Switch to locking package

The locking logic was added to its own package as it may need to be used
by other parts of the code.

Signed-off-by: Gabriel Adrian Samfira <gsamfira@cloudbasesolutions.com>
This commit is contained in:
Gabriel Adrian Samfira 2025-04-07 16:45:05 +00:00
parent e51f19acc8
commit 5ba53adf84
6 changed files with 131 additions and 62 deletions

View file

@ -41,6 +41,7 @@ import (
"github.com/cloudbase/garm/database"
"github.com/cloudbase/garm/database/common"
"github.com/cloudbase/garm/database/watcher"
"github.com/cloudbase/garm/locking"
"github.com/cloudbase/garm/metrics"
"github.com/cloudbase/garm/params"
"github.com/cloudbase/garm/runner" //nolint:typecheck
@ -214,6 +215,17 @@ func main() {
log.Fatal(err)
}
// Local locker for now. Will be configurable in the future,
// as we add scale-out capability to GARM.
lock, err := locking.NewLocalLocker(ctx, db)
if err != nil {
log.Fatalf("failed to create locker: %q", err)
}
if err := locking.RegisterLocker(lock); err != nil {
log.Fatalf("failed to register locker: %q", err)
}
if err := maybeUpdateURLsFromConfig(*cfg, db); err != nil {
log.Fatal(err)
}

16
locking/interface.go Normal file
View file

@ -0,0 +1,16 @@
package locking
import "time"
// TODO(gabriel-samfira): needs owner attribute.
type Locker interface {
TryLock(key string) bool
Unlock(key string, remove bool)
Delete(key string)
}
type InstanceDeleteBackoff interface {
ShouldProcess(key string) (bool, time.Time)
Delete(key string)
RecordFailure(key string)
}

View file

@ -1,9 +1,11 @@
package pool
package locking
import (
"context"
"sync"
"time"
dbCommon "github.com/cloudbase/garm/database/common"
"github.com/cloudbase/garm/runner/common"
)
@ -11,10 +13,16 @@ const (
maxBackoffSeconds float64 = 1200 // 20 minutes
)
func NewLocalLocker(_ context.Context, _ dbCommon.Store) (Locker, error) {
return &keyMutex{}, nil
}
type keyMutex struct {
muxes sync.Map
}
var _ Locker = &keyMutex{}
func (k *keyMutex) TryLock(key string) bool {
mux, _ := k.muxes.LoadOrStore(key, &sync.Mutex{})
keyMux := mux.(*sync.Mutex)
@ -37,6 +45,10 @@ func (k *keyMutex) Delete(key string) {
k.muxes.Delete(key)
}
func NewInstanceDeleteBackoff(_ context.Context) (InstanceDeleteBackoff, error) {
return &instanceDeleteBackoff{}, nil
}
type instanceBackOff struct {
backoffSeconds float64
lastRecordedFailureTime time.Time

46
locking/locking.go Normal file
View file

@ -0,0 +1,46 @@
package locking
import (
"fmt"
"sync"
)
var locker Locker
var lockerMux = sync.Mutex{}
func TryLock(key string) (bool, error) {
if locker == nil {
return false, fmt.Errorf("no locker is registered")
}
return locker.TryLock(key), nil
}
func Unlock(key string, remove bool) error {
if locker == nil {
return fmt.Errorf("no locker is registered")
}
locker.Unlock(key, remove)
return nil
}
func Delete(key string) error {
if locker == nil {
return fmt.Errorf("no locker is registered")
}
locker.Delete(key)
return nil
}
func RegisterLocker(lock Locker) error {
lockerMux.Lock()
defer lockerMux.Unlock()
if locker != nil {
return fmt.Errorf("locker already registered")
}
locker = lock
return nil
}

View file

@ -38,6 +38,7 @@ import (
"github.com/cloudbase/garm/auth"
dbCommon "github.com/cloudbase/garm/database/common"
"github.com/cloudbase/garm/database/watcher"
"github.com/cloudbase/garm/locking"
"github.com/cloudbase/garm/params"
"github.com/cloudbase/garm/runner/common"
garmUtil "github.com/cloudbase/garm/util"
@ -92,8 +93,10 @@ func NewEntityPoolManager(ctx context.Context, entity params.GithubEntity, insta
}
wg := &sync.WaitGroup{}
keyMuxes := &keyMutex{}
backoff := &instanceDeleteBackoff{}
backoff, err := locking.NewInstanceDeleteBackoff(ctx)
if err != nil {
return nil, errors.Wrap(err, "creating backoff")
}
repo := &basePoolManager{
ctx: ctx,
@ -106,7 +109,6 @@ func NewEntityPoolManager(ctx context.Context, entity params.GithubEntity, insta
providers: providers,
quit: make(chan struct{}),
wg: wg,
keyMux: keyMuxes,
backoff: backoff,
consumer: consumer,
}
@ -132,8 +134,7 @@ type basePoolManager struct {
mux sync.Mutex
wg *sync.WaitGroup
keyMux *keyMutex
backoff *instanceDeleteBackoff
backoff locking.InstanceDeleteBackoff
}
func (r *basePoolManager) getProviderBaseParams(pool params.Pool) common.ProviderBaseParams {
@ -414,14 +415,14 @@ func (r *basePoolManager) cleanupOrphanedProviderRunners(runners []*github.Runne
}
for _, instance := range dbInstances {
lockAcquired := r.keyMux.TryLock(instance.Name)
if !lockAcquired {
lockAcquired, err := locking.TryLock(instance.Name)
if !lockAcquired || err != nil {
slog.DebugContext(
r.ctx, "failed to acquire lock for instance",
"runner_name", instance.Name)
"runner_name", instance.Name, "error", err)
continue
}
defer r.keyMux.Unlock(instance.Name, false)
defer locking.Unlock(instance.Name, false)
switch instance.Status {
case commonParams.InstancePendingCreate,
@ -493,14 +494,14 @@ func (r *basePoolManager) reapTimedOutRunners(runners []*github.Runner) error {
slog.DebugContext(
r.ctx, "attempting to lock instance",
"runner_name", instance.Name)
lockAcquired := r.keyMux.TryLock(instance.Name)
if !lockAcquired {
lockAcquired, err := locking.TryLock(instance.Name)
if !lockAcquired || err != nil {
slog.DebugContext(
r.ctx, "failed to acquire lock for instance",
"runner_name", instance.Name)
"runner_name", instance.Name, "error", err)
continue
}
defer r.keyMux.Unlock(instance.Name, false)
defer locking.Unlock(instance.Name, false)
pool, err := r.store.GetEntityPool(r.ctx, r.entity, instance.PoolID)
if err != nil {
@ -624,11 +625,11 @@ func (r *basePoolManager) cleanupOrphanedGithubRunners(runners []*github.Runner)
poolInstanceCache[pool.ID] = poolInstances
}
lockAcquired := r.keyMux.TryLock(dbInstance.Name)
if !lockAcquired {
lockAcquired, err := locking.TryLock(dbInstance.Name)
if !lockAcquired || err != nil {
slog.DebugContext(
r.ctx, "failed to acquire lock for instance",
"runner_name", dbInstance.Name)
"runner_name", dbInstance.Name, "error", err)
continue
}
@ -637,7 +638,7 @@ func (r *basePoolManager) cleanupOrphanedGithubRunners(runners []*github.Runner)
g.Go(func() error {
deleteMux := false
defer func() {
r.keyMux.Unlock(dbInstance.Name, deleteMux)
locking.Unlock(dbInstance.Name, deleteMux)
}()
providerInstance, ok := instanceInList(dbInstance.Name, poolInstances)
if !ok {
@ -877,7 +878,7 @@ func (r *basePoolManager) addInstanceToProvider(instance params.Instance) error
bootstrapArgs := commonParams.BootstrapInstance{
Name: instance.Name,
Tools: r.tools,
RepoURL: r.GithubURL(),
RepoURL: r.entity.GithubURL(),
MetadataURL: instance.MetadataURL,
CallbackURL: instance.CallbackURL,
InstanceToken: jwtToken,
@ -1062,14 +1063,14 @@ func (r *basePoolManager) scaleDownOnePool(ctx context.Context, pool params.Pool
for _, instanceToDelete := range idleWorkers[:numScaleDown] {
instanceToDelete := instanceToDelete
lockAcquired := r.keyMux.TryLock(instanceToDelete.Name)
if !lockAcquired {
lockAcquired, err := locking.TryLock(instanceToDelete.Name)
if !lockAcquired || err != nil {
slog.With(slog.Any("error", err)).ErrorContext(
ctx, "failed to acquire lock for instance",
"provider_id", instanceToDelete.Name)
"provider_id", instanceToDelete.Name, "error", err)
continue
}
defer r.keyMux.Unlock(instanceToDelete.Name, false)
defer locking.Unlock(instanceToDelete.Name, false)
g.Go(func() error {
slog.InfoContext(
@ -1215,16 +1216,16 @@ func (r *basePoolManager) retryFailedInstancesForOnePool(ctx context.Context, po
slog.DebugContext(
ctx, "attempting to retry failed instance",
"runner_name", instance.Name)
lockAcquired := r.keyMux.TryLock(instance.Name)
if !lockAcquired {
lockAcquired, err := locking.TryLock(instance.Name)
if !lockAcquired || err != nil {
slog.DebugContext(
ctx, "failed to acquire lock for instance",
"runner_name", instance.Name)
"runner_name", instance.Name, "error", err)
continue
}
g.Go(func() error {
defer r.keyMux.Unlock(instance.Name, false)
defer locking.Unlock(instance.Name, false)
slog.DebugContext(
ctx, "attempting to clean up any previous instance",
"runner_name", instance.Name)
@ -1394,8 +1395,8 @@ func (r *basePoolManager) deletePendingInstances() error {
r.ctx, "removing instance from pool",
"runner_name", instance.Name,
"pool_id", instance.PoolID)
lockAcquired := r.keyMux.TryLock(instance.Name)
if !lockAcquired {
lockAcquired, err := locking.TryLock(instance.Name)
if !lockAcquired || err != nil {
slog.InfoContext(
r.ctx, "failed to acquire lock for instance",
"runner_name", instance.Name)
@ -1407,7 +1408,7 @@ func (r *basePoolManager) deletePendingInstances() error {
slog.DebugContext(
r.ctx, "backoff in effect for instance",
"runner_name", instance.Name, "deadline", deadline)
r.keyMux.Unlock(instance.Name, false)
locking.Unlock(instance.Name, false)
continue
}
@ -1424,7 +1425,7 @@ func (r *basePoolManager) deletePendingInstances() error {
currentStatus := instance.Status
deleteMux := false
defer func() {
r.keyMux.Unlock(instance.Name, deleteMux)
locking.Unlock(instance.Name, deleteMux)
if deleteMux {
// deleteMux is set only when the instance was successfully removed.
// We can use it as a marker to signal that the backoff is no longer
@ -1501,11 +1502,11 @@ func (r *basePoolManager) addPendingInstances() error {
r.ctx, "attempting to acquire lock for instance",
"runner_name", instance.Name,
"action", "create_pending")
lockAcquired := r.keyMux.TryLock(instance.Name)
if !lockAcquired {
lockAcquired, err := locking.TryLock(instance.Name)
if !lockAcquired || err != nil {
slog.DebugContext(
r.ctx, "failed to acquire lock for instance",
"runner_name", instance.Name)
"runner_name", instance.Name, "error", err)
continue
}
@ -1515,14 +1516,14 @@ func (r *basePoolManager) addPendingInstances() error {
slog.With(slog.Any("error", err)).ErrorContext(
r.ctx, "failed to update runner status",
"runner_name", instance.Name)
r.keyMux.Unlock(instance.Name, false)
locking.Unlock(instance.Name, false)
// We failed to transition the instance to Creating. This means that garm will retry to create this instance
// when the loop runs again and we end up with multiple instances.
continue
}
go func(instance params.Instance) {
defer r.keyMux.Unlock(instance.Name, false)
defer locking.Unlock(instance.Name, false)
slog.InfoContext(
r.ctx, "creating instance in pool",
"runner_name", instance.Name,
@ -2027,18 +2028,6 @@ func (r *basePoolManager) GetGithubRunners() ([]*github.Runner, error) {
return allRunners, nil
}
func (r *basePoolManager) GithubURL() string {
switch r.entity.EntityType {
case params.GithubEntityTypeRepository:
return fmt.Sprintf("%s/%s/%s", r.entity.Credentials.BaseURL, r.entity.Owner, r.entity.Name)
case params.GithubEntityTypeOrganization:
return fmt.Sprintf("%s/%s", r.entity.Credentials.BaseURL, r.entity.Owner)
case params.GithubEntityTypeEnterprise:
return fmt.Sprintf("%s/enterprises/%s", r.entity.Credentials.BaseURL, r.entity.Owner)
}
return ""
}
func (r *basePoolManager) GetWebhookInfo(ctx context.Context) (params.HookInfo, error) {
allHooks, err := r.listHooks(ctx)
if err != nil {

View file

@ -35,18 +35,6 @@ import (
const maxCapacityHeader = "X-ScaleSetMaxCapacity"
func NewMessageSession(ctx context.Context, cli *ScaleSetClient, session *params.RunnerScaleSetSession) (*MessageSession, error) {
sess := &MessageSession{
ssCli: cli,
session: session,
ctx: ctx,
done: make(chan struct{}),
closed: false,
}
go sess.loop()
return sess, nil
}
type MessageSession struct {
ssCli *ScaleSetClient
session *params.RunnerScaleSetSession
@ -243,10 +231,16 @@ func (s *ScaleSetClient) CreateMessageSession(ctx context.Context, runnerScaleSe
return nil, fmt.Errorf("failed to decode response: %w", err)
}
return &MessageSession{
sess := &MessageSession{
ssCli: s,
session: &createdSession,
}, nil
ctx: ctx,
done: make(chan struct{}),
closed: false,
}
go sess.loop()
return sess, nil
}
func (s *ScaleSetClient) DeleteMessageSession(ctx context.Context, session *MessageSession) error {