Fix scale set restart logic
Signed-off-by: Gabriel Adrian Samfira <gsamfira@cloudbasesolutions.com>
This commit is contained in:
parent
bc470c5f78
commit
7376a5fe74
1 changed files with 24 additions and 7 deletions
|
|
@ -176,10 +176,8 @@ func (w *Worker) handleScaleSetEvent(event dbCommon.ChangePayload) {
|
|||
// we don't have to potentially wait for 50 second for the max runner value
|
||||
// to be updated, in which time we might get more runners spawned than the
|
||||
// new max runner value.
|
||||
if w.listener.IsRunning() {
|
||||
if err := w.listener.Stop(); err != nil {
|
||||
slog.ErrorContext(w.ctx, "error stopping listener", "error", err)
|
||||
}
|
||||
if err := w.listener.Stop(); err != nil {
|
||||
slog.ErrorContext(w.ctx, "error stopping listener", "error", err)
|
||||
}
|
||||
}
|
||||
// TODO: should we kick off auto-scaling if desired runner count changes?
|
||||
|
|
@ -271,6 +269,8 @@ func (w *Worker) keepListenerAlive() {
|
|||
w.mux.Unlock()
|
||||
continue
|
||||
}
|
||||
// noop if already started
|
||||
w.listener.Start()
|
||||
w.mux.Unlock()
|
||||
|
||||
select {
|
||||
|
|
@ -280,9 +280,19 @@ func (w *Worker) keepListenerAlive() {
|
|||
return
|
||||
case <-w.listener.Wait():
|
||||
slog.DebugContext(w.ctx, "listener is stopped; attempting to restart")
|
||||
w.mux.Lock()
|
||||
if !w.scaleSet.Enabled {
|
||||
w.mux.Unlock()
|
||||
continue
|
||||
}
|
||||
w.mux.Unlock()
|
||||
for {
|
||||
w.mux.Lock()
|
||||
w.listener.Stop() //cleanup
|
||||
if !w.scaleSet.Enabled {
|
||||
w.mux.Unlock()
|
||||
break
|
||||
}
|
||||
slog.DebugContext(w.ctx, "attempting to restart")
|
||||
if err := w.listener.Start(); err != nil {
|
||||
w.mux.Unlock()
|
||||
|
|
@ -313,6 +323,13 @@ func (w *Worker) handleAutoScale() {
|
|||
ticker := time.NewTicker(5 * time.Second)
|
||||
defer ticker.Stop()
|
||||
|
||||
lastMsg := ""
|
||||
lastMsgDebugLog := func(msg string, targetRunners, currentRunners uint) {
|
||||
if lastMsg != msg {
|
||||
slog.DebugContext(w.ctx, msg, "current_runners", currentRunners, "target_runners", targetRunners)
|
||||
lastMsg = msg
|
||||
}
|
||||
}
|
||||
for {
|
||||
select {
|
||||
case <-w.quit:
|
||||
|
|
@ -328,14 +345,14 @@ func (w *Worker) handleAutoScale() {
|
|||
|
||||
currentRunners := uint(len(w.runners))
|
||||
if currentRunners == targetRunners {
|
||||
slog.DebugContext(w.ctx, "desired runner count reached", "desired_runners", targetRunners)
|
||||
lastMsgDebugLog("desired runner count reached", targetRunners, currentRunners)
|
||||
continue
|
||||
}
|
||||
|
||||
if currentRunners < targetRunners {
|
||||
slog.DebugContext(w.ctx, "scaling up", "current_runners", currentRunners, "target_runners", targetRunners)
|
||||
lastMsgDebugLog("scaling up", targetRunners, currentRunners)
|
||||
} else {
|
||||
slog.DebugContext(w.ctx, "scaling down", "current_runners", currentRunners, "target_runners", targetRunners)
|
||||
lastMsgDebugLog("attempting to scale down", targetRunners, currentRunners)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue