Add runner periodic cleanup check

Adds a periodic cleanup function that cross checks runners between github,
the provider and the GARM database. If an inconsistency is found, GARM will
attempt to fix it.

Signed-off-by: Gabriel Adrian Samfira <gsamfira@cloudbasesolutions.com>
This commit is contained in:
Gabriel Adrian Samfira 2025-05-01 13:35:56 +00:00
parent 73340da322
commit 059734f064
12 changed files with 479 additions and 43 deletions

View file

@ -255,9 +255,10 @@ func WithScaleSetInstanceFilter(scaleset params.ScaleSet) dbCommon.PayloadFilter
}
instance, ok := payload.Payload.(params.Instance)
if !ok {
if !ok || instance.ScaleSetID == 0 {
return false
}
return instance.ScaleSetID == scaleset.ID
}
}

View file

@ -33,19 +33,19 @@ var _ Locker = &keyMutex{}
func (k *keyMutex) TryLock(key, identifier string) bool {
mux, _ := k.muxes.LoadOrStore(key, &lockWithIdent{
mux: sync.Mutex{},
ident: identifier,
mux: sync.Mutex{},
})
keyMux := mux.(*lockWithIdent)
keyMux.ident = identifier
return keyMux.mux.TryLock()
}
func (k *keyMutex) Lock(key, identifier string) {
mux, _ := k.muxes.LoadOrStore(key, &lockWithIdent{
mux: sync.Mutex{},
ident: identifier,
mux: sync.Mutex{},
})
keyMux := mux.(*lockWithIdent)
keyMux.ident = identifier
keyMux.mux.Lock()
}
@ -60,6 +60,7 @@ func (k *keyMutex) Unlock(key string, remove bool) {
}
_, filename, line, _ := runtime.Caller(1)
slog.Debug("unlocking", "key", key, "identifier", keyMux.ident, "caller", fmt.Sprintf("%s:%d", filename, line))
keyMux.ident = ""
keyMux.mux.Unlock()
}

View file

