From 884be62a4d0232970f1fcd38549cabd9afa8a943 Mon Sep 17 00:00:00 2001 From: Gabriel Adrian Samfira Date: Sun, 27 Apr 2025 20:28:06 +0000 Subject: [PATCH] Fix lint errors Signed-off-by: Gabriel Adrian Samfira --- cache/cache.go | 1 - cmd/garm-cli/cmd/pool.go | 12 +++---- cmd/garm-cli/cmd/root.go | 6 ++++ cmd/garm-cli/cmd/scalesets.go | 12 +++---- cmd/garm/main.go | 1 + database/common/store.go | 2 +- database/sql/pools.go | 6 ++-- database/sql/scaleset_instances.go | 3 +- database/sql/scalesets.go | 21 +++++------ database/sql/sql.go | 6 ++++ database/sql/util.go | 3 +- database/watcher/filters.go | 1 - locking/interface.go | 1 - locking/locking.go | 1 + params/github.go | 8 ++--- runner/metadata.go | 7 ++-- runner/runner.go | 16 +++++---- runner/scalesets.go | 21 +++++------ util/github/scalesets/runners.go | 4 +-- workers/entity/controller.go | 1 - workers/entity/worker.go | 3 -- workers/provider/errors.go | 4 +-- workers/provider/instance_manager.go | 4 +-- workers/provider/provider.go | 30 ++++++++-------- workers/provider/provider_helper.go | 10 +++--- workers/provider/util.go | 1 - workers/scaleset/controller.go | 48 ++++---------------------- workers/scaleset/controller_watcher.go | 6 ++-- workers/scaleset/scaleset.go | 18 ++++++---- workers/scaleset/scaleset_helper.go | 1 - workers/scaleset/scaleset_listener.go | 17 ++++----- workers/scaleset/status.go | 13 ------- 32 files changed, 127 insertions(+), 161 deletions(-) delete mode 100644 workers/scaleset/status.go diff --git a/cache/cache.go b/cache/cache.go index 2fa52456..1960de38 100644 --- a/cache/cache.go +++ b/cache/cache.go @@ -5,7 +5,6 @@ import ( "time" commonParams "github.com/cloudbase/garm-provider-common/params" - "github.com/cloudbase/garm/params" ) diff --git a/cmd/garm-cli/cmd/pool.go b/cmd/garm-cli/cmd/pool.go index a4eee742..0b891e96 100644 --- a/cmd/garm-cli/cmd/pool.go +++ b/cmd/garm-cli/cmd/pool.go @@ -493,13 +493,13 @@ func formatPools(pools []params.Pool) { switch { case pool.RepoID != "" && pool.RepoName != "": belongsTo = pool.RepoName - level = "repo" + level = entityTypeRepo case pool.OrgID != "" && pool.OrgName != "": belongsTo = pool.OrgName - level = "org" + level = entityTypeOrg case pool.EnterpriseID != "" && pool.EnterpriseName != "": belongsTo = pool.EnterpriseName - level = "enterprise" + level = entityTypeEnterprise } row := table.Row{pool.ID, pool.Image, pool.Flavor, strings.Join(tags, " "), belongsTo, pool.Enabled} if long { @@ -532,13 +532,13 @@ func formatOnePool(pool params.Pool) { switch { case pool.RepoID != "" && pool.RepoName != "": belongsTo = pool.RepoName - level = "repo" + level = entityTypeRepo case pool.OrgID != "" && pool.OrgName != "": belongsTo = pool.OrgName - level = "org" + level = entityTypeOrg case pool.EnterpriseID != "" && pool.EnterpriseName != "": belongsTo = pool.EnterpriseName - level = "enterprise" + level = entityTypeEnterprise } t.AppendHeader(header) diff --git a/cmd/garm-cli/cmd/root.go b/cmd/garm-cli/cmd/root.go index d1370567..df3ef11b 100644 --- a/cmd/garm-cli/cmd/root.go +++ b/cmd/garm-cli/cmd/root.go @@ -31,6 +31,12 @@ import ( "github.com/cloudbase/garm/params" ) +const ( + entityTypeOrg string = "org" + entityTypeRepo string = "repo" + entityTypeEnterprise string = "enterprise" +) + var ( cfg *config.Config mgr config.Manager diff --git a/cmd/garm-cli/cmd/scalesets.go b/cmd/garm-cli/cmd/scalesets.go index 04c537ee..79486a0e 100644 --- a/cmd/garm-cli/cmd/scalesets.go +++ b/cmd/garm-cli/cmd/scalesets.go @@ -446,13 +446,13 @@ func formatScaleSets(scaleSets []params.ScaleSet) { switch { case scaleSet.RepoID != "" && scaleSet.RepoName != "": belongsTo = scaleSet.RepoName - level = "repo" + level = entityTypeRepo case scaleSet.OrgID != "" && scaleSet.OrgName != "": belongsTo = scaleSet.OrgName - level = "org" + level = entityTypeOrg case scaleSet.EnterpriseID != "" && scaleSet.EnterpriseName != "": belongsTo = scaleSet.EnterpriseName - level = "enterprise" + level = entityTypeEnterprise } t.AppendRow(table.Row{scaleSet.ID, scaleSet.Name, scaleSet.Image, scaleSet.Flavor, belongsTo, level, scaleSet.Enabled, scaleSet.GetRunnerPrefix(), scaleSet.ProviderName}) t.AppendSeparator() @@ -476,13 +476,13 @@ func formatOneScaleSet(scaleSet params.ScaleSet) { switch { case scaleSet.RepoID != "" && scaleSet.RepoName != "": belongsTo = scaleSet.RepoName - level = "repo" + level = entityTypeRepo case scaleSet.OrgID != "" && scaleSet.OrgName != "": belongsTo = scaleSet.OrgName - level = "org" + level = entityTypeOrg case scaleSet.EnterpriseID != "" && scaleSet.EnterpriseName != "": belongsTo = scaleSet.EnterpriseName - level = "enterprise" + level = entityTypeEnterprise } t.AppendHeader(header) diff --git a/cmd/garm/main.go b/cmd/garm/main.go index d117dc6a..c43e3c93 100644 --- a/cmd/garm/main.go +++ b/cmd/garm/main.go @@ -180,6 +180,7 @@ func maybeUpdateURLsFromConfig(cfg config.Config, store common.Store) error { return nil } +//gocyclo:ignore func main() { flag.Parse() if *version { diff --git a/database/common/store.go b/database/common/store.go index 87804281..65fd1343 100644 --- a/database/common/store.go +++ b/database/common/store.go @@ -140,7 +140,7 @@ type ScaleSetsStore interface { ListAllScaleSets(ctx context.Context) ([]params.ScaleSet, error) CreateEntityScaleSet(_ context.Context, entity params.GithubEntity, param params.CreateScaleSetParams) (scaleSet params.ScaleSet, err error) ListEntityScaleSets(_ context.Context, entity params.GithubEntity) ([]params.ScaleSet, error) - UpdateEntityScaleSet(_ context.Context, entity params.GithubEntity, scaleSetID uint, param params.UpdateScaleSetParams, callback func(old, new params.ScaleSet) error) (updatedScaleSet params.ScaleSet, err error) + UpdateEntityScaleSet(_ context.Context, entity params.GithubEntity, scaleSetID uint, param params.UpdateScaleSetParams, callback func(old, newSet params.ScaleSet) error) (updatedScaleSet params.ScaleSet, err error) GetScaleSetByID(ctx context.Context, scaleSet uint) (params.ScaleSet, error) DeleteScaleSetByID(ctx context.Context, scaleSetID uint) (err error) SetScaleSetLastMessageID(ctx context.Context, scaleSetID uint, lastMessageID int64) error diff --git a/database/sql/pools.go b/database/sql/pools.go index 7454b1ef..5cb6d136 100644 --- a/database/sql/pools.go +++ b/database/sql/pools.go @@ -101,13 +101,13 @@ func (s *sqlDatabase) getEntityPool(tx *gorm.DB, entityType params.GithubEntityT switch entityType { case params.GithubEntityTypeRepository: fieldName = entityTypeRepoName - entityField = "Repository" + entityField = repositoryFieldName case params.GithubEntityTypeOrganization: fieldName = entityTypeOrgName - entityField = "Organization" + entityField = organizationFieldName case params.GithubEntityTypeEnterprise: fieldName = entityTypeEnterpriseName - entityField = "Enterprise" + entityField = enterpriseFieldName default: return Pool{}, fmt.Errorf("invalid entityType: %v", entityType) } diff --git a/database/sql/scaleset_instances.go b/database/sql/scaleset_instances.go index 106df956..fcb9e1f2 100644 --- a/database/sql/scaleset_instances.go +++ b/database/sql/scaleset_instances.go @@ -3,9 +3,10 @@ package sql import ( "context" + "github.com/pkg/errors" + "github.com/cloudbase/garm/database/common" "github.com/cloudbase/garm/params" - "github.com/pkg/errors" ) func (s *sqlDatabase) CreateScaleSetInstance(_ context.Context, scaleSetID uint, param params.CreateInstanceParams) (instance params.Instance, err error) { diff --git a/database/sql/scalesets.go b/database/sql/scalesets.go index f168813b..ea4878bf 100644 --- a/database/sql/scalesets.go +++ b/database/sql/scalesets.go @@ -18,13 +18,14 @@ import ( "context" "fmt" - runnerErrors "github.com/cloudbase/garm-provider-common/errors" - "github.com/cloudbase/garm/database/common" - "github.com/cloudbase/garm/params" "github.com/google/uuid" "github.com/pkg/errors" "gorm.io/datatypes" "gorm.io/gorm" + + runnerErrors "github.com/cloudbase/garm-provider-common/errors" + "github.com/cloudbase/garm/database/common" + "github.com/cloudbase/garm/params" ) func (s *sqlDatabase) ListAllScaleSets(_ context.Context) ([]params.ScaleSet, error) { @@ -136,13 +137,13 @@ func (s *sqlDatabase) listEntityScaleSets(tx *gorm.DB, entityType params.GithubE switch entityType { case params.GithubEntityTypeRepository: fieldName = entityTypeRepoName - preloadEntity = "Repository" + preloadEntity = repositoryFieldName case params.GithubEntityTypeOrganization: fieldName = entityTypeOrgName - preloadEntity = "Organization" + preloadEntity = organizationFieldName case params.GithubEntityTypeEnterprise: fieldName = entityTypeEnterpriseName - preloadEntity = "Enterprise" + preloadEntity = enterpriseFieldName default: return nil, fmt.Errorf("invalid entityType: %v", entityType) } @@ -189,7 +190,7 @@ func (s *sqlDatabase) ListEntityScaleSets(_ context.Context, entity params.Githu return ret, nil } -func (s *sqlDatabase) UpdateEntityScaleSet(_ context.Context, entity params.GithubEntity, scaleSetID uint, param params.UpdateScaleSetParams, callback func(old, new params.ScaleSet) error) (updatedScaleSet params.ScaleSet, err error) { +func (s *sqlDatabase) UpdateEntityScaleSet(_ context.Context, entity params.GithubEntity, scaleSetID uint, param params.UpdateScaleSetParams, callback func(old, newSet params.ScaleSet) error) (updatedScaleSet params.ScaleSet, err error) { defer func() { if err == nil { s.sendNotify(common.ScaleSetEntityType, common.UpdateOperation, updatedScaleSet) @@ -348,7 +349,7 @@ func (s *sqlDatabase) GetScaleSetByID(_ context.Context, scaleSet uint) (params. return s.sqlToCommonScaleSet(set) } -func (s *sqlDatabase) DeleteScaleSetByID(ctx context.Context, scaleSetID uint) (err error) { +func (s *sqlDatabase) DeleteScaleSetByID(_ context.Context, scaleSetID uint) (err error) { var scaleSet params.ScaleSet defer func() { if err == nil && scaleSet.ID != 0 { @@ -380,7 +381,7 @@ func (s *sqlDatabase) DeleteScaleSetByID(ctx context.Context, scaleSetID uint) ( return nil } -func (s *sqlDatabase) SetScaleSetLastMessageID(ctx context.Context, scaleSetID uint, lastMessageID int64) (err error) { +func (s *sqlDatabase) SetScaleSetLastMessageID(_ context.Context, scaleSetID uint, lastMessageID int64) (err error) { var scaleSet params.ScaleSet defer func() { if err == nil && scaleSet.ID != 0 { @@ -407,7 +408,7 @@ func (s *sqlDatabase) SetScaleSetLastMessageID(ctx context.Context, scaleSetID u return nil } -func (s *sqlDatabase) SetScaleSetDesiredRunnerCount(ctx context.Context, scaleSetID uint, desiredRunnerCount int) (err error) { +func (s *sqlDatabase) SetScaleSetDesiredRunnerCount(_ context.Context, scaleSetID uint, desiredRunnerCount int) (err error) { var scaleSet params.ScaleSet defer func() { if err == nil && scaleSet.ID != 0 { diff --git a/database/sql/sql.go b/database/sql/sql.go index a704d9c3..76495732 100644 --- a/database/sql/sql.go +++ b/database/sql/sql.go @@ -36,6 +36,12 @@ import ( "github.com/cloudbase/garm/util/appdefaults" ) +const ( + repositoryFieldName string = "Repository" + organizationFieldName string = "Organization" + enterpriseFieldName string = "Enterprise" +) + // newDBConn returns a new gorm db connection, given the config func newDBConn(dbCfg config.Database) (conn *gorm.DB, err error) { dbType, connURI, err := dbCfg.GormParams() diff --git a/database/sql/util.go b/database/sql/util.go index 112d0a76..12513ede 100644 --- a/database/sql/util.go +++ b/database/sql/util.go @@ -753,8 +753,7 @@ func (s *sqlDatabase) AddEntityEvent(ctx context.Context, entity params.GithubEn if maxEvents == 0 { return errors.Wrap(runnerErrors.ErrBadRequest, "max events cannot be 0") } - // TODO(gabriel-samfira): Should we send watcher notifications for events? - // Not sure it's of any value. + switch entity.EntityType { case params.GithubEntityTypeRepository: return s.addRepositoryEvent(ctx, entity.ID, event, eventLevel, statusMessage, maxEvents) diff --git a/database/watcher/filters.go b/database/watcher/filters.go index 6a7e8abf..251a6bc6 100644 --- a/database/watcher/filters.go +++ b/database/watcher/filters.go @@ -2,7 +2,6 @@ package watcher import ( commonParams "github.com/cloudbase/garm-provider-common/params" - dbCommon "github.com/cloudbase/garm/database/common" "github.com/cloudbase/garm/params" ) diff --git a/locking/interface.go b/locking/interface.go index d6a0b62d..7750167b 100644 --- a/locking/interface.go +++ b/locking/interface.go @@ -2,7 +2,6 @@ package locking import "time" -// TODO(gabriel-samfira): needs owner attribute. type Locker interface { TryLock(key, identifier string) bool Lock(key, identifier string) diff --git a/locking/locking.go b/locking/locking.go index c7d99b1d..c7ad89a3 100644 --- a/locking/locking.go +++ b/locking/locking.go @@ -8,6 +8,7 @@ import ( ) var locker Locker + var lockerMux = sync.Mutex{} func TryLock(key, identifier string) (ok bool, err error) { diff --git a/params/github.go b/params/github.go index e0ad0452..7f99750f 100644 --- a/params/github.go +++ b/params/github.go @@ -488,12 +488,12 @@ type RunnerGroupList struct { type ScaleSetJobMessage struct { MessageType string `json:"messageType,omitempty"` - RunnerRequestId int64 `json:"runnerRequestId,omitempty"` + RunnerRequestID int64 `json:"runnerRequestId,omitempty"` RepositoryName string `json:"repositoryName,omitempty"` OwnerName string `json:"ownerName,omitempty"` JobWorkflowRef string `json:"jobWorkflowRef,omitempty"` JobDisplayName string `json:"jobDisplayName,omitempty"` - WorkflowRunId int64 `json:"workflowRunId,omitempty"` + WorkflowRunID int64 `json:"workflowRunId,omitempty"` EventName string `json:"eventName,omitempty"` RequestLabels []string `json:"requestLabels,omitempty"` QueueTime time.Time `json:"queueTime,omitempty"` @@ -501,7 +501,7 @@ type ScaleSetJobMessage struct { RunnerAssignTime time.Time `json:"runnerAssignTime,omitempty"` FinishTime time.Time `json:"finishTime,omitempty"` Result string `json:"result,omitempty"` - RunnerId int `json:"runnerId,omitempty"` + RunnerID int `json:"runnerId,omitempty"` RunnerName string `json:"runnerName,omitempty"` - AcquireJobUrl string `json:"acquireJobUrl,omitempty"` + AcquireJobURL string `json:"acquireJobUrl,omitempty"` } diff --git a/runner/metadata.go b/runner/metadata.go index 1d75fba4..3892d350 100644 --- a/runner/metadata.go +++ b/runner/metadata.go @@ -58,7 +58,8 @@ func (r *Runner) GetRunnerServiceName(ctx context.Context) (string, error) { } var entity params.GithubEntity - if instance.PoolID != "" { + switch { + case instance.PoolID != "": pool, err := r.store.GetPoolByID(r.ctx, instance.PoolID) if err != nil { slog.With(slog.Any("error", err)).ErrorContext( @@ -73,7 +74,7 @@ func (r *Runner) GetRunnerServiceName(ctx context.Context) (string, error) { "pool_id", instance.PoolID) return "", errors.Wrap(err, "fetching pool entity") } - } else if instance.ScaleSetID != 0 { + case instance.ScaleSetID != 0: scaleSet, err := r.store.GetScaleSetByID(r.ctx, instance.ScaleSetID) if err != nil { slog.With(slog.Any("error", err)).ErrorContext( @@ -88,7 +89,7 @@ func (r *Runner) GetRunnerServiceName(ctx context.Context) (string, error) { "scale_set_id", instance.ScaleSetID) return "", errors.Wrap(err, "fetching scale set entity") } - } else { + default: return "", errors.New("instance not associated with a pool or scale set") } diff --git a/runner/runner.go b/runner/runner.go index 4032a94c..42a955fc 100644 --- a/runner/runner.go +++ b/runner/runner.go @@ -857,11 +857,12 @@ func (r *Runner) DeleteRunner(ctx context.Context, instanceName string, forceDel } if instance.AgentID != 0 { - if instance.ScaleSetID != 0 { + switch { + case instance.ScaleSetID != 0: err = ssCli.RemoveRunner(ctx, instance.AgentID) - } else if instance.PoolID != "" { + case instance.PoolID != "": err = ghCli.RemoveEntityRunner(ctx, instance.AgentID) - } else { + default: return errors.New("instance does not have a pool or scale set") } @@ -901,20 +902,23 @@ func (r *Runner) DeleteRunner(ctx context.Context, instanceName string, forceDel } func (r *Runner) getGHCliFromInstance(ctx context.Context, instance params.Instance) (common.GithubClient, *scalesets.ScaleSetClient, error) { + // nolint:golangci-lint,godox // TODO(gabriel-samfira): We can probably cache the entity. var entityGetter params.EntityGetter var err error - if instance.PoolID != "" { + + switch { + case instance.PoolID != "": entityGetter, err = r.store.GetPoolByID(ctx, instance.PoolID) if err != nil { return nil, nil, errors.Wrap(err, "fetching pool") } - } else if instance.ScaleSetID != 0 { + case instance.ScaleSetID != 0: entityGetter, err = r.store.GetScaleSetByID(ctx, instance.ScaleSetID) if err != nil { return nil, nil, errors.Wrap(err, "fetching scale set") } - } else { + default: return nil, nil, errors.New("instance does not have a pool or scale set") } diff --git a/runner/scalesets.go b/runner/scalesets.go index 7b93a662..f55b5dca 100644 --- a/runner/scalesets.go +++ b/runner/scalesets.go @@ -20,13 +20,14 @@ import ( "fmt" "log/slog" + "github.com/pkg/errors" + runnerErrors "github.com/cloudbase/garm-provider-common/errors" "github.com/cloudbase/garm/auth" "github.com/cloudbase/garm/params" "github.com/cloudbase/garm/util/appdefaults" "github.com/cloudbase/garm/util/github" "github.com/cloudbase/garm/util/github/scalesets" - "github.com/pkg/errors" ) func (r *Runner) ListAllScaleSets(ctx context.Context) ([]params.ScaleSet, error) { @@ -152,7 +153,7 @@ func (r *Runner) UpdateScaleSetByID(ctx context.Context, scaleSetID uint, param return params.ScaleSet{}, errors.Wrap(err, "creating github client") } - callback := func(old, new params.ScaleSet) error { + callback := func(old, newSet params.ScaleSet) error { scalesetCli, err := scalesets.NewClient(ghCli) if err != nil { return errors.Wrap(err, "getting scaleset client") @@ -160,13 +161,13 @@ func (r *Runner) UpdateScaleSetByID(ctx context.Context, scaleSetID uint, param updateParams := params.RunnerScaleSet{} hasUpdates := false - if old.Name != new.Name { - updateParams.Name = new.Name + if old.Name != newSet.Name { + updateParams.Name = newSet.Name hasUpdates = true } - if old.GitHubRunnerGroup != new.GitHubRunnerGroup { - runnerGroup, err := scalesetCli.GetRunnerGroupByName(ctx, new.GitHubRunnerGroup) + if old.GitHubRunnerGroup != newSet.GitHubRunnerGroup { + runnerGroup, err := scalesetCli.GetRunnerGroupByName(ctx, newSet.GitHubRunnerGroup) if err != nil { return fmt.Errorf("error fetching runner group from github: %w", err) } @@ -174,13 +175,13 @@ func (r *Runner) UpdateScaleSetByID(ctx context.Context, scaleSetID uint, param hasUpdates = true } - if old.DisableUpdate != new.DisableUpdate { - updateParams.RunnerSetting.DisableUpdate = new.DisableUpdate + if old.DisableUpdate != newSet.DisableUpdate { + updateParams.RunnerSetting.DisableUpdate = newSet.DisableUpdate hasUpdates = true } if hasUpdates { - result, err := scalesetCli.UpdateRunnerScaleSet(ctx, new.ScaleSetID, updateParams) + result, err := scalesetCli.UpdateRunnerScaleSet(ctx, newSet.ScaleSetID, updateParams) if err != nil { return fmt.Errorf("failed to update scaleset in github: %w", err) } @@ -224,7 +225,7 @@ func (r *Runner) CreateEntityScaleSet(ctx context.Context, entityType params.Git if err != nil { return params.ScaleSet{}, errors.Wrap(err, "getting scaleset client") } - var runnerGroupID int = 1 + runnerGroupID := 1 if param.GitHubRunnerGroup != "Default" { runnerGroup, err := scalesetCli.GetRunnerGroupByName(ctx, param.GitHubRunnerGroup) if err != nil { diff --git a/util/github/scalesets/runners.go b/util/github/scalesets/runners.go index 4d6434eb..178361a1 100644 --- a/util/github/scalesets/runners.go +++ b/util/github/scalesets/runners.go @@ -41,12 +41,12 @@ func (s *ScaleSetClient) GenerateJitRunnerConfig(ctx context.Context, runnerName return params.RunnerScaleSetJitRunnerConfig{}, err } - serviceUrl, err := s.actionsServiceInfo.GetURL() + serviceURL, err := s.actionsServiceInfo.GetURL() if err != nil { return params.RunnerScaleSetJitRunnerConfig{}, fmt.Errorf("failed to get pipeline URL: %w", err) } jitConfigPath := fmt.Sprintf("/%s/%d/generatejitconfig", scaleSetEndpoint, scaleSetID) - jitConfigURL := serviceUrl.JoinPath(jitConfigPath) + jitConfigURL := serviceURL.JoinPath(jitConfigPath) req, err := s.newActionsRequest(ctx, http.MethodPost, jitConfigURL.String(), bytes.NewBuffer(body)) if err != nil { diff --git a/workers/entity/controller.go b/workers/entity/controller.go index 424f9099..b0adcb36 100644 --- a/workers/entity/controller.go +++ b/workers/entity/controller.go @@ -140,7 +140,6 @@ func (c *Controller) Start() error { c.ctx, c.consumerID, composeControllerWatcherFilters(), ) - if err != nil { return fmt.Errorf("failed to create consumer for entity controller: %w", err) } diff --git a/workers/entity/worker.go b/workers/entity/worker.go index 070a9711..95026c73 100644 --- a/workers/entity/worker.go +++ b/workers/entity/worker.go @@ -40,9 +40,6 @@ type Worker struct { Entity params.GithubEntity providers map[string]common.Provider scaleSetController *scaleset.Controller - // TODO(gabriel-samfira): replace current pool manager with something similar - // to the scale set controller. - // poolManager *pool.Controller mux sync.Mutex running bool diff --git a/workers/provider/errors.go b/workers/provider/errors.go index d46a721b..40cfc9a8 100644 --- a/workers/provider/errors.go +++ b/workers/provider/errors.go @@ -2,6 +2,4 @@ package provider import "fmt" -var ( - ErrInstanceDeleted = fmt.Errorf("instance deleted") -) +var ErrInstanceDeleted = fmt.Errorf("instance deleted") diff --git a/workers/provider/instance_manager.go b/workers/provider/instance_manager.go index c784b41f..506e6ef1 100644 --- a/workers/provider/instance_manager.go +++ b/workers/provider/instance_manager.go @@ -10,7 +10,6 @@ import ( runnerErrors "github.com/cloudbase/garm-provider-common/errors" commonParams "github.com/cloudbase/garm-provider-common/params" - "github.com/cloudbase/garm/cache" dbCommon "github.com/cloudbase/garm/database/common" "github.com/cloudbase/garm/params" @@ -18,7 +17,7 @@ import ( garmUtil "github.com/cloudbase/garm/util" ) -func NewInstanceManager(ctx context.Context, instance params.Instance, scaleSet params.ScaleSet, provider common.Provider, helper providerHelper) (*instanceManager, error) { +func newInstanceManager(ctx context.Context, instance params.Instance, scaleSet params.ScaleSet, provider common.Provider, helper providerHelper) (*instanceManager, error) { ctx = garmUtil.WithSlogContext(ctx, slog.Any("instance", instance.Name)) githubEntity, err := scaleSet.GetEntity() @@ -146,7 +145,6 @@ func (i *instanceManager) pseudoPoolID() string { } func (i *instanceManager) handleCreateInstanceInProvider(instance params.Instance) error { - // TODO(gabriel-samfira): implement the creation of the instance in the provider. entity, err := i.getEntity() if err != nil { return fmt.Errorf("getting entity: %w", err) diff --git a/workers/provider/provider.go b/workers/provider/provider.go index 0c2cf4df..7d648bd7 100644 --- a/workers/provider/provider.go +++ b/workers/provider/provider.go @@ -7,7 +7,6 @@ import ( "sync" commonParams "github.com/cloudbase/garm-provider-common/params" - "github.com/cloudbase/garm/auth" dbCommon "github.com/cloudbase/garm/database/common" "github.com/cloudbase/garm/database/watcher" @@ -15,9 +14,9 @@ import ( "github.com/cloudbase/garm/runner/common" ) -func NewWorker(ctx context.Context, store dbCommon.Store, providers map[string]common.Provider, tokenGetter auth.InstanceTokenGetter) (*provider, error) { +func NewWorker(ctx context.Context, store dbCommon.Store, providers map[string]common.Provider, tokenGetter auth.InstanceTokenGetter) (*Provider, error) { consumerID := "provider-worker" - return &provider{ + return &Provider{ ctx: context.Background(), store: store, consumerID: consumerID, @@ -28,11 +27,12 @@ func NewWorker(ctx context.Context, store dbCommon.Store, providers map[string]c }, nil } -type provider struct { +type Provider struct { ctx context.Context consumerID string consumer dbCommon.Consumer + // nolint:golangci-lint,godox // TODO: not all workers should have access to the store. // We need to implement way to RPC from workers to controllers // and abstract that into something we can use to eventually @@ -51,7 +51,7 @@ type provider struct { quit chan struct{} } -func (p *provider) loadAllScaleSets() error { +func (p *Provider) loadAllScaleSets() error { scaleSets, err := p.store.ListAllScaleSets(p.ctx) if err != nil { return fmt.Errorf("fetching scale sets: %w", err) @@ -67,7 +67,7 @@ func (p *provider) loadAllScaleSets() error { // loadAllRunners loads all runners from the database. At this stage we only // care about runners created by scale sets, but in the future, we will migrate // the pool manager to the same model. -func (p *provider) loadAllRunners() error { +func (p *Provider) loadAllRunners() error { runners, err := p.store.ListAllInstances(p.ctx) if err != nil { return fmt.Errorf("fetching runners: %w", err) @@ -102,7 +102,7 @@ func (p *provider) loadAllRunners() error { slog.ErrorContext(p.ctx, "provider not found", "provider_name", runner.ProviderName) continue } - instanceManager, err := NewInstanceManager( + instanceManager, err := newInstanceManager( p.ctx, runner, scaleSet, provider, p) if err != nil { return fmt.Errorf("creating instance manager: %w", err) @@ -117,7 +117,7 @@ func (p *provider) loadAllRunners() error { return nil } -func (p *provider) Start() error { +func (p *Provider) Start() error { p.mux.Lock() defer p.mux.Unlock() @@ -147,7 +147,7 @@ func (p *provider) Start() error { return nil } -func (p *provider) Stop() error { +func (p *Provider) Stop() error { p.mux.Lock() defer p.mux.Unlock() @@ -161,7 +161,7 @@ func (p *provider) Stop() error { return nil } -func (p *provider) loop() { +func (p *Provider) loop() { defer p.Stop() for { select { @@ -180,7 +180,7 @@ func (p *provider) loop() { } } -func (p *provider) handleWatcherEvent(payload dbCommon.ChangePayload) { +func (p *Provider) handleWatcherEvent(payload dbCommon.ChangePayload) { switch payload.EntityType { case dbCommon.ScaleSetEntityType: p.handleScaleSetEvent(payload) @@ -191,7 +191,7 @@ func (p *provider) handleWatcherEvent(payload dbCommon.ChangePayload) { } } -func (p *provider) handleScaleSetEvent(event dbCommon.ChangePayload) { +func (p *Provider) handleScaleSetEvent(event dbCommon.ChangePayload) { p.mux.Lock() defer p.mux.Unlock() @@ -214,12 +214,12 @@ func (p *provider) handleScaleSetEvent(event dbCommon.ChangePayload) { } } -func (p *provider) handleInstanceAdded(instance params.Instance) error { +func (p *Provider) handleInstanceAdded(instance params.Instance) error { scaleSet, ok := p.scaleSets[instance.ScaleSetID] if !ok { return fmt.Errorf("scale set not found for instance %s", instance.Name) } - instanceManager, err := NewInstanceManager( + instanceManager, err := newInstanceManager( p.ctx, instance, scaleSet, p.providers[instance.ProviderName], p) if err != nil { return fmt.Errorf("creating instance manager: %w", err) @@ -231,7 +231,7 @@ func (p *provider) handleInstanceAdded(instance params.Instance) error { return nil } -func (p *provider) handleInstanceEvent(event dbCommon.ChangePayload) { +func (p *Provider) handleInstanceEvent(event dbCommon.ChangePayload) { p.mux.Lock() defer p.mux.Unlock() diff --git a/workers/provider/provider_helper.go b/workers/provider/provider_helper.go index d420cdad..6a53bab3 100644 --- a/workers/provider/provider_helper.go +++ b/workers/provider/provider_helper.go @@ -17,7 +17,7 @@ type providerHelper interface { GetGithubEntity(entity params.GithubEntity) (params.GithubEntity, error) } -func (p *provider) updateArgsFromProviderInstance(instanceName string, providerInstance commonParams.ProviderInstance) (params.Instance, error) { +func (p *Provider) updateArgsFromProviderInstance(instanceName string, providerInstance commonParams.ProviderInstance) (params.Instance, error) { updateParams := params.UpdateInstanceParams{ ProviderID: providerInstance.ProviderID, OSName: providerInstance.OSName, @@ -34,7 +34,7 @@ func (p *provider) updateArgsFromProviderInstance(instanceName string, providerI return updated, nil } -func (p *provider) GetControllerInfo() (params.ControllerInfo, error) { +func (p *Provider) GetControllerInfo() (params.ControllerInfo, error) { p.mux.Lock() defer p.mux.Unlock() @@ -46,7 +46,7 @@ func (p *provider) GetControllerInfo() (params.ControllerInfo, error) { return info, nil } -func (p *provider) SetInstanceStatus(instanceName string, status commonParams.InstanceStatus, providerFault []byte) error { +func (p *Provider) SetInstanceStatus(instanceName string, status commonParams.InstanceStatus, providerFault []byte) error { p.mux.Lock() defer p.mux.Unlock() @@ -67,11 +67,11 @@ func (p *provider) SetInstanceStatus(instanceName string, status commonParams.In return nil } -func (p *provider) InstanceTokenGetter() auth.InstanceTokenGetter { +func (p *Provider) InstanceTokenGetter() auth.InstanceTokenGetter { return p.tokenGetter } -func (p *provider) GetGithubEntity(entity params.GithubEntity) (params.GithubEntity, error) { +func (p *Provider) GetGithubEntity(entity params.GithubEntity) (params.GithubEntity, error) { ghEntity, err := p.store.GetGithubEntity(p.ctx, entity.EntityType, entity.ID) if err != nil { return params.GithubEntity{}, fmt.Errorf("getting github entity: %w", err) diff --git a/workers/provider/util.go b/workers/provider/util.go index 2d84e25e..1868611e 100644 --- a/workers/provider/util.go +++ b/workers/provider/util.go @@ -2,7 +2,6 @@ package provider import ( commonParams "github.com/cloudbase/garm-provider-common/params" - dbCommon "github.com/cloudbase/garm/database/common" "github.com/cloudbase/garm/database/watcher" ) diff --git a/workers/scaleset/controller.go b/workers/scaleset/controller.go index 24d1aad3..a5c198e7 100644 --- a/workers/scaleset/controller.go +++ b/workers/scaleset/controller.go @@ -9,7 +9,6 @@ import ( "time" runnerErrors "github.com/cloudbase/garm-provider-common/errors" - "github.com/cloudbase/garm/cache" dbCommon "github.com/cloudbase/garm/database/common" "github.com/cloudbase/garm/database/watcher" @@ -20,7 +19,6 @@ import ( ) func NewController(ctx context.Context, store dbCommon.Store, entity params.GithubEntity, providers map[string]common.Provider) (*Controller, error) { - consumerID := fmt.Sprintf("scaleset-worker-%s", entity.String()) ctx = garmUtil.WithSlogContext( @@ -28,31 +26,22 @@ func NewController(ctx context.Context, store dbCommon.Store, entity params.Gith slog.Any("worker", consumerID)) return &Controller{ - ctx: ctx, - consumerID: consumerID, - ScaleSets: make(map[uint]*scaleSet), - Entity: entity, - providers: providers, - store: store, - statusUpdates: make(chan scaleSetStatus, 10), + ctx: ctx, + consumerID: consumerID, + ScaleSets: make(map[uint]*scaleSet), + Entity: entity, + providers: providers, + store: store, }, nil } type scaleSet struct { scaleSet params.ScaleSet - status scaleSetStatus worker *Worker mux sync.Mutex } -func (s *scaleSet) updateStatus(status scaleSetStatus) { - s.mux.Lock() - defer s.mux.Unlock() - - s.status = status -} - func (s *scaleSet) Stop() error { s.mux.Lock() defer s.mux.Unlock() @@ -80,8 +69,6 @@ type Controller struct { ghCli common.GithubClient forgeCredsAreValid bool - statusUpdates chan scaleSetStatus - mux sync.Mutex running bool quit chan struct{} @@ -162,8 +149,6 @@ func (c *Controller) Stop() error { c.running = false close(c.quit) c.quit = nil - close(c.statusUpdates) - c.statusUpdates = nil c.consumer.Close() return nil @@ -180,6 +165,7 @@ func (c *Controller) updateTools() error { slog.With(slog.Any("error", err)).ErrorContext( c.ctx, "failed to update tools for entity", "entity", c.Entity.String()) if errors.Is(err, runnerErrors.ErrUnauthorized) { + // nolint:golangci-lint,godox // TODO: block all scale sets c.forgeCredsAreValid = false } @@ -191,21 +177,6 @@ func (c *Controller) updateTools() error { return nil } -func (c *Controller) handleScaleSetStatusUpdates(status scaleSetStatus) { - if status.scaleSet.ID == 0 { - slog.DebugContext(c.ctx, "invalid scale set ID; ignoring") - return - } - - scaleSet, ok := c.ScaleSets[status.scaleSet.ID] - if !ok { - slog.DebugContext(c.ctx, "scale set not found; ignoring") - return - } - - scaleSet.updateStatus(status) -} - func (c *Controller) loop() { defer c.Stop() updateToolsTicker := time.NewTicker(common.PoolToolUpdateInterval) @@ -231,11 +202,6 @@ func (c *Controller) loop() { case <-c.ctx.Done(): return case <-initialToolUpdate: - case update, ok := <-c.statusUpdates: - if !ok { - return - } - go c.handleScaleSetStatusUpdates(update) case _, ok := <-updateToolsTicker.C: if !ok { slog.InfoContext(c.ctx, "update tools ticker closed") diff --git a/workers/scaleset/controller_watcher.go b/workers/scaleset/controller_watcher.go index 591e768e..04cfe1cd 100644 --- a/workers/scaleset/controller_watcher.go +++ b/workers/scaleset/controller_watcher.go @@ -88,8 +88,8 @@ func (c *Controller) handleScaleSetCreateOperation(sSet params.ScaleSet, ghCli c } c.ScaleSets[sSet.ID] = &scaleSet{ scaleSet: sSet, - status: scaleSetStatus{}, - worker: worker, + // status: scaleSetStatus{}, + worker: worker, } return nil } @@ -117,7 +117,7 @@ func (c *Controller) handleScaleSetUpdateOperation(sSet params.ScaleSet) error { defer c.mux.Unlock() if _, ok := c.ScaleSets[sSet.ID]; !ok { - // Some error may have occured when the scale set was first created, so we + // Some error may have occurred when the scale set was first created, so we // attempt to create it after the user updated the scale set, hopefully // fixing the reason for the failure. return c.handleScaleSetCreateOperation(sSet, c.ghCli) diff --git a/workers/scaleset/scaleset.go b/workers/scaleset/scaleset.go index a0a5d657..a2ca2515 100644 --- a/workers/scaleset/scaleset.go +++ b/workers/scaleset/scaleset.go @@ -10,7 +10,6 @@ import ( runnerErrors "github.com/cloudbase/garm-provider-common/errors" commonParams "github.com/cloudbase/garm-provider-common/params" - "github.com/cloudbase/garm-provider-common/util" dbCommon "github.com/cloudbase/garm/database/common" "github.com/cloudbase/garm/database/watcher" @@ -127,8 +126,9 @@ func (w *Worker) Start() (err error) { if !errors.Is(err, runnerErrors.ErrNotFound) { if errors.Is(err, runnerErrors.ErrUnauthorized) { // we don't have access to remove the runner. This implies that our - // credentials may have expired or ar incorect. + // credentials may have expired or ar incorrect. // + // nolint:golangci-lint,godox // TODO(gabriel-samfira): we need to set the scale set as inactive and stop the listener (if any). slog.ErrorContext(w.ctx, "error removing runner", "runner_name", instance.Name, "error", err) w.runners[instance.ID] = instance @@ -282,6 +282,7 @@ func (w *Worker) handleScaleSetEvent(event dbCommon.ChangePayload) { slog.ErrorContext(w.ctx, "error stopping listener", "error", err) } } + // nolint:golangci-lint,godox // TODO: should we kick off auto-scaling if desired runner count changes? w.scaleSet = scaleSet w.mux.Unlock() @@ -345,6 +346,7 @@ func (w *Worker) handleInstanceEntityEvent(event dbCommon.ChangePayload) { return } if status != string(params.RunnerIdle) && status != string(params.RunnerActive) { + // nolint:golangci-lint,godox // TODO: Wait for the status to change for a while (30 seconds?). Mark the instance as // pending_delete if the runner never comes online. w.mux.Unlock() @@ -440,7 +442,7 @@ func (w *Worker) keepListenerAlive() { w.mux.Unlock() for { w.mux.Lock() - w.listener.Stop() //cleanup + w.listener.Stop() // cleanup if !w.scaleSet.Enabled { w.mux.Unlock() break @@ -449,12 +451,13 @@ func (w *Worker) keepListenerAlive() { if err := w.listener.Start(); err != nil { w.mux.Unlock() slog.ErrorContext(w.ctx, "error restarting listener", "error", err) - if backoff > 60*time.Second { + switch { + case backoff > 60*time.Second: backoff = 60 * time.Second - } else if backoff == 0 { + case backoff == 0: backoff = 5 * time.Second slog.InfoContext(w.ctx, "backing off restart attempt", "backoff", backoff) - } else { + default: backoff *= 2 } slog.ErrorContext(w.ctx, "error restarting listener", "error", err, "backoff", backoff) @@ -512,7 +515,7 @@ func (w *Worker) handleScaleUp(target, current uint) { CreateAttempt: 1, GitHubRunnerGroup: w.scaleSet.GitHubRunnerGroup, JitConfiguration: decodedJit, - AgentID: int64(jitConfig.Runner.ID), + AgentID: jitConfig.Runner.ID, } if _, err := w.store.CreateScaleSetInstance(w.ctx, w.scaleSet.ID, runnerParams); err != nil { @@ -618,6 +621,7 @@ func (w *Worker) handleScaleDown(target, current uint) { locking.Unlock(runner.Name, true) continue } + // nolint:golangci-lint,godox // TODO: This should not happen, unless there is some issue with the database. // The UpdateInstance() function should add tenacity, but even in that case, if it // still errors out, we need to handle it somehow. diff --git a/workers/scaleset/scaleset_helper.go b/workers/scaleset/scaleset_helper.go index 0cf01025..b83351f2 100644 --- a/workers/scaleset/scaleset_helper.go +++ b/workers/scaleset/scaleset_helper.go @@ -7,7 +7,6 @@ import ( runnerErrors "github.com/cloudbase/garm-provider-common/errors" commonParams "github.com/cloudbase/garm-provider-common/params" - "github.com/cloudbase/garm/locking" "github.com/cloudbase/garm/params" "github.com/cloudbase/garm/util/github/scalesets" diff --git a/workers/scaleset/scaleset_listener.go b/workers/scaleset/scaleset_listener.go index 9fbf9a7e..77f4077b 100644 --- a/workers/scaleset/scaleset_listener.go +++ b/workers/scaleset/scaleset_listener.go @@ -123,15 +123,15 @@ func (l *scaleSetListener) handleSessionMessage(msg params.RunnerScaleSetMessage for _, job := range body { switch job.MessageType { case params.MessageTypeJobAssigned: - slog.InfoContext(l.ctx, "new job assigned", "job_id", job.RunnerRequestId, "job_name", job.JobDisplayName) + slog.InfoContext(l.ctx, "new job assigned", "job_id", job.RunnerRequestID, "job_name", job.JobDisplayName) case params.MessageTypeJobStarted: - slog.InfoContext(l.ctx, "job started", "job_id", job.RunnerRequestId, "job_name", job.JobDisplayName, "runner_name", job.RunnerName) + slog.InfoContext(l.ctx, "job started", "job_id", job.RunnerRequestID, "job_name", job.JobDisplayName, "runner_name", job.RunnerName) startedJobs = append(startedJobs, job) case params.MessageTypeJobCompleted: - slog.InfoContext(l.ctx, "job completed", "job_id", job.RunnerRequestId, "job_name", job.JobDisplayName, "runner_name", job.RunnerName) + slog.InfoContext(l.ctx, "job completed", "job_id", job.RunnerRequestID, "job_name", job.JobDisplayName, "runner_name", job.RunnerName) completedJobs = append(completedJobs, job) case params.MessageTypeJobAvailable: - slog.InfoContext(l.ctx, "job available", "job_id", job.RunnerRequestId, "job_name", job.JobDisplayName) + slog.InfoContext(l.ctx, "job available", "job_id", job.RunnerRequestID, "job_name", job.JobDisplayName) availableJobs = append(availableJobs, job) default: slog.DebugContext(l.ctx, "unknown message type", "message_type", job.MessageType) @@ -139,13 +139,13 @@ func (l *scaleSetListener) handleSessionMessage(msg params.RunnerScaleSetMessage } if len(availableJobs) > 0 { - jobIds := make([]int64, len(availableJobs)) + jobIDs := make([]int64, len(availableJobs)) for idx, job := range availableJobs { - jobIds[idx] = job.RunnerRequestId + jobIDs[idx] = job.RunnerRequestID } idsAcquired, err := l.scaleSetHelper.ScaleSetCLI().AcquireJobs( l.listenerCtx, l.scaleSetHelper.GetScaleSet().ScaleSetID, - l.messageSession.MessageQueueAccessToken(), jobIds) + l.messageSession.MessageQueueAccessToken(), jobIDs) if err != nil { // don't mark message as processed. It will be requeued. slog.ErrorContext(l.ctx, "acquiring jobs", "error", err) @@ -201,7 +201,8 @@ func (l *scaleSetListener) loop() { return default: slog.DebugContext(l.ctx, "getting message", "last_message_id", l.lastMessageID, "max_runners", l.scaleSetHelper.GetScaleSet().MaxRunners) - // TODO: consume initial message on startup and consolidate. + // nolint:golangci-lint,godox + // TODO(gabriel-samfira): consume initial message on startup and consolidate. // The scale set may have undergone several messages while GARM was // down. msg, err := l.messageSession.GetMessage( diff --git a/workers/scaleset/status.go b/workers/scaleset/status.go deleted file mode 100644 index 29d9ae4f..00000000 --- a/workers/scaleset/status.go +++ /dev/null @@ -1,13 +0,0 @@ -package scaleset - -import ( - "time" - - "github.com/cloudbase/garm/params" -) - -type scaleSetStatus struct { - err error - heartbeat time.Time - scaleSet params.ScaleSet -}