Add per instance mux

Lock operations per instance name. This should avoid go routines trying
to update the same instance when operations may be slow.

Signed-off-by: Gabriel Adrian Samfira <gsamfira@cloudbasesolutions.com>
This commit is contained in:
Gabriel Adrian Samfira 2023-06-23 10:18:56 +00:00
parent a9cf5127a9
commit 1edb9247a8
5 changed files with 105 additions and 28 deletions

View file

@ -27,6 +27,7 @@ func NewEnterprisePoolManager(ctx context.Context, cfg params.Enterprise, cfgInt
}
wg := &sync.WaitGroup{}
keyMuxes := &keyMutex{}
helper := &enterprise{
cfg: cfg,
@ -47,6 +48,7 @@ func NewEnterprisePoolManager(ctx context.Context, cfg params.Enterprise, cfgInt
helper: helper,
credsDetails: cfgInternal.GithubCredentialsDetails,
wg: wg,
keyMux: keyMuxes,
}
return repo, nil
}

View file

@ -41,6 +41,7 @@ func NewOrganizationPoolManager(ctx context.Context, cfg params.Organization, cf
}
wg := &sync.WaitGroup{}
keyMuxes := &keyMutex{}
helper := &organization{
cfg: cfg,
@ -60,6 +61,7 @@ func NewOrganizationPoolManager(ctx context.Context, cfg params.Organization, cf
helper: helper,
credsDetails: cfgInternal.GithubCredentialsDetails,
wg: wg,
keyMux: keyMuxes,
}
return repo, nil
}

View file