@ -421,6 +421,7 @@ func (r RunnerScaleSetMessage) GetJobsFromBody() ([]ScaleSetJobMessage, error) {
type RunnerReference struct {
ID int64 `json:"id"`
Name string `json:"name"`
OS string `json:"os"`
RunnerScaleSetID int `json:"runnerScaleSetId"`
CreatedOn interface{} `json:"createdOn"`
RunnerGroupID uint64 `json:"runnerGroupId"`
@ -431,9 +432,29 @@ type RunnerReference struct {
Status interface{} `json:"status"`
DisableUpdate bool `json:"disableUpdate"`
ProvisioningState string `json:"provisioningState"`
Busy bool `json:"busy"`
Labels []Label `json:"labels,omitempty"`
}
func (r RunnerReference) GetStatus() RunnerStatus {
status, ok := r.Status.(string)
if !ok {
return RunnerUnknown
}
runnerStatus := RunnerStatus(status)
if !runnerStatus.IsValid() {
return RunnerUnknown
}
if runnerStatus == RunnerOnline {
if r.Busy {
return RunnerActive
}
return RunnerIdle
}
return runnerStatus
}
type RunnerScaleSetJitRunnerConfig struct {
Runner *RunnerReference `json:"runner"`
EncodedJITConfig string `json:"encodedJITConfig"`

View file

@ -49,6 +49,18 @@ type (
ScaleSetMessageType string
)
func (s RunnerStatus) IsValid() bool {
switch s {
case RunnerIdle, RunnerPending, RunnerTerminated,
RunnerInstalling, RunnerFailed,
RunnerActive, RunnerOffline,
RunnerUnknown, RunnerOnline:
return true
}
return false
}
const (
// PoolBalancerTypeRoundRobin will try to cycle through the pools of an entity
// in a round robin fashion. For example, if a repository has multiple pools that
@ -117,6 +129,9 @@ const (
RunnerInstalling RunnerStatus = "installing"
RunnerFailed RunnerStatus = "failed"
RunnerActive RunnerStatus = "active"
RunnerOffline RunnerStatus = "offline"
RunnerOnline RunnerStatus = "online"
RunnerUnknown RunnerStatus = "unknown"
)
const (

View file

@ -28,6 +28,10 @@ type GithubEntityOperations interface {
GithubBaseURL() *url.URL
}
type RateLimitClient interface {
RateLimit(ctx context.Context) (*github.RateLimits, error)
}
// GithubClient that describes the minimum list of functions we need to interact with github.
// Allows for easier testing.
//

View file

@ -439,10 +439,14 @@ func (r *basePoolManager) cleanupOrphanedProviderRunners(runners []*github.Runne
// github so we let them be for now.
continue
}
pool, err := r.store.GetEntityPool(r.ctx, r.entity, instance.PoolID)
if err != nil {
return errors.Wrap(err, "fetching instance pool info")
}
switch instance.RunnerStatus {
case params.RunnerPending, params.RunnerInstalling:
if time.Since(instance.UpdatedAt).Minutes() < float64(instance.RunnerTimeout()) {
if time.Since(instance.UpdatedAt).Minutes() < float64(pool.RunnerTimeout()) {
// runner is still installing. We give it a chance to finish.
slog.DebugContext(
r.ctx, "runner is still installing, give it a chance to finish",
@ -510,7 +514,11 @@ func (r *basePoolManager) reapTimedOutRunners(runners []*github.Runner) error {
}
defer locking.Unlock(instance.Name, false)
if time.Since(instance.UpdatedAt).Minutes() < float64(instance.RunnerTimeout()) {
pool, err := r.store.GetEntityPool(r.ctx, r.entity, instance.PoolID)
if err != nil {
return errors.Wrap(err, "fetching instance pool info")
}
if time.Since(instance.UpdatedAt).Minutes() < float64(pool.RunnerTimeout()) {
continue
}

View file

@ -490,6 +490,30 @@ func (g *githubClient) GithubBaseURL() *url.URL {
return g.cli.BaseURL
}
func NewRateLimitClient(ctx context.Context, credentials params.GithubCredentials) (common.RateLimitClient, error) {
httpClient, err := credentials.GetHTTPClient(ctx)
if err != nil {
return nil, errors.Wrap(err, "fetching http client")
}
slog.DebugContext(
ctx, "creating rate limit client",
"base_url", credentials.APIBaseURL,
"upload_url", credentials.UploadBaseURL)
ghClient, err := github.NewClient(httpClient).WithEnterpriseURLs(
credentials.APIBaseURL, credentials.UploadBaseURL)
if err != nil {
return nil, errors.Wrap(err, "fetching github client")
}
cli := &githubClient{
rateLimit: ghClient.RateLimit,
cli: ghClient,
}
return cli, nil
}
func Client(ctx context.Context, entity params.GithubEntity) (common.GithubClient, error) {
// func GithubClient(ctx context.Context, entity params.GithubEntity) (common.GithubClient, error) {
httpClient, err := entity.Credentials.GetHTTPClient(ctx)

View file

@ -141,16 +141,17 @@ func (m *MessageSession) maybeRefreshToken(ctx context.Context) error {
if m.session == nil {
return fmt.Errorf("session is nil")
}
// add some jitter
randInt, err := rand.Int(rand.Reader, big.NewInt(5000))
if err != nil {
return fmt.Errorf("failed to get a random number")
}
expiresAt, err := m.session.ExiresAt()
if err != nil {
return fmt.Errorf("failed to get expires at: %w", err)
}
expiresIn := time.Duration(randInt.Int64())*time.Millisecond + 10*time.Minute
// add some jitter (30 second interval)
randInt, err := rand.Int(rand.Reader, big.NewInt(30))
if err != nil {
return fmt.Errorf("failed to get a random number")
}
expiresIn := time.Duration(randInt.Int64())*time.Second + 10*time.Minute
slog.DebugContext(ctx, "checking if message session token needs refresh", "expires_at", expiresAt)
if m.session.ExpiresIn(expiresIn) {
if err := m.Refresh(ctx); err != nil {

View file

@ -13,32 +13,28 @@ func (w *Worker) handleWorkerWatcherEvent(event dbCommon.ChangePayload) {
entityType := dbCommon.DatabaseEntityType(w.Entity.EntityType)
switch event.EntityType {
case entityType:
entityGetter, ok := event.Payload.(params.EntityGetter)
if !ok {
slog.ErrorContext(w.ctx, "invalid payload for entity type", "entity_type", event.EntityType, "payload", event.Payload)
return
}
entity, err := entityGetter.GetEntity()
if err != nil {
slog.ErrorContext(w.ctx, "getting entity from repository", "entity_type", event.EntityType, "payload", event.Payload, "error", err)
return
}
w.handleEntityEventPayload(entity, event)
w.handleEntityEventPayload(event)
return
case dbCommon.GithubCredentialsEntityType:
slog.DebugContext(w.ctx, "got github credentials payload event")
credentials, ok := event.Payload.(params.GithubCredentials)
if !ok {
slog.ErrorContext(w.ctx, "invalid payload for entity type", "entity_type", event.EntityType, "payload", event.Payload)
return
}
w.handleEntityCredentialsEventPayload(credentials, event)
w.handleEntityCredentialsEventPayload(event)
default:
slog.DebugContext(w.ctx, "invalid entity type; ignoring", "entity_type", event.EntityType)
}
}
func (w *Worker) handleEntityEventPayload(entity params.GithubEntity, event dbCommon.ChangePayload) {
func (w *Worker) handleEntityEventPayload(event dbCommon.ChangePayload) {
entityGetter, ok := event.Payload.(params.EntityGetter)
if !ok {
slog.ErrorContext(w.ctx, "invalid payload for entity type", "entity_type", event.EntityType, "payload", event.Payload)
return
}
entity, err := entityGetter.GetEntity()
if err != nil {
slog.ErrorContext(w.ctx, "getting entity from repository", "entity_type", event.EntityType, "payload", event.Payload, "error", err)
return
}
switch event.Operation {
case dbCommon.UpdateOperation:
slog.DebugContext(w.ctx, "got update operation")
@ -57,7 +53,13 @@ func (w *Worker) handleEntityEventPayload(entity params.GithubEntity, event dbCo
}
}
func (w *Worker) handleEntityCredentialsEventPayload(credentials params.GithubCredentials, event dbCommon.ChangePayload) {
func (w *Worker) handleEntityCredentialsEventPayload(event dbCommon.ChangePayload) {
credentials, ok := event.Payload.(params.GithubCredentials)
if !ok {
slog.ErrorContext(w.ctx, "invalid payload for entity type", "entity_type", event.EntityType, "payload", event.Payload)
return
}
switch event.Operation {
case dbCommon.UpdateOperation:
slog.DebugContext(w.ctx, "got delete operation")

View file

@ -8,6 +8,8 @@ import (
"sync"
"time"
"golang.org/x/sync/errgroup"
runnerErrors "github.com/cloudbase/garm-provider-common/errors"
"github.com/cloudbase/garm/cache"
dbCommon "github.com/cloudbase/garm/database/common"
@ -16,6 +18,14 @@ import (
"github.com/cloudbase/garm/runner/common"
garmUtil "github.com/cloudbase/garm/util"
"github.com/cloudbase/garm/util/github"
"github.com/cloudbase/garm/util/github/scalesets"
)
const (
// These are duplicated until we decide if we move the pool manager to the new
// worker flow.
poolIDLabelprefix = "runner-pool-id:"
controllerLabelPrefix = "runner-controller-id:"
)
func NewController(ctx context.Context, store dbCommon.Store, entity params.GithubEntity, providers map[string]common.Provider) (*Controller, error) {
@ -176,11 +186,88 @@ func (c *Controller) updateTools() error {
return nil
}
// consolidateRunnerState will list all runners on GitHub for this entity, sort by
// pool or scale set and pass those runners to the appropriate worker. The worker will
// then have the responsibility to cross check the runners from github with what it
// knows should be true from the database. Any inconsistency needs to be handled.
// If we have an offline runner in github but no database entry for it, we remove the
// runner from github. If we have a runner that is active in the provider but does not
// exist in github, we remove it from the provider and the database.
func (c *Controller) consolidateRunnerState() error {
scaleSetCli, err := scalesets.NewClient(c.ghCli)
if err != nil {
return fmt.Errorf("creating scaleset client: %w", err)
}
// Client is scoped to the current entity. Only runners in a repo/org/enterprise
// will be listed.
runners, err := scaleSetCli.ListAllRunners(c.ctx)
if err != nil {
return fmt.Errorf("listing runners: %w", err)
}
byPoolID := make(map[string][]params.RunnerReference)
byScaleSetID := make(map[int][]params.RunnerReference)
for _, runner := range runners.RunnerReferences {
if runner.RunnerScaleSetID != 0 {
byScaleSetID[runner.RunnerScaleSetID] = append(byScaleSetID[runner.RunnerScaleSetID], runner)
} else {
poolID := poolIDFromLabels(runner)
if poolID == "" {
continue
}
byPoolID[poolID] = append(byPoolID[poolID], runner)
}
}
g, ctx := errgroup.WithContext(c.ctx)
for _, scaleSet := range c.ScaleSets {
runners := byScaleSetID[scaleSet.scaleSet.ScaleSetID]
g.Go(func() error {
slog.DebugContext(ctx, "consolidating runners for scale set", "scale_set_id", scaleSet.scaleSet.ScaleSetID, "runners", runners)
if err := scaleSet.worker.consolidateRunnerState(runners); err != nil {
return fmt.Errorf("consolidating runners for scale set %d: %w", scaleSet.scaleSet.ScaleSetID, err)
}
return nil
})
}
if err := c.waitForErrorGroupOrContextCancelled(g); err != nil {
return fmt.Errorf("waiting for error group: %w", err)
}
return nil
}
func (c *Controller) waitForErrorGroupOrContextCancelled(g *errgroup.Group) error {
if g == nil {
return nil
}
done := make(chan error, 1)
go func() {
waitErr := g.Wait()
done <- waitErr
}()
select {
case err := <-done:
return err
case <-c.ctx.Done():
return c.ctx.Err()
case <-c.quit:
return nil
}
}
func (c *Controller) loop() {
defer c.Stop()
updateToolsTicker := time.NewTicker(common.PoolToolUpdateInterval)
defer updateToolsTicker.Stop()
consilidateTicker := time.NewTicker(common.PoolReapTimeoutInterval)
defer consilidateTicker.Stop()
initialToolUpdate := make(chan struct{}, 1)
defer close(initialToolUpdate)
go func() {
slog.InfoContext(c.ctx, "running initial tool update")
if err := c.updateTools(); err != nil {
@ -206,8 +293,29 @@ func (c *Controller) loop() {
slog.InfoContext(c.ctx, "update tools ticker closed")
return
}
validCreds := c.forgeCredsAreValid
if err := c.updateTools(); err != nil {
if err := c.store.AddEntityEvent(c.ctx, c.Entity, params.StatusEvent, params.EventError, fmt.Sprintf("failed to update tools: %q", err.Error()), 30); err != nil {
slog.With(slog.Any("error", err)).Error("failed to add entity event")
}
slog.With(slog.Any("error", err)).Error("failed to update tools")
continue
}
if validCreds != c.forgeCredsAreValid && c.forgeCredsAreValid {
if err := c.store.AddEntityEvent(c.ctx, c.Entity, params.StatusEvent, params.EventInfo, "tools updated successfully", 30); err != nil {
slog.With(slog.Any("error", err)).Error("failed to add entity event")
}
}
case _, ok := <-consilidateTicker.C:
if !ok {
slog.InfoContext(c.ctx, "consolidate ticker closed")
return
}
if err := c.consolidateRunnerState(); err != nil {
if err := c.store.AddEntityEvent(c.ctx, c.Entity, params.StatusEvent, params.EventError, fmt.Sprintf("failed to consolidate runner state: %q", err.Error()), 30); err != nil {
slog.With(slog.Any("error", err)).Error("failed to add entity event")
}
slog.With(slog.Any("error", err)).Error("failed to consolidate runner state")
}
case <-c.quit:
return

View file

@ -63,14 +63,6 @@ type Worker struct {
quit chan struct{}
}
func (w *Worker) RunnersAndStatuses() map[string]string {
runners := make(map[string]string)
for _, runner := range w.runners {
runners[runner.Name] = string(runner.Status)
}
return runners
}
func (w *Worker) Stop() error {
slog.DebugContext(w.ctx, "stopping scale set worker", "scale_set", w.consumerID)
w.mux.Lock()
@ -239,6 +231,240 @@ func (w *Worker) Start() (err error) {
return nil
}
func (w *Worker) runnerByName() map[string]params.Instance {
runners := make(map[string]params.Instance)
for _, runner := range w.runners {
runners[runner.Name] = runner
}
return runners
}
func (w *Worker) setRunnerDBStatus(runner string, status commonParams.InstanceStatus) (params.Instance, error) {
updateParams := params.UpdateInstanceParams{
Status: status,
}
newDbInstance, err := w.store.UpdateInstance(w.ctx, runner, updateParams)
if err != nil {
if !errors.Is(err, runnerErrors.ErrNotFound) {
return params.Instance{}, fmt.Errorf("updating runner %s: %w", runner, err)
}
}
return newDbInstance, nil
}
func (w *Worker) removeRunnerFromGithubAndSetPendingDelete(runnerName string, agentID int64) error {
if err := w.scaleSetCli.RemoveRunner(w.ctx, agentID); err != nil {
if !errors.Is(err, runnerErrors.ErrNotFound) {
return fmt.Errorf("removing runner %s: %w", runnerName, err)
}
}
instance, err := w.setRunnerDBStatus(runnerName, commonParams.InstancePendingDelete)
if err != nil {
return fmt.Errorf("updating runner %s: %w", instance.Name, err)
}
w.runners[instance.ID] = instance
return nil
}
func (w *Worker) reapTimedOutRunners(runners map[string]params.RunnerReference) (func(), error) {
lockNames := []string{}
unlockFn := func() {
for _, name := range lockNames {
slog.DebugContext(w.ctx, "unlockFn unlocking runner", "runner_name", name)
locking.Unlock(name, false)
}
}
for _, runner := range w.runners {
if time.Since(runner.UpdatedAt).Minutes() < float64(w.scaleSet.RunnerBootstrapTimeout) {
continue
}
switch runner.Status {
case commonParams.InstancePendingDelete, commonParams.InstancePendingForceDelete,
commonParams.InstanceDeleting, commonParams.InstanceDeleted:
continue
}
if runner.RunnerStatus != params.RunnerPending && runner.RunnerStatus != params.RunnerInstalling {
slog.DebugContext(w.ctx, "runner is not pending or installing; skipping", "runner_name", runner.Name)
continue
}
if ghRunner, ok := runners[runner.Name]; !ok || ghRunner.GetStatus() == params.RunnerOffline {
if ok, err := locking.TryLock(runner.Name, w.consumerID); err != nil || !ok {
slog.DebugContext(w.ctx, "runner is locked; skipping", "runner_name", runner.Name)
continue
}
lockNames = append(lockNames, runner.Name)
slog.InfoContext(
w.ctx, "reaping timed-out/failed runner",
"runner_name", runner.Name)
if err := w.removeRunnerFromGithubAndSetPendingDelete(runner.Name, runner.AgentID); err != nil {
slog.ErrorContext(w.ctx, "error removing runner", "runner_name", runner.Name, "error", err)
unlockFn()
return nil, fmt.Errorf("removing runner %s: %w", runner.Name, err)
}
}
}
return unlockFn, nil
}
func (w *Worker) consolidateRunnerState(runners []params.RunnerReference) error {
w.mux.Lock()
defer w.mux.Unlock()
ghRunnersByName := make(map[string]params.RunnerReference)
for _, runner := range runners {
ghRunnersByName[runner.Name] = runner
}
dbRunnersByName := w.runnerByName()
// Cross check what exists in github with what we have in the database.
for name, runner := range ghRunnersByName {
status := runner.GetStatus()
if _, ok := dbRunnersByName[name]; !ok {
// runner appears to be active. Is it not managed by GARM?
if status != params.RunnerIdle && status != params.RunnerActive {
slog.InfoContext(w.ctx, "runner does not exist in GARM; removing from github", "runner_name", name)
if err := w.scaleSetCli.RemoveRunner(w.ctx, runner.ID); err != nil {
if errors.Is(err, runnerErrors.ErrNotFound) {
continue
}
slog.ErrorContext(w.ctx, "error removing runner", "runner_name", runner.Name, "error", err)
}
}
continue
}
}
unlockFn, err := w.reapTimedOutRunners(ghRunnersByName)
if err != nil {
return fmt.Errorf("reaping timed out runners: %w", err)
}
defer unlockFn()
// refresh the map. It may have been mutated above.
dbRunnersByName = w.runnerByName()
// Cross check what exists in the database with what we have in github.
for name, runner := range dbRunnersByName {
// in the case of scale sets, JIT configs re used. There is no situation
// in which we create a runner in the DB and one does not exist in github.
// We can safely assume that if the runner is not in github anymore, it can
// be removed from the provider and the DB.
switch runner.Status {
case commonParams.InstancePendingDelete, commonParams.InstancePendingForceDelete,
commonParams.InstanceDeleting, commonParams.InstanceDeleted:
continue
}
if _, ok := ghRunnersByName[name]; !ok {
if ok, err := locking.TryLock(name, w.consumerID); err != nil || !ok {
slog.DebugContext(w.ctx, "runner is locked; skipping", "runner_name", name)
continue
}
// unlock the runner only after this function returns. This function also cross
// checks between the provider and the database, and removes left over runners.
// If we unlock early, the provider worker will attempt to remove runners that
// we set in pending_delete. This function holds the mutex, so we won't see those
// changes until we return. So we hold the instance lock here until we are done.
// That way, even if the provider sees the pending_delete status, it won't act on
// it until it manages to lock the instance.
defer locking.Unlock(name, false)
slog.InfoContext(w.ctx, "runner does not exist in github; removing from provider", "runner_name", name)
instance, err := w.setRunnerDBStatus(runner.Name, commonParams.InstancePendingDelete)
if err != nil {
if !errors.Is(err, runnerErrors.ErrNotFound) {
return fmt.Errorf("updating runner %s: %w", instance.Name, err)
}
}
// We will get an update event anyway from the watcher, but updating the runner
// here, will prevent race conditions if some other event is already in the queue
// which involves this runner. For the duration of the lifetime of this function, we
// hold the lock, so no race condition can occur.
w.runners[runner.ID] = instance
}
}
// Cross check what exists in the provider with the DB.
pseudoPoolID, err := w.pseudoPoolID()
if err != nil {
return fmt.Errorf("getting pseudo pool ID: %w", err)
}
listParams := common.ListInstancesParams{
ListInstancesV011: common.ListInstancesV011Params{
ProviderBaseParams: common.ProviderBaseParams{
ControllerInfo: w.controllerInfo,
},
},
}
providerRunners, err := w.provider.ListInstances(w.ctx, pseudoPoolID, listParams)
if err != nil {
return fmt.Errorf("listing instances: %w", err)
}
providerRunnersByName := make(map[string]commonParams.ProviderInstance)
for _, runner := range providerRunners {
providerRunnersByName[runner.Name] = runner
}
deleteInstanceParams := common.DeleteInstanceParams{
DeleteInstanceV011: common.DeleteInstanceV011Params{
ProviderBaseParams: common.ProviderBaseParams{
ControllerInfo: w.controllerInfo,
},
},
}
// refresh the map. It may have been mutated above.
dbRunnersByName = w.runnerByName()
for _, runner := range providerRunners {
if _, ok := dbRunnersByName[runner.Name]; !ok {
slog.InfoContext(w.ctx, "runner does not exist in database; removing from provider", "runner_name", runner.Name)
// There is no situation in which the runner will disappear from the provider
// after it was removed from the database. The provider worker will remove the
// instance from the provider nd mark the instance as deleted in the database.
// It is the responsibility of the scaleset worker to then clean up the runners
// in the deleted state.
// That means that if we have a runner in the provider but not the DB, it is most
// likely an inconsistency.
if err := w.provider.DeleteInstance(w.ctx, runner.Name, deleteInstanceParams); err != nil {
slog.ErrorContext(w.ctx, "error removing instance", "instance_name", runner.Name, "error", err)
}
continue
}
}
for _, runner := range dbRunnersByName {
switch runner.Status {
case commonParams.InstancePendingDelete, commonParams.InstancePendingForceDelete,
commonParams.InstanceDeleting, commonParams.InstanceDeleted:
// This instance is already being deleted.
continue
}
locked, err := locking.TryLock(runner.Name, w.consumerID)
if err != nil || !locked {
slog.DebugContext(w.ctx, "runner is locked; skipping", "runner_name", runner.Name)
continue
}
defer locking.Unlock(runner.Name, false)
if _, ok := providerRunnersByName[runner.Name]; !ok {
// The runner is not in the provider anymore. Remove it from the DB.
slog.InfoContext(w.ctx, "runner does not exist in provider; removing from database", "runner_name", runner.Name)
if err := w.removeRunnerFromGithubAndSetPendingDelete(runner.Name, runner.AgentID); err != nil {
return fmt.Errorf("removing runner %s: %w", runner.Name, err)
}
}
}
return nil
}
func (w *Worker) SetGithubClient(client common.GithubClient) error {
w.mux.Lock()
defer w.mux.Unlock()
@ -256,6 +482,15 @@ func (w *Worker) SetGithubClient(client common.GithubClient) error {
return nil
}
func (w *Worker) pseudoPoolID() (string, error) {
// This is temporary. We need to extend providers to know about scale sets.
entity, err := w.scaleSet.GetEntity()
if err != nil {
return "", fmt.Errorf("getting entity: %w", err)
}
return fmt.Sprintf("%s-%s", w.scaleSet.Name, entity.ID), nil
}
func (w *Worker) handleScaleSetEvent(event dbCommon.ChangePayload) {
scaleSet, ok := event.Payload.(params.ScaleSet)
if !ok {
@ -418,7 +653,10 @@ func (w *Worker) keepListenerAlive() {
w.mux.Unlock()
continue
}
// noop if already started
// noop if already started. If the scaleset was just enabled, we need to
// start the listener here, or the <-w.listener.Wait() channel receive bellow
// will block forever, even if we start the listener, as a nil channel will
// block forever.
w.listener.Start()
w.mux.Unlock()
@ -513,13 +751,15 @@ func (w *Worker) handleScaleUp(target, current uint) {
AgentID: jitConfig.Runner.ID,
}
if _, err := w.store.CreateScaleSetInstance(w.ctx, w.scaleSet.ID, runnerParams); err != nil {
dbInstance, err := w.store.CreateScaleSetInstance(w.ctx, w.scaleSet.ID, runnerParams)
if err != nil {
slog.ErrorContext(w.ctx, "error creating instance", "error", err)
if err := w.scaleSetCli.RemoveRunner(w.ctx, jitConfig.Runner.ID); err != nil {
slog.ErrorContext(w.ctx, "error deleting runner", "error", err)
}
continue
}
w.runners[dbInstance.ID] = dbInstance
_, err = w.scaleSetCli.GetRunner(w.ctx, jitConfig.Runner.ID)
if err != nil {
@ -636,7 +876,7 @@ func (w *Worker) handleAutoScale() {
lastMsg := ""
lastMsgDebugLog := func(msg string, targetRunners, currentRunners uint) {
if lastMsg != msg {
slog.DebugContext(w.ctx, msg, "current_runners", currentRunners, "target_runners", targetRunners, "current_runners", w.RunnersAndStatuses())
slog.DebugContext(w.ctx, msg, "current_runners", currentRunners, "target_runners", targetRunners)
lastMsg = msg
}
}

View file

@ -1,6 +1,8 @@
package scaleset
import (
"strings"
dbCommon "github.com/cloudbase/garm/database/common"
"github.com/cloudbase/garm/database/watcher"
"github.com/cloudbase/garm/params"
@ -26,3 +28,12 @@ func composeControllerWatcherFilters(entity params.GithubEntity) dbCommon.Payloa
),
)
}
func poolIDFromLabels(runner params.RunnerReference) string {
for _, lbl := range runner.Labels {
if strings.HasPrefix(lbl.Name, poolIDLabelprefix) {
return lbl.Name[len(poolIDLabelprefix):]
}
}
return ""
}