From 1edb9247a8080f7bd111b2a66cde1d46f1623240 Mon Sep 17 00:00:00 2001 From: Gabriel Adrian Samfira Date: Fri, 23 Jun 2023 10:18:56 +0000 Subject: [PATCH] Add per instance mux Lock operations per instance name. This should avoid go routines trying to update the same instance when operations may be slow. Signed-off-by: Gabriel Adrian Samfira --- runner/pool/enterprise.go | 2 + runner/pool/organization.go | 2 + runner/pool/pool.go | 123 +++++++++++++++++++++++++++++------- runner/pool/repository.go | 2 + runner/runner.go | 4 -- 5 files changed, 105 insertions(+), 28 deletions(-) diff --git a/runner/pool/enterprise.go b/runner/pool/enterprise.go index 91043c67..ab9ed7db 100644 --- a/runner/pool/enterprise.go +++ b/runner/pool/enterprise.go @@ -27,6 +27,7 @@ func NewEnterprisePoolManager(ctx context.Context, cfg params.Enterprise, cfgInt } wg := &sync.WaitGroup{} + keyMuxes := &keyMutex{} helper := &enterprise{ cfg: cfg, @@ -47,6 +48,7 @@ func NewEnterprisePoolManager(ctx context.Context, cfg params.Enterprise, cfgInt helper: helper, credsDetails: cfgInternal.GithubCredentialsDetails, wg: wg, + keyMux: keyMuxes, } return repo, nil } diff --git a/runner/pool/organization.go b/runner/pool/organization.go index 9f64f2e9..5caa83c4 100644 --- a/runner/pool/organization.go +++ b/runner/pool/organization.go @@ -41,6 +41,7 @@ func NewOrganizationPoolManager(ctx context.Context, cfg params.Organization, cf } wg := &sync.WaitGroup{} + keyMuxes := &keyMutex{} helper := &organization{ cfg: cfg, @@ -60,6 +61,7 @@ func NewOrganizationPoolManager(ctx context.Context, cfg params.Organization, cf helper: helper, credsDetails: cfgInternal.GithubCredentialsDetails, wg: wg, + keyMux: keyMuxes, } return repo, nil } diff --git a/runner/pool/pool.go b/runner/pool/pool.go index 00be158b..4a677fb1 100644 --- a/runner/pool/pool.go +++ b/runner/pool/pool.go @@ -49,6 +49,34 @@ const ( maxCreateAttempts = 5 ) +type keyMutex struct { + muxes sync.Map +} + +func (k *keyMutex) TryLock(key string) bool { + mux, _ := k.muxes.LoadOrStore(key, &sync.Mutex{}) + keyMux := mux.(*sync.Mutex) + return keyMux.TryLock() +} + +func (k *keyMutex) Unlock(key string) { + mux, ok := k.muxes.Load(key) + if !ok { + return + } + keyMux := mux.(*sync.Mutex) + keyMux.Unlock() +} + +func (k *keyMutex) Delete(key string) { + k.muxes.Delete(key) +} + +func (k *keyMutex) UnlockAndDelete(key string) { + k.Unlock(key) + k.Delete(key) +} + type basePoolManager struct { ctx context.Context controllerID string @@ -65,9 +93,9 @@ type basePoolManager struct { managerIsRunning bool managerErrorReason string - mux sync.Mutex - - wg *sync.WaitGroup + mux sync.Mutex + wg *sync.WaitGroup + keyMux *keyMutex } func (r *basePoolManager) HandleWorkflowJob(job params.WorkflowJob) (err error) { @@ -290,6 +318,13 @@ func (r *basePoolManager) cleanupOrphanedProviderRunners(runners []*github.Runne } for _, instance := range dbInstances { + lockAcquired := r.keyMux.TryLock(instance.Name) + if !lockAcquired { + log.Printf("failed to acquire lock for instance %s", instance.Name) + continue + } + defer r.keyMux.Unlock(instance.Name) + switch providerCommon.InstanceStatus(instance.Status) { case providerCommon.InstancePendingCreate, providerCommon.InstancePendingDelete: @@ -342,6 +377,13 @@ func (r *basePoolManager) reapTimedOutRunners(runners []*github.Runner) error { } for _, instance := range dbInstances { + lockAcquired := r.keyMux.TryLock(instance.Name) + if !lockAcquired { + log.Printf("failed to acquire lock for instance %s", instance.Name) + continue + } + defer r.keyMux.Unlock(instance.Name) + pool, err := r.store.GetPoolByID(r.ctx, instance.PoolID) if err != nil { return errors.Wrap(err, "fetching instance pool info") @@ -453,6 +495,14 @@ func (r *basePoolManager) cleanupOrphanedGithubRunners(runners []*github.Runner) } poolInstanceCache[pool.ID] = poolInstances } + + lockAcquired := r.keyMux.TryLock(dbInstance.Name) + if !lockAcquired { + log.Printf("failed to acquire lock for instance %s", dbInstance.Name) + continue + } + defer r.keyMux.Unlock(dbInstance.Name) + // See: https://golang.org/doc/faq#closures_and_goroutines runner := runner g.Go(func() error { @@ -475,6 +525,7 @@ func (r *basePoolManager) cleanupOrphanedGithubRunners(runners []*github.Runner) if err := r.store.DeleteInstance(ctx, dbInstance.PoolID, dbInstance.Name); err != nil { return errors.Wrap(err, "removing runner from database") } + defer r.keyMux.UnlockAndDelete(dbInstance.Name) return nil } @@ -853,6 +904,14 @@ func (r *basePoolManager) scaleDownOnePool(ctx context.Context, pool params.Pool for _, instanceToDelete := range idleWorkers[:numScaleDown] { instanceToDelete := instanceToDelete + + lockAcquired := r.keyMux.TryLock(instanceToDelete.Name) + if !lockAcquired { + log.Printf("failed to acquire lock for instance %s", instanceToDelete.Name) + continue + } + defer r.keyMux.Unlock(instanceToDelete.Name) + g.Go(func() error { log.Printf("scaling down idle worker %s from pool %s\n", instanceToDelete.Name, pool.ID) if err := r.ForceDeleteRunner(instanceToDelete); err != nil { @@ -929,8 +988,16 @@ func (r *basePoolManager) retryFailedInstancesForOnePool(ctx context.Context, po if instance.CreateAttempt >= maxCreateAttempts { continue } + + lockAcquired := r.keyMux.TryLock(instance.Name) + if !lockAcquired { + log.Printf("failed to acquire lock for instance %s", instance.Name) + continue + } + instance := instance g.Go(func() error { + defer r.keyMux.Unlock(instance.Name) // NOTE(gabriel-samfira): this is done in parallel. If there are many failed instances // this has the potential to create many API requests to the target provider. // TODO(gabriel-samfira): implement request throttling. @@ -1060,22 +1127,29 @@ func (r *basePoolManager) deletePendingInstances() error { if err != nil { return fmt.Errorf("failed to fetch instances from store: %w", err) } - g, ctx := errgroup.WithContext(r.ctx) + for _, instance := range instances { if instance.Status != providerCommon.InstancePendingDelete { // not in pending_delete status. Skip. continue } + lockAcquired := r.keyMux.TryLock(instance.Name) + if !lockAcquired { + log.Printf("failed to acquire lock for instance %s", instance.Name) + continue + } + // Set the status to deleting before launching the goroutine that removes // the runner from the provider (which can take a long time). if err := r.setInstanceStatus(instance.Name, providerCommon.InstanceDeleting, nil); err != nil { log.Printf("failed to update runner %s status", instance.Name) + r.keyMux.Unlock(instance.Name) continue } - instance := instance - g.Go(func() (err error) { + go func(instance params.Instance) (err error) { + defer r.keyMux.Unlock(instance.Name) defer func(instance params.Instance) { if err != nil { // failed to remove from provider. Set the status back to pending_delete, which @@ -1086,19 +1160,17 @@ func (r *basePoolManager) deletePendingInstances() error { } }(instance) - err = r.deleteInstanceFromProvider(ctx, instance) + err = r.deleteInstanceFromProvider(r.ctx, instance) if err != nil { return fmt.Errorf("failed to remove instance from provider: %w", err) } - if deleteErr := r.store.DeleteInstance(ctx, instance.PoolID, instance.Name); deleteErr != nil { + if deleteErr := r.store.DeleteInstance(r.ctx, instance.PoolID, instance.Name); deleteErr != nil { return fmt.Errorf("failed to delete instance from database: %w", deleteErr) } + r.keyMux.UnlockAndDelete(instance.Name) return nil - }) - } - if err := r.waitForErrorGroupOrContextCancelled(g); err != nil { - return fmt.Errorf("failed to delete pending instances: %w", err) + }(instance) //nolint } return nil @@ -1110,36 +1182,40 @@ func (r *basePoolManager) addPendingInstances() error { if err != nil { return fmt.Errorf("failed to fetch instances from store: %w", err) } - g, _ := errgroup.WithContext(r.ctx) for _, instance := range instances { if instance.Status != providerCommon.InstancePendingCreate { // not in pending_create status. Skip. continue } + + lockAcquired := r.keyMux.TryLock(instance.Name) + if !lockAcquired { + log.Printf("failed to acquire lock for instance %s", instance.Name) + continue + } + // Set the instance to "creating" before launching the goroutine. This will ensure that addPendingInstances() // won't attempt to create the runner a second time. if err := r.setInstanceStatus(instance.Name, providerCommon.InstanceCreating, nil); err != nil { log.Printf("failed to update runner %s status: %s", instance.Name, err) + r.keyMux.Unlock(instance.Name) // We failed to transition the instance to Creating. This means that garm will retry to create this instance // when the loop runs again and we end up with multiple instances. continue } - instance := instance - g.Go(func() error { + + go func(instance params.Instance) { + defer r.keyMux.Unlock(instance.Name) log.Printf("creating instance %s in pool %s", instance.Name, instance.PoolID) if err := r.addInstanceToProvider(instance); err != nil { log.Printf("failed to add instance to provider: %s", err) errAsBytes := []byte(err.Error()) if err := r.setInstanceStatus(instance.Name, providerCommon.InstanceError, errAsBytes); err != nil { - return fmt.Errorf("failed to update runner %s status: %w", instance.Name, err) + log.Printf("failed to update runner %s status: %s", instance.Name, err) } - return fmt.Errorf("failed to create instance in provider: %w", err) + log.Printf("failed to create instance in provider: %s", err) } - return nil - }) - } - if err := r.waitForErrorGroupOrContextCancelled(g); err != nil { - return fmt.Errorf("failed to add pending instances: %w", err) + }(instance) } return nil @@ -1193,7 +1269,7 @@ func (r *basePoolManager) cleanupOrphanedRunners() error { } func (r *basePoolManager) Start() error { - r.checkCanAuthenticateToGithub() + r.checkCanAuthenticateToGithub() //nolint go r.startLoopForFunction(r.runnerCleanup, common.PoolReapTimeoutInterval, "timeout_reaper") go r.startLoopForFunction(r.scaleDown, common.PoolScaleDownInterval, "scale_down") @@ -1203,7 +1279,6 @@ func (r *basePoolManager) Start() error { go r.startLoopForFunction(r.retryFailedInstances, common.PoolConsilitationInterval, "consolidate[retry_failed]") go r.startLoopForFunction(r.updateTools, common.PoolToolUpdateInterval, "update_tools") go r.startLoopForFunction(r.checkCanAuthenticateToGithub, common.UnauthorizedBackoffTimer, "bad_auth_backoff") - // go r.loop() return nil } diff --git a/runner/pool/repository.go b/runner/pool/repository.go index 306ea566..05ea4200 100644 --- a/runner/pool/repository.go +++ b/runner/pool/repository.go @@ -41,6 +41,7 @@ func NewRepositoryPoolManager(ctx context.Context, cfg params.Repository, cfgInt } wg := &sync.WaitGroup{} + keyMuxes := &keyMutex{} helper := &repository{ cfg: cfg, @@ -60,6 +61,7 @@ func NewRepositoryPoolManager(ctx context.Context, cfg params.Repository, cfgInt helper: helper, credsDetails: cfgInternal.GithubCredentialsDetails, wg: wg, + keyMux: keyMuxes, } return repo, nil } diff --git a/runner/runner.go b/runner/runner.go index 3aa25b62..fa28ae66 100644 --- a/runner/runner.go +++ b/runner/runner.go @@ -636,10 +636,6 @@ func (r *Runner) DispatchWorkflowJob(hookTargetType, signature string, jobData [ return errors.Wrapf(runnerErrors.ErrBadRequest, "invalid job data: %s", err) } - asJs, _ := json.MarshalIndent(job, "", " ") - log.Printf("got workflow job: %s", string(asJs)) - log.Printf("got workflow job for %s", string(jobData)) - var poolManager common.PoolManager var err error