garm/workers/scaleset/scaleset.go
Gabriel Adrian Samfira 18902f884a Use JoinPath() and relative paths
Use JoinPath() in newActionsRequest() and make sure we pass relative
paths to it. This should fix scale sets on GHES.

Signed-off-by: Gabriel Adrian Samfira <gsamfira@cloudbasesolutions.com>
2025-07-06 19:22:43 +00:00

980 lines
32 KiB
Go

// Copyright 2025 Cloudbase Solutions SRL
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License. You may obtain
// a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations
// under the License.
package scaleset
import (
"context"
"errors"
"fmt"
"log/slog"
"sync"
"time"
runnerErrors "github.com/cloudbase/garm-provider-common/errors"
commonParams "github.com/cloudbase/garm-provider-common/params"
"github.com/cloudbase/garm-provider-common/util"
"github.com/cloudbase/garm/cache"
dbCommon "github.com/cloudbase/garm/database/common"
"github.com/cloudbase/garm/database/watcher"
"github.com/cloudbase/garm/locking"
"github.com/cloudbase/garm/params"
"github.com/cloudbase/garm/runner/common"
)
func NewWorker(ctx context.Context, store dbCommon.Store, scaleSet params.ScaleSet, provider common.Provider) (*Worker, error) {
consumerID := fmt.Sprintf("scaleset-worker-%s-%d", scaleSet.Name, scaleSet.ID)
controllerInfo, err := store.ControllerInfo()
if err != nil {
return nil, fmt.Errorf("getting controller info: %w", err)
}
return &Worker{
ctx: ctx,
controllerInfo: controllerInfo,
consumerID: consumerID,
store: store,
provider: provider,
scaleSet: scaleSet,
runners: make(map[string]params.Instance),
}, nil
}
type Worker struct {
ctx context.Context
consumerID string
controllerInfo params.ControllerInfo
provider common.Provider
store dbCommon.Store
scaleSet params.ScaleSet
runners map[string]params.Instance
consumer dbCommon.Consumer
listener *scaleSetListener
mux sync.Mutex
running bool
quit chan struct{}
}
func (w *Worker) Stop() error {
slog.DebugContext(w.ctx, "stopping scale set worker", "scale_set", w.consumerID)
w.mux.Lock()
defer w.mux.Unlock()
if !w.running {
return nil
}
w.consumer.Close()
w.running = false
if w.quit != nil {
close(w.quit)
}
w.listener.Stop()
return nil
}
func (w *Worker) Start() (err error) {
slog.DebugContext(w.ctx, "starting scale set worker", "scale_set", w.consumerID)
w.mux.Lock()
defer w.mux.Unlock()
if w.running {
return nil
}
instances, err := w.store.ListScaleSetInstances(w.ctx, w.scaleSet.ID)
if err != nil {
return fmt.Errorf("listing scale set instances: %w", err)
}
for _, instance := range instances {
if instance.Status == commonParams.InstanceCreating {
// We're just starting up. We found an instance stuck in creating.
// When a provider creates an instance, it sets the db instance to
// creating and then issues an API call to the IaaS to create the
// instance using some userdata it needs to come up. But the instance
// will still need to call back home to fetch aditional metadata and
// complete its setup. We should remove the instance as it is not
// possible to reliably determine the state of the instance (if it's in
// mid boot before it reached the phase where it runs the metadtata, or
// if it already failed).
instanceState := commonParams.InstancePendingDelete
locking.Lock(instance.Name, w.consumerID)
if instance.AgentID != 0 {
scaleSetCli, err := w.GetScaleSetClient()
if err != nil {
slog.ErrorContext(w.ctx, "error getting scale set client", "error", err)
return fmt.Errorf("getting scale set client: %w", err)
}
if err := scaleSetCli.RemoveRunner(w.ctx, instance.AgentID); err != nil {
// scale sets use JIT runners. This means that we create the runner in github
// before we create the actual instance that will use the credentials. We need
// to remove the runner from github if it exists.
if !errors.Is(err, runnerErrors.ErrNotFound) {
if errors.Is(err, runnerErrors.ErrUnauthorized) {
// we don't have access to remove the runner. This implies that our
// credentials may have expired or ar incorrect.
//
// nolint:golangci-lint,godox
// TODO(gabriel-samfira): we need to set the scale set as inactive and stop the listener (if any).
slog.ErrorContext(w.ctx, "error removing runner", "runner_name", instance.Name, "error", err)
w.runners[instance.ID] = instance
locking.Unlock(instance.Name, false)
continue
}
// The runner may have come up, registered and is currently running a
// job, in which case, github will not allow us to remove it.
runnerInstance, err := scaleSetCli.GetRunner(w.ctx, instance.AgentID)
if err != nil {
if !errors.Is(err, runnerErrors.ErrNotFound) {
// We could not get info about the runner and it wasn't not found
slog.ErrorContext(w.ctx, "error getting runner details", "error", err)
w.runners[instance.ID] = instance
locking.Unlock(instance.Name, false)
continue
}
}
if runnerInstance.Status == string(params.RunnerIdle) ||
runnerInstance.Status == string(params.RunnerActive) {
// This is a highly unlikely scenario, but let's account for it anyway.
//
// The runner is running a job or is idle. Mark it as running, as
// it appears that it finished booting and is now running.
//
// NOTE: if the instance was in creating and it managed to boot, there
// is a high chance that the we do not have a provider ID for the runner
// inside our database. When removing the runner, the provider will attempt
// to use the instance name instead of the provider ID, the same as when
// creation of the instance fails and we try to clean up any lingering resources
// in the provider.
slog.DebugContext(w.ctx, "runner is running a job or is idle; not removing", "runner_name", instance.Name)
instanceState = commonParams.InstanceRunning
}
}
}
}
runnerUpdateParams := params.UpdateInstanceParams{
Status: instanceState,
}
instance, err = w.store.UpdateInstance(w.ctx, instance.Name, runnerUpdateParams)
if err != nil {
if !errors.Is(err, runnerErrors.ErrNotFound) {
locking.Unlock(instance.Name, false)
return fmt.Errorf("updating runner %s: %w", instance.Name, err)
}
}
} else if instance.Status == commonParams.InstanceDeleting {
// Set the instance in deleting. It is assumed that the runner was already
// removed from github either by github or by garm. Deleting status indicates
// that it was already being handled by the provider. There should be no entry on
// github for the runner if that was the case.
// Setting it in pending_delete will cause the provider to try again, an operation
// which is idempotent (if it's already deleted, the provider reports success).
runnerUpdateParams := params.UpdateInstanceParams{
Status: commonParams.InstancePendingDelete,
}
instance, err = w.store.UpdateInstance(w.ctx, instance.Name, runnerUpdateParams)
if err != nil {
if !errors.Is(err, runnerErrors.ErrNotFound) {
locking.Unlock(instance.Name, false)
return fmt.Errorf("updating runner %s: %w", instance.Name, err)
}
}
}
w.runners[instance.ID] = instance
locking.Unlock(instance.Name, false)
}
consumer, err := watcher.RegisterConsumer(
w.ctx, w.consumerID,
watcher.WithAny(
watcher.WithAll(
watcher.WithScaleSetFilter(w.scaleSet),
watcher.WithOperationTypeFilter(dbCommon.UpdateOperation),
),
watcher.WithScaleSetInstanceFilter(w.scaleSet),
),
)
if err != nil {
return fmt.Errorf("error registering consumer: %w", err)
}
defer func() {
if err != nil {
consumer.Close()
}
}()
slog.DebugContext(w.ctx, "creating scale set listener")
listener := newListener(w.ctx, w)
if w.scaleSet.Enabled {
slog.DebugContext(w.ctx, "starting scale set listener")
if err := listener.Start(); err != nil {
return fmt.Errorf("error starting listener: %w", err)
}
} else {
slog.InfoContext(w.ctx, "scale set is disabled; not starting listener")
}
w.listener = listener
w.consumer = consumer
w.running = true
w.quit = make(chan struct{})
slog.DebugContext(w.ctx, "starting scale set worker loops", "scale_set", w.consumerID)
go w.loop()
go w.keepListenerAlive()
go w.handleAutoScale()
return nil
}
func (w *Worker) runnerByName() map[string]params.Instance {
runners := make(map[string]params.Instance)
for _, runner := range w.runners {
runners[runner.Name] = runner
}
return runners
}
func (w *Worker) setRunnerDBStatus(runner string, status commonParams.InstanceStatus) (params.Instance, error) {
updateParams := params.UpdateInstanceParams{
Status: status,
}
newDbInstance, err := w.store.UpdateInstance(w.ctx, runner, updateParams)
if err != nil {
if !errors.Is(err, runnerErrors.ErrNotFound) {
return params.Instance{}, fmt.Errorf("updating runner %s: %w", runner, err)
}
}
return newDbInstance, nil
}
func (w *Worker) removeRunnerFromGithubAndSetPendingDelete(runnerName string, agentID int64) error {
scaleSetCli, err := w.GetScaleSetClient()
if err != nil {
return fmt.Errorf("getting scale set client: %w", err)
}
if err := scaleSetCli.RemoveRunner(w.ctx, agentID); err != nil {
if !errors.Is(err, runnerErrors.ErrNotFound) {
return fmt.Errorf("removing runner %s: %w", runnerName, err)
}
}
instance, err := w.setRunnerDBStatus(runnerName, commonParams.InstancePendingDelete)
if err != nil {
return fmt.Errorf("updating runner %s: %w", instance.Name, err)
}
w.runners[instance.ID] = instance
return nil
}
func (w *Worker) reapTimedOutRunners(runners map[string]params.RunnerReference) (func(), error) {
lockNames := []string{}
unlockFn := func() {
for _, name := range lockNames {
slog.DebugContext(w.ctx, "unlockFn unlocking runner", "runner_name", name)
locking.Unlock(name, false)
}
}
for _, runner := range w.runners {
if time.Since(runner.UpdatedAt).Minutes() < float64(w.scaleSet.RunnerBootstrapTimeout) {
continue
}
switch runner.Status {
case commonParams.InstancePendingDelete, commonParams.InstancePendingForceDelete,
commonParams.InstanceDeleting, commonParams.InstanceDeleted:
continue
}
if runner.RunnerStatus != params.RunnerPending && runner.RunnerStatus != params.RunnerInstalling {
slog.DebugContext(w.ctx, "runner is not pending or installing; skipping", "runner_name", runner.Name)
continue
}
if ghRunner, ok := runners[runner.Name]; !ok || ghRunner.GetStatus() == params.RunnerOffline {
if ok := locking.TryLock(runner.Name, w.consumerID); !ok {
slog.DebugContext(w.ctx, "runner is locked; skipping", "runner_name", runner.Name)
continue
}
lockNames = append(lockNames, runner.Name)
slog.InfoContext(
w.ctx, "reaping timed-out/failed runner",
"runner_name", runner.Name)
if err := w.removeRunnerFromGithubAndSetPendingDelete(runner.Name, runner.AgentID); err != nil {
slog.ErrorContext(w.ctx, "error removing runner", "runner_name", runner.Name, "error", err)
unlockFn()
return nil, fmt.Errorf("removing runner %s: %w", runner.Name, err)
}
}
}
return unlockFn, nil
}
func (w *Worker) consolidateRunnerState(runners []params.RunnerReference) error {
w.mux.Lock()
defer w.mux.Unlock()
ghRunnersByName := make(map[string]params.RunnerReference)
for _, runner := range runners {
ghRunnersByName[runner.Name] = runner
}
scaleSetCli, err := w.GetScaleSetClient()
if err != nil {
return fmt.Errorf("getting scale set client: %w", err)
}
dbRunnersByName := w.runnerByName()
// Cross check what exists in github with what we have in the database.
for name, runner := range ghRunnersByName {
status := runner.GetStatus()
if _, ok := dbRunnersByName[name]; !ok {
// runner appears to be active. Is it not managed by GARM?
if status != params.RunnerIdle && status != params.RunnerActive {
slog.InfoContext(w.ctx, "runner does not exist in GARM; removing from github", "runner_name", name)
if err := scaleSetCli.RemoveRunner(w.ctx, runner.ID); err != nil {
if errors.Is(err, runnerErrors.ErrNotFound) {
continue
}
slog.ErrorContext(w.ctx, "error removing runner", "runner_name", runner.Name, "error", err)
}
}
continue
}
}
unlockFn, err := w.reapTimedOutRunners(ghRunnersByName)
if err != nil {
return fmt.Errorf("reaping timed out runners: %w", err)
}
defer unlockFn()
// refresh the map. It may have been mutated above.
dbRunnersByName = w.runnerByName()
// Cross check what exists in the database with what we have in github.
for name, runner := range dbRunnersByName {
// in the case of scale sets, JIT configs re used. There is no situation
// in which we create a runner in the DB and one does not exist in github.
// We can safely assume that if the runner is not in github anymore, it can
// be removed from the provider and the DB.
switch runner.Status {
case commonParams.InstancePendingDelete, commonParams.InstancePendingForceDelete,
commonParams.InstanceDeleting, commonParams.InstanceDeleted:
continue
}
if _, ok := ghRunnersByName[name]; !ok {
if ok := locking.TryLock(name, w.consumerID); !ok {
slog.DebugContext(w.ctx, "runner is locked; skipping", "runner_name", name)
continue
}
// unlock the runner only after this function returns. This function also cross
// checks between the provider and the database, and removes left over runners.
// If we unlock early, the provider worker will attempt to remove runners that
// we set in pending_delete. This function holds the mutex, so we won't see those
// changes until we return. So we hold the instance lock here until we are done.
// That way, even if the provider sees the pending_delete status, it won't act on
// it until it manages to lock the instance.
defer locking.Unlock(name, false)
slog.InfoContext(w.ctx, "runner does not exist in github; removing from provider", "runner_name", name)
instance, err := w.setRunnerDBStatus(runner.Name, commonParams.InstancePendingDelete)
if err != nil {
if !errors.Is(err, runnerErrors.ErrNotFound) {
return fmt.Errorf("updating runner %s: %w", instance.Name, err)
}
}
// We will get an update event anyway from the watcher, but updating the runner
// here, will prevent race conditions if some other event is already in the queue
// which involves this runner. For the duration of the lifetime of this function, we
// hold the lock, so no race condition can occur.
w.runners[runner.ID] = instance
}
}
// Cross check what exists in the provider with the DB.
pseudoPoolID, err := w.pseudoPoolID()
if err != nil {
return fmt.Errorf("getting pseudo pool ID: %w", err)
}
listParams := common.ListInstancesParams{
ListInstancesV011: common.ListInstancesV011Params{
ProviderBaseParams: common.ProviderBaseParams{
ControllerInfo: w.controllerInfo,
},
},
}
providerRunners, err := w.provider.ListInstances(w.ctx, pseudoPoolID, listParams)
if err != nil {
return fmt.Errorf("listing instances: %w", err)
}
providerRunnersByName := make(map[string]commonParams.ProviderInstance)
for _, runner := range providerRunners {
providerRunnersByName[runner.Name] = runner
}
deleteInstanceParams := common.DeleteInstanceParams{
DeleteInstanceV011: common.DeleteInstanceV011Params{
ProviderBaseParams: common.ProviderBaseParams{
ControllerInfo: w.controllerInfo,
},
},
}
// refresh the map. It may have been mutated above.
dbRunnersByName = w.runnerByName()
for _, runner := range providerRunners {
if _, ok := dbRunnersByName[runner.Name]; !ok {
slog.InfoContext(w.ctx, "runner does not exist in database; removing from provider", "runner_name", runner.Name)
// There is no situation in which the runner will disappear from the provider
// after it was removed from the database. The provider worker will remove the
// instance from the provider nd mark the instance as deleted in the database.
// It is the responsibility of the scaleset worker to then clean up the runners
// in the deleted state.
// That means that if we have a runner in the provider but not the DB, it is most
// likely an inconsistency.
if err := w.provider.DeleteInstance(w.ctx, runner.Name, deleteInstanceParams); err != nil {
slog.ErrorContext(w.ctx, "error removing instance", "instance_name", runner.Name, "error", err)
}
continue
}
}
for _, runner := range dbRunnersByName {
switch runner.Status {
case commonParams.InstancePendingDelete, commonParams.InstancePendingForceDelete,
commonParams.InstanceDeleting, commonParams.InstanceDeleted:
// This instance is already being deleted.
continue
}
locked := locking.TryLock(runner.Name, w.consumerID)
if !locked {
slog.DebugContext(w.ctx, "runner is locked; skipping", "runner_name", runner.Name)
continue
}
defer locking.Unlock(runner.Name, false)
if _, ok := providerRunnersByName[runner.Name]; !ok {
// The runner is not in the provider anymore. Remove it from the DB.
slog.InfoContext(w.ctx, "runner does not exist in provider; removing from database", "runner_name", runner.Name)
if err := w.removeRunnerFromGithubAndSetPendingDelete(runner.Name, runner.AgentID); err != nil {
return fmt.Errorf("removing runner %s: %w", runner.Name, err)
}
}
}
return nil
}
func (w *Worker) pseudoPoolID() (string, error) {
// This is temporary. We need to extend providers to know about scale sets.
entity, err := w.scaleSet.GetEntity()
if err != nil {
return "", fmt.Errorf("getting entity: %w", err)
}
return fmt.Sprintf("%s-%s", w.scaleSet.Name, entity.ID), nil
}
func (w *Worker) handleScaleSetEvent(event dbCommon.ChangePayload) {
scaleSet, ok := event.Payload.(params.ScaleSet)
if !ok {
slog.ErrorContext(w.ctx, "invalid payload for scale set type", "scale_set_type", event.EntityType, "payload", event.Payload)
return
}
switch event.Operation {
case dbCommon.UpdateOperation:
slog.DebugContext(w.ctx, "got update operation")
w.mux.Lock()
if scaleSet.MaxRunners < w.scaleSet.MaxRunners || !scaleSet.Enabled {
// we stop the listener if the scale set is disabled or if the max runners
// is decreased. In the case where max runners changes but the scale set
// is still enabled, we rely on the keepListenerAlive to restart the listener
// which will listen for new messages with the changed max runners. This way
// we don't have to potentially wait for 50 second for the max runner value
// to be updated, in which time we might get more runners spawned than the
// new max runner value.
if err := w.listener.Stop(); err != nil {
slog.ErrorContext(w.ctx, "error stopping listener", "error", err)
}
}
w.scaleSet = scaleSet
w.mux.Unlock()
default:
slog.DebugContext(w.ctx, "invalid operation type; ignoring", "operation_type", event.Operation)
}
}
func (w *Worker) handleInstanceCleanup(instance params.Instance) error {
if instance.Status == commonParams.InstanceDeleted {
if err := w.store.DeleteInstanceByName(w.ctx, instance.Name); err != nil {
if !errors.Is(err, runnerErrors.ErrNotFound) {
return fmt.Errorf("deleting instance %s: %w", instance.ID, err)
}
}
}
return nil
}
func (w *Worker) handleInstanceEntityEvent(event dbCommon.ChangePayload) {
instance, ok := event.Payload.(params.Instance)
if !ok {
slog.ErrorContext(w.ctx, "invalid payload for instance type", "instance_type", event.EntityType, "payload", event.Payload)
return
}
switch event.Operation {
case dbCommon.CreateOperation:
slog.DebugContext(w.ctx, "got create operation")
w.mux.Lock()
w.runners[instance.ID] = instance
w.mux.Unlock()
case dbCommon.UpdateOperation:
slog.DebugContext(w.ctx, "got update operation")
w.mux.Lock()
if instance.Status == commonParams.InstanceDeleted {
if err := w.handleInstanceCleanup(instance); err != nil {
slog.ErrorContext(w.ctx, "error cleaning up instance", "instance_id", instance.ID, "error", err)
}
w.mux.Unlock()
return
}
oldInstance, ok := w.runners[instance.ID]
w.runners[instance.ID] = instance
if !ok {
slog.DebugContext(w.ctx, "instance not found in local cache; ignoring", "instance_id", instance.ID)
w.mux.Unlock()
return
}
scaleSetCli, err := w.GetScaleSetClient()
if err != nil {
slog.ErrorContext(w.ctx, "error getting scale set client", "error", err)
return
}
if oldInstance.RunnerStatus != instance.RunnerStatus && instance.RunnerStatus == params.RunnerIdle {
serviceRuner, err := scaleSetCli.GetRunner(w.ctx, instance.AgentID)
if err != nil {
slog.ErrorContext(w.ctx, "error getting runner details", "error", err)
w.mux.Unlock()
return
}
status, ok := serviceRuner.Status.(string)
if !ok {
slog.ErrorContext(w.ctx, "error getting runner status", "runner_id", instance.AgentID)
w.mux.Unlock()
return
}
if status != string(params.RunnerIdle) && status != string(params.RunnerActive) {
// nolint:golangci-lint,godox
// TODO: Wait for the status to change for a while (30 seconds?). Mark the instance as
// pending_delete if the runner never comes online.
w.mux.Unlock()
return
}
}
w.mux.Unlock()
case dbCommon.DeleteOperation:
slog.DebugContext(w.ctx, "got delete operation")
w.mux.Lock()
delete(w.runners, instance.ID)
w.mux.Unlock()
default:
slog.DebugContext(w.ctx, "invalid operation type; ignoring", "operation_type", event.Operation)
}
}
func (w *Worker) handleEvent(event dbCommon.ChangePayload) {
switch event.EntityType {
case dbCommon.ScaleSetEntityType:
slog.DebugContext(w.ctx, "got scaleset event")
w.handleScaleSetEvent(event)
case dbCommon.InstanceEntityType:
slog.DebugContext(w.ctx, "got instance event")
w.handleInstanceEntityEvent(event)
default:
slog.DebugContext(w.ctx, "invalid entity type; ignoring", "entity_type", event.EntityType)
}
}
func (w *Worker) loop() {
defer w.Stop()
for {
select {
case <-w.quit:
return
case event, ok := <-w.consumer.Watch():
if !ok {
slog.InfoContext(w.ctx, "consumer channel closed")
return
}
go w.handleEvent(event)
case <-w.ctx.Done():
slog.DebugContext(w.ctx, "context done")
return
}
}
}
func (w *Worker) sleepWithCancel(sleepTime time.Duration) (canceled bool) {
ticker := time.NewTicker(sleepTime)
defer ticker.Stop()
select {
case <-ticker.C:
return false
case <-w.quit:
case <-w.ctx.Done():
}
return true
}
func (w *Worker) sessionLoopMayRun() bool {
w.mux.Lock()
defer w.mux.Unlock()
return w.scaleSet.Enabled
}
func (w *Worker) keepListenerAlive() {
var backoff time.Duration
Loop:
for {
if !w.sessionLoopMayRun() {
if canceled := w.sleepWithCancel(2 * time.Second); canceled {
slog.InfoContext(w.ctx, "worker is stopped; exiting keepListenerAlive")
return
}
continue
}
// noop if already started. If the scaleset was just enabled, we need to
// start the listener here, or the <-w.listener.Wait() channel receive bellow
// will block forever, even if we start the listener, as a nil channel will
// block forever.
if err := w.listener.Start(); err != nil {
slog.ErrorContext(w.ctx, "error starting listener", "error", err, "consumer_id", w.consumerID)
if canceled := w.sleepWithCancel(2 * time.Second); canceled {
slog.InfoContext(w.ctx, "worker is stopped; exiting keepListenerAlive")
return
}
// we failed to start the listener. Try again.
continue
}
select {
case <-w.quit:
return
case <-w.ctx.Done():
return
case <-w.listener.Wait():
slog.DebugContext(w.ctx, "listener is stopped; attempting to restart")
w.mux.Lock()
if !w.scaleSet.Enabled {
w.listener.Stop() // cleanup
w.mux.Unlock()
continue Loop
}
w.mux.Unlock()
for {
w.mux.Lock()
w.listener.Stop() // cleanup
if !w.scaleSet.Enabled {
w.mux.Unlock()
continue Loop
}
slog.DebugContext(w.ctx, "attempting to restart")
if err := w.listener.Start(); err != nil {
w.mux.Unlock()
slog.ErrorContext(w.ctx, "error restarting listener", "error", err)
switch {
case backoff > 60*time.Second:
backoff = 60 * time.Second
case backoff == 0:
backoff = 5 * time.Second
slog.InfoContext(w.ctx, "backing off restart attempt", "backoff", backoff)
default:
backoff *= 2
}
slog.ErrorContext(w.ctx, "error restarting listener", "error", err, "backoff", backoff)
if canceled := w.sleepWithCancel(backoff); canceled {
slog.DebugContext(w.ctx, "listener restart canceled")
return
}
continue
}
w.mux.Unlock()
continue Loop
}
}
}
}
func (w *Worker) handleScaleUp(target, current uint) {
if !w.scaleSet.Enabled {
slog.DebugContext(w.ctx, "scale set is disabled; not scaling up")
return
}
if target <= current {
slog.DebugContext(w.ctx, "target is less than or equal to current; not scaling up")
return
}
controllerConfig, err := w.store.ControllerInfo()
if err != nil {
slog.ErrorContext(w.ctx, "error getting controller config", "error", err)
return
}
scaleSetCli, err := w.GetScaleSetClient()
if err != nil {
slog.ErrorContext(w.ctx, "error getting scale set client", "error", err)
return
}
for i := current; i < target; i++ {
newRunnerName := fmt.Sprintf("%s-%s", w.scaleSet.GetRunnerPrefix(), util.NewID())
jitConfig, err := scaleSetCli.GenerateJitRunnerConfig(w.ctx, newRunnerName, w.scaleSet.ScaleSetID)
if err != nil {
slog.ErrorContext(w.ctx, "error generating jit config", "error", err)
continue
}
slog.DebugContext(w.ctx, "creating new runner", "runner_name", newRunnerName)
decodedJit, err := jitConfig.DecodedJITConfig()
if err != nil {
slog.ErrorContext(w.ctx, "error decoding jit config", "error", err)
continue
}
runnerParams := params.CreateInstanceParams{
Name: newRunnerName,
Status: commonParams.InstancePendingCreate,
RunnerStatus: params.RunnerPending,
OSArch: w.scaleSet.OSArch,
OSType: w.scaleSet.OSType,
CallbackURL: controllerConfig.CallbackURL,
MetadataURL: controllerConfig.MetadataURL,
CreateAttempt: 1,
GitHubRunnerGroup: w.scaleSet.GitHubRunnerGroup,
JitConfiguration: decodedJit,
AgentID: jitConfig.Runner.ID,
}
dbInstance, err := w.store.CreateScaleSetInstance(w.ctx, w.scaleSet.ID, runnerParams)
if err != nil {
slog.ErrorContext(w.ctx, "error creating instance", "error", err)
if err := scaleSetCli.RemoveRunner(w.ctx, jitConfig.Runner.ID); err != nil {
slog.ErrorContext(w.ctx, "error deleting runner", "error", err)
}
continue
}
w.runners[dbInstance.ID] = dbInstance
_, err = scaleSetCli.GetRunner(w.ctx, jitConfig.Runner.ID)
if err != nil {
slog.ErrorContext(w.ctx, "error getting runner details", "error", err)
continue
}
}
}
func (w *Worker) waitForToolsOrCancel() (hasTools, stopped bool) {
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
select {
case <-ticker.C:
entity, err := w.scaleSet.GetEntity()
if err != nil {
slog.ErrorContext(w.ctx, "error getting entity", "error", err)
}
if _, err := cache.GetGithubToolsCache(entity.ID); err != nil {
slog.DebugContext(w.ctx, "tools not found in cache; waiting for tools")
return false, false
}
return true, false
case <-w.quit:
return false, true
case <-w.ctx.Done():
return false, true
}
}
func (w *Worker) handleScaleDown(target, current uint) {
delta := current - target
if delta <= 0 {
return
}
removed := 0
candidates := []params.Instance{}
for _, runner := range w.runners {
locked := locking.TryLock(runner.Name, w.consumerID)
if !locked {
slog.DebugContext(w.ctx, "runner is locked; skipping", "runner_name", runner.Name)
continue
}
switch runner.Status {
case commonParams.InstanceRunning:
if runner.RunnerStatus != params.RunnerActive {
candidates = append(candidates, runner)
}
case commonParams.InstancePendingDelete, commonParams.InstancePendingForceDelete,
commonParams.InstanceDeleting, commonParams.InstanceDeleted:
removed++
locking.Unlock(runner.Name, true)
continue
default:
slog.DebugContext(w.ctx, "runner is not in a valid state; skipping", "runner_name", runner.Name, "runner_status", runner.Status)
locking.Unlock(runner.Name, false)
continue
}
locking.Unlock(runner.Name, false)
}
if removed >= int(delta) {
return
}
for _, runner := range candidates {
if removed >= int(delta) {
break
}
locked := locking.TryLock(runner.Name, w.consumerID)
if !locked {
slog.DebugContext(w.ctx, "runner is locked; skipping", "runner_name", runner.Name)
continue
}
switch runner.Status {
case commonParams.InstancePendingCreate, commonParams.InstanceRunning:
case commonParams.InstancePendingDelete, commonParams.InstancePendingForceDelete,
commonParams.InstanceDeleting, commonParams.InstanceDeleted:
removed++
locking.Unlock(runner.Name, true)
continue
default:
slog.DebugContext(w.ctx, "runner is not in a valid state; skipping", "runner_name", runner.Name, "runner_status", runner.Status)
locking.Unlock(runner.Name, false)
continue
}
switch runner.RunnerStatus {
case params.RunnerTerminated, params.RunnerActive:
slog.DebugContext(w.ctx, "runner is not in a valid state; skipping", "runner_name", runner.Name, "runner_status", runner.RunnerStatus)
locking.Unlock(runner.Name, false)
continue
}
scaleSetCli, err := w.GetScaleSetClient()
if err != nil {
slog.ErrorContext(w.ctx, "error getting scale set client", "error", err)
return
}
slog.DebugContext(w.ctx, "removing runner", "runner_name", runner.Name)
if err := scaleSetCli.RemoveRunner(w.ctx, runner.AgentID); err != nil {
if !errors.Is(err, runnerErrors.ErrNotFound) {
slog.ErrorContext(w.ctx, "error removing runner", "runner_name", runner.Name, "error", err)
locking.Unlock(runner.Name, false)
continue
}
}
runnerUpdateParams := params.UpdateInstanceParams{
Status: commonParams.InstancePendingDelete,
}
if _, err := w.store.UpdateInstance(w.ctx, runner.Name, runnerUpdateParams); err != nil {
if errors.Is(err, runnerErrors.ErrNotFound) {
// The error seems to be that the instance was removed from the database. We still had it in our
// state, so either the update never came from the watcher or something else happened.
// Remove it from the local cache.
delete(w.runners, runner.ID)
removed++
locking.Unlock(runner.Name, true)
continue
}
// nolint:golangci-lint,godox
// TODO: This should not happen, unless there is some issue with the database.
// The UpdateInstance() function should add tenacity, but even in that case, if it
// still errors out, we need to handle it somehow.
slog.ErrorContext(w.ctx, "error updating runner", "runner_name", runner.Name, "error", err)
locking.Unlock(runner.Name, false)
continue
}
removed++
locking.Unlock(runner.Name, false)
}
}
func (w *Worker) handleAutoScale() {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
lastMsg := ""
lastMsgDebugLog := func(msg string, targetRunners, currentRunners uint) {
if lastMsg != msg {
slog.DebugContext(w.ctx, msg, "current_runners", currentRunners, "target_runners", targetRunners)
lastMsg = msg
}
}
for {
hasTools, stopped := w.waitForToolsOrCancel()
if stopped {
slog.DebugContext(w.ctx, "worker is stopped; exiting handleAutoScale")
return
}
if !hasTools {
w.sleepWithCancel(1 * time.Second)
continue
}
select {
case <-w.quit:
return
case <-w.ctx.Done():
return
case <-ticker.C:
w.mux.Lock()
for _, instance := range w.runners {
if err := w.handleInstanceCleanup(instance); err != nil {
slog.ErrorContext(w.ctx, "error cleaning up instance", "instance_id", instance.ID, "error", err)
}
}
var desiredRunners uint
if w.scaleSet.DesiredRunnerCount > 0 {
desiredRunners = uint(w.scaleSet.DesiredRunnerCount)
}
targetRunners := min(w.scaleSet.MinIdleRunners+desiredRunners, w.scaleSet.MaxRunners)
currentRunners := uint(len(w.runners))
if currentRunners == targetRunners {
lastMsgDebugLog("desired runner count reached", targetRunners, currentRunners)
w.mux.Unlock()
continue
}
if currentRunners < targetRunners {
lastMsgDebugLog("scaling up", targetRunners, currentRunners)
w.handleScaleUp(targetRunners, currentRunners)
} else {
lastMsgDebugLog("attempting to scale down", targetRunners, currentRunners)
w.handleScaleDown(targetRunners, currentRunners)
}
w.mux.Unlock()
}
}
}