diff --git a/cache/cache_test.go b/cache/cache_test.go new file mode 100644 index 00000000..a2155e97 --- /dev/null +++ b/cache/cache_test.go @@ -0,0 +1,87 @@ +package cache + +import ( + "testing" + "time" + + "github.com/stretchr/testify/suite" + + commonParams "github.com/cloudbase/garm-provider-common/params" + garmTesting "github.com/cloudbase/garm/internal/testing" + "github.com/cloudbase/garm/params" +) + +type CacheTestSuite struct { + suite.Suite + entity params.GithubEntity +} + +func (c *CacheTestSuite) SetupTest() { + c.entity = params.GithubEntity{ + ID: "1234", + EntityType: params.GithubEntityTypeOrganization, + Name: "test", + Owner: "test", + } +} + +func (c *CacheTestSuite) TearDownTest() { + // Clean up the cache after each test + githubToolsCache.mux.Lock() + defer githubToolsCache.mux.Unlock() + githubToolsCache.entities = make(map[string]GithubEntityTools) +} + +func (c *CacheTestSuite) TestCacheIsInitialized() { + c.Require().NotNil(githubToolsCache) +} + +func (c *CacheTestSuite) TestSetCacheWorks() { + tools := []commonParams.RunnerApplicationDownload{ + { + DownloadURL: garmTesting.Ptr("https://example.com"), + }, + } + + c.Require().NotNil(githubToolsCache) + c.Require().Len(githubToolsCache.entities, 0) + SetGithubToolsCache(c.entity, tools) + c.Require().Len(githubToolsCache.entities, 1) + cachedTools, ok := GetGithubToolsCache(c.entity) + c.Require().True(ok) + c.Require().Len(cachedTools, 1) + c.Require().Equal(tools[0].GetDownloadURL(), cachedTools[0].GetDownloadURL()) +} + +func (c *CacheTestSuite) TestTimedOutToolsCache() { + tools := []commonParams.RunnerApplicationDownload{ + { + DownloadURL: garmTesting.Ptr("https://example.com"), + }, + } + + c.Require().NotNil(githubToolsCache) + c.Require().Len(githubToolsCache.entities, 0) + SetGithubToolsCache(c.entity, tools) + c.Require().Len(githubToolsCache.entities, 1) + entity := githubToolsCache.entities[c.entity.String()] + entity.updatedAt = entity.updatedAt.Add(-2 * time.Hour) + githubToolsCache.entities[c.entity.String()] = entity + + cachedTools, ok := GetGithubToolsCache(c.entity) + c.Require().False(ok) + c.Require().Nil(cachedTools) +} + +func (c *CacheTestSuite) TestGetInexistentCache() { + c.Require().NotNil(githubToolsCache) + c.Require().Len(githubToolsCache.entities, 0) + cachedTools, ok := GetGithubToolsCache(c.entity) + c.Require().False(ok) + c.Require().Nil(cachedTools) +} + +func TestCacheTestSuite(t *testing.T) { + t.Parallel() + suite.Run(t, new(CacheTestSuite)) +} diff --git a/internal/testing/testing.go b/internal/testing/testing.go index 1b937b6c..b3d049fd 100644 --- a/internal/testing/testing.go +++ b/internal/testing/testing.go @@ -153,6 +153,10 @@ type NameAndIDDBEntity interface { GetName() string } +func Ptr[T any](v T) *T { + return &v +} + func EqualDBEntityByName[T NameAndIDDBEntity](t *testing.T, expected, actual []T) { require.Equal(t, len(expected), len(actual)) diff --git a/locking/interface.go b/locking/interface.go index 7750167b..2b6ffb47 100644 --- a/locking/interface.go +++ b/locking/interface.go @@ -5,6 +5,7 @@ import "time" type Locker interface { TryLock(key, identifier string) bool Lock(key, identifier string) + LockedBy(key string) (string, bool) Unlock(key string, remove bool) Delete(key string) } diff --git a/locking/local_backoff_locker.go b/locking/local_backoff_locker.go new file mode 100644 index 00000000..9c2fecb1 --- /dev/null +++ b/locking/local_backoff_locker.go @@ -0,0 +1,63 @@ +package locking + +import ( + "context" + "sync" + "time" + + "github.com/cloudbase/garm/runner/common" +) + +func NewInstanceDeleteBackoff(_ context.Context) (InstanceDeleteBackoff, error) { + return &instanceDeleteBackoff{}, nil +} + +type instanceBackOff struct { + backoffSeconds float64 + lastRecordedFailureTime time.Time + mux sync.Mutex +} + +type instanceDeleteBackoff struct { + muxes sync.Map +} + +func (i *instanceDeleteBackoff) ShouldProcess(key string) (bool, time.Time) { + backoff, loaded := i.muxes.LoadOrStore(key, &instanceBackOff{}) + if !loaded { + return true, time.Time{} + } + + ib := backoff.(*instanceBackOff) + ib.mux.Lock() + defer ib.mux.Unlock() + + if ib.lastRecordedFailureTime.IsZero() || ib.backoffSeconds == 0 { + return true, time.Time{} + } + + now := time.Now().UTC() + deadline := ib.lastRecordedFailureTime.Add(time.Duration(ib.backoffSeconds) * time.Second) + return now.After(deadline), deadline +} + +func (i *instanceDeleteBackoff) Delete(key string) { + i.muxes.Delete(key) +} + +func (i *instanceDeleteBackoff) RecordFailure(key string) { + backoff, _ := i.muxes.LoadOrStore(key, &instanceBackOff{}) + ib := backoff.(*instanceBackOff) + ib.mux.Lock() + defer ib.mux.Unlock() + + ib.lastRecordedFailureTime = time.Now().UTC() + if ib.backoffSeconds == 0 { + ib.backoffSeconds = common.PoolConsilitationInterval.Seconds() + } else { + // Geometric progression of 1.5 + newBackoff := ib.backoffSeconds * 1.5 + // Cap the backoff to 20 minutes + ib.backoffSeconds = min(newBackoff, maxBackoffSeconds) + } +} diff --git a/locking/local_backoff_locker_test.go b/locking/local_backoff_locker_test.go new file mode 100644 index 00000000..a9a986e2 --- /dev/null +++ b/locking/local_backoff_locker_test.go @@ -0,0 +1,75 @@ +package locking + +import ( + "testing" + "time" + + "github.com/stretchr/testify/suite" +) + +type LockerBackoffTestSuite struct { + suite.Suite + + locker *instanceDeleteBackoff +} + +func (l *LockerBackoffTestSuite) SetupTest() { + l.locker = &instanceDeleteBackoff{} +} + +func (l *LockerBackoffTestSuite) TearDownTest() { + l.locker = nil +} + +func (l *LockerBackoffTestSuite) TestShouldProcess() { + shouldProcess, deadline := l.locker.ShouldProcess("test") + l.Require().True(shouldProcess) + l.Require().Equal(time.Time{}, deadline) + + l.locker.muxes.Store("test", &instanceBackOff{ + backoffSeconds: 0, + lastRecordedFailureTime: time.Time{}, + }) + + shouldProcess, deadline = l.locker.ShouldProcess("test") + l.Require().True(shouldProcess) + l.Require().Equal(time.Time{}, deadline) + + l.locker.muxes.Store("test", &instanceBackOff{ + backoffSeconds: 100, + lastRecordedFailureTime: time.Now().UTC(), + }) + + shouldProcess, deadline = l.locker.ShouldProcess("test") + l.Require().False(shouldProcess) + l.Require().NotEqual(time.Time{}, deadline) +} + +func (l *LockerBackoffTestSuite) TestRecordFailure() { + l.locker.RecordFailure("test") + + mux, ok := l.locker.muxes.Load("test") + l.Require().True(ok) + ib := mux.(*instanceBackOff) + l.Require().NotNil(ib) + l.Require().NotEqual(time.Time{}, ib.lastRecordedFailureTime) + l.Require().Equal(float64(5), ib.backoffSeconds) + + l.locker.RecordFailure("test") + mux, ok = l.locker.muxes.Load("test") + l.Require().True(ok) + ib = mux.(*instanceBackOff) + l.Require().NotNil(ib) + l.Require().NotEqual(time.Time{}, ib.lastRecordedFailureTime) + l.Require().Equal(7.5, ib.backoffSeconds) + + l.locker.Delete("test") + mux, ok = l.locker.muxes.Load("test") + l.Require().False(ok) + l.Require().Nil(mux) +} + +func TestBackoffTestSuite(t *testing.T) { + t.Parallel() + suite.Run(t, new(LockerBackoffTestSuite)) +} diff --git a/locking/local_locker.go b/locking/local_locker.go index aeae610f..fc5ea847 100644 --- a/locking/local_locker.go +++ b/locking/local_locker.go @@ -2,14 +2,9 @@ package locking import ( "context" - "fmt" - "log/slog" - "runtime" "sync" - "time" dbCommon "github.com/cloudbase/garm/database/common" - "github.com/cloudbase/garm/runner/common" ) const ( @@ -36,8 +31,11 @@ func (k *keyMutex) TryLock(key, identifier string) bool { mux: sync.Mutex{}, }) keyMux := mux.(*lockWithIdent) - keyMux.ident = identifier - return keyMux.mux.TryLock() + locked := keyMux.mux.TryLock() + if locked { + keyMux.ident = identifier + } + return locked } func (k *keyMutex) Lock(key, identifier string) { @@ -58,8 +56,6 @@ func (k *keyMutex) Unlock(key string, remove bool) { if remove { k.Delete(key) } - _, filename, line, _ := runtime.Caller(1) - slog.Debug("unlocking", "key", key, "identifier", keyMux.ident, "caller", fmt.Sprintf("%s:%d", filename, line)) keyMux.ident = "" keyMux.mux.Unlock() } @@ -68,56 +64,15 @@ func (k *keyMutex) Delete(key string) { k.muxes.Delete(key) } -func NewInstanceDeleteBackoff(_ context.Context) (InstanceDeleteBackoff, error) { - return &instanceDeleteBackoff{}, nil -} - -type instanceBackOff struct { - backoffSeconds float64 - lastRecordedFailureTime time.Time - mux sync.Mutex -} - -type instanceDeleteBackoff struct { - muxes sync.Map -} - -func (i *instanceDeleteBackoff) ShouldProcess(key string) (bool, time.Time) { - backoff, loaded := i.muxes.LoadOrStore(key, &instanceBackOff{}) - if !loaded { - return true, time.Time{} +func (k *keyMutex) LockedBy(key string) (string, bool) { + mux, ok := k.muxes.Load(key) + if !ok { + return "", false + } + keyMux := mux.(*lockWithIdent) + if keyMux.ident == "" { + return "", false } - ib := backoff.(*instanceBackOff) - ib.mux.Lock() - defer ib.mux.Unlock() - - if ib.lastRecordedFailureTime.IsZero() || ib.backoffSeconds == 0 { - return true, time.Time{} - } - - now := time.Now().UTC() - deadline := ib.lastRecordedFailureTime.Add(time.Duration(ib.backoffSeconds) * time.Second) - return deadline.After(now), deadline -} - -func (i *instanceDeleteBackoff) Delete(key string) { - i.muxes.Delete(key) -} - -func (i *instanceDeleteBackoff) RecordFailure(key string) { - backoff, _ := i.muxes.LoadOrStore(key, &instanceBackOff{}) - ib := backoff.(*instanceBackOff) - ib.mux.Lock() - defer ib.mux.Unlock() - - ib.lastRecordedFailureTime = time.Now().UTC() - if ib.backoffSeconds == 0 { - ib.backoffSeconds = common.PoolConsilitationInterval.Seconds() - } else { - // Geometric progression of 1.5 - newBackoff := ib.backoffSeconds * 1.5 - // Cap the backoff to 20 minutes - ib.backoffSeconds = min(newBackoff, maxBackoffSeconds) - } + return keyMux.ident, true } diff --git a/locking/local_locker_test.go b/locking/local_locker_test.go new file mode 100644 index 00000000..6decf512 --- /dev/null +++ b/locking/local_locker_test.go @@ -0,0 +1,228 @@ +package locking + +import ( + "testing" + + "github.com/stretchr/testify/suite" +) + +type LockerTestSuite struct { + suite.Suite + + mux *keyMutex +} + +func (l *LockerTestSuite) SetupTest() { + l.mux = &keyMutex{} + err := RegisterLocker(l.mux) + l.Require().NoError(err, "should register the locker") +} + +func (l *LockerTestSuite) TearDownTest() { + l.mux = nil + locker = nil +} + +func (l *LockerTestSuite) TestLocalLockerLockUnlock() { + l.mux.Lock("test", "test-identifier") + mux, ok := l.mux.muxes.Load("test") + l.Require().True(ok) + keyMux := mux.(*lockWithIdent) + l.Require().Equal("test-identifier", keyMux.ident) + l.mux.Unlock("test", true) + mux, ok = l.mux.muxes.Load("test") + l.Require().False(ok) + l.Require().Nil(mux) + l.mux.Unlock("test", false) +} + +func (l *LockerTestSuite) TestLocalLockerTryLock() { + locked := l.mux.TryLock("test", "test-identifier") + l.Require().True(locked) + mux, ok := l.mux.muxes.Load("test") + l.Require().True(ok) + keyMux := mux.(*lockWithIdent) + l.Require().Equal("test-identifier", keyMux.ident) + + locked = l.mux.TryLock("test", "another-identifier2") + l.Require().False(locked) + mux, ok = l.mux.muxes.Load("test") + l.Require().True(ok) + keyMux = mux.(*lockWithIdent) + l.Require().Equal("test-identifier", keyMux.ident) + + l.mux.Unlock("test", true) + locked = l.mux.TryLock("test", "another-identifier2") + l.Require().True(locked) + mux, ok = l.mux.muxes.Load("test") + l.Require().True(ok) + keyMux = mux.(*lockWithIdent) + l.Require().Equal("another-identifier2", keyMux.ident) + l.mux.Unlock("test", true) +} + +func (l *LockerTestSuite) TestLocalLockertLockedBy() { + l.mux.Lock("test", "test-identifier") + identifier, ok := l.mux.LockedBy("test") + l.Require().True(ok) + l.Require().Equal("test-identifier", identifier) + l.mux.Unlock("test", true) + identifier, ok = l.mux.LockedBy("test") + l.Require().False(ok) + l.Require().Equal("", identifier) + + l.mux.Lock("test", "test-identifier") + identifier, ok = l.mux.LockedBy("test") + l.Require().True(ok) + l.Require().Equal("test-identifier", identifier) + l.mux.Unlock("test", false) + identifier, ok = l.mux.LockedBy("test") + l.Require().False(ok) + l.Require().Equal("", identifier) +} + +func (l *LockerTestSuite) TestLockerPanicsIfNotInitialized() { + locker = nil + l.Require().Panics( + func() { + Lock("test", "test-identifier") + }, + "Lock should panic if locker is not initialized", + ) + + l.Require().Panics( + func() { + TryLock("test", "test-identifier") + }, + "TryLock should panic if locker is not initialized", + ) + + l.Require().Panics( + func() { + Unlock("test", false) + }, + "Unlock should panic if locker is not initialized", + ) + + l.Require().Panics( + func() { + Delete("test") + }, + "Delete should panic if locker is not initialized", + ) + + l.Require().Panics( + func() { + LockedBy("test") + }, + "LockedBy should panic if locker is not initialized", + ) +} + +func (l *LockerTestSuite) TestLockerAlreadyRegistered() { + err := RegisterLocker(l.mux) + l.Require().Error(err, "should not be able to register the same locker again") + l.Require().Equal("locker already registered", err.Error()) +} + +func (l *LockerTestSuite) TestLockerDelete() { + Lock("test", "test-identifier") + mux, ok := l.mux.muxes.Load("test") + l.Require().True(ok) + keyMux := mux.(*lockWithIdent) + l.Require().Equal("test-identifier", keyMux.ident) + + Delete("test") + mux, ok = l.mux.muxes.Load("test") + l.Require().False(ok) + l.Require().Nil(mux) + + identifier, ok := l.mux.LockedBy("test") + l.Require().False(ok) + l.Require().Equal("", identifier) +} + +func (l *LockerTestSuite) TestLockUnlock() { + Lock("test", "test-identifier") + mux, ok := l.mux.muxes.Load("test") + l.Require().True(ok) + keyMux := mux.(*lockWithIdent) + l.Require().Equal("test-identifier", keyMux.ident) + + Unlock("test", true) + mux, ok = l.mux.muxes.Load("test") + l.Require().False(ok) + l.Require().Nil(mux) + + identifier, ok := l.mux.LockedBy("test") + l.Require().False(ok) + l.Require().Equal("", identifier) +} + +func (l *LockerTestSuite) TestLockUnlockWithoutRemove() { + Lock("test", "test-identifier") + mux, ok := l.mux.muxes.Load("test") + l.Require().True(ok) + keyMux := mux.(*lockWithIdent) + l.Require().Equal("test-identifier", keyMux.ident) + + Unlock("test", false) + mux, ok = l.mux.muxes.Load("test") + l.Require().True(ok) + keyMux = mux.(*lockWithIdent) + l.Require().Equal("", keyMux.ident) + + identifier, ok := l.mux.LockedBy("test") + l.Require().False(ok) + l.Require().Equal("", identifier) +} + +func (l *LockerTestSuite) TestTryLock() { + locked := TryLock("test", "test-identifier") + l.Require().True(locked) + mux, ok := l.mux.muxes.Load("test") + l.Require().True(ok) + keyMux := mux.(*lockWithIdent) + l.Require().Equal("test-identifier", keyMux.ident) + + locked = TryLock("test", "another-identifier2") + l.Require().False(locked) + mux, ok = l.mux.muxes.Load("test") + l.Require().True(ok) + keyMux = mux.(*lockWithIdent) + l.Require().Equal("test-identifier", keyMux.ident) + + Unlock("test", true) + locked = TryLock("test", "another-identifier2") + l.Require().True(locked) + mux, ok = l.mux.muxes.Load("test") + l.Require().True(ok) + keyMux = mux.(*lockWithIdent) + l.Require().Equal("another-identifier2", keyMux.ident) + Unlock("test", true) +} + +func (l *LockerTestSuite) TestLockedBy() { + Lock("test", "test-identifier") + identifier, ok := LockedBy("test") + l.Require().True(ok) + l.Require().Equal("test-identifier", identifier) + Unlock("test", true) + identifier, ok = LockedBy("test") + l.Require().False(ok) + l.Require().Equal("", identifier) + + Lock("test", "test-identifier2") + identifier, ok = LockedBy("test") + l.Require().True(ok) + l.Require().Equal("test-identifier2", identifier) + Unlock("test", false) + identifier, ok = LockedBy("test") + l.Require().False(ok) + l.Require().Equal("", identifier) +} + +func TestLockerTestSuite(t *testing.T) { + t.Parallel() + suite.Run(t, new(LockerTestSuite)) +} diff --git a/locking/locking.go b/locking/locking.go index c7ad89a3..d485f5ff 100644 --- a/locking/locking.go +++ b/locking/locking.go @@ -11,48 +11,56 @@ var locker Locker var lockerMux = sync.Mutex{} -func TryLock(key, identifier string) (ok bool, err error) { - _, filename, line, _ := runtime.Caller(1) - slog.Debug("attempting to try lock", "key", key, "identifier", identifier, "caller", fmt.Sprintf("%s:%d", filename, line)) - defer slog.Debug("try lock returned", "key", key, "identifier", identifier, "locked", ok, "caller", fmt.Sprintf("%s:%d", filename, line)) - if locker == nil { - return false, fmt.Errorf("no locker is registered") - } - - ok = locker.TryLock(key, identifier) - return ok, nil -} - -func Lock(key, identifier string) { - _, filename, line, _ := runtime.Caller(1) - slog.Debug("attempting to lock", "key", key, "identifier", identifier, "caller", fmt.Sprintf("%s:%d", filename, line)) - defer slog.Debug("lock acquired", "key", key, "identifier", identifier, "caller", fmt.Sprintf("%s:%d", filename, line)) - +func TryLock(key, identifier string) (ok bool) { if locker == nil { panic("no locker is registered") } + _, filename, line, _ := runtime.Caller(1) + slog.Debug("attempting to try lock", "key", key, "identifier", identifier, "caller", fmt.Sprintf("%s:%d", filename, line)) + defer slog.Debug("try lock returned", "key", key, "identifier", identifier, "locked", ok, "caller", fmt.Sprintf("%s:%d", filename, line)) + + ok = locker.TryLock(key, identifier) + return ok +} + +func Lock(key, identifier string) { + if locker == nil { + panic("no locker is registered") + } + + _, filename, line, _ := runtime.Caller(1) + slog.Debug("attempting to lock", "key", key, "identifier", identifier, "caller", fmt.Sprintf("%s:%d", filename, line)) + defer slog.Debug("lock acquired", "key", key, "identifier", identifier, "caller", fmt.Sprintf("%s:%d", filename, line)) + locker.Lock(key, identifier) } -func Unlock(key string, remove bool) error { - _, filename, line, _ := runtime.Caller(1) - slog.Debug("attempting to unlock", "key", key, "remove", remove, "caller", fmt.Sprintf("%s:%d", filename, line)) +func Unlock(key string, remove bool) { if locker == nil { - return fmt.Errorf("no locker is registered") + panic("no locker is registered") } + _, filename, line, _ := runtime.Caller(1) + slog.Debug("attempting to unlock", "key", key, "remove", remove, "caller", fmt.Sprintf("%s:%d", filename, line)) + defer slog.Debug("unlock completed", "key", key, "remove", remove, "caller", fmt.Sprintf("%s:%d", filename, line)) locker.Unlock(key, remove) - return nil } -func Delete(key string) error { +func LockedBy(key string) (string, bool) { if locker == nil { - return fmt.Errorf("no locker is registered") + panic("no locker is registered") + } + + return locker.LockedBy(key) +} + +func Delete(key string) { + if locker == nil { + panic("no locker is registered") } locker.Delete(key) - return nil } func RegisterLocker(lock Locker) error { diff --git a/runner/pool/pool.go b/runner/pool/pool.go index 1f2e96ec..f5f9a13b 100644 --- a/runner/pool/pool.go +++ b/runner/pool/pool.go @@ -422,11 +422,11 @@ func (r *basePoolManager) cleanupOrphanedProviderRunners(runners []*github.Runne continue } - lockAcquired, err := locking.TryLock(instance.Name, r.consumerID) - if !lockAcquired || err != nil { + lockAcquired := locking.TryLock(instance.Name, r.consumerID) + if !lockAcquired { slog.DebugContext( r.ctx, "failed to acquire lock for instance", - "runner_name", instance.Name, "error", err) + "runner_name", instance.Name) continue } defer locking.Unlock(instance.Name, false) @@ -505,11 +505,11 @@ func (r *basePoolManager) reapTimedOutRunners(runners []*github.Runner) error { slog.DebugContext( r.ctx, "attempting to lock instance", "runner_name", instance.Name) - lockAcquired, err := locking.TryLock(instance.Name, r.consumerID) - if !lockAcquired || err != nil { + lockAcquired := locking.TryLock(instance.Name, r.consumerID) + if !lockAcquired { slog.DebugContext( r.ctx, "failed to acquire lock for instance", - "runner_name", instance.Name, "error", err) + "runner_name", instance.Name) continue } defer locking.Unlock(instance.Name, false) @@ -639,11 +639,11 @@ func (r *basePoolManager) cleanupOrphanedGithubRunners(runners []*github.Runner) poolInstanceCache[dbInstance.PoolID] = poolInstances } - lockAcquired, err := locking.TryLock(dbInstance.Name, r.consumerID) - if !lockAcquired || err != nil { + lockAcquired := locking.TryLock(dbInstance.Name, r.consumerID) + if !lockAcquired { slog.DebugContext( r.ctx, "failed to acquire lock for instance", - "runner_name", dbInstance.Name, "error", err) + "runner_name", dbInstance.Name) continue } @@ -1076,11 +1076,11 @@ func (r *basePoolManager) scaleDownOnePool(ctx context.Context, pool params.Pool for _, instanceToDelete := range idleWorkers[:numScaleDown] { instanceToDelete := instanceToDelete - lockAcquired, err := locking.TryLock(instanceToDelete.Name, r.consumerID) - if !lockAcquired || err != nil { + lockAcquired := locking.TryLock(instanceToDelete.Name, r.consumerID) + if !lockAcquired { slog.With(slog.Any("error", err)).ErrorContext( ctx, "failed to acquire lock for instance", - "provider_id", instanceToDelete.Name, "error", err) + "provider_id", instanceToDelete.Name) continue } defer locking.Unlock(instanceToDelete.Name, false) @@ -1229,11 +1229,11 @@ func (r *basePoolManager) retryFailedInstancesForOnePool(ctx context.Context, po slog.DebugContext( ctx, "attempting to retry failed instance", "runner_name", instance.Name) - lockAcquired, err := locking.TryLock(instance.Name, r.consumerID) - if !lockAcquired || err != nil { + lockAcquired := locking.TryLock(instance.Name, r.consumerID) + if !lockAcquired { slog.DebugContext( ctx, "failed to acquire lock for instance", - "runner_name", instance.Name, "error", err) + "runner_name", instance.Name) continue } @@ -1413,8 +1413,8 @@ func (r *basePoolManager) deletePendingInstances() error { r.ctx, "removing instance from pool", "runner_name", instance.Name, "pool_id", instance.PoolID) - lockAcquired, err := locking.TryLock(instance.Name, r.consumerID) - if !lockAcquired || err != nil { + lockAcquired := locking.TryLock(instance.Name, r.consumerID) + if !lockAcquired { slog.InfoContext( r.ctx, "failed to acquire lock for instance", "runner_name", instance.Name) @@ -1525,11 +1525,11 @@ func (r *basePoolManager) addPendingInstances() error { r.ctx, "attempting to acquire lock for instance", "runner_name", instance.Name, "action", "create_pending") - lockAcquired, err := locking.TryLock(instance.Name, r.consumerID) - if !lockAcquired || err != nil { + lockAcquired := locking.TryLock(instance.Name, r.consumerID) + if !lockAcquired { slog.DebugContext( r.ctx, "failed to acquire lock for instance", - "runner_name", instance.Name, "error", err) + "runner_name", instance.Name) continue } diff --git a/workers/scaleset/scaleset.go b/workers/scaleset/scaleset.go index f740f051..660bbe97 100644 --- a/workers/scaleset/scaleset.go +++ b/workers/scaleset/scaleset.go @@ -291,7 +291,7 @@ func (w *Worker) reapTimedOutRunners(runners map[string]params.RunnerReference) continue } if ghRunner, ok := runners[runner.Name]; !ok || ghRunner.GetStatus() == params.RunnerOffline { - if ok, err := locking.TryLock(runner.Name, w.consumerID); err != nil || !ok { + if ok := locking.TryLock(runner.Name, w.consumerID); !ok { slog.DebugContext(w.ctx, "runner is locked; skipping", "runner_name", runner.Name) continue } @@ -360,7 +360,7 @@ func (w *Worker) consolidateRunnerState(runners []params.RunnerReference) error } if _, ok := ghRunnersByName[name]; !ok { - if ok, err := locking.TryLock(name, w.consumerID); err != nil || !ok { + if ok := locking.TryLock(name, w.consumerID); !ok { slog.DebugContext(w.ctx, "runner is locked; skipping", "runner_name", name) continue } @@ -446,8 +446,8 @@ func (w *Worker) consolidateRunnerState(runners []params.RunnerReference) error continue } - locked, err := locking.TryLock(runner.Name, w.consumerID) - if err != nil || !locked { + locked := locking.TryLock(runner.Name, w.consumerID) + if !locked { slog.DebugContext(w.ctx, "runner is locked; skipping", "runner_name", runner.Name) continue } @@ -777,8 +777,8 @@ func (w *Worker) handleScaleDown(target, current uint) { removed := 0 candidates := []params.Instance{} for _, runner := range w.runners { - locked, err := locking.TryLock(runner.Name, w.consumerID) - if err != nil || !locked { + locked := locking.TryLock(runner.Name, w.consumerID) + if !locked { slog.DebugContext(w.ctx, "runner is locked; skipping", "runner_name", runner.Name) continue } @@ -809,8 +809,8 @@ func (w *Worker) handleScaleDown(target, current uint) { break } - locked, err := locking.TryLock(runner.Name, w.consumerID) - if err != nil || !locked { + locked := locking.TryLock(runner.Name, w.consumerID) + if !locked { slog.DebugContext(w.ctx, "runner is locked; skipping", "runner_name", runner.Name) continue }