Merge Unlock() and UnlockAndDelete()

Signed-off-by: Gabriel Adrian Samfira <gsamfira@cloudbasesolutions.com>
This commit is contained in:
Gabriel Adrian Samfira 2023-06-30 08:48:29 +03:00
parent aa44bf4f98
commit 7358beb2b9

View file

@ -59,12 +59,15 @@ func (k *keyMutex) TryLock(key string) bool {
return keyMux.TryLock()
}
func (k *keyMutex) Unlock(key string) {
func (k *keyMutex) Unlock(key string, remove bool) {
mux, ok := k.muxes.Load(key)
if !ok {
return
}
keyMux := mux.(*sync.Mutex)
if remove {
k.Delete(key)
}
keyMux.Unlock()
}
@ -72,11 +75,6 @@ func (k *keyMutex) Delete(key string) {
k.muxes.Delete(key)
}
func (k *keyMutex) UnlockAndDelete(key string) {
k.Unlock(key)
k.Delete(key)
}
type basePoolManager struct {
ctx context.Context
controllerID string
@ -323,7 +321,7 @@ func (r *basePoolManager) cleanupOrphanedProviderRunners(runners []*github.Runne
log.Printf("failed to acquire lock for instance %s", instance.Name)
continue
}
defer r.keyMux.Unlock(instance.Name)
defer r.keyMux.Unlock(instance.Name, false)
switch providerCommon.InstanceStatus(instance.Status) {
case providerCommon.InstancePendingCreate,
@ -382,7 +380,7 @@ func (r *basePoolManager) reapTimedOutRunners(runners []*github.Runner) error {
log.Printf("failed to acquire lock for instance %s", instance.Name)
continue
}
defer r.keyMux.Unlock(instance.Name)
defer r.keyMux.Unlock(instance.Name, false)
pool, err := r.store.GetPoolByID(r.ctx, instance.PoolID)
if err != nil {
@ -501,11 +499,14 @@ func (r *basePoolManager) cleanupOrphanedGithubRunners(runners []*github.Runner)
log.Printf("failed to acquire lock for instance %s", dbInstance.Name)
continue
}
defer r.keyMux.Unlock(dbInstance.Name)
// See: https://golang.org/doc/faq#closures_and_goroutines
runner := runner
g.Go(func() error {
deleteMux := false
defer func() {
r.keyMux.Unlock(dbInstance.Name, deleteMux)
}()
providerInstance, ok := instanceInList(dbInstance.Name, poolInstances)
if !ok {
// The runner instance is no longer on the provider, and it appears offline in github.
@ -525,7 +526,7 @@ func (r *basePoolManager) cleanupOrphanedGithubRunners(runners []*github.Runner)
if err := r.store.DeleteInstance(ctx, dbInstance.PoolID, dbInstance.Name); err != nil {
return errors.Wrap(err, "removing runner from database")
}
defer r.keyMux.UnlockAndDelete(dbInstance.Name)
deleteMux = true
return nil
}
@ -910,7 +911,7 @@ func (r *basePoolManager) scaleDownOnePool(ctx context.Context, pool params.Pool
log.Printf("failed to acquire lock for instance %s", instanceToDelete.Name)
continue
}
defer r.keyMux.Unlock(instanceToDelete.Name)
defer r.keyMux.Unlock(instanceToDelete.Name, false)
g.Go(func() error {
log.Printf("scaling down idle worker %s from pool %s\n", instanceToDelete.Name, pool.ID)
@ -997,7 +998,7 @@ func (r *basePoolManager) retryFailedInstancesForOnePool(ctx context.Context, po
instance := instance
g.Go(func() error {
defer r.keyMux.Unlock(instance.Name)
defer r.keyMux.Unlock(instance.Name, false)
// NOTE(gabriel-samfira): this is done in parallel. If there are many failed instances
// this has the potential to create many API requests to the target provider.
// TODO(gabriel-samfira): implement request throttling.
@ -1143,13 +1144,16 @@ func (r *basePoolManager) deletePendingInstances() error {
// Set the status to deleting before launching the goroutine that removes
// the runner from the provider (which can take a long time).
if err := r.setInstanceStatus(instance.Name, providerCommon.InstanceDeleting, nil); err != nil {
log.Printf("failed to update runner %s status", instance.Name)
r.keyMux.Unlock(instance.Name)
log.Printf("failed to update runner %s status: %s", instance.Name, err)
r.keyMux.Unlock(instance.Name, false)
continue
}
go func(instance params.Instance) (err error) {
defer r.keyMux.Unlock(instance.Name)
deleteMux := false
defer func() {
r.keyMux.Unlock(instance.Name, deleteMux)
}()
defer func(instance params.Instance) {
if err != nil {
// failed to remove from provider. Set the status back to pending_delete, which
@ -1168,7 +1172,7 @@ func (r *basePoolManager) deletePendingInstances() error {
if deleteErr := r.store.DeleteInstance(r.ctx, instance.PoolID, instance.Name); deleteErr != nil {
return fmt.Errorf("failed to delete instance from database: %w", deleteErr)
}
r.keyMux.UnlockAndDelete(instance.Name)
deleteMux = true
return nil
}(instance) //nolint
}
@ -1198,14 +1202,14 @@ func (r *basePoolManager) addPendingInstances() error {
// won't attempt to create the runner a second time.
if err := r.setInstanceStatus(instance.Name, providerCommon.InstanceCreating, nil); err != nil {
log.Printf("failed to update runner %s status: %s", instance.Name, err)
r.keyMux.Unlock(instance.Name)
r.keyMux.Unlock(instance.Name, false)
// We failed to transition the instance to Creating. This means that garm will retry to create this instance
// when the loop runs again and we end up with multiple instances.
continue
}
go func(instance params.Instance) {
defer r.keyMux.Unlock(instance.Name)
defer r.keyMux.Unlock(instance.Name, false)
log.Printf("creating instance %s in pool %s", instance.Name, instance.PoolID)
if err := r.addInstanceToProvider(instance); err != nil {
log.Printf("failed to add instance to provider: %s", err)