Add tests for cache and locking
Signed-off-by: Gabriel Adrian Samfira <gsamfira@cloudbasesolutions.com>
This commit is contained in:
parent
059734f064
commit
92d04c8e8d
10 changed files with 533 additions and 112 deletions
87
cache/cache_test.go
vendored
Normal file
87
cache/cache_test.go
vendored
Normal file
|
|
@ -0,0 +1,87 @@
|
|||
package cache
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/suite"
|
||||
|
||||
commonParams "github.com/cloudbase/garm-provider-common/params"
|
||||
garmTesting "github.com/cloudbase/garm/internal/testing"
|
||||
"github.com/cloudbase/garm/params"
|
||||
)
|
||||
|
||||
type CacheTestSuite struct {
|
||||
suite.Suite
|
||||
entity params.GithubEntity
|
||||
}
|
||||
|
||||
func (c *CacheTestSuite) SetupTest() {
|
||||
c.entity = params.GithubEntity{
|
||||
ID: "1234",
|
||||
EntityType: params.GithubEntityTypeOrganization,
|
||||
Name: "test",
|
||||
Owner: "test",
|
||||
}
|
||||
}
|
||||
|
||||
func (c *CacheTestSuite) TearDownTest() {
|
||||
// Clean up the cache after each test
|
||||
githubToolsCache.mux.Lock()
|
||||
defer githubToolsCache.mux.Unlock()
|
||||
githubToolsCache.entities = make(map[string]GithubEntityTools)
|
||||
}
|
||||
|
||||
func (c *CacheTestSuite) TestCacheIsInitialized() {
|
||||
c.Require().NotNil(githubToolsCache)
|
||||
}
|
||||
|
||||
func (c *CacheTestSuite) TestSetCacheWorks() {
|
||||
tools := []commonParams.RunnerApplicationDownload{
|
||||
{
|
||||
DownloadURL: garmTesting.Ptr("https://example.com"),
|
||||
},
|
||||
}
|
||||
|
||||
c.Require().NotNil(githubToolsCache)
|
||||
c.Require().Len(githubToolsCache.entities, 0)
|
||||
SetGithubToolsCache(c.entity, tools)
|
||||
c.Require().Len(githubToolsCache.entities, 1)
|
||||
cachedTools, ok := GetGithubToolsCache(c.entity)
|
||||
c.Require().True(ok)
|
||||
c.Require().Len(cachedTools, 1)
|
||||
c.Require().Equal(tools[0].GetDownloadURL(), cachedTools[0].GetDownloadURL())
|
||||
}
|
||||
|
||||
func (c *CacheTestSuite) TestTimedOutToolsCache() {
|
||||
tools := []commonParams.RunnerApplicationDownload{
|
||||
{
|
||||
DownloadURL: garmTesting.Ptr("https://example.com"),
|
||||
},
|
||||
}
|
||||
|
||||
c.Require().NotNil(githubToolsCache)
|
||||
c.Require().Len(githubToolsCache.entities, 0)
|
||||
SetGithubToolsCache(c.entity, tools)
|
||||
c.Require().Len(githubToolsCache.entities, 1)
|
||||
entity := githubToolsCache.entities[c.entity.String()]
|
||||
entity.updatedAt = entity.updatedAt.Add(-2 * time.Hour)
|
||||
githubToolsCache.entities[c.entity.String()] = entity
|
||||
|
||||
cachedTools, ok := GetGithubToolsCache(c.entity)
|
||||
c.Require().False(ok)
|
||||
c.Require().Nil(cachedTools)
|
||||
}
|
||||
|
||||
func (c *CacheTestSuite) TestGetInexistentCache() {
|
||||
c.Require().NotNil(githubToolsCache)
|
||||
c.Require().Len(githubToolsCache.entities, 0)
|
||||
cachedTools, ok := GetGithubToolsCache(c.entity)
|
||||
c.Require().False(ok)
|
||||
c.Require().Nil(cachedTools)
|
||||
}
|
||||
|
||||
func TestCacheTestSuite(t *testing.T) {
|
||||
t.Parallel()
|
||||
suite.Run(t, new(CacheTestSuite))
|
||||
}
|
||||
|
|
@ -153,6 +153,10 @@ type NameAndIDDBEntity interface {
|
|||
GetName() string
|
||||
}
|
||||
|
||||
func Ptr[T any](v T) *T {
|
||||
return &v
|
||||
}
|
||||
|
||||
func EqualDBEntityByName[T NameAndIDDBEntity](t *testing.T, expected, actual []T) {
|
||||
require.Equal(t, len(expected), len(actual))
|
||||
|
||||
|
|
|
|||
|
|
@ -5,6 +5,7 @@ import "time"
|
|||
type Locker interface {
|
||||
TryLock(key, identifier string) bool
|
||||
Lock(key, identifier string)
|
||||
LockedBy(key string) (string, bool)
|
||||
Unlock(key string, remove bool)
|
||||
Delete(key string)
|
||||
}
|
||||
|
|
|
|||
63
locking/local_backoff_locker.go
Normal file
63
locking/local_backoff_locker.go
Normal file
|
|
@ -0,0 +1,63 @@
|
|||
package locking
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/cloudbase/garm/runner/common"
|
||||
)
|
||||
|
||||
func NewInstanceDeleteBackoff(_ context.Context) (InstanceDeleteBackoff, error) {
|
||||
return &instanceDeleteBackoff{}, nil
|
||||
}
|
||||
|
||||
type instanceBackOff struct {
|
||||
backoffSeconds float64
|
||||
lastRecordedFailureTime time.Time
|
||||
mux sync.Mutex
|
||||
}
|
||||
|
||||
type instanceDeleteBackoff struct {
|
||||
muxes sync.Map
|
||||
}
|
||||
|
||||
func (i *instanceDeleteBackoff) ShouldProcess(key string) (bool, time.Time) {
|
||||
backoff, loaded := i.muxes.LoadOrStore(key, &instanceBackOff{})
|
||||
if !loaded {
|
||||
return true, time.Time{}
|
||||
}
|
||||
|
||||
ib := backoff.(*instanceBackOff)
|
||||
ib.mux.Lock()
|
||||
defer ib.mux.Unlock()
|
||||
|
||||
if ib.lastRecordedFailureTime.IsZero() || ib.backoffSeconds == 0 {
|
||||
return true, time.Time{}
|
||||
}
|
||||
|
||||
now := time.Now().UTC()
|
||||
deadline := ib.lastRecordedFailureTime.Add(time.Duration(ib.backoffSeconds) * time.Second)
|
||||
return now.After(deadline), deadline
|
||||
}
|
||||
|
||||
func (i *instanceDeleteBackoff) Delete(key string) {
|
||||
i.muxes.Delete(key)
|
||||
}
|
||||
|
||||
func (i *instanceDeleteBackoff) RecordFailure(key string) {
|
||||
backoff, _ := i.muxes.LoadOrStore(key, &instanceBackOff{})
|
||||
ib := backoff.(*instanceBackOff)
|
||||
ib.mux.Lock()
|
||||
defer ib.mux.Unlock()
|
||||
|
||||
ib.lastRecordedFailureTime = time.Now().UTC()
|
||||
if ib.backoffSeconds == 0 {
|
||||
ib.backoffSeconds = common.PoolConsilitationInterval.Seconds()
|
||||
} else {
|
||||
// Geometric progression of 1.5
|
||||
newBackoff := ib.backoffSeconds * 1.5
|
||||
// Cap the backoff to 20 minutes
|
||||
ib.backoffSeconds = min(newBackoff, maxBackoffSeconds)
|
||||
}
|
||||
}
|
||||
75
locking/local_backoff_locker_test.go
Normal file
75
locking/local_backoff_locker_test.go
Normal file
|
|
@ -0,0 +1,75 @@
|
|||
package locking
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/suite"
|
||||
)
|
||||
|
||||
type LockerBackoffTestSuite struct {
|
||||
suite.Suite
|
||||
|
||||
locker *instanceDeleteBackoff
|
||||
}
|
||||
|
||||
func (l *LockerBackoffTestSuite) SetupTest() {
|
||||
l.locker = &instanceDeleteBackoff{}
|
||||
}
|
||||
|
||||
func (l *LockerBackoffTestSuite) TearDownTest() {
|
||||
l.locker = nil
|
||||
}
|
||||
|
||||
func (l *LockerBackoffTestSuite) TestShouldProcess() {
|
||||
shouldProcess, deadline := l.locker.ShouldProcess("test")
|
||||
l.Require().True(shouldProcess)
|
||||
l.Require().Equal(time.Time{}, deadline)
|
||||
|
||||
l.locker.muxes.Store("test", &instanceBackOff{
|
||||
backoffSeconds: 0,
|
||||
lastRecordedFailureTime: time.Time{},
|
||||
})
|
||||
|
||||
shouldProcess, deadline = l.locker.ShouldProcess("test")
|
||||
l.Require().True(shouldProcess)
|
||||
l.Require().Equal(time.Time{}, deadline)
|
||||
|
||||
l.locker.muxes.Store("test", &instanceBackOff{
|
||||
backoffSeconds: 100,
|
||||
lastRecordedFailureTime: time.Now().UTC(),
|
||||
})
|
||||
|
||||
shouldProcess, deadline = l.locker.ShouldProcess("test")
|
||||
l.Require().False(shouldProcess)
|
||||
l.Require().NotEqual(time.Time{}, deadline)
|
||||
}
|
||||
|
||||
func (l *LockerBackoffTestSuite) TestRecordFailure() {
|
||||
l.locker.RecordFailure("test")
|
||||
|
||||
mux, ok := l.locker.muxes.Load("test")
|
||||
l.Require().True(ok)
|
||||
ib := mux.(*instanceBackOff)
|
||||
l.Require().NotNil(ib)
|
||||
l.Require().NotEqual(time.Time{}, ib.lastRecordedFailureTime)
|
||||
l.Require().Equal(float64(5), ib.backoffSeconds)
|
||||
|
||||
l.locker.RecordFailure("test")
|
||||
mux, ok = l.locker.muxes.Load("test")
|
||||
l.Require().True(ok)
|
||||
ib = mux.(*instanceBackOff)
|
||||
l.Require().NotNil(ib)
|
||||
l.Require().NotEqual(time.Time{}, ib.lastRecordedFailureTime)
|
||||
l.Require().Equal(7.5, ib.backoffSeconds)
|
||||
|
||||
l.locker.Delete("test")
|
||||
mux, ok = l.locker.muxes.Load("test")
|
||||
l.Require().False(ok)
|
||||
l.Require().Nil(mux)
|
||||
}
|
||||
|
||||
func TestBackoffTestSuite(t *testing.T) {
|
||||
t.Parallel()
|
||||
suite.Run(t, new(LockerBackoffTestSuite))
|
||||
}
|
||||
|
|
@ -2,14 +2,9 @@ package locking
|
|||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"runtime"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
dbCommon "github.com/cloudbase/garm/database/common"
|
||||
"github.com/cloudbase/garm/runner/common"
|
||||
)
|
||||
|
||||
const (
|
||||
|
|
@ -36,8 +31,11 @@ func (k *keyMutex) TryLock(key, identifier string) bool {
|
|||
mux: sync.Mutex{},
|
||||
})
|
||||
keyMux := mux.(*lockWithIdent)
|
||||
keyMux.ident = identifier
|
||||
return keyMux.mux.TryLock()
|
||||
locked := keyMux.mux.TryLock()
|
||||
if locked {
|
||||
keyMux.ident = identifier
|
||||
}
|
||||
return locked
|
||||
}
|
||||
|
||||
func (k *keyMutex) Lock(key, identifier string) {
|
||||
|
|
@ -58,8 +56,6 @@ func (k *keyMutex) Unlock(key string, remove bool) {
|
|||
if remove {
|
||||
k.Delete(key)
|
||||
}
|
||||
_, filename, line, _ := runtime.Caller(1)
|
||||
slog.Debug("unlocking", "key", key, "identifier", keyMux.ident, "caller", fmt.Sprintf("%s:%d", filename, line))
|
||||
keyMux.ident = ""
|
||||
keyMux.mux.Unlock()
|
||||
}
|
||||
|
|
@ -68,56 +64,15 @@ func (k *keyMutex) Delete(key string) {
|
|||
k.muxes.Delete(key)
|
||||
}
|
||||
|
||||
func NewInstanceDeleteBackoff(_ context.Context) (InstanceDeleteBackoff, error) {
|
||||
return &instanceDeleteBackoff{}, nil
|
||||
}
|
||||
|
||||
type instanceBackOff struct {
|
||||
backoffSeconds float64
|
||||
lastRecordedFailureTime time.Time
|
||||
mux sync.Mutex
|
||||
}
|
||||
|
||||
type instanceDeleteBackoff struct {
|
||||
muxes sync.Map
|
||||
}
|
||||
|
||||
func (i *instanceDeleteBackoff) ShouldProcess(key string) (bool, time.Time) {
|
||||
backoff, loaded := i.muxes.LoadOrStore(key, &instanceBackOff{})
|
||||
if !loaded {
|
||||
return true, time.Time{}
|
||||
func (k *keyMutex) LockedBy(key string) (string, bool) {
|
||||
mux, ok := k.muxes.Load(key)
|
||||
if !ok {
|
||||
return "", false
|
||||
}
|
||||
keyMux := mux.(*lockWithIdent)
|
||||
if keyMux.ident == "" {
|
||||
return "", false
|
||||
}
|
||||
|
||||
ib := backoff.(*instanceBackOff)
|
||||
ib.mux.Lock()
|
||||
defer ib.mux.Unlock()
|
||||
|
||||
if ib.lastRecordedFailureTime.IsZero() || ib.backoffSeconds == 0 {
|
||||
return true, time.Time{}
|
||||
}
|
||||
|
||||
now := time.Now().UTC()
|
||||
deadline := ib.lastRecordedFailureTime.Add(time.Duration(ib.backoffSeconds) * time.Second)
|
||||
return deadline.After(now), deadline
|
||||
}
|
||||
|
||||
func (i *instanceDeleteBackoff) Delete(key string) {
|
||||
i.muxes.Delete(key)
|
||||
}
|
||||
|
||||
func (i *instanceDeleteBackoff) RecordFailure(key string) {
|
||||
backoff, _ := i.muxes.LoadOrStore(key, &instanceBackOff{})
|
||||
ib := backoff.(*instanceBackOff)
|
||||
ib.mux.Lock()
|
||||
defer ib.mux.Unlock()
|
||||
|
||||
ib.lastRecordedFailureTime = time.Now().UTC()
|
||||
if ib.backoffSeconds == 0 {
|
||||
ib.backoffSeconds = common.PoolConsilitationInterval.Seconds()
|
||||
} else {
|
||||
// Geometric progression of 1.5
|
||||
newBackoff := ib.backoffSeconds * 1.5
|
||||
// Cap the backoff to 20 minutes
|
||||
ib.backoffSeconds = min(newBackoff, maxBackoffSeconds)
|
||||
}
|
||||
return keyMux.ident, true
|
||||
}
|
||||
|
|
|
|||
228
locking/local_locker_test.go
Normal file
228
locking/local_locker_test.go
Normal file
|
|
@ -0,0 +1,228 @@
|
|||
package locking
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/suite"
|
||||
)
|
||||
|
||||
type LockerTestSuite struct {
|
||||
suite.Suite
|
||||
|
||||
mux *keyMutex
|
||||
}
|
||||
|
||||
func (l *LockerTestSuite) SetupTest() {
|
||||
l.mux = &keyMutex{}
|
||||
err := RegisterLocker(l.mux)
|
||||
l.Require().NoError(err, "should register the locker")
|
||||
}
|
||||
|
||||
func (l *LockerTestSuite) TearDownTest() {
|
||||
l.mux = nil
|
||||
locker = nil
|
||||
}
|
||||
|
||||
func (l *LockerTestSuite) TestLocalLockerLockUnlock() {
|
||||
l.mux.Lock("test", "test-identifier")
|
||||
mux, ok := l.mux.muxes.Load("test")
|
||||
l.Require().True(ok)
|
||||
keyMux := mux.(*lockWithIdent)
|
||||
l.Require().Equal("test-identifier", keyMux.ident)
|
||||
l.mux.Unlock("test", true)
|
||||
mux, ok = l.mux.muxes.Load("test")
|
||||
l.Require().False(ok)
|
||||
l.Require().Nil(mux)
|
||||
l.mux.Unlock("test", false)
|
||||
}
|
||||
|
||||
func (l *LockerTestSuite) TestLocalLockerTryLock() {
|
||||
locked := l.mux.TryLock("test", "test-identifier")
|
||||
l.Require().True(locked)
|
||||
mux, ok := l.mux.muxes.Load("test")
|
||||
l.Require().True(ok)
|
||||
keyMux := mux.(*lockWithIdent)
|
||||
l.Require().Equal("test-identifier", keyMux.ident)
|
||||
|
||||
locked = l.mux.TryLock("test", "another-identifier2")
|
||||
l.Require().False(locked)
|
||||
mux, ok = l.mux.muxes.Load("test")
|
||||
l.Require().True(ok)
|
||||
keyMux = mux.(*lockWithIdent)
|
||||
l.Require().Equal("test-identifier", keyMux.ident)
|
||||
|
||||
l.mux.Unlock("test", true)
|
||||
locked = l.mux.TryLock("test", "another-identifier2")
|
||||
l.Require().True(locked)
|
||||
mux, ok = l.mux.muxes.Load("test")
|
||||
l.Require().True(ok)
|
||||
keyMux = mux.(*lockWithIdent)
|
||||
l.Require().Equal("another-identifier2", keyMux.ident)
|
||||
l.mux.Unlock("test", true)
|
||||
}
|
||||
|
||||
func (l *LockerTestSuite) TestLocalLockertLockedBy() {
|
||||
l.mux.Lock("test", "test-identifier")
|
||||
identifier, ok := l.mux.LockedBy("test")
|
||||
l.Require().True(ok)
|
||||
l.Require().Equal("test-identifier", identifier)
|
||||
l.mux.Unlock("test", true)
|
||||
identifier, ok = l.mux.LockedBy("test")
|
||||
l.Require().False(ok)
|
||||
l.Require().Equal("", identifier)
|
||||
|
||||
l.mux.Lock("test", "test-identifier")
|
||||
identifier, ok = l.mux.LockedBy("test")
|
||||
l.Require().True(ok)
|
||||
l.Require().Equal("test-identifier", identifier)
|
||||
l.mux.Unlock("test", false)
|
||||
identifier, ok = l.mux.LockedBy("test")
|
||||
l.Require().False(ok)
|
||||
l.Require().Equal("", identifier)
|
||||
}
|
||||
|
||||
func (l *LockerTestSuite) TestLockerPanicsIfNotInitialized() {
|
||||
locker = nil
|
||||
l.Require().Panics(
|
||||
func() {
|
||||
Lock("test", "test-identifier")
|
||||
},
|
||||
"Lock should panic if locker is not initialized",
|
||||
)
|
||||
|
||||
l.Require().Panics(
|
||||
func() {
|
||||
TryLock("test", "test-identifier")
|
||||
},
|
||||
"TryLock should panic if locker is not initialized",
|
||||
)
|
||||
|
||||
l.Require().Panics(
|
||||
func() {
|
||||
Unlock("test", false)
|
||||
},
|
||||
"Unlock should panic if locker is not initialized",
|
||||
)
|
||||
|
||||
l.Require().Panics(
|
||||
func() {
|
||||
Delete("test")
|
||||
},
|
||||
"Delete should panic if locker is not initialized",
|
||||
)
|
||||
|
||||
l.Require().Panics(
|
||||
func() {
|
||||
LockedBy("test")
|
||||
},
|
||||
"LockedBy should panic if locker is not initialized",
|
||||
)
|
||||
}
|
||||
|
||||
func (l *LockerTestSuite) TestLockerAlreadyRegistered() {
|
||||
err := RegisterLocker(l.mux)
|
||||
l.Require().Error(err, "should not be able to register the same locker again")
|
||||
l.Require().Equal("locker already registered", err.Error())
|
||||
}
|
||||
|
||||
func (l *LockerTestSuite) TestLockerDelete() {
|
||||
Lock("test", "test-identifier")
|
||||
mux, ok := l.mux.muxes.Load("test")
|
||||
l.Require().True(ok)
|
||||
keyMux := mux.(*lockWithIdent)
|
||||
l.Require().Equal("test-identifier", keyMux.ident)
|
||||
|
||||
Delete("test")
|
||||
mux, ok = l.mux.muxes.Load("test")
|
||||
l.Require().False(ok)
|
||||
l.Require().Nil(mux)
|
||||
|
||||
identifier, ok := l.mux.LockedBy("test")
|
||||
l.Require().False(ok)
|
||||
l.Require().Equal("", identifier)
|
||||
}
|
||||
|
||||
func (l *LockerTestSuite) TestLockUnlock() {
|
||||
Lock("test", "test-identifier")
|
||||
mux, ok := l.mux.muxes.Load("test")
|
||||
l.Require().True(ok)
|
||||
keyMux := mux.(*lockWithIdent)
|
||||
l.Require().Equal("test-identifier", keyMux.ident)
|
||||
|
||||
Unlock("test", true)
|
||||
mux, ok = l.mux.muxes.Load("test")
|
||||
l.Require().False(ok)
|
||||
l.Require().Nil(mux)
|
||||
|
||||
identifier, ok := l.mux.LockedBy("test")
|
||||
l.Require().False(ok)
|
||||
l.Require().Equal("", identifier)
|
||||
}
|
||||
|
||||
func (l *LockerTestSuite) TestLockUnlockWithoutRemove() {
|
||||
Lock("test", "test-identifier")
|
||||
mux, ok := l.mux.muxes.Load("test")
|
||||
l.Require().True(ok)
|
||||
keyMux := mux.(*lockWithIdent)
|
||||
l.Require().Equal("test-identifier", keyMux.ident)
|
||||
|
||||
Unlock("test", false)
|
||||
mux, ok = l.mux.muxes.Load("test")
|
||||
l.Require().True(ok)
|
||||
keyMux = mux.(*lockWithIdent)
|
||||
l.Require().Equal("", keyMux.ident)
|
||||
|
||||
identifier, ok := l.mux.LockedBy("test")
|
||||
l.Require().False(ok)
|
||||
l.Require().Equal("", identifier)
|
||||
}
|
||||
|
||||
func (l *LockerTestSuite) TestTryLock() {
|
||||
locked := TryLock("test", "test-identifier")
|
||||
l.Require().True(locked)
|
||||
mux, ok := l.mux.muxes.Load("test")
|
||||
l.Require().True(ok)
|
||||
keyMux := mux.(*lockWithIdent)
|
||||
l.Require().Equal("test-identifier", keyMux.ident)
|
||||
|
||||
locked = TryLock("test", "another-identifier2")
|
||||
l.Require().False(locked)
|
||||
mux, ok = l.mux.muxes.Load("test")
|
||||
l.Require().True(ok)
|
||||
keyMux = mux.(*lockWithIdent)
|
||||
l.Require().Equal("test-identifier", keyMux.ident)
|
||||
|
||||
Unlock("test", true)
|
||||
locked = TryLock("test", "another-identifier2")
|
||||
l.Require().True(locked)
|
||||
mux, ok = l.mux.muxes.Load("test")
|
||||
l.Require().True(ok)
|
||||
keyMux = mux.(*lockWithIdent)
|
||||
l.Require().Equal("another-identifier2", keyMux.ident)
|
||||
Unlock("test", true)
|
||||
}
|
||||
|
||||
func (l *LockerTestSuite) TestLockedBy() {
|
||||
Lock("test", "test-identifier")
|
||||
identifier, ok := LockedBy("test")
|
||||
l.Require().True(ok)
|
||||
l.Require().Equal("test-identifier", identifier)
|
||||
Unlock("test", true)
|
||||
identifier, ok = LockedBy("test")
|
||||
l.Require().False(ok)
|
||||
l.Require().Equal("", identifier)
|
||||
|
||||
Lock("test", "test-identifier2")
|
||||
identifier, ok = LockedBy("test")
|
||||
l.Require().True(ok)
|
||||
l.Require().Equal("test-identifier2", identifier)
|
||||
Unlock("test", false)
|
||||
identifier, ok = LockedBy("test")
|
||||
l.Require().False(ok)
|
||||
l.Require().Equal("", identifier)
|
||||
}
|
||||
|
||||
func TestLockerTestSuite(t *testing.T) {
|
||||
t.Parallel()
|
||||
suite.Run(t, new(LockerTestSuite))
|
||||
}
|
||||
|
|
@ -11,48 +11,56 @@ var locker Locker
|
|||
|
||||
var lockerMux = sync.Mutex{}
|
||||
|
||||
func TryLock(key, identifier string) (ok bool, err error) {
|
||||
_, filename, line, _ := runtime.Caller(1)
|
||||
slog.Debug("attempting to try lock", "key", key, "identifier", identifier, "caller", fmt.Sprintf("%s:%d", filename, line))
|
||||
defer slog.Debug("try lock returned", "key", key, "identifier", identifier, "locked", ok, "caller", fmt.Sprintf("%s:%d", filename, line))
|
||||
if locker == nil {
|
||||
return false, fmt.Errorf("no locker is registered")
|
||||
}
|
||||
|
||||
ok = locker.TryLock(key, identifier)
|
||||
return ok, nil
|
||||
}
|
||||
|
||||
func Lock(key, identifier string) {
|
||||
_, filename, line, _ := runtime.Caller(1)
|
||||
slog.Debug("attempting to lock", "key", key, "identifier", identifier, "caller", fmt.Sprintf("%s:%d", filename, line))
|
||||
defer slog.Debug("lock acquired", "key", key, "identifier", identifier, "caller", fmt.Sprintf("%s:%d", filename, line))
|
||||
|
||||
func TryLock(key, identifier string) (ok bool) {
|
||||
if locker == nil {
|
||||
panic("no locker is registered")
|
||||
}
|
||||
|
||||
_, filename, line, _ := runtime.Caller(1)
|
||||
slog.Debug("attempting to try lock", "key", key, "identifier", identifier, "caller", fmt.Sprintf("%s:%d", filename, line))
|
||||
defer slog.Debug("try lock returned", "key", key, "identifier", identifier, "locked", ok, "caller", fmt.Sprintf("%s:%d", filename, line))
|
||||
|
||||
ok = locker.TryLock(key, identifier)
|
||||
return ok
|
||||
}
|
||||
|
||||
func Lock(key, identifier string) {
|
||||
if locker == nil {
|
||||
panic("no locker is registered")
|
||||
}
|
||||
|
||||
_, filename, line, _ := runtime.Caller(1)
|
||||
slog.Debug("attempting to lock", "key", key, "identifier", identifier, "caller", fmt.Sprintf("%s:%d", filename, line))
|
||||
defer slog.Debug("lock acquired", "key", key, "identifier", identifier, "caller", fmt.Sprintf("%s:%d", filename, line))
|
||||
|
||||
locker.Lock(key, identifier)
|
||||
}
|
||||
|
||||
func Unlock(key string, remove bool) error {
|
||||
_, filename, line, _ := runtime.Caller(1)
|
||||
slog.Debug("attempting to unlock", "key", key, "remove", remove, "caller", fmt.Sprintf("%s:%d", filename, line))
|
||||
func Unlock(key string, remove bool) {
|
||||
if locker == nil {
|
||||
return fmt.Errorf("no locker is registered")
|
||||
panic("no locker is registered")
|
||||
}
|
||||
|
||||
_, filename, line, _ := runtime.Caller(1)
|
||||
slog.Debug("attempting to unlock", "key", key, "remove", remove, "caller", fmt.Sprintf("%s:%d", filename, line))
|
||||
defer slog.Debug("unlock completed", "key", key, "remove", remove, "caller", fmt.Sprintf("%s:%d", filename, line))
|
||||
locker.Unlock(key, remove)
|
||||
return nil
|
||||
}
|
||||
|
||||
func Delete(key string) error {
|
||||
func LockedBy(key string) (string, bool) {
|
||||
if locker == nil {
|
||||
return fmt.Errorf("no locker is registered")
|
||||
panic("no locker is registered")
|
||||
}
|
||||
|
||||
return locker.LockedBy(key)
|
||||
}
|
||||
|
||||
func Delete(key string) {
|
||||
if locker == nil {
|
||||
panic("no locker is registered")
|
||||
}
|
||||
|
||||
locker.Delete(key)
|
||||
return nil
|
||||
}
|
||||
|
||||
func RegisterLocker(lock Locker) error {
|
||||
|
|
|
|||
|
|
@ -422,11 +422,11 @@ func (r *basePoolManager) cleanupOrphanedProviderRunners(runners []*github.Runne
|
|||
continue
|
||||
}
|
||||
|
||||
lockAcquired, err := locking.TryLock(instance.Name, r.consumerID)
|
||||
if !lockAcquired || err != nil {
|
||||
lockAcquired := locking.TryLock(instance.Name, r.consumerID)
|
||||
if !lockAcquired {
|
||||
slog.DebugContext(
|
||||
r.ctx, "failed to acquire lock for instance",
|
||||
"runner_name", instance.Name, "error", err)
|
||||
"runner_name", instance.Name)
|
||||
continue
|
||||
}
|
||||
defer locking.Unlock(instance.Name, false)
|
||||
|
|
@ -505,11 +505,11 @@ func (r *basePoolManager) reapTimedOutRunners(runners []*github.Runner) error {
|
|||
slog.DebugContext(
|
||||
r.ctx, "attempting to lock instance",
|
||||
"runner_name", instance.Name)
|
||||
lockAcquired, err := locking.TryLock(instance.Name, r.consumerID)
|
||||
if !lockAcquired || err != nil {
|
||||
lockAcquired := locking.TryLock(instance.Name, r.consumerID)
|
||||
if !lockAcquired {
|
||||
slog.DebugContext(
|
||||
r.ctx, "failed to acquire lock for instance",
|
||||
"runner_name", instance.Name, "error", err)
|
||||
"runner_name", instance.Name)
|
||||
continue
|
||||
}
|
||||
defer locking.Unlock(instance.Name, false)
|
||||
|
|
@ -639,11 +639,11 @@ func (r *basePoolManager) cleanupOrphanedGithubRunners(runners []*github.Runner)
|
|||
poolInstanceCache[dbInstance.PoolID] = poolInstances
|
||||
}
|
||||
|
||||
lockAcquired, err := locking.TryLock(dbInstance.Name, r.consumerID)
|
||||
if !lockAcquired || err != nil {
|
||||
lockAcquired := locking.TryLock(dbInstance.Name, r.consumerID)
|
||||
if !lockAcquired {
|
||||
slog.DebugContext(
|
||||
r.ctx, "failed to acquire lock for instance",
|
||||
"runner_name", dbInstance.Name, "error", err)
|
||||
"runner_name", dbInstance.Name)
|
||||
continue
|
||||
}
|
||||
|
||||
|
|
@ -1076,11 +1076,11 @@ func (r *basePoolManager) scaleDownOnePool(ctx context.Context, pool params.Pool
|
|||
for _, instanceToDelete := range idleWorkers[:numScaleDown] {
|
||||
instanceToDelete := instanceToDelete
|
||||
|
||||
lockAcquired, err := locking.TryLock(instanceToDelete.Name, r.consumerID)
|
||||
if !lockAcquired || err != nil {
|
||||
lockAcquired := locking.TryLock(instanceToDelete.Name, r.consumerID)
|
||||
if !lockAcquired {
|
||||
slog.With(slog.Any("error", err)).ErrorContext(
|
||||
ctx, "failed to acquire lock for instance",
|
||||
"provider_id", instanceToDelete.Name, "error", err)
|
||||
"provider_id", instanceToDelete.Name)
|
||||
continue
|
||||
}
|
||||
defer locking.Unlock(instanceToDelete.Name, false)
|
||||
|
|
@ -1229,11 +1229,11 @@ func (r *basePoolManager) retryFailedInstancesForOnePool(ctx context.Context, po
|
|||
slog.DebugContext(
|
||||
ctx, "attempting to retry failed instance",
|
||||
"runner_name", instance.Name)
|
||||
lockAcquired, err := locking.TryLock(instance.Name, r.consumerID)
|
||||
if !lockAcquired || err != nil {
|
||||
lockAcquired := locking.TryLock(instance.Name, r.consumerID)
|
||||
if !lockAcquired {
|
||||
slog.DebugContext(
|
||||
ctx, "failed to acquire lock for instance",
|
||||
"runner_name", instance.Name, "error", err)
|
||||
"runner_name", instance.Name)
|
||||
continue
|
||||
}
|
||||
|
||||
|
|
@ -1413,8 +1413,8 @@ func (r *basePoolManager) deletePendingInstances() error {
|
|||
r.ctx, "removing instance from pool",
|
||||
"runner_name", instance.Name,
|
||||
"pool_id", instance.PoolID)
|
||||
lockAcquired, err := locking.TryLock(instance.Name, r.consumerID)
|
||||
if !lockAcquired || err != nil {
|
||||
lockAcquired := locking.TryLock(instance.Name, r.consumerID)
|
||||
if !lockAcquired {
|
||||
slog.InfoContext(
|
||||
r.ctx, "failed to acquire lock for instance",
|
||||
"runner_name", instance.Name)
|
||||
|
|
@ -1525,11 +1525,11 @@ func (r *basePoolManager) addPendingInstances() error {
|
|||
r.ctx, "attempting to acquire lock for instance",
|
||||
"runner_name", instance.Name,
|
||||
"action", "create_pending")
|
||||
lockAcquired, err := locking.TryLock(instance.Name, r.consumerID)
|
||||
if !lockAcquired || err != nil {
|
||||
lockAcquired := locking.TryLock(instance.Name, r.consumerID)
|
||||
if !lockAcquired {
|
||||
slog.DebugContext(
|
||||
r.ctx, "failed to acquire lock for instance",
|
||||
"runner_name", instance.Name, "error", err)
|
||||
"runner_name", instance.Name)
|
||||
continue
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -291,7 +291,7 @@ func (w *Worker) reapTimedOutRunners(runners map[string]params.RunnerReference)
|
|||
continue
|
||||
}
|
||||
if ghRunner, ok := runners[runner.Name]; !ok || ghRunner.GetStatus() == params.RunnerOffline {
|
||||
if ok, err := locking.TryLock(runner.Name, w.consumerID); err != nil || !ok {
|
||||
if ok := locking.TryLock(runner.Name, w.consumerID); !ok {
|
||||
slog.DebugContext(w.ctx, "runner is locked; skipping", "runner_name", runner.Name)
|
||||
continue
|
||||
}
|
||||
|
|
@ -360,7 +360,7 @@ func (w *Worker) consolidateRunnerState(runners []params.RunnerReference) error
|
|||
}
|
||||
|
||||
if _, ok := ghRunnersByName[name]; !ok {
|
||||
if ok, err := locking.TryLock(name, w.consumerID); err != nil || !ok {
|
||||
if ok := locking.TryLock(name, w.consumerID); !ok {
|
||||
slog.DebugContext(w.ctx, "runner is locked; skipping", "runner_name", name)
|
||||
continue
|
||||
}
|
||||
|
|
@ -446,8 +446,8 @@ func (w *Worker) consolidateRunnerState(runners []params.RunnerReference) error
|
|||
continue
|
||||
}
|
||||
|
||||
locked, err := locking.TryLock(runner.Name, w.consumerID)
|
||||
if err != nil || !locked {
|
||||
locked := locking.TryLock(runner.Name, w.consumerID)
|
||||
if !locked {
|
||||
slog.DebugContext(w.ctx, "runner is locked; skipping", "runner_name", runner.Name)
|
||||
continue
|
||||
}
|
||||
|
|
@ -777,8 +777,8 @@ func (w *Worker) handleScaleDown(target, current uint) {
|
|||
removed := 0
|
||||
candidates := []params.Instance{}
|
||||
for _, runner := range w.runners {
|
||||
locked, err := locking.TryLock(runner.Name, w.consumerID)
|
||||
if err != nil || !locked {
|
||||
locked := locking.TryLock(runner.Name, w.consumerID)
|
||||
if !locked {
|
||||
slog.DebugContext(w.ctx, "runner is locked; skipping", "runner_name", runner.Name)
|
||||
continue
|
||||
}
|
||||
|
|
@ -809,8 +809,8 @@ func (w *Worker) handleScaleDown(target, current uint) {
|
|||
break
|
||||
}
|
||||
|
||||
locked, err := locking.TryLock(runner.Name, w.consumerID)
|
||||
if err != nil || !locked {
|
||||
locked := locking.TryLock(runner.Name, w.consumerID)
|
||||
if !locked {
|
||||
slog.DebugContext(w.ctx, "runner is locked; skipping", "runner_name", runner.Name)
|
||||
continue
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue