From cb4d56773f4d85d9f2ef028d47dbd24a0a0e1ae7 Mon Sep 17 00:00:00 2001 From: Gabriel Adrian Samfira Date: Mon, 1 Apr 2024 14:48:31 +0000 Subject: [PATCH] Remove some code, move some code around Remove code that was just wrapping other functions at this point, and move some code around. We need to get a better idea what is actually still needed in the pool manager, to begin to refactor it into something that can scale out. Signed-off-by: Gabriel Adrian Samfira --- runner/pool/locking.go | 29 +++++++++ runner/pool/pool.go | 143 +++++++---------------------------------- runner/pool/util.go | 43 +++++++++++++ 3 files changed, 96 insertions(+), 119 deletions(-) create mode 100644 runner/pool/locking.go diff --git a/runner/pool/locking.go b/runner/pool/locking.go new file mode 100644 index 00000000..7f6c7bc6 --- /dev/null +++ b/runner/pool/locking.go @@ -0,0 +1,29 @@ +package pool + +import "sync" + +type keyMutex struct { + muxes sync.Map +} + +func (k *keyMutex) TryLock(key string) bool { + mux, _ := k.muxes.LoadOrStore(key, &sync.Mutex{}) + keyMux := mux.(*sync.Mutex) + return keyMux.TryLock() +} + +func (k *keyMutex) Unlock(key string, remove bool) { + mux, ok := k.muxes.Load(key) + if !ok { + return + } + keyMux := mux.(*sync.Mutex) + if remove { + k.Delete(key) + } + keyMux.Unlock() +} + +func (k *keyMutex) Delete(key string) { + k.muxes.Delete(key) +} diff --git a/runner/pool/pool.go b/runner/pool/pool.go index 5d7e17a1..4508f55e 100644 --- a/runner/pool/pool.go +++ b/runner/pool/pool.go @@ -62,32 +62,6 @@ const ( maxCreateAttempts = 5 ) -type keyMutex struct { - muxes sync.Map -} - -func (k *keyMutex) TryLock(key string) bool { - mux, _ := k.muxes.LoadOrStore(key, &sync.Mutex{}) - keyMux := mux.(*sync.Mutex) - return keyMux.TryLock() -} - -func (k *keyMutex) Unlock(key string, remove bool) { - mux, ok := k.muxes.Load(key) - if !ok { - return - } - keyMux := mux.(*sync.Mutex) - if remove { - k.Delete(key) - } - keyMux.Unlock() -} - -func (k *keyMutex) Delete(key string) { - k.muxes.Delete(key) -} - type urls struct { callbackURL string metadataURL string @@ -288,7 +262,7 @@ func (r *basePoolManager) HandleWorkflowJob(job params.WorkflowJob) error { // A runner has picked up the job, and is now running it. It may need to be replaced if the pool has // a minimum number of idle runners configured. - pool, err := r.GetPoolByID(instance.PoolID) + pool, err := r.store.GetEntityPool(r.ctx, r.entity, instance.PoolID) if err != nil { return errors.Wrap(err, "getting pool") } @@ -386,37 +360,6 @@ func (r *basePoolManager) updateTools() error { return err } -func controllerIDFromLabels(labels []string) string { - for _, lbl := range labels { - if strings.HasPrefix(lbl, controllerLabelPrefix) { - return lbl[len(controllerLabelPrefix):] - } - } - return "" -} - -func labelsFromRunner(runner *github.Runner) []string { - if runner == nil || runner.Labels == nil { - return []string{} - } - - var labels []string - for _, val := range runner.Labels { - if val == nil { - continue - } - labels = append(labels, val.GetName()) - } - return labels -} - -// isManagedRunner returns true if labels indicate the runner belongs to a pool -// this manager is responsible for. -func (r *basePoolManager) isManagedRunner(labels []string) bool { - runnerControllerID := controllerIDFromLabels(labels) - return runnerControllerID == r.controllerID -} - // cleanupOrphanedProviderRunners compares runners in github with local runners and removes // any local runners that are not present in Github. Runners that are "idle" in our // provider, but do not exist in github, will be removed. This can happen if the @@ -425,14 +368,14 @@ func (r *basePoolManager) isManagedRunner(labels []string) bool { // If we were offline and did not process the webhook, the instance will linger. // We need to remove it from the provider and database. func (r *basePoolManager) cleanupOrphanedProviderRunners(runners []*github.Runner) error { - dbInstances, err := r.FetchDbInstances() + dbInstances, err := r.store.ListEntityInstances(r.ctx, r.entity) if err != nil { return errors.Wrap(err, "fetching instances from db") } runnerNames := map[string]bool{} for _, run := range runners { - if !r.isManagedRunner(labelsFromRunner(run)) { + if !isManagedRunner(labelsFromRunner(run), r.controllerID) { slog.DebugContext( r.ctx, "runner is not managed by a pool we manage", "runner_name", run.GetName()) @@ -460,7 +403,7 @@ func (r *basePoolManager) cleanupOrphanedProviderRunners(runners []*github.Runne continue } - pool, err := r.GetPoolByID(instance.PoolID) + pool, err := r.store.GetEntityPool(r.ctx, r.entity, instance.PoolID) if err != nil { return errors.Wrap(err, "fetching instance pool info") } @@ -501,14 +444,14 @@ func (r *basePoolManager) cleanupOrphanedProviderRunners(runners []*github.Runne // of "running" in the provider, but that has not registered with Github, and has // received no new updates in the configured timeout interval. func (r *basePoolManager) reapTimedOutRunners(runners []*github.Runner) error { - dbInstances, err := r.FetchDbInstances() + dbInstances, err := r.store.ListEntityInstances(r.ctx, r.entity) if err != nil { return errors.Wrap(err, "fetching instances from db") } runnersByName := map[string]*github.Runner{} for _, run := range runners { - if !r.isManagedRunner(labelsFromRunner(run)) { + if !isManagedRunner(labelsFromRunner(run), r.controllerID) { slog.DebugContext( r.ctx, "runner is not managed by a pool we manage", "runner_name", run.GetName()) @@ -530,7 +473,7 @@ func (r *basePoolManager) reapTimedOutRunners(runners []*github.Runner) error { } defer r.keyMux.Unlock(instance.Name, false) - pool, err := r.GetPoolByID(instance.PoolID) + pool, err := r.store.GetEntityPool(r.ctx, r.entity, instance.PoolID) if err != nil { return errors.Wrap(err, "fetching instance pool info") } @@ -558,15 +501,6 @@ func (r *basePoolManager) reapTimedOutRunners(runners []*github.Runner) error { return nil } -func instanceInList(instanceName string, instances []commonParams.ProviderInstance) (commonParams.ProviderInstance, bool) { - for _, val := range instances { - if val.Name == instanceName { - return val, true - } - } - return commonParams.ProviderInstance{}, false -} - // cleanupOrphanedGithubRunners will forcefully remove any github runners that appear // as offline and for which we no longer have a local instance. // This may happen if someone manually deletes the instance in the provider. We need to @@ -575,7 +509,7 @@ func (r *basePoolManager) cleanupOrphanedGithubRunners(runners []*github.Runner) poolInstanceCache := map[string][]commonParams.ProviderInstance{} g, ctx := errgroup.WithContext(r.ctx) for _, runner := range runners { - if !r.isManagedRunner(labelsFromRunner(runner)) { + if !isManagedRunner(labelsFromRunner(runner), r.controllerID) { slog.DebugContext( r.ctx, "runner is not managed by a pool we manage", "runner_name", runner.GetName()) @@ -598,7 +532,7 @@ func (r *basePoolManager) cleanupOrphanedGithubRunners(runners []*github.Runner) slog.InfoContext( r.ctx, "Runner has no database entry in garm, removing from github", "runner_name", runner.GetName()) - resp, err := r.RemoveGithubRunner(*runner.ID) + resp, err := r.ghcli.RemoveEntityRunner(r.ctx, runner.GetID()) if err != nil { // Removed in the meantime? if resp != nil && resp.StatusCode == http.StatusNotFound { @@ -632,7 +566,7 @@ func (r *basePoolManager) cleanupOrphanedGithubRunners(runners []*github.Runner) } } - pool, err := r.GetPoolByID(dbInstance.PoolID) + pool, err := r.store.GetEntityPool(r.ctx, r.entity, dbInstance.PoolID) if err != nil { return errors.Wrap(err, "fetching pool") } @@ -678,7 +612,7 @@ func (r *basePoolManager) cleanupOrphanedGithubRunners(runners []*github.Runner) slog.InfoContext( r.ctx, "Runner instance is no longer on the provider, removing from github", "runner_name", dbInstance.Name) - resp, err := r.RemoveGithubRunner(*runner.ID) + resp, err := r.ghcli.RemoveEntityRunner(r.ctx, runner.GetID()) if err != nil { // Removed in the meantime? if resp != nil && resp.StatusCode == http.StatusNotFound { @@ -749,16 +683,7 @@ func (r *basePoolManager) setInstanceRunnerStatus(runnerName string, status para updateParams := params.UpdateInstanceParams{ RunnerStatus: status, } - - instance, err := r.updateInstance(runnerName, updateParams) - if err != nil { - return params.Instance{}, errors.Wrap(err, "updating runner state") - } - return instance, nil -} - -func (r *basePoolManager) updateInstance(runnerName string, update params.UpdateInstanceParams) (params.Instance, error) { - instance, err := r.store.UpdateInstance(r.ctx, runnerName, update) + instance, err := r.store.UpdateInstance(r.ctx, runnerName, updateParams) if err != nil { return params.Instance{}, errors.Wrap(err, "updating runner state") } @@ -771,7 +696,7 @@ func (r *basePoolManager) setInstanceStatus(runnerName string, status commonPara ProviderFault: providerFault, } - instance, err := r.updateInstance(runnerName, updateParams) + instance, err := r.store.UpdateInstance(r.ctx, runnerName, updateParams) if err != nil { return params.Instance{}, errors.Wrap(err, "updating runner state") } @@ -779,7 +704,7 @@ func (r *basePoolManager) setInstanceStatus(runnerName string, status commonPara } func (r *basePoolManager) AddRunner(ctx context.Context, poolID string, aditionalLabels []string) (err error) { - pool, err := r.GetPoolByID(poolID) + pool, err := r.store.GetEntityPool(r.ctx, r.entity, poolID) if err != nil { return errors.Wrap(err, "fetching pool") } @@ -838,7 +763,7 @@ func (r *basePoolManager) AddRunner(ctx context.Context, poolID string, aditiona } if runner != nil { - _, runnerCleanupErr := r.RemoveGithubRunner(runner.GetID()) + _, runnerCleanupErr := r.ghcli.RemoveEntityRunner(r.ctx, runner.GetID()) if err != nil { slog.With(slog.Any("error", runnerCleanupErr)).ErrorContext( ctx, "failed to remove runner", @@ -888,7 +813,7 @@ func (r *basePoolManager) getLabelsForInstance(pool params.Pool) []string { } func (r *basePoolManager) addInstanceToProvider(instance params.Instance) error { - pool, err := r.GetPoolByID(instance.PoolID) + pool, err := r.store.GetEntityPool(r.ctx, r.entity, instance.PoolID) if err != nil { return errors.Wrap(err, "fetching pool") } @@ -1332,7 +1257,7 @@ func (r *basePoolManager) retryFailedInstancesForOnePool(ctx context.Context, po ctx, "queueing previously failed instance for retry", "runner_name", instance.Name) // Set instance to pending create and wait for retry. - if _, err := r.updateInstance(instance.Name, updateParams); err != nil { + if _, err := r.store.UpdateInstance(r.ctx, instance.Name, updateParams); err != nil { slog.With(slog.Any("error", err)).ErrorContext( ctx, "failed to update runner status", "runner_name", instance.Name) @@ -1347,7 +1272,7 @@ func (r *basePoolManager) retryFailedInstancesForOnePool(ctx context.Context, po } func (r *basePoolManager) retryFailedInstances() error { - pools, err := r.ListPools() + pools, err := r.store.ListEntityPools(r.ctx, r.entity) if err != nil { return fmt.Errorf("error listing pools: %w", err) } @@ -1370,7 +1295,7 @@ func (r *basePoolManager) retryFailedInstances() error { } func (r *basePoolManager) scaleDown() error { - pools, err := r.ListPools() + pools, err := r.store.ListEntityPools(r.ctx, r.entity) if err != nil { return fmt.Errorf("error listing pools: %w", err) } @@ -1391,7 +1316,7 @@ func (r *basePoolManager) scaleDown() error { } func (r *basePoolManager) ensureMinIdleRunners() error { - pools, err := r.ListPools() + pools, err := r.store.ListEntityPools(r.ctx, r.entity) if err != nil { return fmt.Errorf("error listing pools: %w", err) } @@ -1411,7 +1336,7 @@ func (r *basePoolManager) ensureMinIdleRunners() error { } func (r *basePoolManager) deleteInstanceFromProvider(ctx context.Context, instance params.Instance) error { - pool, err := r.GetPoolByID(instance.PoolID) + pool, err := r.store.GetEntityPool(r.ctx, r.entity, instance.PoolID) if err != nil { return errors.Wrap(err, "fetching pool") } @@ -1441,7 +1366,7 @@ func (r *basePoolManager) deleteInstanceFromProvider(ctx context.Context, instan } func (r *basePoolManager) deletePendingInstances() error { - instances, err := r.FetchDbInstances() + instances, err := r.store.ListEntityInstances(r.ctx, r.entity) if err != nil { return fmt.Errorf("failed to fetch instances from store: %w", err) } @@ -1529,7 +1454,7 @@ func (r *basePoolManager) deletePendingInstances() error { func (r *basePoolManager) addPendingInstances() error { // nolint:golangci-lint,godox // TODO: filter instances by status. - instances, err := r.FetchDbInstances() + instances, err := r.store.ListEntityInstances(r.ctx, r.entity) if err != nil { return fmt.Errorf("failed to fetch instances from store: %w", err) } @@ -1717,7 +1642,7 @@ func (r *basePoolManager) DeleteRunner(runner params.Instance, forceRemove, bypa return runnerErrors.NewConflictError("pool manager is not running for %s", r.entity.String()) } if runner.AgentID != 0 { - resp, err := r.RemoveGithubRunner(runner.AgentID) + resp, err := r.ghcli.RemoveEntityRunner(r.ctx, runner.AgentID) if err != nil { if resp != nil { switch resp.StatusCode { @@ -2083,14 +2008,6 @@ func (r *basePoolManager) GithubRunnerRegistrationToken() (string, error) { return *tk.Token, nil } -func (r *basePoolManager) RemoveGithubRunner(runnerID int64) (*github.Response, error) { - ghResp, err := r.ghcli.RemoveEntityRunner(r.ctx, runnerID) - if err != nil { - return nil, fmt.Errorf("removing runner: %w", err) - } - return ghResp, nil -} - func (r *basePoolManager) FetchTools() ([]commonParams.RunnerApplicationDownload, error) { tools, ghResp, err := r.ghcli.ListEntityRunnerApplicationDownloads(r.ctx) if err != nil { @@ -2153,18 +2070,6 @@ func (r *basePoolManager) GithubURL() string { return "" } -func (r *basePoolManager) FetchDbInstances() ([]params.Instance, error) { - return r.store.ListEntityInstances(r.ctx, r.entity) -} - -func (r *basePoolManager) ListPools() ([]params.Pool, error) { - return r.store.ListEntityPools(r.ctx, r.entity) -} - -func (r *basePoolManager) GetPoolByID(poolID string) (params.Pool, error) { - return r.store.GetEntityPool(r.ctx, r.entity, poolID) -} - func (r *basePoolManager) GetWebhookInfo(ctx context.Context) (params.HookInfo, error) { allHooks, err := r.listHooks(ctx) if err != nil { diff --git a/runner/pool/util.go b/runner/pool/util.go index 5a3a3c8c..8ceea49d 100644 --- a/runner/pool/util.go +++ b/runner/pool/util.go @@ -6,7 +6,10 @@ import ( "sync" "sync/atomic" + "github.com/google/go-github/v57/github" + runnerErrors "github.com/cloudbase/garm-provider-common/errors" + commonParams "github.com/cloudbase/garm-provider-common/params" "github.com/cloudbase/garm/params" ) @@ -73,3 +76,43 @@ func (p *poolsForTags) Add(tags []string, pools []params.Pool) poolCacheStore { v, _ := p.pools.LoadOrStore(key, poolRR) return v.(*poolRoundRobin) } + +func instanceInList(instanceName string, instances []commonParams.ProviderInstance) (commonParams.ProviderInstance, bool) { + for _, val := range instances { + if val.Name == instanceName { + return val, true + } + } + return commonParams.ProviderInstance{}, false +} + +func controllerIDFromLabels(labels []string) string { + for _, lbl := range labels { + if strings.HasPrefix(lbl, controllerLabelPrefix) { + return lbl[len(controllerLabelPrefix):] + } + } + return "" +} + +func labelsFromRunner(runner *github.Runner) []string { + if runner == nil || runner.Labels == nil { + return []string{} + } + + var labels []string + for _, val := range runner.Labels { + if val == nil { + continue + } + labels = append(labels, val.GetName()) + } + return labels +} + +// isManagedRunner returns true if labels indicate the runner belongs to a pool +// this manager is responsible for. +func isManagedRunner(labels []string, controllerID string) bool { + runnerControllerID := controllerIDFromLabels(labels) + return runnerControllerID == controllerID +}