From 616499e715124fde8a44c43c0a50b8633562e048 Mon Sep 17 00:00:00 2001 From: Gabriel Adrian Samfira Date: Fri, 12 Sep 2025 14:12:20 +0000 Subject: [PATCH] Simplify logic This change simplifies the scale down logic a bit. It also make sure we don't accidentally remove runners that are in the process of being created when we try to consolidate. Signed-off-by: Gabriel Adrian Samfira --- workers/scaleset/scaleset.go | 184 ++++++++++++++++------------------- 1 file changed, 82 insertions(+), 102 deletions(-) diff --git a/workers/scaleset/scaleset.go b/workers/scaleset/scaleset.go index 0e49ea3d..526aee0e 100644 --- a/workers/scaleset/scaleset.go +++ b/workers/scaleset/scaleset.go @@ -459,7 +459,7 @@ func (w *Worker) consolidateRunnerState(runners []params.RunnerReference) error dbRunnersByName = w.runnerByName() // Cross check what exists in the database with what we have in github. for name, runner := range dbRunnersByName { - // in the case of scale sets, JIT configs re used. There is no situation + // in the case of scale sets, JIT configs are used. There is no situation // in which we create a runner in the DB and one does not exist in github. // We can safely assume that if the runner is not in github anymore, it can // be removed from the provider and the DB. @@ -548,12 +548,17 @@ func (w *Worker) consolidateRunnerState(runners []params.RunnerReference) error } } + dbRunnersByName = w.runnerByName() for _, runner := range dbRunnersByName { switch runner.Status { case commonParams.InstancePendingDelete, commonParams.InstancePendingForceDelete, commonParams.InstanceDeleting, commonParams.InstanceDeleted: // This instance is already being deleted. continue + case commonParams.InstanceCreating, commonParams.InstancePendingCreate: + // Instance is still being created in the provider, or is about to be created. + // Allow it to finish. + continue } locked := locking.TryLock(runner.Name, w.consumerID) @@ -785,13 +790,13 @@ Loop: } } -func (w *Worker) handleScaleUp(target, current uint) { +func (w *Worker) handleScaleUp() { if !w.scaleSet.Enabled { slog.DebugContext(w.ctx, "scale set is disabled; not scaling up") return } - if target <= current { + if w.targetRunners() <= w.runnerCount() { slog.DebugContext(w.ctx, "target is less than or equal to current; not scaling up") return } @@ -807,7 +812,7 @@ func (w *Worker) handleScaleUp(target, current uint) { slog.ErrorContext(w.ctx, "error getting scale set client", "error", err) return } - for i := current; i < target; i++ { + for i := w.runnerCount(); i < w.targetRunners(); i++ { newRunnerName := fmt.Sprintf("%s-%s", w.scaleSet.GetRunnerPrefix(), util.NewID()) jitConfig, err := scaleSetCli.GenerateJitRunnerConfig(w.ctx, newRunnerName, w.scaleSet.ScaleSetID) if err != nil { @@ -867,118 +872,99 @@ func (w *Worker) waitForToolsOrCancel() (hasTools, stopped bool) { } } -func (w *Worker) handleScaleDown(target, current uint) { - delta := current - target +func (w *Worker) handleScaleDown() { + delta := w.runnerCount() - w.targetRunners() if delta <= 0 { return } - removed := 0 - candidates := []params.Instance{} - for _, runner := range w.runners { - locked := locking.TryLock(runner.Name, w.consumerID) - if !locked { - slog.DebugContext(w.ctx, "runner is locked; skipping", "runner_name", runner.Name) - continue - } - switch runner.Status { - case commonParams.InstanceRunning: - if runner.RunnerStatus != params.RunnerActive { - candidates = append(candidates, runner) - } - case commonParams.InstancePendingDelete, commonParams.InstancePendingForceDelete, - commonParams.InstanceDeleting, commonParams.InstanceDeleted: - removed++ - locking.Unlock(runner.Name, true) - continue - default: - slog.DebugContext(w.ctx, "runner is not in a valid state; skipping", "runner_name", runner.Name, "runner_status", runner.Status) - locking.Unlock(runner.Name, false) - continue - } - locking.Unlock(runner.Name, false) - } - - if removed >= int(delta) { - return - } - scaleSetCli, err := w.GetScaleSetClient() if err != nil { slog.ErrorContext(w.ctx, "error getting scale set client", "error", err) return } - - for _, runner := range candidates { - if removed >= int(delta) { + removed := 0 + for _, runner := range w.runners { + slog.InfoContext(w.ctx, "considering runners for removal", "delta", delta, "removed", removed) + if removed >= delta { break } - - locked := locking.TryLock(runner.Name, w.consumerID) - if !locked { - slog.DebugContext(w.ctx, "runner is locked; skipping", "runner_name", runner.Name) - continue - } - switch runner.Status { - case commonParams.InstancePendingCreate, commonParams.InstanceRunning: - case commonParams.InstancePendingDelete, commonParams.InstancePendingForceDelete, - commonParams.InstanceDeleting, commonParams.InstanceDeleted: - removed++ - locking.Unlock(runner.Name, true) - continue - default: - slog.DebugContext(w.ctx, "runner is not in a valid state; skipping", "runner_name", runner.Name, "runner_status", runner.Status) - locking.Unlock(runner.Name, false) - continue - } - - switch runner.RunnerStatus { - case params.RunnerTerminated, params.RunnerActive: - slog.DebugContext(w.ctx, "runner is not in a valid state; skipping", "runner_name", runner.Name, "runner_status", runner.RunnerStatus) - locking.Unlock(runner.Name, false) - continue - } - - slog.DebugContext(w.ctx, "removing runner", "runner_name", runner.Name) - if err := scaleSetCli.RemoveRunner(w.ctx, runner.AgentID); err != nil { - if !errors.Is(err, runnerErrors.ErrNotFound) { - slog.ErrorContext(w.ctx, "error removing runner", "runner_name", runner.Name, "error", err) + case commonParams.InstanceRunning: + switch runner.RunnerStatus { + case params.RunnerTerminated, params.RunnerActive: + slog.DebugContext(w.ctx, "runner is not in a valid state; skipping", "runner_name", runner.Name, "runner_status", runner.RunnerStatus) locking.Unlock(runner.Name, false) continue } - } - runnerUpdateParams := params.UpdateInstanceParams{ - Status: commonParams.InstancePendingDelete, - } - if _, err := w.store.UpdateInstance(w.ctx, runner.Name, runnerUpdateParams); err != nil { - if errors.Is(err, runnerErrors.ErrNotFound) { - // The error seems to be that the instance was removed from the database. We still had it in our - // state, so either the update never came from the watcher or something else happened. - // Remove it from the local cache. - delete(w.runners, runner.ID) - removed++ - locking.Unlock(runner.Name, true) + locked := locking.TryLock(runner.Name, w.consumerID) + if !locked { + slog.DebugContext(w.ctx, "runner is locked; skipping", "runner_name", runner.Name) continue } - // nolint:golangci-lint,godox - // TODO: This should not happen, unless there is some issue with the database. - // The UpdateInstance() function should add tenacity, but even in that case, if it - // still errors out, we need to handle it somehow. - slog.ErrorContext(w.ctx, "error updating runner", "runner_name", runner.Name, "error", err) + slog.DebugContext(w.ctx, "removing runner", "runner_name", runner.Name) + if err := scaleSetCli.RemoveRunner(w.ctx, runner.AgentID); err != nil { + if !errors.Is(err, runnerErrors.ErrNotFound) { + slog.ErrorContext(w.ctx, "error removing runner", "runner_name", runner.Name, "error", err) + locking.Unlock(runner.Name, false) + continue + } + } + runnerUpdateParams := params.UpdateInstanceParams{ + Status: commonParams.InstancePendingDelete, + } + updatedRunner, err := w.store.UpdateInstance(w.ctx, runner.Name, runnerUpdateParams) + if err != nil { + if errors.Is(err, runnerErrors.ErrNotFound) { + // The error seems to be that the instance was removed from the database. We still had it in our + // state, so either the update never came from the watcher or something else happened. + // Remove it from the local cache. + delete(w.runners, runner.ID) + removed++ + locking.Unlock(runner.Name, true) + continue + } + // nolint:golangci-lint,godox + // TODO: This should not happen, unless there is some issue with the database. + // The UpdateInstance() function should add tenacity, but even in that case, if it + // still errors out, we need to handle it somehow. + slog.ErrorContext(w.ctx, "error updating runner", "runner_name", runner.Name, "error", err) + locking.Unlock(runner.Name, false) + continue + } + w.runners[runner.ID] = updatedRunner locking.Unlock(runner.Name, false) + removed++ + case commonParams.InstancePendingDelete, commonParams.InstancePendingForceDelete, + commonParams.InstanceDeleting, commonParams.InstanceDeleted: + removed++ + continue + default: + slog.WarnContext(w.ctx, "runner is not in a valid state; skipping", "runner_name", runner.Name, "runner_status", runner.Status) continue } - removed++ - locking.Unlock(runner.Name, false) } } +func (w *Worker) targetRunners() int { + var desiredRunners uint + if w.scaleSet.DesiredRunnerCount > 0 { + desiredRunners = uint(w.scaleSet.DesiredRunnerCount) + } + targetRunners := min(w.scaleSet.MinIdleRunners+desiredRunners, w.scaleSet.MaxRunners) + + return int(targetRunners) +} + +func (w *Worker) runnerCount() int { + return len(w.runners) +} + func (w *Worker) handleAutoScale() { ticker := time.NewTicker(5 * time.Second) defer ticker.Stop() lastMsg := "" - lastMsgDebugLog := func(msg string, targetRunners, currentRunners uint) { + lastMsgDebugLog := func(msg string, targetRunners, currentRunners int) { if lastMsg != msg { slog.DebugContext(w.ctx, msg, "current_runners", currentRunners, "target_runners", targetRunners) lastMsg = msg @@ -1009,25 +995,19 @@ func (w *Worker) handleAutoScale() { slog.ErrorContext(w.ctx, "error cleaning up instance", "instance_id", instance.ID, "error", err) } } - var desiredRunners uint - if w.scaleSet.DesiredRunnerCount > 0 { - desiredRunners = uint(w.scaleSet.DesiredRunnerCount) - } - targetRunners := min(w.scaleSet.MinIdleRunners+desiredRunners, w.scaleSet.MaxRunners) - currentRunners := uint(len(w.runners)) - if currentRunners == targetRunners { - lastMsgDebugLog("desired runner count reached", targetRunners, currentRunners) + if w.runnerCount() == w.targetRunners() { + lastMsgDebugLog("desired runner count reached", w.targetRunners(), w.runnerCount()) w.mux.Unlock() continue } - if currentRunners < targetRunners { - lastMsgDebugLog("scaling up", targetRunners, currentRunners) - w.handleScaleUp(targetRunners, currentRunners) + if w.runnerCount() < w.targetRunners() { + lastMsgDebugLog("scaling up", w.targetRunners(), w.runnerCount()) + w.handleScaleUp() } else { - lastMsgDebugLog("attempting to scale down", targetRunners, currentRunners) - w.handleScaleDown(targetRunners, currentRunners) + lastMsgDebugLog("attempting to scale down", w.targetRunners(), w.runnerCount()) + w.handleScaleDown() } w.mux.Unlock() }