From bc470c5f7896eb2506dec3a9a6590065920a3833 Mon Sep 17 00:00:00 2001 From: Gabriel Adrian Samfira Date: Thu, 17 Apr 2025 22:59:24 +0000 Subject: [PATCH] WiP Signed-off-by: Gabriel Adrian Samfira --- locking/interface.go | 1 + locking/local_locker.go | 6 ++++++ locking/locking.go | 9 ++++++++ workers/scaleset/scaleset.go | 33 +++++++++++++++++++++++++++++ workers/scaleset/scaleset_helper.go | 9 ++++++++ 5 files changed, 58 insertions(+) diff --git a/locking/interface.go b/locking/interface.go index fd547830..07380a7b 100644 --- a/locking/interface.go +++ b/locking/interface.go @@ -5,6 +5,7 @@ import "time" // TODO(gabriel-samfira): needs owner attribute. type Locker interface { TryLock(key string) bool + Lock(key string) Unlock(key string, remove bool) Delete(key string) } diff --git a/locking/local_locker.go b/locking/local_locker.go index 5298c9e7..ad41345c 100644 --- a/locking/local_locker.go +++ b/locking/local_locker.go @@ -29,6 +29,12 @@ func (k *keyMutex) TryLock(key string) bool { return keyMux.TryLock() } +func (k *keyMutex) Lock(key string) { + mux, _ := k.muxes.LoadOrStore(key, &sync.Mutex{}) + keyMux := mux.(*sync.Mutex) + keyMux.Lock() +} + func (k *keyMutex) Unlock(key string, remove bool) { mux, ok := k.muxes.Load(key) if !ok { diff --git a/locking/locking.go b/locking/locking.go index 793edb4e..6628d8b1 100644 --- a/locking/locking.go +++ b/locking/locking.go @@ -15,6 +15,15 @@ func TryLock(key string) (bool, error) { return locker.TryLock(key), nil } + +func Lock(key string) { + if locker == nil { + panic("no locker is registered") + } + + locker.Lock(key) +} + func Unlock(key string, remove bool) error { if locker == nil { return fmt.Errorf("no locker is registered") diff --git a/workers/scaleset/scaleset.go b/workers/scaleset/scaleset.go index dee1c70c..4a982ad4 100644 --- a/workers/scaleset/scaleset.go +++ b/workers/scaleset/scaleset.go @@ -136,6 +136,7 @@ func (w *Worker) Start() (err error) { slog.DebugContext(w.ctx, "starting scale set worker loops", "scale_set", w.consumerID) go w.loop() go w.keepListenerAlive() + go w.handleAutoScale() return nil } @@ -307,3 +308,35 @@ func (w *Worker) keepListenerAlive() { } } } + +func (w *Worker) handleAutoScale() { + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() + + for { + select { + case <-w.quit: + return + case <-w.ctx.Done(): + return + case <-ticker.C: + var desiredRunners uint + if w.scaleSet.DesiredRunnerCount > 0 { + desiredRunners = uint(w.scaleSet.DesiredRunnerCount) + } + targetRunners := min(w.scaleSet.MinIdleRunners+desiredRunners, w.scaleSet.MaxRunners) + + currentRunners := uint(len(w.runners)) + if currentRunners == targetRunners { + slog.DebugContext(w.ctx, "desired runner count reached", "desired_runners", targetRunners) + continue + } + + if currentRunners < targetRunners { + slog.DebugContext(w.ctx, "scaling up", "current_runners", currentRunners, "target_runners", targetRunners) + } else { + slog.DebugContext(w.ctx, "scaling down", "current_runners", currentRunners, "target_runners", targetRunners) + } + } + } +} diff --git a/workers/scaleset/scaleset_helper.go b/workers/scaleset/scaleset_helper.go index ca673c4d..e6ae9197 100644 --- a/workers/scaleset/scaleset_helper.go +++ b/workers/scaleset/scaleset_helper.go @@ -8,6 +8,7 @@ import ( runnerErrors "github.com/cloudbase/garm-provider-common/errors" commonParams "github.com/cloudbase/garm-provider-common/params" + "github.com/cloudbase/garm/locking" "github.com/cloudbase/garm/params" "github.com/cloudbase/garm/util/github/scalesets" ) @@ -45,12 +46,16 @@ func (w *Worker) HandleJobsCompleted(jobs []params.ScaleSetJobMessage) error { Status: commonParams.InstancePendingDelete, RunnerStatus: params.RunnerTerminated, } + + locking.Lock(job.RunnerName) _, err := w.store.UpdateInstance(w.ctx, job.RunnerName, runnerUpdateParams) if err != nil { if !errors.Is(err, runnerErrors.ErrNotFound) { + locking.Unlock(job.RunnerName, false) return fmt.Errorf("updating runner %s: %w", job.RunnerName, err) } } + locking.Unlock(job.RunnerName, false) } return nil } @@ -68,14 +73,18 @@ func (w *Worker) HandleJobsStarted(jobs []params.ScaleSetJobMessage) error { RunnerStatus: params.RunnerActive, } + locking.Lock(job.RunnerName) _, err := w.store.UpdateInstance(w.ctx, job.RunnerName, updateParams) if err != nil { if errors.Is(err, runnerErrors.ErrNotFound) { slog.InfoContext(w.ctx, "runner not found; handled by some other controller?", "runner_name", job.RunnerName) + locking.Unlock(job.RunnerName, true) continue } + locking.Unlock(job.RunnerName, false) return fmt.Errorf("updating runner %s: %w", job.RunnerName, err) } + locking.Unlock(job.RunnerName, false) } return nil }