From 7376a5fe741d0aa7dcf7adea230342af3974d84a Mon Sep 17 00:00:00 2001 From: Gabriel Adrian Samfira Date: Sat, 19 Apr 2025 14:56:55 +0000 Subject: [PATCH] Fix scale set restart logic Signed-off-by: Gabriel Adrian Samfira --- workers/scaleset/scaleset.go | 31 ++++++++++++++++++++++++------- 1 file changed, 24 insertions(+), 7 deletions(-) diff --git a/workers/scaleset/scaleset.go b/workers/scaleset/scaleset.go index 4a982ad4..7e134adb 100644 --- a/workers/scaleset/scaleset.go +++ b/workers/scaleset/scaleset.go @@ -176,10 +176,8 @@ func (w *Worker) handleScaleSetEvent(event dbCommon.ChangePayload) { // we don't have to potentially wait for 50 second for the max runner value // to be updated, in which time we might get more runners spawned than the // new max runner value. - if w.listener.IsRunning() { - if err := w.listener.Stop(); err != nil { - slog.ErrorContext(w.ctx, "error stopping listener", "error", err) - } + if err := w.listener.Stop(); err != nil { + slog.ErrorContext(w.ctx, "error stopping listener", "error", err) } } // TODO: should we kick off auto-scaling if desired runner count changes? @@ -271,6 +269,8 @@ func (w *Worker) keepListenerAlive() { w.mux.Unlock() continue } + // noop if already started + w.listener.Start() w.mux.Unlock() select { @@ -280,9 +280,19 @@ func (w *Worker) keepListenerAlive() { return case <-w.listener.Wait(): slog.DebugContext(w.ctx, "listener is stopped; attempting to restart") + w.mux.Lock() + if !w.scaleSet.Enabled { + w.mux.Unlock() + continue + } + w.mux.Unlock() for { w.mux.Lock() w.listener.Stop() //cleanup + if !w.scaleSet.Enabled { + w.mux.Unlock() + break + } slog.DebugContext(w.ctx, "attempting to restart") if err := w.listener.Start(); err != nil { w.mux.Unlock() @@ -313,6 +323,13 @@ func (w *Worker) handleAutoScale() { ticker := time.NewTicker(5 * time.Second) defer ticker.Stop() + lastMsg := "" + lastMsgDebugLog := func(msg string, targetRunners, currentRunners uint) { + if lastMsg != msg { + slog.DebugContext(w.ctx, msg, "current_runners", currentRunners, "target_runners", targetRunners) + lastMsg = msg + } + } for { select { case <-w.quit: @@ -328,14 +345,14 @@ func (w *Worker) handleAutoScale() { currentRunners := uint(len(w.runners)) if currentRunners == targetRunners { - slog.DebugContext(w.ctx, "desired runner count reached", "desired_runners", targetRunners) + lastMsgDebugLog("desired runner count reached", targetRunners, currentRunners) continue } if currentRunners < targetRunners { - slog.DebugContext(w.ctx, "scaling up", "current_runners", currentRunners, "target_runners", targetRunners) + lastMsgDebugLog("scaling up", targetRunners, currentRunners) } else { - slog.DebugContext(w.ctx, "scaling down", "current_runners", currentRunners, "target_runners", targetRunners) + lastMsgDebugLog("attempting to scale down", targetRunners, currentRunners) } } }