@ -49,6 +49,34 @@ const (
maxCreateAttempts = 5
)
type keyMutex struct {
muxes sync.Map
}
func (k *keyMutex) TryLock(key string) bool {
mux, _ := k.muxes.LoadOrStore(key, &sync.Mutex{})
keyMux := mux.(*sync.Mutex)
return keyMux.TryLock()
}
func (k *keyMutex) Unlock(key string) {
mux, ok := k.muxes.Load(key)
if !ok {
return
}
keyMux := mux.(*sync.Mutex)
keyMux.Unlock()
}
func (k *keyMutex) Delete(key string) {
k.muxes.Delete(key)
}
func (k *keyMutex) UnlockAndDelete(key string) {
k.Unlock(key)
k.Delete(key)
}
type basePoolManager struct {
ctx context.Context
controllerID string
@ -65,9 +93,9 @@ type basePoolManager struct {
managerIsRunning bool
managerErrorReason string
mux sync.Mutex
wg *sync.WaitGroup
mux sync.Mutex
wg *sync.WaitGroup
keyMux *keyMutex
}
func (r *basePoolManager) HandleWorkflowJob(job params.WorkflowJob) (err error) {
@ -290,6 +318,13 @@ func (r *basePoolManager) cleanupOrphanedProviderRunners(runners []*github.Runne
}
for _, instance := range dbInstances {
lockAcquired := r.keyMux.TryLock(instance.Name)
if !lockAcquired {
log.Printf("failed to acquire lock for instance %s", instance.Name)
continue
}
defer r.keyMux.Unlock(instance.Name)
switch providerCommon.InstanceStatus(instance.Status) {
case providerCommon.InstancePendingCreate,
providerCommon.InstancePendingDelete:
@ -342,6 +377,13 @@ func (r *basePoolManager) reapTimedOutRunners(runners []*github.Runner) error {
}
for _, instance := range dbInstances {
lockAcquired := r.keyMux.TryLock(instance.Name)
if !lockAcquired {
log.Printf("failed to acquire lock for instance %s", instance.Name)
continue
}
defer r.keyMux.Unlock(instance.Name)
pool, err := r.store.GetPoolByID(r.ctx, instance.PoolID)
if err != nil {
return errors.Wrap(err, "fetching instance pool info")
@ -453,6 +495,14 @@ func (r *basePoolManager) cleanupOrphanedGithubRunners(runners []*github.Runner)
}
poolInstanceCache[pool.ID] = poolInstances
}
lockAcquired := r.keyMux.TryLock(dbInstance.Name)
if !lockAcquired {
log.Printf("failed to acquire lock for instance %s", dbInstance.Name)
continue
}
defer r.keyMux.Unlock(dbInstance.Name)
// See: https://golang.org/doc/faq#closures_and_goroutines
runner := runner
g.Go(func() error {
@ -475,6 +525,7 @@ func (r *basePoolManager) cleanupOrphanedGithubRunners(runners []*github.Runner)
if err := r.store.DeleteInstance(ctx, dbInstance.PoolID, dbInstance.Name); err != nil {
return errors.Wrap(err, "removing runner from database")
}
defer r.keyMux.UnlockAndDelete(dbInstance.Name)
return nil
}
@ -853,6 +904,14 @@ func (r *basePoolManager) scaleDownOnePool(ctx context.Context, pool params.Pool
for _, instanceToDelete := range idleWorkers[:numScaleDown] {
instanceToDelete := instanceToDelete
lockAcquired := r.keyMux.TryLock(instanceToDelete.Name)
if !lockAcquired {
log.Printf("failed to acquire lock for instance %s", instanceToDelete.Name)
continue
}
defer r.keyMux.Unlock(instanceToDelete.Name)
g.Go(func() error {
log.Printf("scaling down idle worker %s from pool %s\n", instanceToDelete.Name, pool.ID)
if err := r.ForceDeleteRunner(instanceToDelete); err != nil {
@ -929,8 +988,16 @@ func (r *basePoolManager) retryFailedInstancesForOnePool(ctx context.Context, po
if instance.CreateAttempt >= maxCreateAttempts {
continue
}
lockAcquired := r.keyMux.TryLock(instance.Name)
if !lockAcquired {
log.Printf("failed to acquire lock for instance %s", instance.Name)
continue
}
instance := instance
g.Go(func() error {
defer r.keyMux.Unlock(instance.Name)
// NOTE(gabriel-samfira): this is done in parallel. If there are many failed instances
// this has the potential to create many API requests to the target provider.
// TODO(gabriel-samfira): implement request throttling.
@ -1060,22 +1127,29 @@ func (r *basePoolManager) deletePendingInstances() error {
if err != nil {
return fmt.Errorf("failed to fetch instances from store: %w", err)
}
g, ctx := errgroup.WithContext(r.ctx)
for _, instance := range instances {
if instance.Status != providerCommon.InstancePendingDelete {
// not in pending_delete status. Skip.
continue
}
lockAcquired := r.keyMux.TryLock(instance.Name)
if !lockAcquired {
log.Printf("failed to acquire lock for instance %s", instance.Name)
continue
}
// Set the status to deleting before launching the goroutine that removes
// the runner from the provider (which can take a long time).
if err := r.setInstanceStatus(instance.Name, providerCommon.InstanceDeleting, nil); err != nil {
log.Printf("failed to update runner %s status", instance.Name)
r.keyMux.Unlock(instance.Name)
continue
}
instance := instance
g.Go(func() (err error) {
go func(instance params.Instance) (err error) {
defer r.keyMux.Unlock(instance.Name)
defer func(instance params.Instance) {
if err != nil {
// failed to remove from provider. Set the status back to pending_delete, which
@ -1086,19 +1160,17 @@ func (r *basePoolManager) deletePendingInstances() error {
}
}(instance)
err = r.deleteInstanceFromProvider(ctx, instance)
err = r.deleteInstanceFromProvider(r.ctx, instance)
if err != nil {
return fmt.Errorf("failed to remove instance from provider: %w", err)
}
if deleteErr := r.store.DeleteInstance(ctx, instance.PoolID, instance.Name); deleteErr != nil {
if deleteErr := r.store.DeleteInstance(r.ctx, instance.PoolID, instance.Name); deleteErr != nil {
return fmt.Errorf("failed to delete instance from database: %w", deleteErr)
}
r.keyMux.UnlockAndDelete(instance.Name)
return nil
})
}
if err := r.waitForErrorGroupOrContextCancelled(g); err != nil {
return fmt.Errorf("failed to delete pending instances: %w", err)
}(instance) //nolint
}
return nil
@ -1110,36 +1182,40 @@ func (r *basePoolManager) addPendingInstances() error {
if err != nil {
return fmt.Errorf("failed to fetch instances from store: %w", err)
}
g, _ := errgroup.WithContext(r.ctx)
for _, instance := range instances {
if instance.Status != providerCommon.InstancePendingCreate {
// not in pending_create status. Skip.
continue
}
lockAcquired := r.keyMux.TryLock(instance.Name)
if !lockAcquired {
log.Printf("failed to acquire lock for instance %s", instance.Name)
continue
}
// Set the instance to "creating" before launching the goroutine. This will ensure that addPendingInstances()
// won't attempt to create the runner a second time.
if err := r.setInstanceStatus(instance.Name, providerCommon.InstanceCreating, nil); err != nil {
log.Printf("failed to update runner %s status: %s", instance.Name, err)
r.keyMux.Unlock(instance.Name)
// We failed to transition the instance to Creating. This means that garm will retry to create this instance
// when the loop runs again and we end up with multiple instances.
continue
}
instance := instance
g.Go(func() error {
go func(instance params.Instance) {
defer r.keyMux.Unlock(instance.Name)
log.Printf("creating instance %s in pool %s", instance.Name, instance.PoolID)
if err := r.addInstanceToProvider(instance); err != nil {
log.Printf("failed to add instance to provider: %s", err)
errAsBytes := []byte(err.Error())
if err := r.setInstanceStatus(instance.Name, providerCommon.InstanceError, errAsBytes); err != nil {
return fmt.Errorf("failed to update runner %s status: %w", instance.Name, err)
log.Printf("failed to update runner %s status: %s", instance.Name, err)
}
return fmt.Errorf("failed to create instance in provider: %w", err)
log.Printf("failed to create instance in provider: %s", err)
}
return nil
})
}
if err := r.waitForErrorGroupOrContextCancelled(g); err != nil {
return fmt.Errorf("failed to add pending instances: %w", err)
}(instance)
}
return nil
@ -1193,7 +1269,7 @@ func (r *basePoolManager) cleanupOrphanedRunners() error {
}
func (r *basePoolManager) Start() error {
r.checkCanAuthenticateToGithub()
r.checkCanAuthenticateToGithub() //nolint
go r.startLoopForFunction(r.runnerCleanup, common.PoolReapTimeoutInterval, "timeout_reaper")
go r.startLoopForFunction(r.scaleDown, common.PoolScaleDownInterval, "scale_down")
@ -1203,7 +1279,6 @@ func (r *basePoolManager) Start() error {
go r.startLoopForFunction(r.retryFailedInstances, common.PoolConsilitationInterval, "consolidate[retry_failed]")
go r.startLoopForFunction(r.updateTools, common.PoolToolUpdateInterval, "update_tools")
go r.startLoopForFunction(r.checkCanAuthenticateToGithub, common.UnauthorizedBackoffTimer, "bad_auth_backoff")
// go r.loop()
return nil
}

View file

@ -41,6 +41,7 @@ func NewRepositoryPoolManager(ctx context.Context, cfg params.Repository, cfgInt
}
wg := &sync.WaitGroup{}
keyMuxes := &keyMutex{}
helper := &repository{
cfg: cfg,
@ -60,6 +61,7 @@ func NewRepositoryPoolManager(ctx context.Context, cfg params.Repository, cfgInt
helper: helper,
credsDetails: cfgInternal.GithubCredentialsDetails,
wg: wg,
keyMux: keyMuxes,
}
return repo, nil
}

View file

@ -636,10 +636,6 @@ func (r *Runner) DispatchWorkflowJob(hookTargetType, signature string, jobData [
return errors.Wrapf(runnerErrors.ErrBadRequest, "invalid job data: %s", err)
}
asJs, _ := json.MarshalIndent(job, "", " ")
log.Printf("got workflow job: %s", string(asJs))
log.Printf("got workflow job for %s", string(jobData))
var poolManager common.PoolManager
var err error