diff --git a/cmd/garm/main.go b/cmd/garm/main.go index 45f8fe82..ebb30d55 100644 --- a/cmd/garm/main.go +++ b/cmd/garm/main.go @@ -41,6 +41,7 @@ import ( "github.com/cloudbase/garm/database" "github.com/cloudbase/garm/database/common" "github.com/cloudbase/garm/database/watcher" + "github.com/cloudbase/garm/locking" "github.com/cloudbase/garm/metrics" "github.com/cloudbase/garm/params" "github.com/cloudbase/garm/runner" //nolint:typecheck @@ -214,6 +215,17 @@ func main() { log.Fatal(err) } + // Local locker for now. Will be configurable in the future, + // as we add scale-out capability to GARM. + lock, err := locking.NewLocalLocker(ctx, db) + if err != nil { + log.Fatalf("failed to create locker: %q", err) + } + + if err := locking.RegisterLocker(lock); err != nil { + log.Fatalf("failed to register locker: %q", err) + } + if err := maybeUpdateURLsFromConfig(*cfg, db); err != nil { log.Fatal(err) } diff --git a/locking/interface.go b/locking/interface.go new file mode 100644 index 00000000..fd547830 --- /dev/null +++ b/locking/interface.go @@ -0,0 +1,16 @@ +package locking + +import "time" + +// TODO(gabriel-samfira): needs owner attribute. +type Locker interface { + TryLock(key string) bool + Unlock(key string, remove bool) + Delete(key string) +} + +type InstanceDeleteBackoff interface { + ShouldProcess(key string) (bool, time.Time) + Delete(key string) + RecordFailure(key string) +} diff --git a/runner/pool/locking.go b/locking/local_locker.go similarity index 84% rename from runner/pool/locking.go rename to locking/local_locker.go index 70471f98..5298c9e7 100644 --- a/runner/pool/locking.go +++ b/locking/local_locker.go @@ -1,9 +1,11 @@ -package pool +package locking import ( + "context" "sync" "time" + dbCommon "github.com/cloudbase/garm/database/common" "github.com/cloudbase/garm/runner/common" ) @@ -11,10 +13,16 @@ const ( maxBackoffSeconds float64 = 1200 // 20 minutes ) +func NewLocalLocker(_ context.Context, _ dbCommon.Store) (Locker, error) { + return &keyMutex{}, nil +} + type keyMutex struct { muxes sync.Map } +var _ Locker = &keyMutex{} + func (k *keyMutex) TryLock(key string) bool { mux, _ := k.muxes.LoadOrStore(key, &sync.Mutex{}) keyMux := mux.(*sync.Mutex) @@ -37,6 +45,10 @@ func (k *keyMutex) Delete(key string) { k.muxes.Delete(key) } +func NewInstanceDeleteBackoff(_ context.Context) (InstanceDeleteBackoff, error) { + return &instanceDeleteBackoff{}, nil +} + type instanceBackOff struct { backoffSeconds float64 lastRecordedFailureTime time.Time diff --git a/locking/locking.go b/locking/locking.go new file mode 100644 index 00000000..793edb4e --- /dev/null +++ b/locking/locking.go @@ -0,0 +1,46 @@ +package locking + +import ( + "fmt" + "sync" +) + +var locker Locker +var lockerMux = sync.Mutex{} + +func TryLock(key string) (bool, error) { + if locker == nil { + return false, fmt.Errorf("no locker is registered") + } + + return locker.TryLock(key), nil +} +func Unlock(key string, remove bool) error { + if locker == nil { + return fmt.Errorf("no locker is registered") + } + + locker.Unlock(key, remove) + return nil +} + +func Delete(key string) error { + if locker == nil { + return fmt.Errorf("no locker is registered") + } + + locker.Delete(key) + return nil +} + +func RegisterLocker(lock Locker) error { + lockerMux.Lock() + defer lockerMux.Unlock() + + if locker != nil { + return fmt.Errorf("locker already registered") + } + + locker = lock + return nil +} diff --git a/runner/pool/pool.go b/runner/pool/pool.go index 18e2d0a4..f6c97633 100644 --- a/runner/pool/pool.go +++ b/runner/pool/pool.go @@ -38,6 +38,7 @@ import ( "github.com/cloudbase/garm/auth" dbCommon "github.com/cloudbase/garm/database/common" "github.com/cloudbase/garm/database/watcher" + "github.com/cloudbase/garm/locking" "github.com/cloudbase/garm/params" "github.com/cloudbase/garm/runner/common" garmUtil "github.com/cloudbase/garm/util" @@ -92,8 +93,10 @@ func NewEntityPoolManager(ctx context.Context, entity params.GithubEntity, insta } wg := &sync.WaitGroup{} - keyMuxes := &keyMutex{} - backoff := &instanceDeleteBackoff{} + backoff, err := locking.NewInstanceDeleteBackoff(ctx) + if err != nil { + return nil, errors.Wrap(err, "creating backoff") + } repo := &basePoolManager{ ctx: ctx, @@ -106,7 +109,6 @@ func NewEntityPoolManager(ctx context.Context, entity params.GithubEntity, insta providers: providers, quit: make(chan struct{}), wg: wg, - keyMux: keyMuxes, backoff: backoff, consumer: consumer, } @@ -132,8 +134,7 @@ type basePoolManager struct { mux sync.Mutex wg *sync.WaitGroup - keyMux *keyMutex - backoff *instanceDeleteBackoff + backoff locking.InstanceDeleteBackoff } func (r *basePoolManager) getProviderBaseParams(pool params.Pool) common.ProviderBaseParams { @@ -414,14 +415,14 @@ func (r *basePoolManager) cleanupOrphanedProviderRunners(runners []*github.Runne } for _, instance := range dbInstances { - lockAcquired := r.keyMux.TryLock(instance.Name) - if !lockAcquired { + lockAcquired, err := locking.TryLock(instance.Name) + if !lockAcquired || err != nil { slog.DebugContext( r.ctx, "failed to acquire lock for instance", - "runner_name", instance.Name) + "runner_name", instance.Name, "error", err) continue } - defer r.keyMux.Unlock(instance.Name, false) + defer locking.Unlock(instance.Name, false) switch instance.Status { case commonParams.InstancePendingCreate, @@ -493,14 +494,14 @@ func (r *basePoolManager) reapTimedOutRunners(runners []*github.Runner) error { slog.DebugContext( r.ctx, "attempting to lock instance", "runner_name", instance.Name) - lockAcquired := r.keyMux.TryLock(instance.Name) - if !lockAcquired { + lockAcquired, err := locking.TryLock(instance.Name) + if !lockAcquired || err != nil { slog.DebugContext( r.ctx, "failed to acquire lock for instance", - "runner_name", instance.Name) + "runner_name", instance.Name, "error", err) continue } - defer r.keyMux.Unlock(instance.Name, false) + defer locking.Unlock(instance.Name, false) pool, err := r.store.GetEntityPool(r.ctx, r.entity, instance.PoolID) if err != nil { @@ -624,11 +625,11 @@ func (r *basePoolManager) cleanupOrphanedGithubRunners(runners []*github.Runner) poolInstanceCache[pool.ID] = poolInstances } - lockAcquired := r.keyMux.TryLock(dbInstance.Name) - if !lockAcquired { + lockAcquired, err := locking.TryLock(dbInstance.Name) + if !lockAcquired || err != nil { slog.DebugContext( r.ctx, "failed to acquire lock for instance", - "runner_name", dbInstance.Name) + "runner_name", dbInstance.Name, "error", err) continue } @@ -637,7 +638,7 @@ func (r *basePoolManager) cleanupOrphanedGithubRunners(runners []*github.Runner) g.Go(func() error { deleteMux := false defer func() { - r.keyMux.Unlock(dbInstance.Name, deleteMux) + locking.Unlock(dbInstance.Name, deleteMux) }() providerInstance, ok := instanceInList(dbInstance.Name, poolInstances) if !ok { @@ -877,7 +878,7 @@ func (r *basePoolManager) addInstanceToProvider(instance params.Instance) error bootstrapArgs := commonParams.BootstrapInstance{ Name: instance.Name, Tools: r.tools, - RepoURL: r.GithubURL(), + RepoURL: r.entity.GithubURL(), MetadataURL: instance.MetadataURL, CallbackURL: instance.CallbackURL, InstanceToken: jwtToken, @@ -1062,14 +1063,14 @@ func (r *basePoolManager) scaleDownOnePool(ctx context.Context, pool params.Pool for _, instanceToDelete := range idleWorkers[:numScaleDown] { instanceToDelete := instanceToDelete - lockAcquired := r.keyMux.TryLock(instanceToDelete.Name) - if !lockAcquired { + lockAcquired, err := locking.TryLock(instanceToDelete.Name) + if !lockAcquired || err != nil { slog.With(slog.Any("error", err)).ErrorContext( ctx, "failed to acquire lock for instance", - "provider_id", instanceToDelete.Name) + "provider_id", instanceToDelete.Name, "error", err) continue } - defer r.keyMux.Unlock(instanceToDelete.Name, false) + defer locking.Unlock(instanceToDelete.Name, false) g.Go(func() error { slog.InfoContext( @@ -1215,16 +1216,16 @@ func (r *basePoolManager) retryFailedInstancesForOnePool(ctx context.Context, po slog.DebugContext( ctx, "attempting to retry failed instance", "runner_name", instance.Name) - lockAcquired := r.keyMux.TryLock(instance.Name) - if !lockAcquired { + lockAcquired, err := locking.TryLock(instance.Name) + if !lockAcquired || err != nil { slog.DebugContext( ctx, "failed to acquire lock for instance", - "runner_name", instance.Name) + "runner_name", instance.Name, "error", err) continue } g.Go(func() error { - defer r.keyMux.Unlock(instance.Name, false) + defer locking.Unlock(instance.Name, false) slog.DebugContext( ctx, "attempting to clean up any previous instance", "runner_name", instance.Name) @@ -1394,8 +1395,8 @@ func (r *basePoolManager) deletePendingInstances() error { r.ctx, "removing instance from pool", "runner_name", instance.Name, "pool_id", instance.PoolID) - lockAcquired := r.keyMux.TryLock(instance.Name) - if !lockAcquired { + lockAcquired, err := locking.TryLock(instance.Name) + if !lockAcquired || err != nil { slog.InfoContext( r.ctx, "failed to acquire lock for instance", "runner_name", instance.Name) @@ -1407,7 +1408,7 @@ func (r *basePoolManager) deletePendingInstances() error { slog.DebugContext( r.ctx, "backoff in effect for instance", "runner_name", instance.Name, "deadline", deadline) - r.keyMux.Unlock(instance.Name, false) + locking.Unlock(instance.Name, false) continue } @@ -1424,7 +1425,7 @@ func (r *basePoolManager) deletePendingInstances() error { currentStatus := instance.Status deleteMux := false defer func() { - r.keyMux.Unlock(instance.Name, deleteMux) + locking.Unlock(instance.Name, deleteMux) if deleteMux { // deleteMux is set only when the instance was successfully removed. // We can use it as a marker to signal that the backoff is no longer @@ -1501,11 +1502,11 @@ func (r *basePoolManager) addPendingInstances() error { r.ctx, "attempting to acquire lock for instance", "runner_name", instance.Name, "action", "create_pending") - lockAcquired := r.keyMux.TryLock(instance.Name) - if !lockAcquired { + lockAcquired, err := locking.TryLock(instance.Name) + if !lockAcquired || err != nil { slog.DebugContext( r.ctx, "failed to acquire lock for instance", - "runner_name", instance.Name) + "runner_name", instance.Name, "error", err) continue } @@ -1515,14 +1516,14 @@ func (r *basePoolManager) addPendingInstances() error { slog.With(slog.Any("error", err)).ErrorContext( r.ctx, "failed to update runner status", "runner_name", instance.Name) - r.keyMux.Unlock(instance.Name, false) + locking.Unlock(instance.Name, false) // We failed to transition the instance to Creating. This means that garm will retry to create this instance // when the loop runs again and we end up with multiple instances. continue } go func(instance params.Instance) { - defer r.keyMux.Unlock(instance.Name, false) + defer locking.Unlock(instance.Name, false) slog.InfoContext( r.ctx, "creating instance in pool", "runner_name", instance.Name, @@ -2027,18 +2028,6 @@ func (r *basePoolManager) GetGithubRunners() ([]*github.Runner, error) { return allRunners, nil } -func (r *basePoolManager) GithubURL() string { - switch r.entity.EntityType { - case params.GithubEntityTypeRepository: - return fmt.Sprintf("%s/%s/%s", r.entity.Credentials.BaseURL, r.entity.Owner, r.entity.Name) - case params.GithubEntityTypeOrganization: - return fmt.Sprintf("%s/%s", r.entity.Credentials.BaseURL, r.entity.Owner) - case params.GithubEntityTypeEnterprise: - return fmt.Sprintf("%s/enterprises/%s", r.entity.Credentials.BaseURL, r.entity.Owner) - } - return "" -} - func (r *basePoolManager) GetWebhookInfo(ctx context.Context) (params.HookInfo, error) { allHooks, err := r.listHooks(ctx) if err != nil { diff --git a/util/github/scalesets/message_sessions.go b/util/github/scalesets/message_sessions.go index 8af10173..5ecdd94d 100644 --- a/util/github/scalesets/message_sessions.go +++ b/util/github/scalesets/message_sessions.go @@ -35,18 +35,6 @@ import ( const maxCapacityHeader = "X-ScaleSetMaxCapacity" -func NewMessageSession(ctx context.Context, cli *ScaleSetClient, session *params.RunnerScaleSetSession) (*MessageSession, error) { - sess := &MessageSession{ - ssCli: cli, - session: session, - ctx: ctx, - done: make(chan struct{}), - closed: false, - } - go sess.loop() - return sess, nil -} - type MessageSession struct { ssCli *ScaleSetClient session *params.RunnerScaleSetSession @@ -243,10 +231,16 @@ func (s *ScaleSetClient) CreateMessageSession(ctx context.Context, runnerScaleSe return nil, fmt.Errorf("failed to decode response: %w", err) } - return &MessageSession{ + sess := &MessageSession{ ssCli: s, session: &createdSession, - }, nil + ctx: ctx, + done: make(chan struct{}), + closed: false, + } + go sess.loop() + + return sess, nil } func (s *ScaleSetClient) DeleteMessageSession(ctx context.Context, session *MessageSession) error {