Merge pull request #118 from gabriel-samfira/add-logging
Add logging and fix backoff loop
This commit is contained in:
commit
b506df20da
2 changed files with 115 additions and 108 deletions
|
|
@ -59,12 +59,15 @@ func (k *keyMutex) TryLock(key string) bool {
|
|||
return keyMux.TryLock()
|
||||
}
|
||||
|
||||
func (k *keyMutex) Unlock(key string) {
|
||||
func (k *keyMutex) Unlock(key string, remove bool) {
|
||||
mux, ok := k.muxes.Load(key)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
keyMux := mux.(*sync.Mutex)
|
||||
if remove {
|
||||
k.Delete(key)
|
||||
}
|
||||
keyMux.Unlock()
|
||||
}
|
||||
|
||||
|
|
@ -72,11 +75,6 @@ func (k *keyMutex) Delete(key string) {
|
|||
k.muxes.Delete(key)
|
||||
}
|
||||
|
||||
func (k *keyMutex) UnlockAndDelete(key string) {
|
||||
k.Unlock(key)
|
||||
k.Delete(key)
|
||||
}
|
||||
|
||||
type basePoolManager struct {
|
||||
ctx context.Context
|
||||
controllerID string
|
||||
|
|
@ -118,7 +116,7 @@ func (r *basePoolManager) HandleWorkflowJob(job params.WorkflowJob) (err error)
|
|||
// exclude that runner from available runners and attempt to ensure
|
||||
// the needed number of runners.
|
||||
if err := r.acquireNewInstance(job); err != nil {
|
||||
log.Printf("failed to add instance: %s", err)
|
||||
r.log("failed to add instance: %s", err)
|
||||
}
|
||||
case "completed":
|
||||
// ignore the error here. A completed job may not have a runner name set
|
||||
|
|
@ -139,15 +137,15 @@ func (r *basePoolManager) HandleWorkflowJob(job params.WorkflowJob) (err error)
|
|||
if errors.Is(err, runnerErrors.ErrNotFound) {
|
||||
return nil
|
||||
}
|
||||
log.Printf("failed to update runner %s status", util.SanitizeLogEntry(runnerInfo.Name))
|
||||
r.log("failed to update runner %s status: %s", util.SanitizeLogEntry(runnerInfo.Name), err)
|
||||
return errors.Wrap(err, "updating runner")
|
||||
}
|
||||
log.Printf("marking instance %s as pending_delete", util.SanitizeLogEntry(runnerInfo.Name))
|
||||
r.log("marking instance %s as pending_delete", util.SanitizeLogEntry(runnerInfo.Name))
|
||||
if err := r.setInstanceStatus(runnerInfo.Name, providerCommon.InstancePendingDelete, nil); err != nil {
|
||||
if errors.Is(err, runnerErrors.ErrNotFound) {
|
||||
return nil
|
||||
}
|
||||
log.Printf("failed to update runner %s status", util.SanitizeLogEntry(runnerInfo.Name))
|
||||
r.log("failed to update runner %s status: %s", util.SanitizeLogEntry(runnerInfo.Name), err)
|
||||
return errors.Wrap(err, "updating runner")
|
||||
}
|
||||
case "in_progress":
|
||||
|
|
@ -171,31 +169,35 @@ func (r *basePoolManager) HandleWorkflowJob(job params.WorkflowJob) (err error)
|
|||
if errors.Is(err, runnerErrors.ErrNotFound) {
|
||||
return nil
|
||||
}
|
||||
log.Printf("failed to update runner %s status", util.SanitizeLogEntry(runnerInfo.Name))
|
||||
r.log("failed to update runner %s status: %s", util.SanitizeLogEntry(runnerInfo.Name), err)
|
||||
return errors.Wrap(err, "updating runner")
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *basePoolManager) startLoopForFunction(f func() error, interval time.Duration, name string) {
|
||||
log.Printf("starting %s loop for %s", name, r.helper.String())
|
||||
func (r *basePoolManager) startLoopForFunction(f func() error, interval time.Duration, name string, alwaysRun bool) {
|
||||
r.log("starting %s loop for %s", name, r.helper.String())
|
||||
ticker := time.NewTicker(interval)
|
||||
r.wg.Add(1)
|
||||
|
||||
defer func() {
|
||||
log.Printf("%s loop exited for pool %s", name, r.helper.String())
|
||||
r.log("%s loop exited for pool %s", name, r.helper.String())
|
||||
ticker.Stop()
|
||||
r.wg.Done()
|
||||
}()
|
||||
|
||||
for {
|
||||
switch r.managerIsRunning {
|
||||
shouldRun := r.managerIsRunning
|
||||
if alwaysRun {
|
||||
shouldRun = true
|
||||
}
|
||||
switch shouldRun {
|
||||
case true:
|
||||
select {
|
||||
case <-ticker.C:
|
||||
if err := f(); err != nil {
|
||||
log.Printf("%s: %q", name, err)
|
||||
r.log("error in loop %s: %q", name, err)
|
||||
if errors.Is(err, runnerErrors.ErrUnauthorized) {
|
||||
r.setPoolRunningState(false, err.Error())
|
||||
}
|
||||
|
|
@ -224,17 +226,6 @@ func (r *basePoolManager) startLoopForFunction(f func() error, interval time.Dur
|
|||
|
||||
func (r *basePoolManager) updateTools() error {
|
||||
// Update tools cache.
|
||||
tools, err := r.helper.FetchTools()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to update tools for repo %s: %w", r.helper.String(), err)
|
||||
}
|
||||
r.mux.Lock()
|
||||
r.tools = tools
|
||||
r.mux.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *basePoolManager) checkCanAuthenticateToGithub() error {
|
||||
tools, err := r.helper.FetchTools()
|
||||
if err != nil {
|
||||
r.setPoolRunningState(false, err.Error())
|
||||
|
|
@ -249,17 +240,6 @@ func (r *basePoolManager) checkCanAuthenticateToGithub() error {
|
|||
r.tools = tools
|
||||
r.mux.Unlock()
|
||||
|
||||
err = r.runnerCleanup()
|
||||
if err != nil {
|
||||
if errors.Is(err, runnerErrors.ErrUnauthorized) {
|
||||
r.setPoolRunningState(false, err.Error())
|
||||
r.waitForTimeoutOrCanceled(common.UnauthorizedBackoffTimer)
|
||||
return fmt.Errorf("failed to clean runners for %s: %w", r.helper.String(), err)
|
||||
}
|
||||
}
|
||||
// We still set the pool as running, even if we failed to clean up runners.
|
||||
// We only set the pool as not running if we fail to authenticate to github. This is done
|
||||
// to avoid being rate limited by github when we have a bad token.
|
||||
r.setPoolRunningState(true, "")
|
||||
return err
|
||||
}
|
||||
|
|
@ -311,7 +291,7 @@ func (r *basePoolManager) cleanupOrphanedProviderRunners(runners []*github.Runne
|
|||
runnerNames := map[string]bool{}
|
||||
for _, run := range runners {
|
||||
if !r.isManagedRunner(labelsFromRunner(run)) {
|
||||
log.Printf("runner %s is not managed by a pool belonging to %s", *run.Name, r.helper.String())
|
||||
r.log("runner %s is not managed by a pool belonging to %s", *run.Name, r.helper.String())
|
||||
continue
|
||||
}
|
||||
runnerNames[*run.Name] = true
|
||||
|
|
@ -320,10 +300,10 @@ func (r *basePoolManager) cleanupOrphanedProviderRunners(runners []*github.Runne
|
|||
for _, instance := range dbInstances {
|
||||
lockAcquired := r.keyMux.TryLock(instance.Name)
|
||||
if !lockAcquired {
|
||||
log.Printf("failed to acquire lock for instance %s", instance.Name)
|
||||
r.log("failed to acquire lock for instance %s", instance.Name)
|
||||
continue
|
||||
}
|
||||
defer r.keyMux.Unlock(instance.Name)
|
||||
defer r.keyMux.Unlock(instance.Name, false)
|
||||
|
||||
switch providerCommon.InstanceStatus(instance.Status) {
|
||||
case providerCommon.InstancePendingCreate,
|
||||
|
|
@ -337,20 +317,20 @@ func (r *basePoolManager) cleanupOrphanedProviderRunners(runners []*github.Runne
|
|||
switch instance.RunnerStatus {
|
||||
case providerCommon.RunnerPending, providerCommon.RunnerInstalling:
|
||||
// runner is still installing. We give it a chance to finish.
|
||||
log.Printf("runner %s is still installing, give it a chance to finish", instance.Name)
|
||||
r.log("runner %s is still installing, give it a chance to finish", instance.Name)
|
||||
continue
|
||||
}
|
||||
|
||||
if time.Since(instance.UpdatedAt).Minutes() < 5 {
|
||||
// instance was updated recently. We give it a chance to register itself in github.
|
||||
log.Printf("instance %s was updated recently, skipping check", instance.Name)
|
||||
r.log("instance %s was updated recently, skipping check", instance.Name)
|
||||
continue
|
||||
}
|
||||
|
||||
if ok := runnerNames[instance.Name]; !ok {
|
||||
// Set pending_delete on DB field. Allow consolidate() to remove it.
|
||||
if err := r.setInstanceStatus(instance.Name, providerCommon.InstancePendingDelete, nil); err != nil {
|
||||
log.Printf("failed to update runner %s status", instance.Name)
|
||||
r.log("failed to update runner %s status: %s", instance.Name, err)
|
||||
return errors.Wrap(err, "updating runner")
|
||||
}
|
||||
}
|
||||
|
|
@ -370,19 +350,20 @@ func (r *basePoolManager) reapTimedOutRunners(runners []*github.Runner) error {
|
|||
runnersByName := map[string]*github.Runner{}
|
||||
for _, run := range runners {
|
||||
if !r.isManagedRunner(labelsFromRunner(run)) {
|
||||
log.Printf("runner %s is not managed by a pool belonging to %s", *run.Name, r.helper.String())
|
||||
r.log("runner %s is not managed by a pool belonging to %s", *run.Name, r.helper.String())
|
||||
continue
|
||||
}
|
||||
runnersByName[*run.Name] = run
|
||||
}
|
||||
|
||||
for _, instance := range dbInstances {
|
||||
r.log("attempting to lock instance %s", instance.Name)
|
||||
lockAcquired := r.keyMux.TryLock(instance.Name)
|
||||
if !lockAcquired {
|
||||
log.Printf("failed to acquire lock for instance %s", instance.Name)
|
||||
r.log("failed to acquire lock for instance %s", instance.Name)
|
||||
continue
|
||||
}
|
||||
defer r.keyMux.Unlock(instance.Name)
|
||||
defer r.keyMux.Unlock(instance.Name, false)
|
||||
|
||||
pool, err := r.store.GetPoolByID(r.ctx, instance.PoolID)
|
||||
if err != nil {
|
||||
|
|
@ -410,9 +391,9 @@ func (r *basePoolManager) reapTimedOutRunners(runners []*github.Runner) error {
|
|||
// both the runner status as reported by GitHub and the runner status as reported by the provider.
|
||||
// If the runner is "offline" and marked as "failed", it should be safe to reap it.
|
||||
if runner, ok := runnersByName[instance.Name]; !ok || (runner.GetStatus() == "offline" && instance.RunnerStatus == providerCommon.RunnerFailed) {
|
||||
log.Printf("reaping timed-out/failed runner %s", instance.Name)
|
||||
r.log("reaping timed-out/failed runner %s", instance.Name)
|
||||
if err := r.ForceDeleteRunner(instance); err != nil {
|
||||
log.Printf("failed to update runner %s status", instance.Name)
|
||||
r.log("failed to update runner %s status: %s", instance.Name, err)
|
||||
return errors.Wrap(err, "updating runner")
|
||||
}
|
||||
}
|
||||
|
|
@ -438,7 +419,7 @@ func (r *basePoolManager) cleanupOrphanedGithubRunners(runners []*github.Runner)
|
|||
g, ctx := errgroup.WithContext(r.ctx)
|
||||
for _, runner := range runners {
|
||||
if !r.isManagedRunner(labelsFromRunner(runner)) {
|
||||
log.Printf("runner %s is not managed by a pool belonging to %s", *runner.Name, r.helper.String())
|
||||
r.log("runner %s is not managed by a pool belonging to %s", *runner.Name, r.helper.String())
|
||||
continue
|
||||
}
|
||||
|
||||
|
|
@ -455,7 +436,7 @@ func (r *basePoolManager) cleanupOrphanedGithubRunners(runners []*github.Runner)
|
|||
}
|
||||
// We no longer have a DB entry for this instance, and the runner appears offline in github.
|
||||
// Previous forceful removal may have failed?
|
||||
log.Printf("Runner %s has no database entry in garm, removing from github", *runner.Name)
|
||||
r.log("Runner %s has no database entry in garm, removing from github", *runner.Name)
|
||||
resp, err := r.helper.RemoveGithubRunner(*runner.ID)
|
||||
if err != nil {
|
||||
// Removed in the meantime?
|
||||
|
|
@ -488,7 +469,7 @@ func (r *basePoolManager) cleanupOrphanedGithubRunners(runners []*github.Runner)
|
|||
var poolInstances []params.Instance
|
||||
poolInstances, ok = poolInstanceCache[pool.ID]
|
||||
if !ok {
|
||||
log.Printf("updating instances cache for pool %s", pool.ID)
|
||||
r.log("updating instances cache for pool %s", pool.ID)
|
||||
poolInstances, err = provider.ListInstances(r.ctx, pool.ID)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "fetching instances for pool %s", pool.ID)
|
||||
|
|
@ -498,34 +479,37 @@ func (r *basePoolManager) cleanupOrphanedGithubRunners(runners []*github.Runner)
|
|||
|
||||
lockAcquired := r.keyMux.TryLock(dbInstance.Name)
|
||||
if !lockAcquired {
|
||||
log.Printf("failed to acquire lock for instance %s", dbInstance.Name)
|
||||
r.log("failed to acquire lock for instance %s", dbInstance.Name)
|
||||
continue
|
||||
}
|
||||
defer r.keyMux.Unlock(dbInstance.Name)
|
||||
|
||||
// See: https://golang.org/doc/faq#closures_and_goroutines
|
||||
runner := runner
|
||||
g.Go(func() error {
|
||||
deleteMux := false
|
||||
defer func() {
|
||||
r.keyMux.Unlock(dbInstance.Name, deleteMux)
|
||||
}()
|
||||
providerInstance, ok := instanceInList(dbInstance.Name, poolInstances)
|
||||
if !ok {
|
||||
// The runner instance is no longer on the provider, and it appears offline in github.
|
||||
// It should be safe to force remove it.
|
||||
log.Printf("Runner instance for %s is no longer on the provider, removing from github", dbInstance.Name)
|
||||
r.log("Runner instance for %s is no longer on the provider, removing from github", dbInstance.Name)
|
||||
resp, err := r.helper.RemoveGithubRunner(*runner.ID)
|
||||
if err != nil {
|
||||
// Removed in the meantime?
|
||||
if resp != nil && resp.StatusCode == http.StatusNotFound {
|
||||
log.Printf("runner dissapeared from github")
|
||||
r.log("runner dissapeared from github")
|
||||
} else {
|
||||
return errors.Wrap(err, "removing runner from github")
|
||||
}
|
||||
}
|
||||
// Remove the database entry for the runner.
|
||||
log.Printf("Removing %s from database", dbInstance.Name)
|
||||
r.log("Removing %s from database", dbInstance.Name)
|
||||
if err := r.store.DeleteInstance(ctx, dbInstance.PoolID, dbInstance.Name); err != nil {
|
||||
return errors.Wrap(err, "removing runner from database")
|
||||
}
|
||||
defer r.keyMux.UnlockAndDelete(dbInstance.Name)
|
||||
deleteMux = true
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
@ -533,10 +517,10 @@ func (r *basePoolManager) cleanupOrphanedGithubRunners(runners []*github.Runner)
|
|||
// instance is running, but github reports runner as offline. Log the event.
|
||||
// This scenario may require manual intervention.
|
||||
// Perhaps it just came online and github did not yet change it's status?
|
||||
log.Printf("instance %s is online but github reports runner as offline", dbInstance.Name)
|
||||
r.log("instance %s is online but github reports runner as offline", dbInstance.Name)
|
||||
return nil
|
||||
} else {
|
||||
log.Printf("instance %s was found in stopped state; starting", dbInstance.Name)
|
||||
r.log("instance %s was found in stopped state; starting", dbInstance.Name)
|
||||
//start the instance
|
||||
if err := provider.Start(r.ctx, dbInstance.ProviderID); err != nil {
|
||||
return errors.Wrapf(err, "starting instance %s", dbInstance.ProviderID)
|
||||
|
|
@ -629,15 +613,15 @@ func (r *basePoolManager) acquireNewInstance(job params.WorkflowJob) error {
|
|||
pool, err := r.helper.FindPoolByTags(requestedLabels)
|
||||
if err != nil {
|
||||
if errors.Is(err, runnerErrors.ErrNotFound) {
|
||||
log.Printf("failed to find an enabled pool with required labels: %s", strings.Join(requestedLabels, ", "))
|
||||
r.log("failed to find an enabled pool with required labels: %s", strings.Join(requestedLabels, ", "))
|
||||
return nil
|
||||
}
|
||||
return errors.Wrap(err, "fetching suitable pool")
|
||||
}
|
||||
log.Printf("adding new runner with requested tags %s in pool %s", util.SanitizeLogEntry(strings.Join(job.WorkflowJob.Labels, ", ")), util.SanitizeLogEntry(pool.ID))
|
||||
r.log("adding new runner with requested tags %s in pool %s", util.SanitizeLogEntry(strings.Join(job.WorkflowJob.Labels, ", ")), util.SanitizeLogEntry(pool.ID))
|
||||
|
||||
if !pool.Enabled {
|
||||
log.Printf("selected pool (%s) is disabled", pool.ID)
|
||||
r.log("selected pool (%s) is disabled", pool.ID)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
@ -647,7 +631,7 @@ func (r *basePoolManager) acquireNewInstance(job params.WorkflowJob) error {
|
|||
}
|
||||
|
||||
if poolInstances >= int64(pool.MaxRunners) {
|
||||
log.Printf("max_runners (%d) reached for pool %s, skipping...", pool.MaxRunners, pool.ID)
|
||||
r.log("max_runners (%d) reached for pool %s, skipping...", pool.MaxRunners, pool.ID)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
@ -667,12 +651,12 @@ func (r *basePoolManager) acquireNewInstance(job params.WorkflowJob) error {
|
|||
// Skip creating a new runner if we have at least one idle runner and the minimum is already satisfied.
|
||||
// This should work even for pools that define a MinIdleRunner of 0.
|
||||
if int64(idleWorkers) > 0 && int64(idleWorkers) >= int64(pool.MinIdleRunners) {
|
||||
log.Printf("we have enough min_idle_runners (%d) for pool %s, skipping...", pool.MinIdleRunners, pool.ID)
|
||||
r.log("we have enough min_idle_runners (%d) for pool %s, skipping...", pool.MinIdleRunners, pool.ID)
|
||||
return nil
|
||||
}
|
||||
|
||||
if err := r.AddRunner(r.ctx, pool.ID); err != nil {
|
||||
log.Printf("failed to add runner to pool %s", pool.ID)
|
||||
r.log("failed to add runner to pool %s", pool.ID)
|
||||
return errors.Wrap(err, "adding runner")
|
||||
}
|
||||
return nil
|
||||
|
|
@ -716,7 +700,7 @@ func (r *basePoolManager) Status() params.PoolManagerStatus {
|
|||
}
|
||||
|
||||
func (r *basePoolManager) waitForTimeoutOrCanceled(timeout time.Duration) {
|
||||
log.Printf("sleeping for %.2f minutes", timeout.Minutes())
|
||||
r.log("sleeping for %.2f minutes", timeout.Minutes())
|
||||
select {
|
||||
case <-time.After(timeout):
|
||||
case <-r.ctx.Done():
|
||||
|
|
@ -781,7 +765,7 @@ func (r *basePoolManager) addInstanceToProvider(instance params.Instance) error
|
|||
if instanceIDToDelete != "" {
|
||||
if err := provider.DeleteInstance(r.ctx, instanceIDToDelete); err != nil {
|
||||
if !errors.Is(err, runnerErrors.ErrNotFound) {
|
||||
log.Printf("failed to cleanup instance: %s", instanceIDToDelete)
|
||||
r.log("failed to cleanup instance: %s", instanceIDToDelete)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -821,7 +805,7 @@ func (r *basePoolManager) getRunnerDetailsFromJob(job params.WorkflowJob) (param
|
|||
}
|
||||
// Runner name was not set in WorkflowJob by github. We can still attempt to
|
||||
// fetch the info we need, using the workflow run ID, from the API.
|
||||
log.Printf("runner name not found in workflow job, attempting to fetch from API")
|
||||
r.log("runner name not found in workflow job, attempting to fetch from API")
|
||||
runnerInfo, err = r.helper.GetRunnerInfoFromWorkflow(job)
|
||||
if err != nil {
|
||||
return params.RunnerInfo{}, errors.Wrap(err, "fetching runner name from API")
|
||||
|
|
@ -830,12 +814,12 @@ func (r *basePoolManager) getRunnerDetailsFromJob(job params.WorkflowJob) (param
|
|||
|
||||
runnerDetails, err := r.store.GetInstanceByName(context.Background(), runnerInfo.Name)
|
||||
if err != nil {
|
||||
log.Printf("could not find runner details for %s", util.SanitizeLogEntry(runnerInfo.Name))
|
||||
r.log("could not find runner details for %s", util.SanitizeLogEntry(runnerInfo.Name))
|
||||
return params.RunnerInfo{}, errors.Wrap(err, "fetching runner details")
|
||||
}
|
||||
|
||||
if _, err := r.helper.GetPoolByID(runnerDetails.PoolID); err != nil {
|
||||
log.Printf("runner %s (pool ID: %s) does not belong to any pool we manage: %s", runnerDetails.Name, runnerDetails.PoolID, err)
|
||||
r.log("runner %s (pool ID: %s) does not belong to any pool we manage: %s", runnerDetails.Name, runnerDetails.PoolID, err)
|
||||
return params.RunnerInfo{}, errors.Wrap(err, "fetching pool for instance")
|
||||
}
|
||||
return runnerInfo, nil
|
||||
|
|
@ -907,13 +891,13 @@ func (r *basePoolManager) scaleDownOnePool(ctx context.Context, pool params.Pool
|
|||
|
||||
lockAcquired := r.keyMux.TryLock(instanceToDelete.Name)
|
||||
if !lockAcquired {
|
||||
log.Printf("failed to acquire lock for instance %s", instanceToDelete.Name)
|
||||
r.log("failed to acquire lock for instance %s", instanceToDelete.Name)
|
||||
continue
|
||||
}
|
||||
defer r.keyMux.Unlock(instanceToDelete.Name)
|
||||
defer r.keyMux.Unlock(instanceToDelete.Name, false)
|
||||
|
||||
g.Go(func() error {
|
||||
log.Printf("scaling down idle worker %s from pool %s\n", instanceToDelete.Name, pool.ID)
|
||||
r.log("scaling down idle worker %s from pool %s\n", instanceToDelete.Name, pool.ID)
|
||||
if err := r.ForceDeleteRunner(instanceToDelete); err != nil {
|
||||
return fmt.Errorf("failed to delete instance %s: %w", instanceToDelete.ID, err)
|
||||
}
|
||||
|
|
@ -937,7 +921,7 @@ func (r *basePoolManager) ensureIdleRunnersForOnePool(pool params.Pool) error {
|
|||
}
|
||||
|
||||
if uint(len(existingInstances)) >= pool.MaxRunners {
|
||||
log.Printf("max workers (%d) reached for pool %s, skipping idle worker creation", pool.MaxRunners, pool.ID)
|
||||
r.log("max workers (%d) reached for pool %s, skipping idle worker creation", pool.MaxRunners, pool.ID)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
@ -962,7 +946,7 @@ func (r *basePoolManager) ensureIdleRunnersForOnePool(pool params.Pool) error {
|
|||
}
|
||||
|
||||
for i := 0; i < required; i++ {
|
||||
log.Printf("adding new idle worker to pool %s", pool.ID)
|
||||
r.log("adding new idle worker to pool %s", pool.ID)
|
||||
if err := r.AddRunner(r.ctx, pool.ID); err != nil {
|
||||
return fmt.Errorf("failed to add new instance for pool %s: %w", pool.ID, err)
|
||||
}
|
||||
|
|
@ -974,6 +958,7 @@ func (r *basePoolManager) retryFailedInstancesForOnePool(ctx context.Context, po
|
|||
if !pool.Enabled {
|
||||
return nil
|
||||
}
|
||||
r.log("running retry failed instances for pool %s", pool.ID)
|
||||
|
||||
existingInstances, err := r.store.ListPoolInstances(r.ctx, pool.ID)
|
||||
if err != nil {
|
||||
|
|
@ -989,20 +974,21 @@ func (r *basePoolManager) retryFailedInstancesForOnePool(ctx context.Context, po
|
|||
continue
|
||||
}
|
||||
|
||||
r.log("attempting to retry failed instance %s", instance.Name)
|
||||
lockAcquired := r.keyMux.TryLock(instance.Name)
|
||||
if !lockAcquired {
|
||||
log.Printf("failed to acquire lock for instance %s", instance.Name)
|
||||
r.log("failed to acquire lock for instance %s", instance.Name)
|
||||
continue
|
||||
}
|
||||
|
||||
instance := instance
|
||||
g.Go(func() error {
|
||||
defer r.keyMux.Unlock(instance.Name)
|
||||
defer r.keyMux.Unlock(instance.Name, false)
|
||||
// NOTE(gabriel-samfira): this is done in parallel. If there are many failed instances
|
||||
// this has the potential to create many API requests to the target provider.
|
||||
// TODO(gabriel-samfira): implement request throttling.
|
||||
if err := r.deleteInstanceFromProvider(errCtx, instance); err != nil {
|
||||
log.Printf("failed to delete instance %s from provider: %s", instance.Name, err)
|
||||
r.log("failed to delete instance %s from provider: %s", instance.Name, err)
|
||||
// Bail here, otherwise we risk creating multiple failing instances, and losing track
|
||||
// of them. If Create instance failed to return a proper provider ID, we rely on the
|
||||
// name to delete the instance. If we don't bail here, and end up with multiple
|
||||
|
|
@ -1022,10 +1008,10 @@ func (r *basePoolManager) retryFailedInstancesForOnePool(ctx context.Context, po
|
|||
TokenFetched: &tokenFetched,
|
||||
Status: providerCommon.InstancePendingCreate,
|
||||
}
|
||||
log.Printf("queueing previously failed instance %s for retry", instance.Name)
|
||||
r.log("queueing previously failed instance %s for retry", instance.Name)
|
||||
// Set instance to pending create and wait for retry.
|
||||
if err := r.updateInstance(instance.Name, updateParams); err != nil {
|
||||
log.Printf("failed to update runner %s status", instance.Name)
|
||||
r.log("failed to update runner %s status: %s", instance.Name, err)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
|
@ -1068,6 +1054,7 @@ func (r *basePoolManager) scaleDown() error {
|
|||
for _, pool := range pools {
|
||||
pool := pool
|
||||
g.Go(func() error {
|
||||
r.log("running scale down for pool %s", pool.ID)
|
||||
return r.scaleDownOnePool(ctx, pool)
|
||||
})
|
||||
}
|
||||
|
|
@ -1128,47 +1115,55 @@ func (r *basePoolManager) deletePendingInstances() error {
|
|||
return fmt.Errorf("failed to fetch instances from store: %w", err)
|
||||
}
|
||||
|
||||
r.log("removing instances in pending_delete")
|
||||
for _, instance := range instances {
|
||||
if instance.Status != providerCommon.InstancePendingDelete {
|
||||
// not in pending_delete status. Skip.
|
||||
continue
|
||||
}
|
||||
|
||||
r.log("removing instance %s in pool %s", instance.Name, instance.PoolID)
|
||||
lockAcquired := r.keyMux.TryLock(instance.Name)
|
||||
if !lockAcquired {
|
||||
log.Printf("failed to acquire lock for instance %s", instance.Name)
|
||||
r.log("failed to acquire lock for instance %s", instance.Name)
|
||||
continue
|
||||
}
|
||||
|
||||
// Set the status to deleting before launching the goroutine that removes
|
||||
// the runner from the provider (which can take a long time).
|
||||
if err := r.setInstanceStatus(instance.Name, providerCommon.InstanceDeleting, nil); err != nil {
|
||||
log.Printf("failed to update runner %s status", instance.Name)
|
||||
r.keyMux.Unlock(instance.Name)
|
||||
r.log("failed to update runner %s status: %q", instance.Name, err)
|
||||
r.keyMux.Unlock(instance.Name, false)
|
||||
continue
|
||||
}
|
||||
|
||||
go func(instance params.Instance) (err error) {
|
||||
defer r.keyMux.Unlock(instance.Name)
|
||||
deleteMux := false
|
||||
defer func() {
|
||||
r.keyMux.Unlock(instance.Name, deleteMux)
|
||||
}()
|
||||
defer func(instance params.Instance) {
|
||||
if err != nil {
|
||||
r.log("failed to remove instance %s: %s", instance.Name, err)
|
||||
// failed to remove from provider. Set the status back to pending_delete, which
|
||||
// will retry the operation.
|
||||
if err := r.setInstanceStatus(instance.Name, providerCommon.InstancePendingDelete, nil); err != nil {
|
||||
log.Printf("failed to update runner %s status", instance.Name)
|
||||
r.log("failed to update runner %s status: %s", instance.Name, err)
|
||||
}
|
||||
}
|
||||
}(instance)
|
||||
|
||||
r.log("removing instance %s from provider", instance.Name)
|
||||
err = r.deleteInstanceFromProvider(r.ctx, instance)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to remove instance from provider: %w", err)
|
||||
}
|
||||
|
||||
r.log("removing instance %s from database", instance.Name)
|
||||
if deleteErr := r.store.DeleteInstance(r.ctx, instance.PoolID, instance.Name); deleteErr != nil {
|
||||
return fmt.Errorf("failed to delete instance from database: %w", deleteErr)
|
||||
}
|
||||
r.keyMux.UnlockAndDelete(instance.Name)
|
||||
deleteMux = true
|
||||
r.log("instance %s was successfully removed", instance.Name)
|
||||
return nil
|
||||
}(instance) //nolint
|
||||
}
|
||||
|
|
@ -1188,32 +1183,33 @@ func (r *basePoolManager) addPendingInstances() error {
|
|||
continue
|
||||
}
|
||||
|
||||
r.log("attempting to acquire lock for instance %s (create)", instance.Name)
|
||||
lockAcquired := r.keyMux.TryLock(instance.Name)
|
||||
if !lockAcquired {
|
||||
log.Printf("failed to acquire lock for instance %s", instance.Name)
|
||||
r.log("failed to acquire lock for instance %s", instance.Name)
|
||||
continue
|
||||
}
|
||||
|
||||
// Set the instance to "creating" before launching the goroutine. This will ensure that addPendingInstances()
|
||||
// won't attempt to create the runner a second time.
|
||||
if err := r.setInstanceStatus(instance.Name, providerCommon.InstanceCreating, nil); err != nil {
|
||||
log.Printf("failed to update runner %s status: %s", instance.Name, err)
|
||||
r.keyMux.Unlock(instance.Name)
|
||||
r.log("failed to update runner %s status: %s", instance.Name, err)
|
||||
r.keyMux.Unlock(instance.Name, false)
|
||||
// We failed to transition the instance to Creating. This means that garm will retry to create this instance
|
||||
// when the loop runs again and we end up with multiple instances.
|
||||
continue
|
||||
}
|
||||
|
||||
go func(instance params.Instance) {
|
||||
defer r.keyMux.Unlock(instance.Name)
|
||||
log.Printf("creating instance %s in pool %s", instance.Name, instance.PoolID)
|
||||
defer r.keyMux.Unlock(instance.Name, false)
|
||||
r.log("creating instance %s in pool %s", instance.Name, instance.PoolID)
|
||||
if err := r.addInstanceToProvider(instance); err != nil {
|
||||
log.Printf("failed to add instance to provider: %s", err)
|
||||
r.log("failed to add instance to provider: %s", err)
|
||||
errAsBytes := []byte(err.Error())
|
||||
if err := r.setInstanceStatus(instance.Name, providerCommon.InstanceError, errAsBytes); err != nil {
|
||||
log.Printf("failed to update runner %s status: %s", instance.Name, err)
|
||||
r.log("failed to update runner %s status: %s", instance.Name, err)
|
||||
}
|
||||
log.Printf("failed to create instance in provider: %s", err)
|
||||
r.log("failed to create instance in provider: %s", err)
|
||||
}
|
||||
}(instance)
|
||||
}
|
||||
|
|
@ -1236,6 +1232,7 @@ func (r *basePoolManager) Wait() error {
|
|||
}
|
||||
|
||||
func (r *basePoolManager) runnerCleanup() (err error) {
|
||||
r.log("running runner cleanup")
|
||||
runners, err := r.helper.GetGithubRunners()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to fetch github runners: %w", err)
|
||||
|
|
@ -1269,16 +1266,15 @@ func (r *basePoolManager) cleanupOrphanedRunners() error {
|
|||
}
|
||||
|
||||
func (r *basePoolManager) Start() error {
|
||||
r.checkCanAuthenticateToGithub() //nolint
|
||||
r.updateTools() //nolint
|
||||
|
||||
go r.startLoopForFunction(r.runnerCleanup, common.PoolReapTimeoutInterval, "timeout_reaper")
|
||||
go r.startLoopForFunction(r.scaleDown, common.PoolScaleDownInterval, "scale_down")
|
||||
go r.startLoopForFunction(r.deletePendingInstances, common.PoolConsilitationInterval, "consolidate[delete_pending]")
|
||||
go r.startLoopForFunction(r.addPendingInstances, common.PoolConsilitationInterval, "consolidate[add_pending]")
|
||||
go r.startLoopForFunction(r.ensureMinIdleRunners, common.PoolConsilitationInterval, "consolidate[ensure_min_idle]")
|
||||
go r.startLoopForFunction(r.retryFailedInstances, common.PoolConsilitationInterval, "consolidate[retry_failed]")
|
||||
go r.startLoopForFunction(r.updateTools, common.PoolToolUpdateInterval, "update_tools")
|
||||
go r.startLoopForFunction(r.checkCanAuthenticateToGithub, common.UnauthorizedBackoffTimer, "bad_auth_backoff")
|
||||
go r.startLoopForFunction(r.runnerCleanup, common.PoolReapTimeoutInterval, "timeout_reaper", false)
|
||||
go r.startLoopForFunction(r.scaleDown, common.PoolScaleDownInterval, "scale_down", false)
|
||||
go r.startLoopForFunction(r.deletePendingInstances, common.PoolConsilitationInterval, "consolidate[delete_pending]", false)
|
||||
go r.startLoopForFunction(r.addPendingInstances, common.PoolConsilitationInterval, "consolidate[add_pending]", false)
|
||||
go r.startLoopForFunction(r.ensureMinIdleRunners, common.PoolConsilitationInterval, "consolidate[ensure_min_idle]", false)
|
||||
go r.startLoopForFunction(r.retryFailedInstances, common.PoolConsilitationInterval, "consolidate[retry_failed]", false)
|
||||
go r.startLoopForFunction(r.updateTools, common.PoolToolUpdateInterval, "update_tools", true)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
@ -1316,7 +1312,7 @@ func (r *basePoolManager) ForceDeleteRunner(runner params.Instance) error {
|
|||
return errors.Wrapf(runnerErrors.ErrBadRequest, "removing runner: %q", err)
|
||||
case http.StatusNotFound:
|
||||
// Runner may have been deleted by a finished job, or manually by the user.
|
||||
log.Printf("runner with agent id %d was not found in github", runner.AgentID)
|
||||
r.log("runner with agent id %d was not found in github", runner.AgentID)
|
||||
case http.StatusUnauthorized:
|
||||
// Mark the pool as offline from this point forward
|
||||
failureReason := fmt.Sprintf("failed to remove runner: %q", err)
|
||||
|
|
@ -1333,10 +1329,10 @@ func (r *basePoolManager) ForceDeleteRunner(runner params.Instance) error {
|
|||
}
|
||||
}
|
||||
}
|
||||
log.Printf("setting instance status for: %v", runner.Name)
|
||||
r.log("setting instance status for: %v", runner.Name)
|
||||
|
||||
if err := r.setInstanceStatus(runner.Name, providerCommon.InstancePendingDelete, nil); err != nil {
|
||||
log.Printf("failed to update runner %s status", runner.Name)
|
||||
r.log("failed to update runner %s status: %s", runner.Name, err)
|
||||
return errors.Wrap(err, "updating runner")
|
||||
}
|
||||
return nil
|
||||
|
|
|
|||
11
runner/pool/util.go
Normal file
11
runner/pool/util.go
Normal file
|
|
@ -0,0 +1,11 @@
|
|||
package pool
|
||||
|
||||
import "log"
|
||||
|
||||
func (r *basePoolManager) log(msg string, args ...interface{}) {
|
||||
msgArgs := []interface{}{
|
||||
r.helper.String(),
|
||||
}
|
||||
msgArgs = append(msgArgs, args...)
|
||||
log.Printf("[Pool mgr %s] "+msg, msgArgs...)
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue