From 7358beb2b9dd400f9a93922f27fbe840a09c7e8d Mon Sep 17 00:00:00 2001 From: Gabriel Adrian Samfira Date: Fri, 30 Jun 2023 08:48:29 +0300 Subject: [PATCH 1/3] Merge Unlock() and UnlockAndDelete() Signed-off-by: Gabriel Adrian Samfira --- runner/pool/pool.go | 40 ++++++++++++++++++++++------------------ 1 file changed, 22 insertions(+), 18 deletions(-) diff --git a/runner/pool/pool.go b/runner/pool/pool.go index 4a677fb1..2097af7a 100644 --- a/runner/pool/pool.go +++ b/runner/pool/pool.go @@ -59,12 +59,15 @@ func (k *keyMutex) TryLock(key string) bool { return keyMux.TryLock() } -func (k *keyMutex) Unlock(key string) { +func (k *keyMutex) Unlock(key string, remove bool) { mux, ok := k.muxes.Load(key) if !ok { return } keyMux := mux.(*sync.Mutex) + if remove { + k.Delete(key) + } keyMux.Unlock() } @@ -72,11 +75,6 @@ func (k *keyMutex) Delete(key string) { k.muxes.Delete(key) } -func (k *keyMutex) UnlockAndDelete(key string) { - k.Unlock(key) - k.Delete(key) -} - type basePoolManager struct { ctx context.Context controllerID string @@ -323,7 +321,7 @@ func (r *basePoolManager) cleanupOrphanedProviderRunners(runners []*github.Runne log.Printf("failed to acquire lock for instance %s", instance.Name) continue } - defer r.keyMux.Unlock(instance.Name) + defer r.keyMux.Unlock(instance.Name, false) switch providerCommon.InstanceStatus(instance.Status) { case providerCommon.InstancePendingCreate, @@ -382,7 +380,7 @@ func (r *basePoolManager) reapTimedOutRunners(runners []*github.Runner) error { log.Printf("failed to acquire lock for instance %s", instance.Name) continue } - defer r.keyMux.Unlock(instance.Name) + defer r.keyMux.Unlock(instance.Name, false) pool, err := r.store.GetPoolByID(r.ctx, instance.PoolID) if err != nil { @@ -501,11 +499,14 @@ func (r *basePoolManager) cleanupOrphanedGithubRunners(runners []*github.Runner) log.Printf("failed to acquire lock for instance %s", dbInstance.Name) continue } - defer r.keyMux.Unlock(dbInstance.Name) // See: https://golang.org/doc/faq#closures_and_goroutines runner := runner g.Go(func() error { + deleteMux := false + defer func() { + r.keyMux.Unlock(dbInstance.Name, deleteMux) + }() providerInstance, ok := instanceInList(dbInstance.Name, poolInstances) if !ok { // The runner instance is no longer on the provider, and it appears offline in github. @@ -525,7 +526,7 @@ func (r *basePoolManager) cleanupOrphanedGithubRunners(runners []*github.Runner) if err := r.store.DeleteInstance(ctx, dbInstance.PoolID, dbInstance.Name); err != nil { return errors.Wrap(err, "removing runner from database") } - defer r.keyMux.UnlockAndDelete(dbInstance.Name) + deleteMux = true return nil } @@ -910,7 +911,7 @@ func (r *basePoolManager) scaleDownOnePool(ctx context.Context, pool params.Pool log.Printf("failed to acquire lock for instance %s", instanceToDelete.Name) continue } - defer r.keyMux.Unlock(instanceToDelete.Name) + defer r.keyMux.Unlock(instanceToDelete.Name, false) g.Go(func() error { log.Printf("scaling down idle worker %s from pool %s\n", instanceToDelete.Name, pool.ID) @@ -997,7 +998,7 @@ func (r *basePoolManager) retryFailedInstancesForOnePool(ctx context.Context, po instance := instance g.Go(func() error { - defer r.keyMux.Unlock(instance.Name) + defer r.keyMux.Unlock(instance.Name, false) // NOTE(gabriel-samfira): this is done in parallel. If there are many failed instances // this has the potential to create many API requests to the target provider. // TODO(gabriel-samfira): implement request throttling. @@ -1143,13 +1144,16 @@ func (r *basePoolManager) deletePendingInstances() error { // Set the status to deleting before launching the goroutine that removes // the runner from the provider (which can take a long time). if err := r.setInstanceStatus(instance.Name, providerCommon.InstanceDeleting, nil); err != nil { - log.Printf("failed to update runner %s status", instance.Name) - r.keyMux.Unlock(instance.Name) + log.Printf("failed to update runner %s status: %s", instance.Name, err) + r.keyMux.Unlock(instance.Name, false) continue } go func(instance params.Instance) (err error) { - defer r.keyMux.Unlock(instance.Name) + deleteMux := false + defer func() { + r.keyMux.Unlock(instance.Name, deleteMux) + }() defer func(instance params.Instance) { if err != nil { // failed to remove from provider. Set the status back to pending_delete, which @@ -1168,7 +1172,7 @@ func (r *basePoolManager) deletePendingInstances() error { if deleteErr := r.store.DeleteInstance(r.ctx, instance.PoolID, instance.Name); deleteErr != nil { return fmt.Errorf("failed to delete instance from database: %w", deleteErr) } - r.keyMux.UnlockAndDelete(instance.Name) + deleteMux = true return nil }(instance) //nolint } @@ -1198,14 +1202,14 @@ func (r *basePoolManager) addPendingInstances() error { // won't attempt to create the runner a second time. if err := r.setInstanceStatus(instance.Name, providerCommon.InstanceCreating, nil); err != nil { log.Printf("failed to update runner %s status: %s", instance.Name, err) - r.keyMux.Unlock(instance.Name) + r.keyMux.Unlock(instance.Name, false) // We failed to transition the instance to Creating. This means that garm will retry to create this instance // when the loop runs again and we end up with multiple instances. continue } go func(instance params.Instance) { - defer r.keyMux.Unlock(instance.Name) + defer r.keyMux.Unlock(instance.Name, false) log.Printf("creating instance %s in pool %s", instance.Name, instance.PoolID) if err := r.addInstanceToProvider(instance); err != nil { log.Printf("failed to add instance to provider: %s", err) From 0a27acd81843e752744c033063aa813da88c1c3f Mon Sep 17 00:00:00 2001 From: Gabriel Adrian Samfira Date: Fri, 30 Jun 2023 00:57:45 +0300 Subject: [PATCH 2/3] Remove extra loop and add logging * removes an extra loop. The fetch tools loop does the same job * add a lot of log messages Signed-off-by: Gabriel Adrian Samfira --- runner/pool/pool.go | 174 +++++++++++++++++++++----------------------- runner/pool/util.go | 11 +++ 2 files changed, 94 insertions(+), 91 deletions(-) create mode 100644 runner/pool/util.go diff --git a/runner/pool/pool.go b/runner/pool/pool.go index 2097af7a..c27e5616 100644 --- a/runner/pool/pool.go +++ b/runner/pool/pool.go @@ -116,7 +116,7 @@ func (r *basePoolManager) HandleWorkflowJob(job params.WorkflowJob) (err error) // exclude that runner from available runners and attempt to ensure // the needed number of runners. if err := r.acquireNewInstance(job); err != nil { - log.Printf("failed to add instance: %s", err) + r.log("failed to add instance: %s", err) } case "completed": // ignore the error here. A completed job may not have a runner name set @@ -137,15 +137,15 @@ func (r *basePoolManager) HandleWorkflowJob(job params.WorkflowJob) (err error) if errors.Is(err, runnerErrors.ErrNotFound) { return nil } - log.Printf("failed to update runner %s status", util.SanitizeLogEntry(runnerInfo.Name)) + r.log("failed to update runner %s status", util.SanitizeLogEntry(runnerInfo.Name)) return errors.Wrap(err, "updating runner") } - log.Printf("marking instance %s as pending_delete", util.SanitizeLogEntry(runnerInfo.Name)) + r.log("marking instance %s as pending_delete", util.SanitizeLogEntry(runnerInfo.Name)) if err := r.setInstanceStatus(runnerInfo.Name, providerCommon.InstancePendingDelete, nil); err != nil { if errors.Is(err, runnerErrors.ErrNotFound) { return nil } - log.Printf("failed to update runner %s status", util.SanitizeLogEntry(runnerInfo.Name)) + r.log("failed to update runner %s status", util.SanitizeLogEntry(runnerInfo.Name)) return errors.Wrap(err, "updating runner") } case "in_progress": @@ -169,31 +169,35 @@ func (r *basePoolManager) HandleWorkflowJob(job params.WorkflowJob) (err error) if errors.Is(err, runnerErrors.ErrNotFound) { return nil } - log.Printf("failed to update runner %s status", util.SanitizeLogEntry(runnerInfo.Name)) + r.log("failed to update runner %s status", util.SanitizeLogEntry(runnerInfo.Name)) return errors.Wrap(err, "updating runner") } } return nil } -func (r *basePoolManager) startLoopForFunction(f func() error, interval time.Duration, name string) { - log.Printf("starting %s loop for %s", name, r.helper.String()) +func (r *basePoolManager) startLoopForFunction(f func() error, interval time.Duration, name string, alwaysRun bool) { + r.log("starting %s loop for %s", name, r.helper.String()) ticker := time.NewTicker(interval) r.wg.Add(1) defer func() { - log.Printf("%s loop exited for pool %s", name, r.helper.String()) + r.log("%s loop exited for pool %s", name, r.helper.String()) ticker.Stop() r.wg.Done() }() for { - switch r.managerIsRunning { + shouldRun := r.managerIsRunning + if alwaysRun { + shouldRun = true + } + switch shouldRun { case true: select { case <-ticker.C: if err := f(); err != nil { - log.Printf("%s: %q", name, err) + r.log("error in loop %s: %q", name, err) if errors.Is(err, runnerErrors.ErrUnauthorized) { r.setPoolRunningState(false, err.Error()) } @@ -222,17 +226,6 @@ func (r *basePoolManager) startLoopForFunction(f func() error, interval time.Dur func (r *basePoolManager) updateTools() error { // Update tools cache. - tools, err := r.helper.FetchTools() - if err != nil { - return fmt.Errorf("failed to update tools for repo %s: %w", r.helper.String(), err) - } - r.mux.Lock() - r.tools = tools - r.mux.Unlock() - return nil -} - -func (r *basePoolManager) checkCanAuthenticateToGithub() error { tools, err := r.helper.FetchTools() if err != nil { r.setPoolRunningState(false, err.Error()) @@ -247,17 +240,6 @@ func (r *basePoolManager) checkCanAuthenticateToGithub() error { r.tools = tools r.mux.Unlock() - err = r.runnerCleanup() - if err != nil { - if errors.Is(err, runnerErrors.ErrUnauthorized) { - r.setPoolRunningState(false, err.Error()) - r.waitForTimeoutOrCanceled(common.UnauthorizedBackoffTimer) - return fmt.Errorf("failed to clean runners for %s: %w", r.helper.String(), err) - } - } - // We still set the pool as running, even if we failed to clean up runners. - // We only set the pool as not running if we fail to authenticate to github. This is done - // to avoid being rate limited by github when we have a bad token. r.setPoolRunningState(true, "") return err } @@ -309,7 +291,7 @@ func (r *basePoolManager) cleanupOrphanedProviderRunners(runners []*github.Runne runnerNames := map[string]bool{} for _, run := range runners { if !r.isManagedRunner(labelsFromRunner(run)) { - log.Printf("runner %s is not managed by a pool belonging to %s", *run.Name, r.helper.String()) + r.log("runner %s is not managed by a pool belonging to %s", *run.Name, r.helper.String()) continue } runnerNames[*run.Name] = true @@ -318,7 +300,7 @@ func (r *basePoolManager) cleanupOrphanedProviderRunners(runners []*github.Runne for _, instance := range dbInstances { lockAcquired := r.keyMux.TryLock(instance.Name) if !lockAcquired { - log.Printf("failed to acquire lock for instance %s", instance.Name) + r.log("failed to acquire lock for instance %s", instance.Name) continue } defer r.keyMux.Unlock(instance.Name, false) @@ -335,20 +317,20 @@ func (r *basePoolManager) cleanupOrphanedProviderRunners(runners []*github.Runne switch instance.RunnerStatus { case providerCommon.RunnerPending, providerCommon.RunnerInstalling: // runner is still installing. We give it a chance to finish. - log.Printf("runner %s is still installing, give it a chance to finish", instance.Name) + r.log("runner %s is still installing, give it a chance to finish", instance.Name) continue } if time.Since(instance.UpdatedAt).Minutes() < 5 { // instance was updated recently. We give it a chance to register itself in github. - log.Printf("instance %s was updated recently, skipping check", instance.Name) + r.log("instance %s was updated recently, skipping check", instance.Name) continue } if ok := runnerNames[instance.Name]; !ok { // Set pending_delete on DB field. Allow consolidate() to remove it. if err := r.setInstanceStatus(instance.Name, providerCommon.InstancePendingDelete, nil); err != nil { - log.Printf("failed to update runner %s status", instance.Name) + r.log("failed to update runner %s status", instance.Name) return errors.Wrap(err, "updating runner") } } @@ -368,16 +350,17 @@ func (r *basePoolManager) reapTimedOutRunners(runners []*github.Runner) error { runnersByName := map[string]*github.Runner{} for _, run := range runners { if !r.isManagedRunner(labelsFromRunner(run)) { - log.Printf("runner %s is not managed by a pool belonging to %s", *run.Name, r.helper.String()) + r.log("runner %s is not managed by a pool belonging to %s", *run.Name, r.helper.String()) continue } runnersByName[*run.Name] = run } for _, instance := range dbInstances { + r.log("attempting to lock instance %s", instance.Name) lockAcquired := r.keyMux.TryLock(instance.Name) if !lockAcquired { - log.Printf("failed to acquire lock for instance %s", instance.Name) + r.log("failed to acquire lock for instance %s", instance.Name) continue } defer r.keyMux.Unlock(instance.Name, false) @@ -408,9 +391,9 @@ func (r *basePoolManager) reapTimedOutRunners(runners []*github.Runner) error { // both the runner status as reported by GitHub and the runner status as reported by the provider. // If the runner is "offline" and marked as "failed", it should be safe to reap it. if runner, ok := runnersByName[instance.Name]; !ok || (runner.GetStatus() == "offline" && instance.RunnerStatus == providerCommon.RunnerFailed) { - log.Printf("reaping timed-out/failed runner %s", instance.Name) + r.log("reaping timed-out/failed runner %s", instance.Name) if err := r.ForceDeleteRunner(instance); err != nil { - log.Printf("failed to update runner %s status", instance.Name) + r.log("failed to update runner %s status", instance.Name) return errors.Wrap(err, "updating runner") } } @@ -436,7 +419,7 @@ func (r *basePoolManager) cleanupOrphanedGithubRunners(runners []*github.Runner) g, ctx := errgroup.WithContext(r.ctx) for _, runner := range runners { if !r.isManagedRunner(labelsFromRunner(runner)) { - log.Printf("runner %s is not managed by a pool belonging to %s", *runner.Name, r.helper.String()) + r.log("runner %s is not managed by a pool belonging to %s", *runner.Name, r.helper.String()) continue } @@ -453,7 +436,7 @@ func (r *basePoolManager) cleanupOrphanedGithubRunners(runners []*github.Runner) } // We no longer have a DB entry for this instance, and the runner appears offline in github. // Previous forceful removal may have failed? - log.Printf("Runner %s has no database entry in garm, removing from github", *runner.Name) + r.log("Runner %s has no database entry in garm, removing from github", *runner.Name) resp, err := r.helper.RemoveGithubRunner(*runner.ID) if err != nil { // Removed in the meantime? @@ -486,7 +469,7 @@ func (r *basePoolManager) cleanupOrphanedGithubRunners(runners []*github.Runner) var poolInstances []params.Instance poolInstances, ok = poolInstanceCache[pool.ID] if !ok { - log.Printf("updating instances cache for pool %s", pool.ID) + r.log("updating instances cache for pool %s", pool.ID) poolInstances, err = provider.ListInstances(r.ctx, pool.ID) if err != nil { return errors.Wrapf(err, "fetching instances for pool %s", pool.ID) @@ -496,7 +479,7 @@ func (r *basePoolManager) cleanupOrphanedGithubRunners(runners []*github.Runner) lockAcquired := r.keyMux.TryLock(dbInstance.Name) if !lockAcquired { - log.Printf("failed to acquire lock for instance %s", dbInstance.Name) + r.log("failed to acquire lock for instance %s", dbInstance.Name) continue } @@ -511,18 +494,18 @@ func (r *basePoolManager) cleanupOrphanedGithubRunners(runners []*github.Runner) if !ok { // The runner instance is no longer on the provider, and it appears offline in github. // It should be safe to force remove it. - log.Printf("Runner instance for %s is no longer on the provider, removing from github", dbInstance.Name) + r.log("Runner instance for %s is no longer on the provider, removing from github", dbInstance.Name) resp, err := r.helper.RemoveGithubRunner(*runner.ID) if err != nil { // Removed in the meantime? if resp != nil && resp.StatusCode == http.StatusNotFound { - log.Printf("runner dissapeared from github") + r.log("runner dissapeared from github") } else { return errors.Wrap(err, "removing runner from github") } } // Remove the database entry for the runner. - log.Printf("Removing %s from database", dbInstance.Name) + r.log("Removing %s from database", dbInstance.Name) if err := r.store.DeleteInstance(ctx, dbInstance.PoolID, dbInstance.Name); err != nil { return errors.Wrap(err, "removing runner from database") } @@ -534,10 +517,10 @@ func (r *basePoolManager) cleanupOrphanedGithubRunners(runners []*github.Runner) // instance is running, but github reports runner as offline. Log the event. // This scenario may require manual intervention. // Perhaps it just came online and github did not yet change it's status? - log.Printf("instance %s is online but github reports runner as offline", dbInstance.Name) + r.log("instance %s is online but github reports runner as offline", dbInstance.Name) return nil } else { - log.Printf("instance %s was found in stopped state; starting", dbInstance.Name) + r.log("instance %s was found in stopped state; starting", dbInstance.Name) //start the instance if err := provider.Start(r.ctx, dbInstance.ProviderID); err != nil { return errors.Wrapf(err, "starting instance %s", dbInstance.ProviderID) @@ -630,15 +613,15 @@ func (r *basePoolManager) acquireNewInstance(job params.WorkflowJob) error { pool, err := r.helper.FindPoolByTags(requestedLabels) if err != nil { if errors.Is(err, runnerErrors.ErrNotFound) { - log.Printf("failed to find an enabled pool with required labels: %s", strings.Join(requestedLabels, ", ")) + r.log("failed to find an enabled pool with required labels: %s", strings.Join(requestedLabels, ", ")) return nil } return errors.Wrap(err, "fetching suitable pool") } - log.Printf("adding new runner with requested tags %s in pool %s", util.SanitizeLogEntry(strings.Join(job.WorkflowJob.Labels, ", ")), util.SanitizeLogEntry(pool.ID)) + r.log("adding new runner with requested tags %s in pool %s", util.SanitizeLogEntry(strings.Join(job.WorkflowJob.Labels, ", ")), util.SanitizeLogEntry(pool.ID)) if !pool.Enabled { - log.Printf("selected pool (%s) is disabled", pool.ID) + r.log("selected pool (%s) is disabled", pool.ID) return nil } @@ -648,7 +631,7 @@ func (r *basePoolManager) acquireNewInstance(job params.WorkflowJob) error { } if poolInstances >= int64(pool.MaxRunners) { - log.Printf("max_runners (%d) reached for pool %s, skipping...", pool.MaxRunners, pool.ID) + r.log("max_runners (%d) reached for pool %s, skipping...", pool.MaxRunners, pool.ID) return nil } @@ -668,12 +651,12 @@ func (r *basePoolManager) acquireNewInstance(job params.WorkflowJob) error { // Skip creating a new runner if we have at least one idle runner and the minimum is already satisfied. // This should work even for pools that define a MinIdleRunner of 0. if int64(idleWorkers) > 0 && int64(idleWorkers) >= int64(pool.MinIdleRunners) { - log.Printf("we have enough min_idle_runners (%d) for pool %s, skipping...", pool.MinIdleRunners, pool.ID) + r.log("we have enough min_idle_runners (%d) for pool %s, skipping...", pool.MinIdleRunners, pool.ID) return nil } if err := r.AddRunner(r.ctx, pool.ID); err != nil { - log.Printf("failed to add runner to pool %s", pool.ID) + r.log("failed to add runner to pool %s", pool.ID) return errors.Wrap(err, "adding runner") } return nil @@ -717,7 +700,7 @@ func (r *basePoolManager) Status() params.PoolManagerStatus { } func (r *basePoolManager) waitForTimeoutOrCanceled(timeout time.Duration) { - log.Printf("sleeping for %.2f minutes", timeout.Minutes()) + r.log("sleeping for %.2f minutes", timeout.Minutes()) select { case <-time.After(timeout): case <-r.ctx.Done(): @@ -782,7 +765,7 @@ func (r *basePoolManager) addInstanceToProvider(instance params.Instance) error if instanceIDToDelete != "" { if err := provider.DeleteInstance(r.ctx, instanceIDToDelete); err != nil { if !errors.Is(err, runnerErrors.ErrNotFound) { - log.Printf("failed to cleanup instance: %s", instanceIDToDelete) + r.log("failed to cleanup instance: %s", instanceIDToDelete) } } } @@ -822,7 +805,7 @@ func (r *basePoolManager) getRunnerDetailsFromJob(job params.WorkflowJob) (param } // Runner name was not set in WorkflowJob by github. We can still attempt to // fetch the info we need, using the workflow run ID, from the API. - log.Printf("runner name not found in workflow job, attempting to fetch from API") + r.log("runner name not found in workflow job, attempting to fetch from API") runnerInfo, err = r.helper.GetRunnerInfoFromWorkflow(job) if err != nil { return params.RunnerInfo{}, errors.Wrap(err, "fetching runner name from API") @@ -831,12 +814,12 @@ func (r *basePoolManager) getRunnerDetailsFromJob(job params.WorkflowJob) (param runnerDetails, err := r.store.GetInstanceByName(context.Background(), runnerInfo.Name) if err != nil { - log.Printf("could not find runner details for %s", util.SanitizeLogEntry(runnerInfo.Name)) + r.log("could not find runner details for %s", util.SanitizeLogEntry(runnerInfo.Name)) return params.RunnerInfo{}, errors.Wrap(err, "fetching runner details") } if _, err := r.helper.GetPoolByID(runnerDetails.PoolID); err != nil { - log.Printf("runner %s (pool ID: %s) does not belong to any pool we manage: %s", runnerDetails.Name, runnerDetails.PoolID, err) + r.log("runner %s (pool ID: %s) does not belong to any pool we manage: %s", runnerDetails.Name, runnerDetails.PoolID, err) return params.RunnerInfo{}, errors.Wrap(err, "fetching pool for instance") } return runnerInfo, nil @@ -908,13 +891,13 @@ func (r *basePoolManager) scaleDownOnePool(ctx context.Context, pool params.Pool lockAcquired := r.keyMux.TryLock(instanceToDelete.Name) if !lockAcquired { - log.Printf("failed to acquire lock for instance %s", instanceToDelete.Name) + r.log("failed to acquire lock for instance %s", instanceToDelete.Name) continue } defer r.keyMux.Unlock(instanceToDelete.Name, false) g.Go(func() error { - log.Printf("scaling down idle worker %s from pool %s\n", instanceToDelete.Name, pool.ID) + r.log("scaling down idle worker %s from pool %s\n", instanceToDelete.Name, pool.ID) if err := r.ForceDeleteRunner(instanceToDelete); err != nil { return fmt.Errorf("failed to delete instance %s: %w", instanceToDelete.ID, err) } @@ -938,7 +921,7 @@ func (r *basePoolManager) ensureIdleRunnersForOnePool(pool params.Pool) error { } if uint(len(existingInstances)) >= pool.MaxRunners { - log.Printf("max workers (%d) reached for pool %s, skipping idle worker creation", pool.MaxRunners, pool.ID) + r.log("max workers (%d) reached for pool %s, skipping idle worker creation", pool.MaxRunners, pool.ID) return nil } @@ -963,7 +946,7 @@ func (r *basePoolManager) ensureIdleRunnersForOnePool(pool params.Pool) error { } for i := 0; i < required; i++ { - log.Printf("adding new idle worker to pool %s", pool.ID) + r.log("adding new idle worker to pool %s", pool.ID) if err := r.AddRunner(r.ctx, pool.ID); err != nil { return fmt.Errorf("failed to add new instance for pool %s: %w", pool.ID, err) } @@ -975,6 +958,7 @@ func (r *basePoolManager) retryFailedInstancesForOnePool(ctx context.Context, po if !pool.Enabled { return nil } + r.log("running retry failed instances for pool %s", pool.ID) existingInstances, err := r.store.ListPoolInstances(r.ctx, pool.ID) if err != nil { @@ -990,9 +974,10 @@ func (r *basePoolManager) retryFailedInstancesForOnePool(ctx context.Context, po continue } + r.log("attempting to retry failed instance %s", instance.Name) lockAcquired := r.keyMux.TryLock(instance.Name) if !lockAcquired { - log.Printf("failed to acquire lock for instance %s", instance.Name) + r.log("failed to acquire lock for instance %s", instance.Name) continue } @@ -1003,7 +988,7 @@ func (r *basePoolManager) retryFailedInstancesForOnePool(ctx context.Context, po // this has the potential to create many API requests to the target provider. // TODO(gabriel-samfira): implement request throttling. if err := r.deleteInstanceFromProvider(errCtx, instance); err != nil { - log.Printf("failed to delete instance %s from provider: %s", instance.Name, err) + r.log("failed to delete instance %s from provider: %s", instance.Name, err) // Bail here, otherwise we risk creating multiple failing instances, and losing track // of them. If Create instance failed to return a proper provider ID, we rely on the // name to delete the instance. If we don't bail here, and end up with multiple @@ -1023,10 +1008,10 @@ func (r *basePoolManager) retryFailedInstancesForOnePool(ctx context.Context, po TokenFetched: &tokenFetched, Status: providerCommon.InstancePendingCreate, } - log.Printf("queueing previously failed instance %s for retry", instance.Name) + r.log("queueing previously failed instance %s for retry", instance.Name) // Set instance to pending create and wait for retry. if err := r.updateInstance(instance.Name, updateParams); err != nil { - log.Printf("failed to update runner %s status", instance.Name) + r.log("failed to update runner %s status", instance.Name) } return nil }) @@ -1069,6 +1054,7 @@ func (r *basePoolManager) scaleDown() error { for _, pool := range pools { pool := pool g.Go(func() error { + r.log("running scale down for pool %s", pool.ID) return r.scaleDownOnePool(ctx, pool) }) } @@ -1129,22 +1115,24 @@ func (r *basePoolManager) deletePendingInstances() error { return fmt.Errorf("failed to fetch instances from store: %w", err) } + r.log("removing instances in pending_delete") for _, instance := range instances { if instance.Status != providerCommon.InstancePendingDelete { // not in pending_delete status. Skip. continue } + r.log("removing instance %s in pool %s", instance.Name, instance.PoolID) lockAcquired := r.keyMux.TryLock(instance.Name) if !lockAcquired { - log.Printf("failed to acquire lock for instance %s", instance.Name) + r.log("failed to acquire lock for instance %s", instance.Name) continue } // Set the status to deleting before launching the goroutine that removes // the runner from the provider (which can take a long time). if err := r.setInstanceStatus(instance.Name, providerCommon.InstanceDeleting, nil); err != nil { - log.Printf("failed to update runner %s status: %s", instance.Name, err) + r.log("failed to update runner %s status: %q", instance.Name, err) r.keyMux.Unlock(instance.Name, false) continue } @@ -1156,23 +1144,26 @@ func (r *basePoolManager) deletePendingInstances() error { }() defer func(instance params.Instance) { if err != nil { + r.log("failed to remove instance %s: %s", instance.Name, err) // failed to remove from provider. Set the status back to pending_delete, which // will retry the operation. if err := r.setInstanceStatus(instance.Name, providerCommon.InstancePendingDelete, nil); err != nil { - log.Printf("failed to update runner %s status", instance.Name) + r.log("failed to update runner %s status", instance.Name) } } }(instance) + r.log("removing instance %s from provider", instance.Name) err = r.deleteInstanceFromProvider(r.ctx, instance) if err != nil { return fmt.Errorf("failed to remove instance from provider: %w", err) } - + r.log("removing instance %s from database", instance.Name) if deleteErr := r.store.DeleteInstance(r.ctx, instance.PoolID, instance.Name); deleteErr != nil { return fmt.Errorf("failed to delete instance from database: %w", deleteErr) } deleteMux = true + r.log("instance %s was successfully removed", instance.Name) return nil }(instance) //nolint } @@ -1192,16 +1183,17 @@ func (r *basePoolManager) addPendingInstances() error { continue } + r.log("attempting to acquire lock for instance %s (create)", instance.Name) lockAcquired := r.keyMux.TryLock(instance.Name) if !lockAcquired { - log.Printf("failed to acquire lock for instance %s", instance.Name) + r.log("failed to acquire lock for instance %s", instance.Name) continue } // Set the instance to "creating" before launching the goroutine. This will ensure that addPendingInstances() // won't attempt to create the runner a second time. if err := r.setInstanceStatus(instance.Name, providerCommon.InstanceCreating, nil); err != nil { - log.Printf("failed to update runner %s status: %s", instance.Name, err) + r.log("failed to update runner %s status: %s", instance.Name, err) r.keyMux.Unlock(instance.Name, false) // We failed to transition the instance to Creating. This means that garm will retry to create this instance // when the loop runs again and we end up with multiple instances. @@ -1210,14 +1202,14 @@ func (r *basePoolManager) addPendingInstances() error { go func(instance params.Instance) { defer r.keyMux.Unlock(instance.Name, false) - log.Printf("creating instance %s in pool %s", instance.Name, instance.PoolID) + r.log("creating instance %s in pool %s", instance.Name, instance.PoolID) if err := r.addInstanceToProvider(instance); err != nil { - log.Printf("failed to add instance to provider: %s", err) + r.log("failed to add instance to provider: %s", err) errAsBytes := []byte(err.Error()) if err := r.setInstanceStatus(instance.Name, providerCommon.InstanceError, errAsBytes); err != nil { - log.Printf("failed to update runner %s status: %s", instance.Name, err) + r.log("failed to update runner %s status: %s", instance.Name, err) } - log.Printf("failed to create instance in provider: %s", err) + r.log("failed to create instance in provider: %s", err) } }(instance) } @@ -1240,6 +1232,7 @@ func (r *basePoolManager) Wait() error { } func (r *basePoolManager) runnerCleanup() (err error) { + r.log("running runner cleanup") runners, err := r.helper.GetGithubRunners() if err != nil { return fmt.Errorf("failed to fetch github runners: %w", err) @@ -1273,16 +1266,15 @@ func (r *basePoolManager) cleanupOrphanedRunners() error { } func (r *basePoolManager) Start() error { - r.checkCanAuthenticateToGithub() //nolint + r.updateTools() //nolint - go r.startLoopForFunction(r.runnerCleanup, common.PoolReapTimeoutInterval, "timeout_reaper") - go r.startLoopForFunction(r.scaleDown, common.PoolScaleDownInterval, "scale_down") - go r.startLoopForFunction(r.deletePendingInstances, common.PoolConsilitationInterval, "consolidate[delete_pending]") - go r.startLoopForFunction(r.addPendingInstances, common.PoolConsilitationInterval, "consolidate[add_pending]") - go r.startLoopForFunction(r.ensureMinIdleRunners, common.PoolConsilitationInterval, "consolidate[ensure_min_idle]") - go r.startLoopForFunction(r.retryFailedInstances, common.PoolConsilitationInterval, "consolidate[retry_failed]") - go r.startLoopForFunction(r.updateTools, common.PoolToolUpdateInterval, "update_tools") - go r.startLoopForFunction(r.checkCanAuthenticateToGithub, common.UnauthorizedBackoffTimer, "bad_auth_backoff") + go r.startLoopForFunction(r.runnerCleanup, common.PoolReapTimeoutInterval, "timeout_reaper", false) + go r.startLoopForFunction(r.scaleDown, common.PoolScaleDownInterval, "scale_down", false) + go r.startLoopForFunction(r.deletePendingInstances, common.PoolConsilitationInterval, "consolidate[delete_pending]", false) + go r.startLoopForFunction(r.addPendingInstances, common.PoolConsilitationInterval, "consolidate[add_pending]", false) + go r.startLoopForFunction(r.ensureMinIdleRunners, common.PoolConsilitationInterval, "consolidate[ensure_min_idle]", false) + go r.startLoopForFunction(r.retryFailedInstances, common.PoolConsilitationInterval, "consolidate[retry_failed]", false) + go r.startLoopForFunction(r.updateTools, common.PoolToolUpdateInterval, "update_tools", true) return nil } @@ -1320,7 +1312,7 @@ func (r *basePoolManager) ForceDeleteRunner(runner params.Instance) error { return errors.Wrapf(runnerErrors.ErrBadRequest, "removing runner: %q", err) case http.StatusNotFound: // Runner may have been deleted by a finished job, or manually by the user. - log.Printf("runner with agent id %d was not found in github", runner.AgentID) + r.log("runner with agent id %d was not found in github", runner.AgentID) case http.StatusUnauthorized: // Mark the pool as offline from this point forward failureReason := fmt.Sprintf("failed to remove runner: %q", err) @@ -1337,10 +1329,10 @@ func (r *basePoolManager) ForceDeleteRunner(runner params.Instance) error { } } } - log.Printf("setting instance status for: %v", runner.Name) + r.log("setting instance status for: %v", runner.Name) if err := r.setInstanceStatus(runner.Name, providerCommon.InstancePendingDelete, nil); err != nil { - log.Printf("failed to update runner %s status", runner.Name) + r.log("failed to update runner %s status", runner.Name) return errors.Wrap(err, "updating runner") } return nil diff --git a/runner/pool/util.go b/runner/pool/util.go new file mode 100644 index 00000000..c5f38473 --- /dev/null +++ b/runner/pool/util.go @@ -0,0 +1,11 @@ +package pool + +import "log" + +func (r *basePoolManager) log(msg string, args ...interface{}) { + msgArgs := []interface{}{ + r.helper.String(), + } + msgArgs = append(msgArgs, args...) + log.Printf("[Pool mgr %s] "+msg, msgArgs...) +} From 67b871488d15354783bb54512fa1fe1da6b81f4d Mon Sep 17 00:00:00 2001 From: Gabriel Adrian Samfira Date: Fri, 30 Jun 2023 09:14:10 +0300 Subject: [PATCH 3/3] Log the actual error Signed-off-by: Gabriel Adrian Samfira --- runner/pool/pool.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/runner/pool/pool.go b/runner/pool/pool.go index c27e5616..7883ad76 100644 --- a/runner/pool/pool.go +++ b/runner/pool/pool.go @@ -137,7 +137,7 @@ func (r *basePoolManager) HandleWorkflowJob(job params.WorkflowJob) (err error) if errors.Is(err, runnerErrors.ErrNotFound) { return nil } - r.log("failed to update runner %s status", util.SanitizeLogEntry(runnerInfo.Name)) + r.log("failed to update runner %s status: %s", util.SanitizeLogEntry(runnerInfo.Name), err) return errors.Wrap(err, "updating runner") } r.log("marking instance %s as pending_delete", util.SanitizeLogEntry(runnerInfo.Name)) @@ -145,7 +145,7 @@ func (r *basePoolManager) HandleWorkflowJob(job params.WorkflowJob) (err error) if errors.Is(err, runnerErrors.ErrNotFound) { return nil } - r.log("failed to update runner %s status", util.SanitizeLogEntry(runnerInfo.Name)) + r.log("failed to update runner %s status: %s", util.SanitizeLogEntry(runnerInfo.Name), err) return errors.Wrap(err, "updating runner") } case "in_progress": @@ -169,7 +169,7 @@ func (r *basePoolManager) HandleWorkflowJob(job params.WorkflowJob) (err error) if errors.Is(err, runnerErrors.ErrNotFound) { return nil } - r.log("failed to update runner %s status", util.SanitizeLogEntry(runnerInfo.Name)) + r.log("failed to update runner %s status: %s", util.SanitizeLogEntry(runnerInfo.Name), err) return errors.Wrap(err, "updating runner") } } @@ -330,7 +330,7 @@ func (r *basePoolManager) cleanupOrphanedProviderRunners(runners []*github.Runne if ok := runnerNames[instance.Name]; !ok { // Set pending_delete on DB field. Allow consolidate() to remove it. if err := r.setInstanceStatus(instance.Name, providerCommon.InstancePendingDelete, nil); err != nil { - r.log("failed to update runner %s status", instance.Name) + r.log("failed to update runner %s status: %s", instance.Name, err) return errors.Wrap(err, "updating runner") } } @@ -393,7 +393,7 @@ func (r *basePoolManager) reapTimedOutRunners(runners []*github.Runner) error { if runner, ok := runnersByName[instance.Name]; !ok || (runner.GetStatus() == "offline" && instance.RunnerStatus == providerCommon.RunnerFailed) { r.log("reaping timed-out/failed runner %s", instance.Name) if err := r.ForceDeleteRunner(instance); err != nil { - r.log("failed to update runner %s status", instance.Name) + r.log("failed to update runner %s status: %s", instance.Name, err) return errors.Wrap(err, "updating runner") } } @@ -1011,7 +1011,7 @@ func (r *basePoolManager) retryFailedInstancesForOnePool(ctx context.Context, po r.log("queueing previously failed instance %s for retry", instance.Name) // Set instance to pending create and wait for retry. if err := r.updateInstance(instance.Name, updateParams); err != nil { - r.log("failed to update runner %s status", instance.Name) + r.log("failed to update runner %s status: %s", instance.Name, err) } return nil }) @@ -1148,7 +1148,7 @@ func (r *basePoolManager) deletePendingInstances() error { // failed to remove from provider. Set the status back to pending_delete, which // will retry the operation. if err := r.setInstanceStatus(instance.Name, providerCommon.InstancePendingDelete, nil); err != nil { - r.log("failed to update runner %s status", instance.Name) + r.log("failed to update runner %s status: %s", instance.Name, err) } } }(instance) @@ -1332,7 +1332,7 @@ func (r *basePoolManager) ForceDeleteRunner(runner params.Instance) error { r.log("setting instance status for: %v", runner.Name) if err := r.setInstanceStatus(runner.Name, providerCommon.InstancePendingDelete, nil); err != nil { - r.log("failed to update runner %s status", runner.Name) + r.log("failed to update runner %s status: %s", runner.Name, err) return errors.Wrap(err, "updating runner") } return nil