Obey enabled/disabled status

Signed-off-by: Gabriel Adrian Samfira <gsamfira@cloudbasesolutions.com>
This commit is contained in:
Gabriel Adrian Samfira 2025-04-17 22:03:29 +00:00
parent d949cecbe7
commit 8c62b6de8c
2 changed files with 39 additions and 9 deletions

View file

@ -119,9 +119,13 @@ func (w *Worker) Start() (err error) {
slog.DebugContext(w.ctx, "creating scale set listener")
listener := newListener(w.ctx, w)
slog.DebugContext(w.ctx, "starting scale set listener")
if err := listener.Start(); err != nil {
return fmt.Errorf("error starting listener: %w", err)
if w.scaleSet.Enabled {
slog.DebugContext(w.ctx, "starting scale set listener")
if err := listener.Start(); err != nil {
return fmt.Errorf("error starting listener: %w", err)
}
} else {
slog.InfoContext(w.ctx, "scale set is disabled; not starting listener")
}
w.listener = listener
@ -162,10 +166,19 @@ func (w *Worker) handleScaleSetEvent(event dbCommon.ChangePayload) {
case dbCommon.UpdateOperation:
slog.DebugContext(w.ctx, "got update operation")
w.mux.Lock()
if scaleSet.MaxRunners < w.scaleSet.MaxRunners {
slog.DebugContext(w.ctx, "max runners changed; stopping listener")
if err := w.listener.Stop(); err != nil {
slog.ErrorContext(w.ctx, "error stopping listener", "error", err)
if scaleSet.MaxRunners < w.scaleSet.MaxRunners || !scaleSet.Enabled {
// we stop the listener if the scale set is disabled or if the max runners
// is decreased. In the case where max runners changes but the scale set
// is still enabled, we rely on the keepListenerAlive to restart the listener
// which will listen for new messages with the changed max runners. This way
// we don't have to potentially wait for 50 second for the max runner value
// to be updated, in which time we might get more runners spawned than the
// new max runner value.
if w.listener.IsRunning() {
if err := w.listener.Stop(); err != nil {
slog.ErrorContext(w.ctx, "error stopping listener", "error", err)
}
}
}
// TODO: should we kick off auto-scaling if desired runner count changes?
@ -239,15 +252,26 @@ func (w *Worker) sleepWithCancel(sleepTime time.Duration) (canceled bool) {
case <-ticker.C:
return false
case <-w.quit:
return true
case <-w.ctx.Done():
return true
}
return true
}
func (w *Worker) keepListenerAlive() {
var backoff time.Duration
for {
w.mux.Lock()
if !w.scaleSet.Enabled {
if canceled := w.sleepWithCancel(2 * time.Second); canceled {
slog.DebugContext(w.ctx, "worker is stopped; exiting keepListenerAlive")
w.mux.Unlock()
return
}
w.mux.Unlock()
continue
}
w.mux.Unlock()
select {
case <-w.quit:
return

View file

@ -91,6 +91,12 @@ func (l *scaleSetListener) Stop() error {
return nil
}
func (l *scaleSetListener) IsRunning() bool {
l.mux.Lock()
defer l.mux.Unlock()
return l.running
}
func (l *scaleSetListener) handleSessionMessage(msg params.RunnerScaleSetMessage) {
l.mux.Lock()
defer l.mux.Unlock()