Simplify logic

This change simplifies the scale down logic a bit. It also make sure we
don't accidentally remove runners that are in the process of being created
when we try to consolidate.

Signed-off-by: Gabriel Adrian Samfira <gsamfira@cloudbasesolutions.com>
This commit is contained in:
Gabriel Adrian Samfira 2025-09-12 14:12:20 +00:00
parent 7ae962b438
commit 616499e715

View file

@ -459,7 +459,7 @@ func (w *Worker) consolidateRunnerState(runners []params.RunnerReference) error
dbRunnersByName = w.runnerByName()
// Cross check what exists in the database with what we have in github.
for name, runner := range dbRunnersByName {
// in the case of scale sets, JIT configs re used. There is no situation
// in the case of scale sets, JIT configs are used. There is no situation
// in which we create a runner in the DB and one does not exist in github.
// We can safely assume that if the runner is not in github anymore, it can
// be removed from the provider and the DB.
@ -548,12 +548,17 @@ func (w *Worker) consolidateRunnerState(runners []params.RunnerReference) error
}
}
dbRunnersByName = w.runnerByName()
for _, runner := range dbRunnersByName {
switch runner.Status {
case commonParams.InstancePendingDelete, commonParams.InstancePendingForceDelete,
commonParams.InstanceDeleting, commonParams.InstanceDeleted:
// This instance is already being deleted.
continue
case commonParams.InstanceCreating, commonParams.InstancePendingCreate:
// Instance is still being created in the provider, or is about to be created.
// Allow it to finish.
continue
}
locked := locking.TryLock(runner.Name, w.consumerID)
@ -785,13 +790,13 @@ Loop:
}
}
func (w *Worker) handleScaleUp(target, current uint) {
func (w *Worker) handleScaleUp() {
if !w.scaleSet.Enabled {
slog.DebugContext(w.ctx, "scale set is disabled; not scaling up")
return
}
if target <= current {
if w.targetRunners() <= w.runnerCount() {
slog.DebugContext(w.ctx, "target is less than or equal to current; not scaling up")
return
}
@ -807,7 +812,7 @@ func (w *Worker) handleScaleUp(target, current uint) {
slog.ErrorContext(w.ctx, "error getting scale set client", "error", err)
return
}
for i := current; i < target; i++ {
for i := w.runnerCount(); i < w.targetRunners(); i++ {
newRunnerName := fmt.Sprintf("%s-%s", w.scaleSet.GetRunnerPrefix(), util.NewID())
jitConfig, err := scaleSetCli.GenerateJitRunnerConfig(w.ctx, newRunnerName, w.scaleSet.ScaleSetID)
if err != nil {
@ -867,118 +872,99 @@ func (w *Worker) waitForToolsOrCancel() (hasTools, stopped bool) {
}
}
func (w *Worker) handleScaleDown(target, current uint) {
delta := current - target
func (w *Worker) handleScaleDown() {
delta := w.runnerCount() - w.targetRunners()
if delta <= 0 {
return
}
removed := 0
candidates := []params.Instance{}
for _, runner := range w.runners {
locked := locking.TryLock(runner.Name, w.consumerID)
if !locked {
slog.DebugContext(w.ctx, "runner is locked; skipping", "runner_name", runner.Name)
continue
}
switch runner.Status {
case commonParams.InstanceRunning:
if runner.RunnerStatus != params.RunnerActive {
candidates = append(candidates, runner)
}
case commonParams.InstancePendingDelete, commonParams.InstancePendingForceDelete,
commonParams.InstanceDeleting, commonParams.InstanceDeleted:
removed++
locking.Unlock(runner.Name, true)
continue
default:
slog.DebugContext(w.ctx, "runner is not in a valid state; skipping", "runner_name", runner.Name, "runner_status", runner.Status)
locking.Unlock(runner.Name, false)
continue
}
locking.Unlock(runner.Name, false)
}
if removed >= int(delta) {
return
}
scaleSetCli, err := w.GetScaleSetClient()
if err != nil {
slog.ErrorContext(w.ctx, "error getting scale set client", "error", err)
return
}
for _, runner := range candidates {
if removed >= int(delta) {
removed := 0
for _, runner := range w.runners {
slog.InfoContext(w.ctx, "considering runners for removal", "delta", delta, "removed", removed)
if removed >= delta {
break
}
locked := locking.TryLock(runner.Name, w.consumerID)
if !locked {
slog.DebugContext(w.ctx, "runner is locked; skipping", "runner_name", runner.Name)
continue
}
switch runner.Status {
case commonParams.InstancePendingCreate, commonParams.InstanceRunning:
case commonParams.InstancePendingDelete, commonParams.InstancePendingForceDelete,
commonParams.InstanceDeleting, commonParams.InstanceDeleted:
removed++
locking.Unlock(runner.Name, true)
continue
default:
slog.DebugContext(w.ctx, "runner is not in a valid state; skipping", "runner_name", runner.Name, "runner_status", runner.Status)
locking.Unlock(runner.Name, false)
continue
}
switch runner.RunnerStatus {
case params.RunnerTerminated, params.RunnerActive:
slog.DebugContext(w.ctx, "runner is not in a valid state; skipping", "runner_name", runner.Name, "runner_status", runner.RunnerStatus)
locking.Unlock(runner.Name, false)
continue
}
slog.DebugContext(w.ctx, "removing runner", "runner_name", runner.Name)
if err := scaleSetCli.RemoveRunner(w.ctx, runner.AgentID); err != nil {
if !errors.Is(err, runnerErrors.ErrNotFound) {
slog.ErrorContext(w.ctx, "error removing runner", "runner_name", runner.Name, "error", err)
case commonParams.InstanceRunning:
switch runner.RunnerStatus {
case params.RunnerTerminated, params.RunnerActive:
slog.DebugContext(w.ctx, "runner is not in a valid state; skipping", "runner_name", runner.Name, "runner_status", runner.RunnerStatus)
locking.Unlock(runner.Name, false)
continue
}
}
runnerUpdateParams := params.UpdateInstanceParams{
Status: commonParams.InstancePendingDelete,
}
if _, err := w.store.UpdateInstance(w.ctx, runner.Name, runnerUpdateParams); err != nil {
if errors.Is(err, runnerErrors.ErrNotFound) {
// The error seems to be that the instance was removed from the database. We still had it in our
// state, so either the update never came from the watcher or something else happened.
// Remove it from the local cache.
delete(w.runners, runner.ID)
removed++
locking.Unlock(runner.Name, true)
locked := locking.TryLock(runner.Name, w.consumerID)
if !locked {
slog.DebugContext(w.ctx, "runner is locked; skipping", "runner_name", runner.Name)
continue
}
// nolint:golangci-lint,godox
// TODO: This should not happen, unless there is some issue with the database.
// The UpdateInstance() function should add tenacity, but even in that case, if it
// still errors out, we need to handle it somehow.
slog.ErrorContext(w.ctx, "error updating runner", "runner_name", runner.Name, "error", err)
slog.DebugContext(w.ctx, "removing runner", "runner_name", runner.Name)
if err := scaleSetCli.RemoveRunner(w.ctx, runner.AgentID); err != nil {
if !errors.Is(err, runnerErrors.ErrNotFound) {
slog.ErrorContext(w.ctx, "error removing runner", "runner_name", runner.Name, "error", err)
locking.Unlock(runner.Name, false)
continue
}
}
runnerUpdateParams := params.UpdateInstanceParams{
Status: commonParams.InstancePendingDelete,
}
updatedRunner, err := w.store.UpdateInstance(w.ctx, runner.Name, runnerUpdateParams)
if err != nil {
if errors.Is(err, runnerErrors.ErrNotFound) {
// The error seems to be that the instance was removed from the database. We still had it in our
// state, so either the update never came from the watcher or something else happened.
// Remove it from the local cache.
delete(w.runners, runner.ID)
removed++
locking.Unlock(runner.Name, true)
continue
}
// nolint:golangci-lint,godox
// TODO: This should not happen, unless there is some issue with the database.
// The UpdateInstance() function should add tenacity, but even in that case, if it
// still errors out, we need to handle it somehow.
slog.ErrorContext(w.ctx, "error updating runner", "runner_name", runner.Name, "error", err)
locking.Unlock(runner.Name, false)
continue
}
w.runners[runner.ID] = updatedRunner
locking.Unlock(runner.Name, false)
removed++
case commonParams.InstancePendingDelete, commonParams.InstancePendingForceDelete,
commonParams.InstanceDeleting, commonParams.InstanceDeleted:
removed++
continue
default:
slog.WarnContext(w.ctx, "runner is not in a valid state; skipping", "runner_name", runner.Name, "runner_status", runner.Status)
continue
}
removed++
locking.Unlock(runner.Name, false)
}
}
func (w *Worker) targetRunners() int {
var desiredRunners uint
if w.scaleSet.DesiredRunnerCount > 0 {
desiredRunners = uint(w.scaleSet.DesiredRunnerCount)
}
targetRunners := min(w.scaleSet.MinIdleRunners+desiredRunners, w.scaleSet.MaxRunners)
return int(targetRunners)
}
func (w *Worker) runnerCount() int {
return len(w.runners)
}
func (w *Worker) handleAutoScale() {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
lastMsg := ""
lastMsgDebugLog := func(msg string, targetRunners, currentRunners uint) {
lastMsgDebugLog := func(msg string, targetRunners, currentRunners int) {
if lastMsg != msg {
slog.DebugContext(w.ctx, msg, "current_runners", currentRunners, "target_runners", targetRunners)
lastMsg = msg
@ -1009,25 +995,19 @@ func (w *Worker) handleAutoScale() {
slog.ErrorContext(w.ctx, "error cleaning up instance", "instance_id", instance.ID, "error", err)
}
}
var desiredRunners uint
if w.scaleSet.DesiredRunnerCount > 0 {
desiredRunners = uint(w.scaleSet.DesiredRunnerCount)
}
targetRunners := min(w.scaleSet.MinIdleRunners+desiredRunners, w.scaleSet.MaxRunners)
currentRunners := uint(len(w.runners))
if currentRunners == targetRunners {
lastMsgDebugLog("desired runner count reached", targetRunners, currentRunners)
if w.runnerCount() == w.targetRunners() {
lastMsgDebugLog("desired runner count reached", w.targetRunners(), w.runnerCount())
w.mux.Unlock()
continue
}
if currentRunners < targetRunners {
lastMsgDebugLog("scaling up", targetRunners, currentRunners)
w.handleScaleUp(targetRunners, currentRunners)
if w.runnerCount() < w.targetRunners() {
lastMsgDebugLog("scaling up", w.targetRunners(), w.runnerCount())
w.handleScaleUp()
} else {
lastMsgDebugLog("attempting to scale down", targetRunners, currentRunners)
w.handleScaleDown(targetRunners, currentRunners)
lastMsgDebugLog("attempting to scale down", w.targetRunners(), w.runnerCount())
w.handleScaleDown()
}
w.mux.Unlock()
}