garm/workers/scaleset/scaleset.go
Gabriel Adrian Samfira 5a6ac12118 Fix for gitea tools and scale set cleanup
Filter out gitea tools to only consider archived downloads. This
should help in situations where bandwidth is more important than
CPU time used to unarchive the tools.

Also a drive by fix for scale sets cleanup.

Signed-off-by: Gabriel Adrian Samfira <gsamfira@cloudbasesolutions.com>
2025-08-12 12:48:48 +00:00

986 lines
32 KiB
Go

// Copyright 2025 Cloudbase Solutions SRL
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License. You may obtain
// a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations
// under the License.
package scaleset
import (
"context"
"errors"
"fmt"
"log/slog"
"sync"
"time"
runnerErrors "github.com/cloudbase/garm-provider-common/errors"
commonParams "github.com/cloudbase/garm-provider-common/params"
"github.com/cloudbase/garm-provider-common/util"
"github.com/cloudbase/garm/cache"
dbCommon "github.com/cloudbase/garm/database/common"
"github.com/cloudbase/garm/database/watcher"
"github.com/cloudbase/garm/locking"
"github.com/cloudbase/garm/params"
"github.com/cloudbase/garm/runner/common"
)
func NewWorker(ctx context.Context, store dbCommon.Store, scaleSet params.ScaleSet, provider common.Provider) (*Worker, error) {
consumerID := fmt.Sprintf("scaleset-worker-%s-%d", scaleSet.Name, scaleSet.ID)
controllerInfo, err := store.ControllerInfo()
if err != nil {
return nil, fmt.Errorf("getting controller info: %w", err)
}
return &Worker{
ctx: ctx,
controllerInfo: controllerInfo,
consumerID: consumerID,
store: store,
provider: provider,
scaleSet: scaleSet,
runners: make(map[string]params.Instance),
}, nil
}
type Worker struct {
ctx context.Context
consumerID string
controllerInfo params.ControllerInfo
provider common.Provider
store dbCommon.Store
scaleSet params.ScaleSet
runners map[string]params.Instance
consumer dbCommon.Consumer
listener *scaleSetListener
mux sync.Mutex
running bool
quit chan struct{}
}
func (w *Worker) Stop() error {
slog.DebugContext(w.ctx, "stopping scale set worker", "scale_set", w.consumerID)
w.mux.Lock()
defer w.mux.Unlock()
if !w.running {
return nil
}
w.consumer.Close()
w.running = false
if w.quit != nil {
close(w.quit)
}
w.listener.Stop()
return nil
}
func (w *Worker) Start() (err error) {
slog.DebugContext(w.ctx, "starting scale set worker", "scale_set", w.consumerID)
w.mux.Lock()
defer w.mux.Unlock()
if w.running {
return nil
}
instances, err := w.store.ListScaleSetInstances(w.ctx, w.scaleSet.ID)
if err != nil {
return fmt.Errorf("listing scale set instances: %w", err)
}
for _, instance := range instances {
if instance.Status == commonParams.InstanceCreating {
// We're just starting up. We found an instance stuck in creating.
// When a provider creates an instance, it sets the db instance to
// creating and then issues an API call to the IaaS to create the
// instance using some userdata it needs to come up. But the instance
// will still need to call back home to fetch aditional metadata and
// complete its setup. We should remove the instance as it is not
// possible to reliably determine the state of the instance (if it's in
// mid boot before it reached the phase where it runs the metadtata, or
// if it already failed).
instanceState := commonParams.InstancePendingDelete
locking.Lock(instance.Name, w.consumerID)
if instance.AgentID != 0 {
scaleSetCli, err := w.GetScaleSetClient()
if err != nil {
slog.ErrorContext(w.ctx, "error getting scale set client", "error", err)
return fmt.Errorf("getting scale set client: %w", err)
}
if err := scaleSetCli.RemoveRunner(w.ctx, instance.AgentID); err != nil {
// scale sets use JIT runners. This means that we create the runner in github
// before we create the actual instance that will use the credentials. We need
// to remove the runner from github if it exists.
if !errors.Is(err, runnerErrors.ErrNotFound) {
if errors.Is(err, runnerErrors.ErrUnauthorized) {
// we don't have access to remove the runner. This implies that our
// credentials may have expired or ar incorrect.
//
// nolint:golangci-lint,godox
// TODO(gabriel-samfira): we need to set the scale set as inactive and stop the listener (if any).
slog.ErrorContext(w.ctx, "error removing runner", "runner_name", instance.Name, "error", err)
w.runners[instance.ID] = instance
locking.Unlock(instance.Name, false)
continue
}
// The runner may have come up, registered and is currently running a
// job, in which case, github will not allow us to remove it.
runnerInstance, err := scaleSetCli.GetRunner(w.ctx, instance.AgentID)
if err != nil {
if !errors.Is(err, runnerErrors.ErrNotFound) {
// We could not get info about the runner and it wasn't not found
slog.ErrorContext(w.ctx, "error getting runner details", "error", err)
w.runners[instance.ID] = instance
locking.Unlock(instance.Name, false)
continue
}
}
if runnerInstance.Status == string(params.RunnerIdle) ||
runnerInstance.Status == string(params.RunnerActive) {
// This is a highly unlikely scenario, but let's account for it anyway.
//
// The runner is running a job or is idle. Mark it as running, as
// it appears that it finished booting and is now running.
//
// NOTE: if the instance was in creating and it managed to boot, there
// is a high chance that the we do not have a provider ID for the runner
// inside our database. When removing the runner, the provider will attempt
// to use the instance name instead of the provider ID, the same as when
// creation of the instance fails and we try to clean up any lingering resources
// in the provider.
slog.DebugContext(w.ctx, "runner is running a job or is idle; not removing", "runner_name", instance.Name)
instanceState = commonParams.InstanceRunning
}
}
}
}
runnerUpdateParams := params.UpdateInstanceParams{
Status: instanceState,
}
instance, err = w.store.UpdateInstance(w.ctx, instance.Name, runnerUpdateParams)
if err != nil {
if !errors.Is(err, runnerErrors.ErrNotFound) {
locking.Unlock(instance.Name, false)
return fmt.Errorf("updating runner %s: %w", instance.Name, err)
}
}
} else if instance.Status == commonParams.InstanceDeleting {
// Set the instance in deleting. It is assumed that the runner was already
// removed from github either by github or by garm. Deleting status indicates
// that it was already being handled by the provider. There should be no entry on
// github for the runner if that was the case.
// Setting it in pending_delete will cause the provider to try again, an operation
// which is idempotent (if it's already deleted, the provider reports success).
runnerUpdateParams := params.UpdateInstanceParams{
Status: commonParams.InstancePendingDelete,
}
instance, err = w.store.UpdateInstance(w.ctx, instance.Name, runnerUpdateParams)
if err != nil {
if !errors.Is(err, runnerErrors.ErrNotFound) {
locking.Unlock(instance.Name, false)
return fmt.Errorf("updating runner %s: %w", instance.Name, err)
}
}
} else if instance.Status == commonParams.InstanceDeleted {
if err := w.handleInstanceCleanup(instance); err != nil {
locking.Unlock(instance.Name, false)
return fmt.Errorf("failed to remove database entry for %s: %w", instance.Name, err)
}
continue
}
w.runners[instance.ID] = instance
locking.Unlock(instance.Name, false)
}
consumer, err := watcher.RegisterConsumer(
w.ctx, w.consumerID,
watcher.WithAny(
watcher.WithAll(
watcher.WithScaleSetFilter(w.scaleSet),
watcher.WithOperationTypeFilter(dbCommon.UpdateOperation),
),
watcher.WithScaleSetInstanceFilter(w.scaleSet),
),
)
if err != nil {
return fmt.Errorf("error registering consumer: %w", err)
}
defer func() {
if err != nil {
consumer.Close()
}
}()
slog.DebugContext(w.ctx, "creating scale set listener")
listener := newListener(w.ctx, w)
if w.scaleSet.Enabled {
slog.DebugContext(w.ctx, "starting scale set listener")
if err := listener.Start(); err != nil {
return fmt.Errorf("error starting listener: %w", err)
}
} else {
slog.InfoContext(w.ctx, "scale set is disabled; not starting listener")
}
w.listener = listener
w.consumer = consumer
w.running = true
w.quit = make(chan struct{})
slog.DebugContext(w.ctx, "starting scale set worker loops", "scale_set", w.consumerID)
go w.loop()
go w.keepListenerAlive()
go w.handleAutoScale()
return nil
}
func (w *Worker) runnerByName() map[string]params.Instance {
runners := make(map[string]params.Instance)
for _, runner := range w.runners {
runners[runner.Name] = runner
}
return runners
}
func (w *Worker) setRunnerDBStatus(runner string, status commonParams.InstanceStatus) (params.Instance, error) {
updateParams := params.UpdateInstanceParams{
Status: status,
}
newDbInstance, err := w.store.UpdateInstance(w.ctx, runner, updateParams)
if err != nil {
if !errors.Is(err, runnerErrors.ErrNotFound) {
return params.Instance{}, fmt.Errorf("updating runner %s: %w", runner, err)
}
}
return newDbInstance, nil
}
func (w *Worker) removeRunnerFromGithubAndSetPendingDelete(runnerName string, agentID int64) error {
scaleSetCli, err := w.GetScaleSetClient()
if err != nil {
return fmt.Errorf("getting scale set client: %w", err)
}
if err := scaleSetCli.RemoveRunner(w.ctx, agentID); err != nil {
if !errors.Is(err, runnerErrors.ErrNotFound) {
return fmt.Errorf("removing runner %s: %w", runnerName, err)
}
}
instance, err := w.setRunnerDBStatus(runnerName, commonParams.InstancePendingDelete)
if err != nil {
return fmt.Errorf("updating runner %s: %w", instance.Name, err)
}
w.runners[instance.ID] = instance
return nil
}
func (w *Worker) reapTimedOutRunners(runners map[string]params.RunnerReference) (func(), error) {
lockNames := []string{}
unlockFn := func() {
for _, name := range lockNames {
slog.DebugContext(w.ctx, "unlockFn unlocking runner", "runner_name", name)
locking.Unlock(name, false)
}
}
for _, runner := range w.runners {
if time.Since(runner.UpdatedAt).Minutes() < float64(w.scaleSet.RunnerBootstrapTimeout) {
continue
}
switch runner.Status {
case commonParams.InstancePendingDelete, commonParams.InstancePendingForceDelete,
commonParams.InstanceDeleting, commonParams.InstanceDeleted:
continue
}
if runner.RunnerStatus != params.RunnerPending && runner.RunnerStatus != params.RunnerInstalling {
slog.DebugContext(w.ctx, "runner is not pending or installing; skipping", "runner_name", runner.Name)
continue
}
if ghRunner, ok := runners[runner.Name]; !ok || ghRunner.GetStatus() == params.RunnerOffline {
if ok := locking.TryLock(runner.Name, w.consumerID); !ok {
slog.DebugContext(w.ctx, "runner is locked; skipping", "runner_name", runner.Name)
continue
}
lockNames = append(lockNames, runner.Name)
slog.InfoContext(
w.ctx, "reaping timed-out/failed runner",
"runner_name", runner.Name)
if err := w.removeRunnerFromGithubAndSetPendingDelete(runner.Name, runner.AgentID); err != nil {
slog.ErrorContext(w.ctx, "error removing runner", "runner_name", runner.Name, "error", err)
unlockFn()
return nil, fmt.Errorf("removing runner %s: %w", runner.Name, err)
}
}
}
return unlockFn, nil
}
func (w *Worker) consolidateRunnerState(runners []params.RunnerReference) error {
w.mux.Lock()
defer w.mux.Unlock()
ghRunnersByName := make(map[string]params.RunnerReference)
for _, runner := range runners {
ghRunnersByName[runner.Name] = runner
}
scaleSetCli, err := w.GetScaleSetClient()
if err != nil {
return fmt.Errorf("getting scale set client: %w", err)
}
dbRunnersByName := w.runnerByName()
// Cross check what exists in github with what we have in the database.
for name, runner := range ghRunnersByName {
status := runner.GetStatus()
if _, ok := dbRunnersByName[name]; !ok {
// runner appears to be active. Is it not managed by GARM?
if status != params.RunnerIdle && status != params.RunnerActive {
slog.InfoContext(w.ctx, "runner does not exist in GARM; removing from github", "runner_name", name)
if err := scaleSetCli.RemoveRunner(w.ctx, runner.ID); err != nil {
if errors.Is(err, runnerErrors.ErrNotFound) {
continue
}
slog.ErrorContext(w.ctx, "error removing runner", "runner_name", runner.Name, "error", err)
}
}
continue
}
}
unlockFn, err := w.reapTimedOutRunners(ghRunnersByName)
if err != nil {
return fmt.Errorf("reaping timed out runners: %w", err)
}
defer unlockFn()
// refresh the map. It may have been mutated above.
dbRunnersByName = w.runnerByName()
// Cross check what exists in the database with what we have in github.
for name, runner := range dbRunnersByName {
// in the case of scale sets, JIT configs re used. There is no situation
// in which we create a runner in the DB and one does not exist in github.
// We can safely assume that if the runner is not in github anymore, it can
// be removed from the provider and the DB.
switch runner.Status {
case commonParams.InstancePendingDelete, commonParams.InstancePendingForceDelete,
commonParams.InstanceDeleting, commonParams.InstanceDeleted:
continue
}
if _, ok := ghRunnersByName[name]; !ok {
if ok := locking.TryLock(name, w.consumerID); !ok {
slog.DebugContext(w.ctx, "runner is locked; skipping", "runner_name", name)
continue
}
// unlock the runner only after this function returns. This function also cross
// checks between the provider and the database, and removes left over runners.
// If we unlock early, the provider worker will attempt to remove runners that
// we set in pending_delete. This function holds the mutex, so we won't see those
// changes until we return. So we hold the instance lock here until we are done.
// That way, even if the provider sees the pending_delete status, it won't act on
// it until it manages to lock the instance.
defer locking.Unlock(name, false)
slog.InfoContext(w.ctx, "runner does not exist in github; removing from provider", "runner_name", name)
instance, err := w.setRunnerDBStatus(runner.Name, commonParams.InstancePendingDelete)
if err != nil {
if !errors.Is(err, runnerErrors.ErrNotFound) {
return fmt.Errorf("updating runner %s: %w", instance.Name, err)
}
}
// We will get an update event anyway from the watcher, but updating the runner
// here, will prevent race conditions if some other event is already in the queue
// which involves this runner. For the duration of the lifetime of this function, we
// hold the lock, so no race condition can occur.
w.runners[runner.ID] = instance
}
}
// Cross check what exists in the provider with the DB.
pseudoPoolID, err := w.pseudoPoolID()
if err != nil {
return fmt.Errorf("getting pseudo pool ID: %w", err)
}
listParams := common.ListInstancesParams{
ListInstancesV011: common.ListInstancesV011Params{
ProviderBaseParams: common.ProviderBaseParams{
ControllerInfo: w.controllerInfo,
},
},
}
providerRunners, err := w.provider.ListInstances(w.ctx, pseudoPoolID, listParams)
if err != nil {
return fmt.Errorf("listing instances: %w", err)
}
providerRunnersByName := make(map[string]commonParams.ProviderInstance)
for _, runner := range providerRunners {
providerRunnersByName[runner.Name] = runner
}
deleteInstanceParams := common.DeleteInstanceParams{
DeleteInstanceV011: common.DeleteInstanceV011Params{
ProviderBaseParams: common.ProviderBaseParams{
ControllerInfo: w.controllerInfo,
},
},
}
// refresh the map. It may have been mutated above.
dbRunnersByName = w.runnerByName()
for _, runner := range providerRunners {
if _, ok := dbRunnersByName[runner.Name]; !ok {
slog.InfoContext(w.ctx, "runner does not exist in database; removing from provider", "runner_name", runner.Name)
// There is no situation in which the runner will disappear from the provider
// after it was removed from the database. The provider worker will remove the
// instance from the provider nd mark the instance as deleted in the database.
// It is the responsibility of the scaleset worker to then clean up the runners
// in the deleted state.
// That means that if we have a runner in the provider but not the DB, it is most
// likely an inconsistency.
if err := w.provider.DeleteInstance(w.ctx, runner.Name, deleteInstanceParams); err != nil {
slog.ErrorContext(w.ctx, "error removing instance", "instance_name", runner.Name, "error", err)
}
continue
}
}
for _, runner := range dbRunnersByName {
switch runner.Status {
case commonParams.InstancePendingDelete, commonParams.InstancePendingForceDelete,
commonParams.InstanceDeleting, commonParams.InstanceDeleted:
// This instance is already being deleted.
continue
}
locked := locking.TryLock(runner.Name, w.consumerID)
if !locked {
slog.DebugContext(w.ctx, "runner is locked; skipping", "runner_name", runner.Name)
continue
}
defer locking.Unlock(runner.Name, false)
if _, ok := providerRunnersByName[runner.Name]; !ok {
// The runner is not in the provider anymore. Remove it from the DB.
slog.InfoContext(w.ctx, "runner does not exist in provider; removing from database", "runner_name", runner.Name)
if err := w.removeRunnerFromGithubAndSetPendingDelete(runner.Name, runner.AgentID); err != nil {
return fmt.Errorf("removing runner %s: %w", runner.Name, err)
}
}
}
return nil
}
func (w *Worker) pseudoPoolID() (string, error) {
// This is temporary. We need to extend providers to know about scale sets.
entity, err := w.scaleSet.GetEntity()
if err != nil {
return "", fmt.Errorf("getting entity: %w", err)
}
return fmt.Sprintf("%s-%s", w.scaleSet.Name, entity.ID), nil
}
func (w *Worker) handleScaleSetEvent(event dbCommon.ChangePayload) {
scaleSet, ok := event.Payload.(params.ScaleSet)
if !ok {
slog.ErrorContext(w.ctx, "invalid payload for scale set type", "scale_set_type", event.EntityType, "payload", event.Payload)
return
}
switch event.Operation {
case dbCommon.UpdateOperation:
slog.DebugContext(w.ctx, "got update operation")
w.mux.Lock()
if scaleSet.MaxRunners < w.scaleSet.MaxRunners || !scaleSet.Enabled {
// we stop the listener if the scale set is disabled or if the max runners
// is decreased. In the case where max runners changes but the scale set
// is still enabled, we rely on the keepListenerAlive to restart the listener
// which will listen for new messages with the changed max runners. This way
// we don't have to potentially wait for 50 second for the max runner value
// to be updated, in which time we might get more runners spawned than the
// new max runner value.
if err := w.listener.Stop(); err != nil {
slog.ErrorContext(w.ctx, "error stopping listener", "error", err)
}
}
w.scaleSet = scaleSet
w.mux.Unlock()
default:
slog.DebugContext(w.ctx, "invalid operation type; ignoring", "operation_type", event.Operation)
}
}
func (w *Worker) handleInstanceCleanup(instance params.Instance) error {
if instance.Status == commonParams.InstanceDeleted {
if err := w.store.DeleteInstanceByName(w.ctx, instance.Name); err != nil {
if !errors.Is(err, runnerErrors.ErrNotFound) {
return fmt.Errorf("deleting instance %s: %w", instance.ID, err)
}
}
}
return nil
}
func (w *Worker) handleInstanceEntityEvent(event dbCommon.ChangePayload) {
instance, ok := event.Payload.(params.Instance)
if !ok {
slog.ErrorContext(w.ctx, "invalid payload for instance type", "instance_type", event.EntityType, "payload", event.Payload)
return
}
switch event.Operation {
case dbCommon.CreateOperation:
slog.DebugContext(w.ctx, "got create operation")
w.mux.Lock()
w.runners[instance.ID] = instance
w.mux.Unlock()
case dbCommon.UpdateOperation:
slog.DebugContext(w.ctx, "got update operation")
w.mux.Lock()
if instance.Status == commonParams.InstanceDeleted {
if err := w.handleInstanceCleanup(instance); err != nil {
slog.ErrorContext(w.ctx, "error cleaning up instance", "instance_id", instance.ID, "error", err)
}
w.mux.Unlock()
return
}
oldInstance, ok := w.runners[instance.ID]
w.runners[instance.ID] = instance
if !ok {
slog.DebugContext(w.ctx, "instance not found in local cache; ignoring", "instance_id", instance.ID)
w.mux.Unlock()
return
}
scaleSetCli, err := w.GetScaleSetClient()
if err != nil {
slog.ErrorContext(w.ctx, "error getting scale set client", "error", err)
return
}
if oldInstance.RunnerStatus != instance.RunnerStatus && instance.RunnerStatus == params.RunnerIdle {
serviceRuner, err := scaleSetCli.GetRunner(w.ctx, instance.AgentID)
if err != nil {
slog.ErrorContext(w.ctx, "error getting runner details", "error", err)
w.mux.Unlock()
return
}
status, ok := serviceRuner.Status.(string)
if !ok {
slog.ErrorContext(w.ctx, "error getting runner status", "runner_id", instance.AgentID)
w.mux.Unlock()
return
}
if status != string(params.RunnerIdle) && status != string(params.RunnerActive) {
// nolint:golangci-lint,godox
// TODO: Wait for the status to change for a while (30 seconds?). Mark the instance as
// pending_delete if the runner never comes online.
w.mux.Unlock()
return
}
}
w.mux.Unlock()
case dbCommon.DeleteOperation:
slog.DebugContext(w.ctx, "got delete operation")
w.mux.Lock()
delete(w.runners, instance.ID)
w.mux.Unlock()
default:
slog.DebugContext(w.ctx, "invalid operation type; ignoring", "operation_type", event.Operation)
}
}
func (w *Worker) handleEvent(event dbCommon.ChangePayload) {
switch event.EntityType {
case dbCommon.ScaleSetEntityType:
slog.DebugContext(w.ctx, "got scaleset event")
w.handleScaleSetEvent(event)
case dbCommon.InstanceEntityType:
slog.DebugContext(w.ctx, "got instance event")
w.handleInstanceEntityEvent(event)
default:
slog.DebugContext(w.ctx, "invalid entity type; ignoring", "entity_type", event.EntityType)
}
}
func (w *Worker) loop() {
defer w.Stop()
for {
select {
case <-w.quit:
return
case event, ok := <-w.consumer.Watch():
if !ok {
slog.InfoContext(w.ctx, "consumer channel closed")
return
}
go w.handleEvent(event)
case <-w.ctx.Done():
slog.DebugContext(w.ctx, "context done")
return
}
}
}
func (w *Worker) sleepWithCancel(sleepTime time.Duration) (canceled bool) {
if sleepTime == 0 {
return false
}
ticker := time.NewTicker(sleepTime)
defer ticker.Stop()
select {
case <-ticker.C:
return false
case <-w.quit:
case <-w.ctx.Done():
}
return true
}
func (w *Worker) sessionLoopMayRun() bool {
w.mux.Lock()
defer w.mux.Unlock()
return w.scaleSet.Enabled
}
func (w *Worker) keepListenerAlive() {
var backoff time.Duration
Loop:
for {
if !w.sessionLoopMayRun() {
if canceled := w.sleepWithCancel(2 * time.Second); canceled {
slog.InfoContext(w.ctx, "worker is stopped; exiting keepListenerAlive")
return
}
continue
}
// noop if already started.
if err := w.listener.Start(); err != nil {
slog.ErrorContext(w.ctx, "error starting listener", "error", err, "consumer_id", w.consumerID)
if canceled := w.sleepWithCancel(2 * time.Second); canceled {
slog.InfoContext(w.ctx, "worker is stopped; exiting keepListenerAlive")
return
}
// we failed to start the listener. Try again.
continue
}
select {
case <-w.quit:
return
case <-w.ctx.Done():
return
case <-w.listener.Wait():
slog.DebugContext(w.ctx, "listener is stopped; attempting to restart")
w.mux.Lock()
if !w.scaleSet.Enabled {
w.listener.Stop() // cleanup
w.mux.Unlock()
continue Loop
}
w.mux.Unlock()
for {
w.mux.Lock()
w.listener.Stop() // cleanup
if !w.scaleSet.Enabled {
w.mux.Unlock()
continue Loop
}
slog.DebugContext(w.ctx, "attempting to restart")
if err := w.listener.Start(); err != nil {
w.mux.Unlock()
slog.ErrorContext(w.ctx, "error restarting listener", "error", err)
switch {
case backoff > 60*time.Second:
backoff = 60 * time.Second
case backoff == 0:
backoff = 5 * time.Second
slog.InfoContext(w.ctx, "backing off restart attempt", "backoff", backoff)
default:
backoff *= 2
}
slog.ErrorContext(w.ctx, "error restarting listener", "error", err, "backoff", backoff)
if canceled := w.sleepWithCancel(backoff); canceled {
slog.DebugContext(w.ctx, "listener restart canceled")
return
}
continue
}
w.mux.Unlock()
continue Loop
}
}
}
}
func (w *Worker) handleScaleUp(target, current uint) {
if !w.scaleSet.Enabled {
slog.DebugContext(w.ctx, "scale set is disabled; not scaling up")
return
}
if target <= current {
slog.DebugContext(w.ctx, "target is less than or equal to current; not scaling up")
return
}
controllerConfig, err := w.store.ControllerInfo()
if err != nil {
slog.ErrorContext(w.ctx, "error getting controller config", "error", err)
return
}
scaleSetCli, err := w.GetScaleSetClient()
if err != nil {
slog.ErrorContext(w.ctx, "error getting scale set client", "error", err)
return
}
for i := current; i < target; i++ {
newRunnerName := fmt.Sprintf("%s-%s", w.scaleSet.GetRunnerPrefix(), util.NewID())
jitConfig, err := scaleSetCli.GenerateJitRunnerConfig(w.ctx, newRunnerName, w.scaleSet.ScaleSetID)
if err != nil {
slog.ErrorContext(w.ctx, "error generating jit config", "error", err)
continue
}
slog.DebugContext(w.ctx, "creating new runner", "runner_name", newRunnerName)
decodedJit, err := jitConfig.DecodedJITConfig()
if err != nil {
slog.ErrorContext(w.ctx, "error decoding jit config", "error", err)
continue
}
runnerParams := params.CreateInstanceParams{
Name: newRunnerName,
Status: commonParams.InstancePendingCreate,
RunnerStatus: params.RunnerPending,
OSArch: w.scaleSet.OSArch,
OSType: w.scaleSet.OSType,
CallbackURL: controllerConfig.CallbackURL,
MetadataURL: controllerConfig.MetadataURL,
CreateAttempt: 1,
GitHubRunnerGroup: w.scaleSet.GitHubRunnerGroup,
JitConfiguration: decodedJit,
AgentID: jitConfig.Runner.ID,
}
dbInstance, err := w.store.CreateScaleSetInstance(w.ctx, w.scaleSet.ID, runnerParams)
if err != nil {
slog.ErrorContext(w.ctx, "error creating instance", "error", err)
if err := scaleSetCli.RemoveRunner(w.ctx, jitConfig.Runner.ID); err != nil {
slog.ErrorContext(w.ctx, "error deleting runner", "error", err)
}
continue
}
w.runners[dbInstance.ID] = dbInstance
_, err = scaleSetCli.GetRunner(w.ctx, jitConfig.Runner.ID)
if err != nil {
slog.ErrorContext(w.ctx, "error getting runner details", "error", err)
continue
}
}
}
func (w *Worker) waitForToolsOrCancel() (hasTools, stopped bool) {
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
select {
case <-ticker.C:
entity, err := w.scaleSet.GetEntity()
if err != nil {
slog.ErrorContext(w.ctx, "error getting entity", "error", err)
}
if _, err := cache.GetGithubToolsCache(entity.ID); err != nil {
slog.DebugContext(w.ctx, "tools not found in cache; waiting for tools")
return false, false
}
return true, false
case <-w.quit:
return false, true
case <-w.ctx.Done():
return false, true
}
}
func (w *Worker) handleScaleDown(target, current uint) {
delta := current - target
if delta <= 0 {
return
}
removed := 0
candidates := []params.Instance{}
for _, runner := range w.runners {
locked := locking.TryLock(runner.Name, w.consumerID)
if !locked {
slog.DebugContext(w.ctx, "runner is locked; skipping", "runner_name", runner.Name)
continue
}
switch runner.Status {
case commonParams.InstanceRunning:
if runner.RunnerStatus != params.RunnerActive {
candidates = append(candidates, runner)
}
case commonParams.InstancePendingDelete, commonParams.InstancePendingForceDelete,
commonParams.InstanceDeleting, commonParams.InstanceDeleted:
removed++
locking.Unlock(runner.Name, true)
continue
default:
slog.DebugContext(w.ctx, "runner is not in a valid state; skipping", "runner_name", runner.Name, "runner_status", runner.Status)
locking.Unlock(runner.Name, false)
continue
}
locking.Unlock(runner.Name, false)
}
if removed >= int(delta) {
return
}
for _, runner := range candidates {
if removed >= int(delta) {
break
}
locked := locking.TryLock(runner.Name, w.consumerID)
if !locked {
slog.DebugContext(w.ctx, "runner is locked; skipping", "runner_name", runner.Name)
continue
}
switch runner.Status {
case commonParams.InstancePendingCreate, commonParams.InstanceRunning:
case commonParams.InstancePendingDelete, commonParams.InstancePendingForceDelete,
commonParams.InstanceDeleting, commonParams.InstanceDeleted:
removed++
locking.Unlock(runner.Name, true)
continue
default:
slog.DebugContext(w.ctx, "runner is not in a valid state; skipping", "runner_name", runner.Name, "runner_status", runner.Status)
locking.Unlock(runner.Name, false)
continue
}
switch runner.RunnerStatus {
case params.RunnerTerminated, params.RunnerActive:
slog.DebugContext(w.ctx, "runner is not in a valid state; skipping", "runner_name", runner.Name, "runner_status", runner.RunnerStatus)
locking.Unlock(runner.Name, false)
continue
}
scaleSetCli, err := w.GetScaleSetClient()
if err != nil {
slog.ErrorContext(w.ctx, "error getting scale set client", "error", err)
return
}
slog.DebugContext(w.ctx, "removing runner", "runner_name", runner.Name)
if err := scaleSetCli.RemoveRunner(w.ctx, runner.AgentID); err != nil {
if !errors.Is(err, runnerErrors.ErrNotFound) {
slog.ErrorContext(w.ctx, "error removing runner", "runner_name", runner.Name, "error", err)
locking.Unlock(runner.Name, false)
continue
}
}
runnerUpdateParams := params.UpdateInstanceParams{
Status: commonParams.InstancePendingDelete,
}
if _, err := w.store.UpdateInstance(w.ctx, runner.Name, runnerUpdateParams); err != nil {
if errors.Is(err, runnerErrors.ErrNotFound) {
// The error seems to be that the instance was removed from the database. We still had it in our
// state, so either the update never came from the watcher or something else happened.
// Remove it from the local cache.
delete(w.runners, runner.ID)
removed++
locking.Unlock(runner.Name, true)
continue
}
// nolint:golangci-lint,godox
// TODO: This should not happen, unless there is some issue with the database.
// The UpdateInstance() function should add tenacity, but even in that case, if it
// still errors out, we need to handle it somehow.
slog.ErrorContext(w.ctx, "error updating runner", "runner_name", runner.Name, "error", err)
locking.Unlock(runner.Name, false)
continue
}
removed++
locking.Unlock(runner.Name, false)
}
}
func (w *Worker) handleAutoScale() {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
lastMsg := ""
lastMsgDebugLog := func(msg string, targetRunners, currentRunners uint) {
if lastMsg != msg {
slog.DebugContext(w.ctx, msg, "current_runners", currentRunners, "target_runners", targetRunners)
lastMsg = msg
}
}
for {
hasTools, stopped := w.waitForToolsOrCancel()
if stopped {
slog.DebugContext(w.ctx, "worker is stopped; exiting handleAutoScale")
return
}
if !hasTools {
w.sleepWithCancel(1 * time.Second)
continue
}
select {
case <-w.quit:
return
case <-w.ctx.Done():
return
case <-ticker.C:
w.mux.Lock()
for _, instance := range w.runners {
if err := w.handleInstanceCleanup(instance); err != nil {
slog.ErrorContext(w.ctx, "error cleaning up instance", "instance_id", instance.ID, "error", err)
}
}
var desiredRunners uint
if w.scaleSet.DesiredRunnerCount > 0 {
desiredRunners = uint(w.scaleSet.DesiredRunnerCount)
}
targetRunners := min(w.scaleSet.MinIdleRunners+desiredRunners, w.scaleSet.MaxRunners)
currentRunners := uint(len(w.runners))
if currentRunners == targetRunners {
lastMsgDebugLog("desired runner count reached", targetRunners, currentRunners)
w.mux.Unlock()
continue
}
if currentRunners < targetRunners {
lastMsgDebugLog("scaling up", targetRunners, currentRunners)
w.handleScaleUp(targetRunners, currentRunners)
} else {
lastMsgDebugLog("attempting to scale down", targetRunners, currentRunners)
w.handleScaleDown(targetRunners, currentRunners)
}
w.mux.Unlock()
}
}
}