Fix lint errors

Signed-off-by: Gabriel Adrian Samfira <gsamfira@cloudbasesolutions.com>
This commit is contained in:
Gabriel Adrian Samfira 2025-04-27 20:28:06 +00:00
parent a4ac85aa4a
commit 884be62a4d
32 changed files with 127 additions and 161 deletions

1
cache/cache.go vendored
View file

@ -5,7 +5,6 @@ import (
"time"
commonParams "github.com/cloudbase/garm-provider-common/params"
"github.com/cloudbase/garm/params"
)

View file

@ -493,13 +493,13 @@ func formatPools(pools []params.Pool) {
switch {
case pool.RepoID != "" && pool.RepoName != "":
belongsTo = pool.RepoName
level = "repo"
level = entityTypeRepo
case pool.OrgID != "" && pool.OrgName != "":
belongsTo = pool.OrgName
level = "org"
level = entityTypeOrg
case pool.EnterpriseID != "" && pool.EnterpriseName != "":
belongsTo = pool.EnterpriseName
level = "enterprise"
level = entityTypeEnterprise
}
row := table.Row{pool.ID, pool.Image, pool.Flavor, strings.Join(tags, " "), belongsTo, pool.Enabled}
if long {
@ -532,13 +532,13 @@ func formatOnePool(pool params.Pool) {
switch {
case pool.RepoID != "" && pool.RepoName != "":
belongsTo = pool.RepoName
level = "repo"
level = entityTypeRepo
case pool.OrgID != "" && pool.OrgName != "":
belongsTo = pool.OrgName
level = "org"
level = entityTypeOrg
case pool.EnterpriseID != "" && pool.EnterpriseName != "":
belongsTo = pool.EnterpriseName
level = "enterprise"
level = entityTypeEnterprise
}
t.AppendHeader(header)

View file

@ -31,6 +31,12 @@ import (
"github.com/cloudbase/garm/params"
)
const (
entityTypeOrg string = "org"
entityTypeRepo string = "repo"
entityTypeEnterprise string = "enterprise"
)
var (
cfg *config.Config
mgr config.Manager

View file

@ -446,13 +446,13 @@ func formatScaleSets(scaleSets []params.ScaleSet) {
switch {
case scaleSet.RepoID != "" && scaleSet.RepoName != "":
belongsTo = scaleSet.RepoName
level = "repo"
level = entityTypeRepo
case scaleSet.OrgID != "" && scaleSet.OrgName != "":
belongsTo = scaleSet.OrgName
level = "org"
level = entityTypeOrg
case scaleSet.EnterpriseID != "" && scaleSet.EnterpriseName != "":
belongsTo = scaleSet.EnterpriseName
level = "enterprise"
level = entityTypeEnterprise
}
t.AppendRow(table.Row{scaleSet.ID, scaleSet.Name, scaleSet.Image, scaleSet.Flavor, belongsTo, level, scaleSet.Enabled, scaleSet.GetRunnerPrefix(), scaleSet.ProviderName})
t.AppendSeparator()
@ -476,13 +476,13 @@ func formatOneScaleSet(scaleSet params.ScaleSet) {
switch {
case scaleSet.RepoID != "" && scaleSet.RepoName != "":
belongsTo = scaleSet.RepoName
level = "repo"
level = entityTypeRepo
case scaleSet.OrgID != "" && scaleSet.OrgName != "":
belongsTo = scaleSet.OrgName
level = "org"
level = entityTypeOrg
case scaleSet.EnterpriseID != "" && scaleSet.EnterpriseName != "":
belongsTo = scaleSet.EnterpriseName
level = "enterprise"
level = entityTypeEnterprise
}
t.AppendHeader(header)

View file

@ -180,6 +180,7 @@ func maybeUpdateURLsFromConfig(cfg config.Config, store common.Store) error {
return nil
}
//gocyclo:ignore
func main() {
flag.Parse()
if *version {

View file

@ -140,7 +140,7 @@ type ScaleSetsStore interface {
ListAllScaleSets(ctx context.Context) ([]params.ScaleSet, error)
CreateEntityScaleSet(_ context.Context, entity params.GithubEntity, param params.CreateScaleSetParams) (scaleSet params.ScaleSet, err error)
ListEntityScaleSets(_ context.Context, entity params.GithubEntity) ([]params.ScaleSet, error)
UpdateEntityScaleSet(_ context.Context, entity params.GithubEntity, scaleSetID uint, param params.UpdateScaleSetParams, callback func(old, new params.ScaleSet) error) (updatedScaleSet params.ScaleSet, err error)
UpdateEntityScaleSet(_ context.Context, entity params.GithubEntity, scaleSetID uint, param params.UpdateScaleSetParams, callback func(old, newSet params.ScaleSet) error) (updatedScaleSet params.ScaleSet, err error)
GetScaleSetByID(ctx context.Context, scaleSet uint) (params.ScaleSet, error)
DeleteScaleSetByID(ctx context.Context, scaleSetID uint) (err error)
SetScaleSetLastMessageID(ctx context.Context, scaleSetID uint, lastMessageID int64) error

View file

@ -101,13 +101,13 @@ func (s *sqlDatabase) getEntityPool(tx *gorm.DB, entityType params.GithubEntityT
switch entityType {
case params.GithubEntityTypeRepository:
fieldName = entityTypeRepoName
entityField = "Repository"
entityField = repositoryFieldName
case params.GithubEntityTypeOrganization:
fieldName = entityTypeOrgName
entityField = "Organization"
entityField = organizationFieldName
case params.GithubEntityTypeEnterprise:
fieldName = entityTypeEnterpriseName
entityField = "Enterprise"
entityField = enterpriseFieldName
default:
return Pool{}, fmt.Errorf("invalid entityType: %v", entityType)
}

View file

@ -3,9 +3,10 @@ package sql
import (
"context"
"github.com/pkg/errors"
"github.com/cloudbase/garm/database/common"
"github.com/cloudbase/garm/params"
"github.com/pkg/errors"
)
func (s *sqlDatabase) CreateScaleSetInstance(_ context.Context, scaleSetID uint, param params.CreateInstanceParams) (instance params.Instance, err error) {

View file

@ -18,13 +18,14 @@ import (
"context"
"fmt"
runnerErrors "github.com/cloudbase/garm-provider-common/errors"
"github.com/cloudbase/garm/database/common"
"github.com/cloudbase/garm/params"
"github.com/google/uuid"
"github.com/pkg/errors"
"gorm.io/datatypes"
"gorm.io/gorm"
runnerErrors "github.com/cloudbase/garm-provider-common/errors"
"github.com/cloudbase/garm/database/common"
"github.com/cloudbase/garm/params"
)
func (s *sqlDatabase) ListAllScaleSets(_ context.Context) ([]params.ScaleSet, error) {
@ -136,13 +137,13 @@ func (s *sqlDatabase) listEntityScaleSets(tx *gorm.DB, entityType params.GithubE
switch entityType {
case params.GithubEntityTypeRepository:
fieldName = entityTypeRepoName
preloadEntity = "Repository"
preloadEntity = repositoryFieldName
case params.GithubEntityTypeOrganization:
fieldName = entityTypeOrgName
preloadEntity = "Organization"
preloadEntity = organizationFieldName
case params.GithubEntityTypeEnterprise:
fieldName = entityTypeEnterpriseName
preloadEntity = "Enterprise"
preloadEntity = enterpriseFieldName
default:
return nil, fmt.Errorf("invalid entityType: %v", entityType)
}
@ -189,7 +190,7 @@ func (s *sqlDatabase) ListEntityScaleSets(_ context.Context, entity params.Githu
return ret, nil
}
func (s *sqlDatabase) UpdateEntityScaleSet(_ context.Context, entity params.GithubEntity, scaleSetID uint, param params.UpdateScaleSetParams, callback func(old, new params.ScaleSet) error) (updatedScaleSet params.ScaleSet, err error) {
func (s *sqlDatabase) UpdateEntityScaleSet(_ context.Context, entity params.GithubEntity, scaleSetID uint, param params.UpdateScaleSetParams, callback func(old, newSet params.ScaleSet) error) (updatedScaleSet params.ScaleSet, err error) {
defer func() {
if err == nil {
s.sendNotify(common.ScaleSetEntityType, common.UpdateOperation, updatedScaleSet)
@ -348,7 +349,7 @@ func (s *sqlDatabase) GetScaleSetByID(_ context.Context, scaleSet uint) (params.
return s.sqlToCommonScaleSet(set)
}
func (s *sqlDatabase) DeleteScaleSetByID(ctx context.Context, scaleSetID uint) (err error) {
func (s *sqlDatabase) DeleteScaleSetByID(_ context.Context, scaleSetID uint) (err error) {
var scaleSet params.ScaleSet
defer func() {
if err == nil && scaleSet.ID != 0 {
@ -380,7 +381,7 @@ func (s *sqlDatabase) DeleteScaleSetByID(ctx context.Context, scaleSetID uint) (
return nil
}
func (s *sqlDatabase) SetScaleSetLastMessageID(ctx context.Context, scaleSetID uint, lastMessageID int64) (err error) {
func (s *sqlDatabase) SetScaleSetLastMessageID(_ context.Context, scaleSetID uint, lastMessageID int64) (err error) {
var scaleSet params.ScaleSet
defer func() {
if err == nil && scaleSet.ID != 0 {
@ -407,7 +408,7 @@ func (s *sqlDatabase) SetScaleSetLastMessageID(ctx context.Context, scaleSetID u
return nil
}
func (s *sqlDatabase) SetScaleSetDesiredRunnerCount(ctx context.Context, scaleSetID uint, desiredRunnerCount int) (err error) {
func (s *sqlDatabase) SetScaleSetDesiredRunnerCount(_ context.Context, scaleSetID uint, desiredRunnerCount int) (err error) {
var scaleSet params.ScaleSet
defer func() {
if err == nil && scaleSet.ID != 0 {

View file

@ -36,6 +36,12 @@ import (
"github.com/cloudbase/garm/util/appdefaults"
)
const (
repositoryFieldName string = "Repository"
organizationFieldName string = "Organization"
enterpriseFieldName string = "Enterprise"
)
// newDBConn returns a new gorm db connection, given the config
func newDBConn(dbCfg config.Database) (conn *gorm.DB, err error) {
dbType, connURI, err := dbCfg.GormParams()

View file

@ -753,8 +753,7 @@ func (s *sqlDatabase) AddEntityEvent(ctx context.Context, entity params.GithubEn
if maxEvents == 0 {
return errors.Wrap(runnerErrors.ErrBadRequest, "max events cannot be 0")
}
// TODO(gabriel-samfira): Should we send watcher notifications for events?
// Not sure it's of any value.
switch entity.EntityType {
case params.GithubEntityTypeRepository:
return s.addRepositoryEvent(ctx, entity.ID, event, eventLevel, statusMessage, maxEvents)

View file

@ -2,7 +2,6 @@ package watcher
import (
commonParams "github.com/cloudbase/garm-provider-common/params"
dbCommon "github.com/cloudbase/garm/database/common"
"github.com/cloudbase/garm/params"
)

View file

@ -2,7 +2,6 @@ package locking
import "time"
// TODO(gabriel-samfira): needs owner attribute.
type Locker interface {
TryLock(key, identifier string) bool
Lock(key, identifier string)

View file

@ -8,6 +8,7 @@ import (
)
var locker Locker
var lockerMux = sync.Mutex{}
func TryLock(key, identifier string) (ok bool, err error) {

View file

@ -488,12 +488,12 @@ type RunnerGroupList struct {
type ScaleSetJobMessage struct {
MessageType string `json:"messageType,omitempty"`
RunnerRequestId int64 `json:"runnerRequestId,omitempty"`
RunnerRequestID int64 `json:"runnerRequestId,omitempty"`
RepositoryName string `json:"repositoryName,omitempty"`
OwnerName string `json:"ownerName,omitempty"`
JobWorkflowRef string `json:"jobWorkflowRef,omitempty"`
JobDisplayName string `json:"jobDisplayName,omitempty"`
WorkflowRunId int64 `json:"workflowRunId,omitempty"`
WorkflowRunID int64 `json:"workflowRunId,omitempty"`
EventName string `json:"eventName,omitempty"`
RequestLabels []string `json:"requestLabels,omitempty"`
QueueTime time.Time `json:"queueTime,omitempty"`
@ -501,7 +501,7 @@ type ScaleSetJobMessage struct {
RunnerAssignTime time.Time `json:"runnerAssignTime,omitempty"`
FinishTime time.Time `json:"finishTime,omitempty"`
Result string `json:"result,omitempty"`
RunnerId int `json:"runnerId,omitempty"`
RunnerID int `json:"runnerId,omitempty"`
RunnerName string `json:"runnerName,omitempty"`
AcquireJobUrl string `json:"acquireJobUrl,omitempty"`
AcquireJobURL string `json:"acquireJobUrl,omitempty"`
}

View file

@ -58,7 +58,8 @@ func (r *Runner) GetRunnerServiceName(ctx context.Context) (string, error) {
}
var entity params.GithubEntity
if instance.PoolID != "" {
switch {
case instance.PoolID != "":
pool, err := r.store.GetPoolByID(r.ctx, instance.PoolID)
if err != nil {
slog.With(slog.Any("error", err)).ErrorContext(
@ -73,7 +74,7 @@ func (r *Runner) GetRunnerServiceName(ctx context.Context) (string, error) {
"pool_id", instance.PoolID)
return "", errors.Wrap(err, "fetching pool entity")
}
} else if instance.ScaleSetID != 0 {
case instance.ScaleSetID != 0:
scaleSet, err := r.store.GetScaleSetByID(r.ctx, instance.ScaleSetID)
if err != nil {
slog.With(slog.Any("error", err)).ErrorContext(
@ -88,7 +89,7 @@ func (r *Runner) GetRunnerServiceName(ctx context.Context) (string, error) {
"scale_set_id", instance.ScaleSetID)
return "", errors.Wrap(err, "fetching scale set entity")
}
} else {
default:
return "", errors.New("instance not associated with a pool or scale set")
}

View file

@ -857,11 +857,12 @@ func (r *Runner) DeleteRunner(ctx context.Context, instanceName string, forceDel
}
if instance.AgentID != 0 {
if instance.ScaleSetID != 0 {
switch {
case instance.ScaleSetID != 0:
err = ssCli.RemoveRunner(ctx, instance.AgentID)
} else if instance.PoolID != "" {
case instance.PoolID != "":
err = ghCli.RemoveEntityRunner(ctx, instance.AgentID)
} else {
default:
return errors.New("instance does not have a pool or scale set")
}
@ -901,20 +902,23 @@ func (r *Runner) DeleteRunner(ctx context.Context, instanceName string, forceDel
}
func (r *Runner) getGHCliFromInstance(ctx context.Context, instance params.Instance) (common.GithubClient, *scalesets.ScaleSetClient, error) {
// nolint:golangci-lint,godox
// TODO(gabriel-samfira): We can probably cache the entity.
var entityGetter params.EntityGetter
var err error
if instance.PoolID != "" {
switch {
case instance.PoolID != "":
entityGetter, err = r.store.GetPoolByID(ctx, instance.PoolID)
if err != nil {
return nil, nil, errors.Wrap(err, "fetching pool")
}
} else if instance.ScaleSetID != 0 {
case instance.ScaleSetID != 0:
entityGetter, err = r.store.GetScaleSetByID(ctx, instance.ScaleSetID)
if err != nil {
return nil, nil, errors.Wrap(err, "fetching scale set")
}
} else {
default:
return nil, nil, errors.New("instance does not have a pool or scale set")
}

View file

@ -20,13 +20,14 @@ import (
"fmt"
"log/slog"
"github.com/pkg/errors"
runnerErrors "github.com/cloudbase/garm-provider-common/errors"
"github.com/cloudbase/garm/auth"
"github.com/cloudbase/garm/params"
"github.com/cloudbase/garm/util/appdefaults"
"github.com/cloudbase/garm/util/github"
"github.com/cloudbase/garm/util/github/scalesets"
"github.com/pkg/errors"
)
func (r *Runner) ListAllScaleSets(ctx context.Context) ([]params.ScaleSet, error) {
@ -152,7 +153,7 @@ func (r *Runner) UpdateScaleSetByID(ctx context.Context, scaleSetID uint, param
return params.ScaleSet{}, errors.Wrap(err, "creating github client")
}
callback := func(old, new params.ScaleSet) error {
callback := func(old, newSet params.ScaleSet) error {
scalesetCli, err := scalesets.NewClient(ghCli)
if err != nil {
return errors.Wrap(err, "getting scaleset client")
@ -160,13 +161,13 @@ func (r *Runner) UpdateScaleSetByID(ctx context.Context, scaleSetID uint, param
updateParams := params.RunnerScaleSet{}
hasUpdates := false
if old.Name != new.Name {
updateParams.Name = new.Name
if old.Name != newSet.Name {
updateParams.Name = newSet.Name
hasUpdates = true
}
if old.GitHubRunnerGroup != new.GitHubRunnerGroup {
runnerGroup, err := scalesetCli.GetRunnerGroupByName(ctx, new.GitHubRunnerGroup)
if old.GitHubRunnerGroup != newSet.GitHubRunnerGroup {
runnerGroup, err := scalesetCli.GetRunnerGroupByName(ctx, newSet.GitHubRunnerGroup)
if err != nil {
return fmt.Errorf("error fetching runner group from github: %w", err)
}
@ -174,13 +175,13 @@ func (r *Runner) UpdateScaleSetByID(ctx context.Context, scaleSetID uint, param
hasUpdates = true
}
if old.DisableUpdate != new.DisableUpdate {
updateParams.RunnerSetting.DisableUpdate = new.DisableUpdate
if old.DisableUpdate != newSet.DisableUpdate {
updateParams.RunnerSetting.DisableUpdate = newSet.DisableUpdate
hasUpdates = true
}
if hasUpdates {
result, err := scalesetCli.UpdateRunnerScaleSet(ctx, new.ScaleSetID, updateParams)
result, err := scalesetCli.UpdateRunnerScaleSet(ctx, newSet.ScaleSetID, updateParams)
if err != nil {
return fmt.Errorf("failed to update scaleset in github: %w", err)
}
@ -224,7 +225,7 @@ func (r *Runner) CreateEntityScaleSet(ctx context.Context, entityType params.Git
if err != nil {
return params.ScaleSet{}, errors.Wrap(err, "getting scaleset client")
}
var runnerGroupID int = 1
runnerGroupID := 1
if param.GitHubRunnerGroup != "Default" {
runnerGroup, err := scalesetCli.GetRunnerGroupByName(ctx, param.GitHubRunnerGroup)
if err != nil {

View file

@ -41,12 +41,12 @@ func (s *ScaleSetClient) GenerateJitRunnerConfig(ctx context.Context, runnerName
return params.RunnerScaleSetJitRunnerConfig{}, err
}
serviceUrl, err := s.actionsServiceInfo.GetURL()
serviceURL, err := s.actionsServiceInfo.GetURL()
if err != nil {
return params.RunnerScaleSetJitRunnerConfig{}, fmt.Errorf("failed to get pipeline URL: %w", err)
}
jitConfigPath := fmt.Sprintf("/%s/%d/generatejitconfig", scaleSetEndpoint, scaleSetID)
jitConfigURL := serviceUrl.JoinPath(jitConfigPath)
jitConfigURL := serviceURL.JoinPath(jitConfigPath)
req, err := s.newActionsRequest(ctx, http.MethodPost, jitConfigURL.String(), bytes.NewBuffer(body))
if err != nil {

View file

@ -140,7 +140,6 @@ func (c *Controller) Start() error {
c.ctx, c.consumerID,
composeControllerWatcherFilters(),
)
if err != nil {
return fmt.Errorf("failed to create consumer for entity controller: %w", err)
}

View file

@ -40,9 +40,6 @@ type Worker struct {
Entity params.GithubEntity
providers map[string]common.Provider
scaleSetController *scaleset.Controller
// TODO(gabriel-samfira): replace current pool manager with something similar
// to the scale set controller.
// poolManager *pool.Controller
mux sync.Mutex
running bool

View file

@ -2,6 +2,4 @@ package provider
import "fmt"
var (
ErrInstanceDeleted = fmt.Errorf("instance deleted")
)
var ErrInstanceDeleted = fmt.Errorf("instance deleted")

View file

@ -10,7 +10,6 @@ import (
runnerErrors "github.com/cloudbase/garm-provider-common/errors"
commonParams "github.com/cloudbase/garm-provider-common/params"
"github.com/cloudbase/garm/cache"
dbCommon "github.com/cloudbase/garm/database/common"
"github.com/cloudbase/garm/params"
@ -18,7 +17,7 @@ import (
garmUtil "github.com/cloudbase/garm/util"
)
func NewInstanceManager(ctx context.Context, instance params.Instance, scaleSet params.ScaleSet, provider common.Provider, helper providerHelper) (*instanceManager, error) {
func newInstanceManager(ctx context.Context, instance params.Instance, scaleSet params.ScaleSet, provider common.Provider, helper providerHelper) (*instanceManager, error) {
ctx = garmUtil.WithSlogContext(ctx, slog.Any("instance", instance.Name))
githubEntity, err := scaleSet.GetEntity()
@ -146,7 +145,6 @@ func (i *instanceManager) pseudoPoolID() string {
}
func (i *instanceManager) handleCreateInstanceInProvider(instance params.Instance) error {
// TODO(gabriel-samfira): implement the creation of the instance in the provider.
entity, err := i.getEntity()
if err != nil {
return fmt.Errorf("getting entity: %w", err)

View file

@ -7,7 +7,6 @@ import (
"sync"
commonParams "github.com/cloudbase/garm-provider-common/params"
"github.com/cloudbase/garm/auth"
dbCommon "github.com/cloudbase/garm/database/common"
"github.com/cloudbase/garm/database/watcher"
@ -15,9 +14,9 @@ import (
"github.com/cloudbase/garm/runner/common"
)
func NewWorker(ctx context.Context, store dbCommon.Store, providers map[string]common.Provider, tokenGetter auth.InstanceTokenGetter) (*provider, error) {
func NewWorker(ctx context.Context, store dbCommon.Store, providers map[string]common.Provider, tokenGetter auth.InstanceTokenGetter) (*Provider, error) {
consumerID := "provider-worker"
return &provider{
return &Provider{
ctx: context.Background(),
store: store,
consumerID: consumerID,
@ -28,11 +27,12 @@ func NewWorker(ctx context.Context, store dbCommon.Store, providers map[string]c
}, nil
}
type provider struct {
type Provider struct {
ctx context.Context
consumerID string
consumer dbCommon.Consumer
// nolint:golangci-lint,godox
// TODO: not all workers should have access to the store.
// We need to implement way to RPC from workers to controllers
// and abstract that into something we can use to eventually
@ -51,7 +51,7 @@ type provider struct {
quit chan struct{}
}
func (p *provider) loadAllScaleSets() error {
func (p *Provider) loadAllScaleSets() error {
scaleSets, err := p.store.ListAllScaleSets(p.ctx)
if err != nil {
return fmt.Errorf("fetching scale sets: %w", err)
@ -67,7 +67,7 @@ func (p *provider) loadAllScaleSets() error {
// loadAllRunners loads all runners from the database. At this stage we only
// care about runners created by scale sets, but in the future, we will migrate
// the pool manager to the same model.
func (p *provider) loadAllRunners() error {
func (p *Provider) loadAllRunners() error {
runners, err := p.store.ListAllInstances(p.ctx)
if err != nil {
return fmt.Errorf("fetching runners: %w", err)
@ -102,7 +102,7 @@ func (p *provider) loadAllRunners() error {
slog.ErrorContext(p.ctx, "provider not found", "provider_name", runner.ProviderName)
continue
}
instanceManager, err := NewInstanceManager(
instanceManager, err := newInstanceManager(
p.ctx, runner, scaleSet, provider, p)
if err != nil {
return fmt.Errorf("creating instance manager: %w", err)
@ -117,7 +117,7 @@ func (p *provider) loadAllRunners() error {
return nil
}
func (p *provider) Start() error {
func (p *Provider) Start() error {
p.mux.Lock()
defer p.mux.Unlock()
@ -147,7 +147,7 @@ func (p *provider) Start() error {
return nil
}
func (p *provider) Stop() error {
func (p *Provider) Stop() error {
p.mux.Lock()
defer p.mux.Unlock()
@ -161,7 +161,7 @@ func (p *provider) Stop() error {
return nil
}
func (p *provider) loop() {
func (p *Provider) loop() {
defer p.Stop()
for {
select {
@ -180,7 +180,7 @@ func (p *provider) loop() {
}
}
func (p *provider) handleWatcherEvent(payload dbCommon.ChangePayload) {
func (p *Provider) handleWatcherEvent(payload dbCommon.ChangePayload) {
switch payload.EntityType {
case dbCommon.ScaleSetEntityType:
p.handleScaleSetEvent(payload)
@ -191,7 +191,7 @@ func (p *provider) handleWatcherEvent(payload dbCommon.ChangePayload) {
}
}
func (p *provider) handleScaleSetEvent(event dbCommon.ChangePayload) {
func (p *Provider) handleScaleSetEvent(event dbCommon.ChangePayload) {
p.mux.Lock()
defer p.mux.Unlock()
@ -214,12 +214,12 @@ func (p *provider) handleScaleSetEvent(event dbCommon.ChangePayload) {
}
}
func (p *provider) handleInstanceAdded(instance params.Instance) error {
func (p *Provider) handleInstanceAdded(instance params.Instance) error {
scaleSet, ok := p.scaleSets[instance.ScaleSetID]
if !ok {
return fmt.Errorf("scale set not found for instance %s", instance.Name)
}
instanceManager, err := NewInstanceManager(
instanceManager, err := newInstanceManager(
p.ctx, instance, scaleSet, p.providers[instance.ProviderName], p)
if err != nil {
return fmt.Errorf("creating instance manager: %w", err)
@ -231,7 +231,7 @@ func (p *provider) handleInstanceAdded(instance params.Instance) error {
return nil
}
func (p *provider) handleInstanceEvent(event dbCommon.ChangePayload) {
func (p *Provider) handleInstanceEvent(event dbCommon.ChangePayload) {
p.mux.Lock()
defer p.mux.Unlock()

View file

@ -17,7 +17,7 @@ type providerHelper interface {
GetGithubEntity(entity params.GithubEntity) (params.GithubEntity, error)
}
func (p *provider) updateArgsFromProviderInstance(instanceName string, providerInstance commonParams.ProviderInstance) (params.Instance, error) {
func (p *Provider) updateArgsFromProviderInstance(instanceName string, providerInstance commonParams.ProviderInstance) (params.Instance, error) {
updateParams := params.UpdateInstanceParams{
ProviderID: providerInstance.ProviderID,
OSName: providerInstance.OSName,
@ -34,7 +34,7 @@ func (p *provider) updateArgsFromProviderInstance(instanceName string, providerI
return updated, nil
}
func (p *provider) GetControllerInfo() (params.ControllerInfo, error) {
func (p *Provider) GetControllerInfo() (params.ControllerInfo, error) {
p.mux.Lock()
defer p.mux.Unlock()
@ -46,7 +46,7 @@ func (p *provider) GetControllerInfo() (params.ControllerInfo, error) {
return info, nil
}
func (p *provider) SetInstanceStatus(instanceName string, status commonParams.InstanceStatus, providerFault []byte) error {
func (p *Provider) SetInstanceStatus(instanceName string, status commonParams.InstanceStatus, providerFault []byte) error {
p.mux.Lock()
defer p.mux.Unlock()
@ -67,11 +67,11 @@ func (p *provider) SetInstanceStatus(instanceName string, status commonParams.In
return nil
}
func (p *provider) InstanceTokenGetter() auth.InstanceTokenGetter {
func (p *Provider) InstanceTokenGetter() auth.InstanceTokenGetter {
return p.tokenGetter
}
func (p *provider) GetGithubEntity(entity params.GithubEntity) (params.GithubEntity, error) {
func (p *Provider) GetGithubEntity(entity params.GithubEntity) (params.GithubEntity, error) {
ghEntity, err := p.store.GetGithubEntity(p.ctx, entity.EntityType, entity.ID)
if err != nil {
return params.GithubEntity{}, fmt.Errorf("getting github entity: %w", err)

View file

@ -2,7 +2,6 @@ package provider
import (
commonParams "github.com/cloudbase/garm-provider-common/params"
dbCommon "github.com/cloudbase/garm/database/common"
"github.com/cloudbase/garm/database/watcher"
)

View file

@ -9,7 +9,6 @@ import (
"time"
runnerErrors "github.com/cloudbase/garm-provider-common/errors"
"github.com/cloudbase/garm/cache"
dbCommon "github.com/cloudbase/garm/database/common"
"github.com/cloudbase/garm/database/watcher"
@ -20,7 +19,6 @@ import (
)
func NewController(ctx context.Context, store dbCommon.Store, entity params.GithubEntity, providers map[string]common.Provider) (*Controller, error) {
consumerID := fmt.Sprintf("scaleset-worker-%s", entity.String())
ctx = garmUtil.WithSlogContext(
@ -28,31 +26,22 @@ func NewController(ctx context.Context, store dbCommon.Store, entity params.Gith
slog.Any("worker", consumerID))
return &Controller{
ctx: ctx,
consumerID: consumerID,
ScaleSets: make(map[uint]*scaleSet),
Entity: entity,
providers: providers,
store: store,
statusUpdates: make(chan scaleSetStatus, 10),
ctx: ctx,
consumerID: consumerID,
ScaleSets: make(map[uint]*scaleSet),
Entity: entity,
providers: providers,
store: store,
}, nil
}
type scaleSet struct {
scaleSet params.ScaleSet
status scaleSetStatus
worker *Worker
mux sync.Mutex
}
func (s *scaleSet) updateStatus(status scaleSetStatus) {
s.mux.Lock()
defer s.mux.Unlock()
s.status = status
}
func (s *scaleSet) Stop() error {
s.mux.Lock()
defer s.mux.Unlock()
@ -80,8 +69,6 @@ type Controller struct {
ghCli common.GithubClient
forgeCredsAreValid bool
statusUpdates chan scaleSetStatus
mux sync.Mutex
running bool
quit chan struct{}
@ -162,8 +149,6 @@ func (c *Controller) Stop() error {
c.running = false
close(c.quit)
c.quit = nil
close(c.statusUpdates)
c.statusUpdates = nil
c.consumer.Close()
return nil
@ -180,6 +165,7 @@ func (c *Controller) updateTools() error {
slog.With(slog.Any("error", err)).ErrorContext(
c.ctx, "failed to update tools for entity", "entity", c.Entity.String())
if errors.Is(err, runnerErrors.ErrUnauthorized) {
// nolint:golangci-lint,godox
// TODO: block all scale sets
c.forgeCredsAreValid = false
}
@ -191,21 +177,6 @@ func (c *Controller) updateTools() error {
return nil
}
func (c *Controller) handleScaleSetStatusUpdates(status scaleSetStatus) {
if status.scaleSet.ID == 0 {
slog.DebugContext(c.ctx, "invalid scale set ID; ignoring")
return
}
scaleSet, ok := c.ScaleSets[status.scaleSet.ID]
if !ok {
slog.DebugContext(c.ctx, "scale set not found; ignoring")
return
}
scaleSet.updateStatus(status)
}
func (c *Controller) loop() {
defer c.Stop()
updateToolsTicker := time.NewTicker(common.PoolToolUpdateInterval)
@ -231,11 +202,6 @@ func (c *Controller) loop() {
case <-c.ctx.Done():
return
case <-initialToolUpdate:
case update, ok := <-c.statusUpdates:
if !ok {
return
}
go c.handleScaleSetStatusUpdates(update)
case _, ok := <-updateToolsTicker.C:
if !ok {
slog.InfoContext(c.ctx, "update tools ticker closed")

View file

@ -88,8 +88,8 @@ func (c *Controller) handleScaleSetCreateOperation(sSet params.ScaleSet, ghCli c
}
c.ScaleSets[sSet.ID] = &scaleSet{
scaleSet: sSet,
status: scaleSetStatus{},
worker: worker,
// status: scaleSetStatus{},
worker: worker,
}
return nil
}
@ -117,7 +117,7 @@ func (c *Controller) handleScaleSetUpdateOperation(sSet params.ScaleSet) error {
defer c.mux.Unlock()
if _, ok := c.ScaleSets[sSet.ID]; !ok {
// Some error may have occured when the scale set was first created, so we
// Some error may have occurred when the scale set was first created, so we
// attempt to create it after the user updated the scale set, hopefully
// fixing the reason for the failure.
return c.handleScaleSetCreateOperation(sSet, c.ghCli)

View file

@ -10,7 +10,6 @@ import (
runnerErrors "github.com/cloudbase/garm-provider-common/errors"
commonParams "github.com/cloudbase/garm-provider-common/params"
"github.com/cloudbase/garm-provider-common/util"
dbCommon "github.com/cloudbase/garm/database/common"
"github.com/cloudbase/garm/database/watcher"
@ -127,8 +126,9 @@ func (w *Worker) Start() (err error) {
if !errors.Is(err, runnerErrors.ErrNotFound) {
if errors.Is(err, runnerErrors.ErrUnauthorized) {
// we don't have access to remove the runner. This implies that our
// credentials may have expired or ar incorect.
// credentials may have expired or ar incorrect.
//
// nolint:golangci-lint,godox
// TODO(gabriel-samfira): we need to set the scale set as inactive and stop the listener (if any).
slog.ErrorContext(w.ctx, "error removing runner", "runner_name", instance.Name, "error", err)
w.runners[instance.ID] = instance
@ -282,6 +282,7 @@ func (w *Worker) handleScaleSetEvent(event dbCommon.ChangePayload) {
slog.ErrorContext(w.ctx, "error stopping listener", "error", err)
}
}
// nolint:golangci-lint,godox
// TODO: should we kick off auto-scaling if desired runner count changes?
w.scaleSet = scaleSet
w.mux.Unlock()
@ -345,6 +346,7 @@ func (w *Worker) handleInstanceEntityEvent(event dbCommon.ChangePayload) {
return
}
if status != string(params.RunnerIdle) && status != string(params.RunnerActive) {
// nolint:golangci-lint,godox
// TODO: Wait for the status to change for a while (30 seconds?). Mark the instance as
// pending_delete if the runner never comes online.
w.mux.Unlock()
@ -440,7 +442,7 @@ func (w *Worker) keepListenerAlive() {
w.mux.Unlock()
for {
w.mux.Lock()
w.listener.Stop() //cleanup
w.listener.Stop() // cleanup
if !w.scaleSet.Enabled {
w.mux.Unlock()
break
@ -449,12 +451,13 @@ func (w *Worker) keepListenerAlive() {
if err := w.listener.Start(); err != nil {
w.mux.Unlock()
slog.ErrorContext(w.ctx, "error restarting listener", "error", err)
if backoff > 60*time.Second {
switch {
case backoff > 60*time.Second:
backoff = 60 * time.Second
} else if backoff == 0 {
case backoff == 0:
backoff = 5 * time.Second
slog.InfoContext(w.ctx, "backing off restart attempt", "backoff", backoff)
} else {
default:
backoff *= 2
}
slog.ErrorContext(w.ctx, "error restarting listener", "error", err, "backoff", backoff)
@ -512,7 +515,7 @@ func (w *Worker) handleScaleUp(target, current uint) {
CreateAttempt: 1,
GitHubRunnerGroup: w.scaleSet.GitHubRunnerGroup,
JitConfiguration: decodedJit,
AgentID: int64(jitConfig.Runner.ID),
AgentID: jitConfig.Runner.ID,
}
if _, err := w.store.CreateScaleSetInstance(w.ctx, w.scaleSet.ID, runnerParams); err != nil {
@ -618,6 +621,7 @@ func (w *Worker) handleScaleDown(target, current uint) {
locking.Unlock(runner.Name, true)
continue
}
// nolint:golangci-lint,godox
// TODO: This should not happen, unless there is some issue with the database.
// The UpdateInstance() function should add tenacity, but even in that case, if it
// still errors out, we need to handle it somehow.

View file

@ -7,7 +7,6 @@ import (
runnerErrors "github.com/cloudbase/garm-provider-common/errors"
commonParams "github.com/cloudbase/garm-provider-common/params"
"github.com/cloudbase/garm/locking"
"github.com/cloudbase/garm/params"
"github.com/cloudbase/garm/util/github/scalesets"

View file

@ -123,15 +123,15 @@ func (l *scaleSetListener) handleSessionMessage(msg params.RunnerScaleSetMessage
for _, job := range body {
switch job.MessageType {
case params.MessageTypeJobAssigned:
slog.InfoContext(l.ctx, "new job assigned", "job_id", job.RunnerRequestId, "job_name", job.JobDisplayName)
slog.InfoContext(l.ctx, "new job assigned", "job_id", job.RunnerRequestID, "job_name", job.JobDisplayName)
case params.MessageTypeJobStarted:
slog.InfoContext(l.ctx, "job started", "job_id", job.RunnerRequestId, "job_name", job.JobDisplayName, "runner_name", job.RunnerName)
slog.InfoContext(l.ctx, "job started", "job_id", job.RunnerRequestID, "job_name", job.JobDisplayName, "runner_name", job.RunnerName)
startedJobs = append(startedJobs, job)
case params.MessageTypeJobCompleted:
slog.InfoContext(l.ctx, "job completed", "job_id", job.RunnerRequestId, "job_name", job.JobDisplayName, "runner_name", job.RunnerName)
slog.InfoContext(l.ctx, "job completed", "job_id", job.RunnerRequestID, "job_name", job.JobDisplayName, "runner_name", job.RunnerName)
completedJobs = append(completedJobs, job)
case params.MessageTypeJobAvailable:
slog.InfoContext(l.ctx, "job available", "job_id", job.RunnerRequestId, "job_name", job.JobDisplayName)
slog.InfoContext(l.ctx, "job available", "job_id", job.RunnerRequestID, "job_name", job.JobDisplayName)
availableJobs = append(availableJobs, job)
default:
slog.DebugContext(l.ctx, "unknown message type", "message_type", job.MessageType)
@ -139,13 +139,13 @@ func (l *scaleSetListener) handleSessionMessage(msg params.RunnerScaleSetMessage
}
if len(availableJobs) > 0 {
jobIds := make([]int64, len(availableJobs))
jobIDs := make([]int64, len(availableJobs))
for idx, job := range availableJobs {
jobIds[idx] = job.RunnerRequestId
jobIDs[idx] = job.RunnerRequestID
}
idsAcquired, err := l.scaleSetHelper.ScaleSetCLI().AcquireJobs(
l.listenerCtx, l.scaleSetHelper.GetScaleSet().ScaleSetID,
l.messageSession.MessageQueueAccessToken(), jobIds)
l.messageSession.MessageQueueAccessToken(), jobIDs)
if err != nil {
// don't mark message as processed. It will be requeued.
slog.ErrorContext(l.ctx, "acquiring jobs", "error", err)
@ -201,7 +201,8 @@ func (l *scaleSetListener) loop() {
return
default:
slog.DebugContext(l.ctx, "getting message", "last_message_id", l.lastMessageID, "max_runners", l.scaleSetHelper.GetScaleSet().MaxRunners)
// TODO: consume initial message on startup and consolidate.
// nolint:golangci-lint,godox
// TODO(gabriel-samfira): consume initial message on startup and consolidate.
// The scale set may have undergone several messages while GARM was
// down.
msg, err := l.messageSession.GetMessage(

View file

@ -1,13 +0,0 @@
package scaleset
import (
"time"
"github.com/cloudbase/garm/params"
)
type scaleSetStatus struct {
err error
heartbeat time.Time
scaleSet params.ScaleSet
}