diff --git a/runner/pool/pool.go b/runner/pool/pool.go index 4a677fb1..2097af7a 100644 --- a/runner/pool/pool.go +++ b/runner/pool/pool.go @@ -59,12 +59,15 @@ func (k *keyMutex) TryLock(key string) bool { return keyMux.TryLock() } -func (k *keyMutex) Unlock(key string) { +func (k *keyMutex) Unlock(key string, remove bool) { mux, ok := k.muxes.Load(key) if !ok { return } keyMux := mux.(*sync.Mutex) + if remove { + k.Delete(key) + } keyMux.Unlock() } @@ -72,11 +75,6 @@ func (k *keyMutex) Delete(key string) { k.muxes.Delete(key) } -func (k *keyMutex) UnlockAndDelete(key string) { - k.Unlock(key) - k.Delete(key) -} - type basePoolManager struct { ctx context.Context controllerID string @@ -323,7 +321,7 @@ func (r *basePoolManager) cleanupOrphanedProviderRunners(runners []*github.Runne log.Printf("failed to acquire lock for instance %s", instance.Name) continue } - defer r.keyMux.Unlock(instance.Name) + defer r.keyMux.Unlock(instance.Name, false) switch providerCommon.InstanceStatus(instance.Status) { case providerCommon.InstancePendingCreate, @@ -382,7 +380,7 @@ func (r *basePoolManager) reapTimedOutRunners(runners []*github.Runner) error { log.Printf("failed to acquire lock for instance %s", instance.Name) continue } - defer r.keyMux.Unlock(instance.Name) + defer r.keyMux.Unlock(instance.Name, false) pool, err := r.store.GetPoolByID(r.ctx, instance.PoolID) if err != nil { @@ -501,11 +499,14 @@ func (r *basePoolManager) cleanupOrphanedGithubRunners(runners []*github.Runner) log.Printf("failed to acquire lock for instance %s", dbInstance.Name) continue } - defer r.keyMux.Unlock(dbInstance.Name) // See: https://golang.org/doc/faq#closures_and_goroutines runner := runner g.Go(func() error { + deleteMux := false + defer func() { + r.keyMux.Unlock(dbInstance.Name, deleteMux) + }() providerInstance, ok := instanceInList(dbInstance.Name, poolInstances) if !ok { // The runner instance is no longer on the provider, and it appears offline in github. @@ -525,7 +526,7 @@ func (r *basePoolManager) cleanupOrphanedGithubRunners(runners []*github.Runner) if err := r.store.DeleteInstance(ctx, dbInstance.PoolID, dbInstance.Name); err != nil { return errors.Wrap(err, "removing runner from database") } - defer r.keyMux.UnlockAndDelete(dbInstance.Name) + deleteMux = true return nil } @@ -910,7 +911,7 @@ func (r *basePoolManager) scaleDownOnePool(ctx context.Context, pool params.Pool log.Printf("failed to acquire lock for instance %s", instanceToDelete.Name) continue } - defer r.keyMux.Unlock(instanceToDelete.Name) + defer r.keyMux.Unlock(instanceToDelete.Name, false) g.Go(func() error { log.Printf("scaling down idle worker %s from pool %s\n", instanceToDelete.Name, pool.ID) @@ -997,7 +998,7 @@ func (r *basePoolManager) retryFailedInstancesForOnePool(ctx context.Context, po instance := instance g.Go(func() error { - defer r.keyMux.Unlock(instance.Name) + defer r.keyMux.Unlock(instance.Name, false) // NOTE(gabriel-samfira): this is done in parallel. If there are many failed instances // this has the potential to create many API requests to the target provider. // TODO(gabriel-samfira): implement request throttling. @@ -1143,13 +1144,16 @@ func (r *basePoolManager) deletePendingInstances() error { // Set the status to deleting before launching the goroutine that removes // the runner from the provider (which can take a long time). if err := r.setInstanceStatus(instance.Name, providerCommon.InstanceDeleting, nil); err != nil { - log.Printf("failed to update runner %s status", instance.Name) - r.keyMux.Unlock(instance.Name) + log.Printf("failed to update runner %s status: %s", instance.Name, err) + r.keyMux.Unlock(instance.Name, false) continue } go func(instance params.Instance) (err error) { - defer r.keyMux.Unlock(instance.Name) + deleteMux := false + defer func() { + r.keyMux.Unlock(instance.Name, deleteMux) + }() defer func(instance params.Instance) { if err != nil { // failed to remove from provider. Set the status back to pending_delete, which @@ -1168,7 +1172,7 @@ func (r *basePoolManager) deletePendingInstances() error { if deleteErr := r.store.DeleteInstance(r.ctx, instance.PoolID, instance.Name); deleteErr != nil { return fmt.Errorf("failed to delete instance from database: %w", deleteErr) } - r.keyMux.UnlockAndDelete(instance.Name) + deleteMux = true return nil }(instance) //nolint } @@ -1198,14 +1202,14 @@ func (r *basePoolManager) addPendingInstances() error { // won't attempt to create the runner a second time. if err := r.setInstanceStatus(instance.Name, providerCommon.InstanceCreating, nil); err != nil { log.Printf("failed to update runner %s status: %s", instance.Name, err) - r.keyMux.Unlock(instance.Name) + r.keyMux.Unlock(instance.Name, false) // We failed to transition the instance to Creating. This means that garm will retry to create this instance // when the loop runs again and we end up with multiple instances. continue } go func(instance params.Instance) { - defer r.keyMux.Unlock(instance.Name) + defer r.keyMux.Unlock(instance.Name, false) log.Printf("creating instance %s in pool %s", instance.Name, instance.PoolID) if err := r.addInstanceToProvider(instance); err != nil { log.Printf("failed to add instance to provider: %s", err)