diff --git a/database/watcher/filters.go b/database/watcher/filters.go index 251a6bc6..dfcd54bb 100644 --- a/database/watcher/filters.go +++ b/database/watcher/filters.go @@ -255,9 +255,10 @@ func WithScaleSetInstanceFilter(scaleset params.ScaleSet) dbCommon.PayloadFilter } instance, ok := payload.Payload.(params.Instance) - if !ok { + if !ok || instance.ScaleSetID == 0 { return false } + return instance.ScaleSetID == scaleset.ID } } diff --git a/locking/local_locker.go b/locking/local_locker.go index 270138ef..aeae610f 100644 --- a/locking/local_locker.go +++ b/locking/local_locker.go @@ -33,19 +33,19 @@ var _ Locker = &keyMutex{} func (k *keyMutex) TryLock(key, identifier string) bool { mux, _ := k.muxes.LoadOrStore(key, &lockWithIdent{ - mux: sync.Mutex{}, - ident: identifier, + mux: sync.Mutex{}, }) keyMux := mux.(*lockWithIdent) + keyMux.ident = identifier return keyMux.mux.TryLock() } func (k *keyMutex) Lock(key, identifier string) { mux, _ := k.muxes.LoadOrStore(key, &lockWithIdent{ - mux: sync.Mutex{}, - ident: identifier, + mux: sync.Mutex{}, }) keyMux := mux.(*lockWithIdent) + keyMux.ident = identifier keyMux.mux.Lock() } @@ -60,6 +60,7 @@ func (k *keyMutex) Unlock(key string, remove bool) { } _, filename, line, _ := runtime.Caller(1) slog.Debug("unlocking", "key", key, "identifier", keyMux.ident, "caller", fmt.Sprintf("%s:%d", filename, line)) + keyMux.ident = "" keyMux.mux.Unlock() } diff --git a/params/github.go b/params/github.go index 4b37b83b..888288fc 100644 --- a/params/github.go +++ b/params/github.go @@ -421,6 +421,7 @@ func (r RunnerScaleSetMessage) GetJobsFromBody() ([]ScaleSetJobMessage, error) { type RunnerReference struct { ID int64 `json:"id"` Name string `json:"name"` + OS string `json:"os"` RunnerScaleSetID int `json:"runnerScaleSetId"` CreatedOn interface{} `json:"createdOn"` RunnerGroupID uint64 `json:"runnerGroupId"` @@ -431,9 +432,29 @@ type RunnerReference struct { Status interface{} `json:"status"` DisableUpdate bool `json:"disableUpdate"` ProvisioningState string `json:"provisioningState"` + Busy bool `json:"busy"` Labels []Label `json:"labels,omitempty"` } +func (r RunnerReference) GetStatus() RunnerStatus { + status, ok := r.Status.(string) + if !ok { + return RunnerUnknown + } + runnerStatus := RunnerStatus(status) + if !runnerStatus.IsValid() { + return RunnerUnknown + } + + if runnerStatus == RunnerOnline { + if r.Busy { + return RunnerActive + } + return RunnerIdle + } + return runnerStatus +} + type RunnerScaleSetJitRunnerConfig struct { Runner *RunnerReference `json:"runner"` EncodedJITConfig string `json:"encodedJITConfig"` diff --git a/params/params.go b/params/params.go index 69ec179c..5653e386 100644 --- a/params/params.go +++ b/params/params.go @@ -49,6 +49,18 @@ type ( ScaleSetMessageType string ) +func (s RunnerStatus) IsValid() bool { + switch s { + case RunnerIdle, RunnerPending, RunnerTerminated, + RunnerInstalling, RunnerFailed, + RunnerActive, RunnerOffline, + RunnerUnknown, RunnerOnline: + + return true + } + return false +} + const ( // PoolBalancerTypeRoundRobin will try to cycle through the pools of an entity // in a round robin fashion. For example, if a repository has multiple pools that @@ -117,6 +129,9 @@ const ( RunnerInstalling RunnerStatus = "installing" RunnerFailed RunnerStatus = "failed" RunnerActive RunnerStatus = "active" + RunnerOffline RunnerStatus = "offline" + RunnerOnline RunnerStatus = "online" + RunnerUnknown RunnerStatus = "unknown" ) const ( diff --git a/runner/common/util.go b/runner/common/util.go index 06751aa3..55e8fb00 100644 --- a/runner/common/util.go +++ b/runner/common/util.go @@ -28,6 +28,10 @@ type GithubEntityOperations interface { GithubBaseURL() *url.URL } +type RateLimitClient interface { + RateLimit(ctx context.Context) (*github.RateLimits, error) +} + // GithubClient that describes the minimum list of functions we need to interact with github. // Allows for easier testing. // diff --git a/runner/pool/pool.go b/runner/pool/pool.go index 0173b3fc..1f2e96ec 100644 --- a/runner/pool/pool.go +++ b/runner/pool/pool.go @@ -439,10 +439,14 @@ func (r *basePoolManager) cleanupOrphanedProviderRunners(runners []*github.Runne // github so we let them be for now. continue } + pool, err := r.store.GetEntityPool(r.ctx, r.entity, instance.PoolID) + if err != nil { + return errors.Wrap(err, "fetching instance pool info") + } switch instance.RunnerStatus { case params.RunnerPending, params.RunnerInstalling: - if time.Since(instance.UpdatedAt).Minutes() < float64(instance.RunnerTimeout()) { + if time.Since(instance.UpdatedAt).Minutes() < float64(pool.RunnerTimeout()) { // runner is still installing. We give it a chance to finish. slog.DebugContext( r.ctx, "runner is still installing, give it a chance to finish", @@ -510,7 +514,11 @@ func (r *basePoolManager) reapTimedOutRunners(runners []*github.Runner) error { } defer locking.Unlock(instance.Name, false) - if time.Since(instance.UpdatedAt).Minutes() < float64(instance.RunnerTimeout()) { + pool, err := r.store.GetEntityPool(r.ctx, r.entity, instance.PoolID) + if err != nil { + return errors.Wrap(err, "fetching instance pool info") + } + if time.Since(instance.UpdatedAt).Minutes() < float64(pool.RunnerTimeout()) { continue } diff --git a/util/github/client.go b/util/github/client.go index fcd661fa..77803f4f 100644 --- a/util/github/client.go +++ b/util/github/client.go @@ -490,6 +490,30 @@ func (g *githubClient) GithubBaseURL() *url.URL { return g.cli.BaseURL } +func NewRateLimitClient(ctx context.Context, credentials params.GithubCredentials) (common.RateLimitClient, error) { + httpClient, err := credentials.GetHTTPClient(ctx) + if err != nil { + return nil, errors.Wrap(err, "fetching http client") + } + + slog.DebugContext( + ctx, "creating rate limit client", + "base_url", credentials.APIBaseURL, + "upload_url", credentials.UploadBaseURL) + + ghClient, err := github.NewClient(httpClient).WithEnterpriseURLs( + credentials.APIBaseURL, credentials.UploadBaseURL) + if err != nil { + return nil, errors.Wrap(err, "fetching github client") + } + cli := &githubClient{ + rateLimit: ghClient.RateLimit, + cli: ghClient, + } + + return cli, nil +} + func Client(ctx context.Context, entity params.GithubEntity) (common.GithubClient, error) { // func GithubClient(ctx context.Context, entity params.GithubEntity) (common.GithubClient, error) { httpClient, err := entity.Credentials.GetHTTPClient(ctx) diff --git a/util/github/scalesets/message_sessions.go b/util/github/scalesets/message_sessions.go index efd684d4..79d5c26e 100644 --- a/util/github/scalesets/message_sessions.go +++ b/util/github/scalesets/message_sessions.go @@ -141,16 +141,17 @@ func (m *MessageSession) maybeRefreshToken(ctx context.Context) error { if m.session == nil { return fmt.Errorf("session is nil") } - // add some jitter - randInt, err := rand.Int(rand.Reader, big.NewInt(5000)) - if err != nil { - return fmt.Errorf("failed to get a random number") - } + expiresAt, err := m.session.ExiresAt() if err != nil { return fmt.Errorf("failed to get expires at: %w", err) } - expiresIn := time.Duration(randInt.Int64())*time.Millisecond + 10*time.Minute + // add some jitter (30 second interval) + randInt, err := rand.Int(rand.Reader, big.NewInt(30)) + if err != nil { + return fmt.Errorf("failed to get a random number") + } + expiresIn := time.Duration(randInt.Int64())*time.Second + 10*time.Minute slog.DebugContext(ctx, "checking if message session token needs refresh", "expires_at", expiresAt) if m.session.ExpiresIn(expiresIn) { if err := m.Refresh(ctx); err != nil { diff --git a/workers/entity/worker_watcher.go b/workers/entity/worker_watcher.go index 7e00112e..4ce83ddf 100644 --- a/workers/entity/worker_watcher.go +++ b/workers/entity/worker_watcher.go @@ -13,32 +13,28 @@ func (w *Worker) handleWorkerWatcherEvent(event dbCommon.ChangePayload) { entityType := dbCommon.DatabaseEntityType(w.Entity.EntityType) switch event.EntityType { case entityType: - entityGetter, ok := event.Payload.(params.EntityGetter) - if !ok { - slog.ErrorContext(w.ctx, "invalid payload for entity type", "entity_type", event.EntityType, "payload", event.Payload) - return - } - entity, err := entityGetter.GetEntity() - if err != nil { - slog.ErrorContext(w.ctx, "getting entity from repository", "entity_type", event.EntityType, "payload", event.Payload, "error", err) - return - } - w.handleEntityEventPayload(entity, event) + w.handleEntityEventPayload(event) return case dbCommon.GithubCredentialsEntityType: slog.DebugContext(w.ctx, "got github credentials payload event") - credentials, ok := event.Payload.(params.GithubCredentials) - if !ok { - slog.ErrorContext(w.ctx, "invalid payload for entity type", "entity_type", event.EntityType, "payload", event.Payload) - return - } - w.handleEntityCredentialsEventPayload(credentials, event) + w.handleEntityCredentialsEventPayload(event) default: slog.DebugContext(w.ctx, "invalid entity type; ignoring", "entity_type", event.EntityType) } } -func (w *Worker) handleEntityEventPayload(entity params.GithubEntity, event dbCommon.ChangePayload) { +func (w *Worker) handleEntityEventPayload(event dbCommon.ChangePayload) { + entityGetter, ok := event.Payload.(params.EntityGetter) + if !ok { + slog.ErrorContext(w.ctx, "invalid payload for entity type", "entity_type", event.EntityType, "payload", event.Payload) + return + } + entity, err := entityGetter.GetEntity() + if err != nil { + slog.ErrorContext(w.ctx, "getting entity from repository", "entity_type", event.EntityType, "payload", event.Payload, "error", err) + return + } + switch event.Operation { case dbCommon.UpdateOperation: slog.DebugContext(w.ctx, "got update operation") @@ -57,7 +53,13 @@ func (w *Worker) handleEntityEventPayload(entity params.GithubEntity, event dbCo } } -func (w *Worker) handleEntityCredentialsEventPayload(credentials params.GithubCredentials, event dbCommon.ChangePayload) { +func (w *Worker) handleEntityCredentialsEventPayload(event dbCommon.ChangePayload) { + credentials, ok := event.Payload.(params.GithubCredentials) + if !ok { + slog.ErrorContext(w.ctx, "invalid payload for entity type", "entity_type", event.EntityType, "payload", event.Payload) + return + } + switch event.Operation { case dbCommon.UpdateOperation: slog.DebugContext(w.ctx, "got delete operation") diff --git a/workers/scaleset/controller.go b/workers/scaleset/controller.go index 02528656..b6d61f54 100644 --- a/workers/scaleset/controller.go +++ b/workers/scaleset/controller.go @@ -8,6 +8,8 @@ import ( "sync" "time" + "golang.org/x/sync/errgroup" + runnerErrors "github.com/cloudbase/garm-provider-common/errors" "github.com/cloudbase/garm/cache" dbCommon "github.com/cloudbase/garm/database/common" @@ -16,6 +18,14 @@ import ( "github.com/cloudbase/garm/runner/common" garmUtil "github.com/cloudbase/garm/util" "github.com/cloudbase/garm/util/github" + "github.com/cloudbase/garm/util/github/scalesets" +) + +const ( + // These are duplicated until we decide if we move the pool manager to the new + // worker flow. + poolIDLabelprefix = "runner-pool-id:" + controllerLabelPrefix = "runner-controller-id:" ) func NewController(ctx context.Context, store dbCommon.Store, entity params.GithubEntity, providers map[string]common.Provider) (*Controller, error) { @@ -176,11 +186,88 @@ func (c *Controller) updateTools() error { return nil } +// consolidateRunnerState will list all runners on GitHub for this entity, sort by +// pool or scale set and pass those runners to the appropriate worker. The worker will +// then have the responsibility to cross check the runners from github with what it +// knows should be true from the database. Any inconsistency needs to be handled. +// If we have an offline runner in github but no database entry for it, we remove the +// runner from github. If we have a runner that is active in the provider but does not +// exist in github, we remove it from the provider and the database. +func (c *Controller) consolidateRunnerState() error { + scaleSetCli, err := scalesets.NewClient(c.ghCli) + if err != nil { + return fmt.Errorf("creating scaleset client: %w", err) + } + // Client is scoped to the current entity. Only runners in a repo/org/enterprise + // will be listed. + runners, err := scaleSetCli.ListAllRunners(c.ctx) + if err != nil { + return fmt.Errorf("listing runners: %w", err) + } + + byPoolID := make(map[string][]params.RunnerReference) + byScaleSetID := make(map[int][]params.RunnerReference) + for _, runner := range runners.RunnerReferences { + if runner.RunnerScaleSetID != 0 { + byScaleSetID[runner.RunnerScaleSetID] = append(byScaleSetID[runner.RunnerScaleSetID], runner) + } else { + poolID := poolIDFromLabels(runner) + if poolID == "" { + continue + } + byPoolID[poolID] = append(byPoolID[poolID], runner) + } + } + + g, ctx := errgroup.WithContext(c.ctx) + for _, scaleSet := range c.ScaleSets { + runners := byScaleSetID[scaleSet.scaleSet.ScaleSetID] + g.Go(func() error { + slog.DebugContext(ctx, "consolidating runners for scale set", "scale_set_id", scaleSet.scaleSet.ScaleSetID, "runners", runners) + if err := scaleSet.worker.consolidateRunnerState(runners); err != nil { + return fmt.Errorf("consolidating runners for scale set %d: %w", scaleSet.scaleSet.ScaleSetID, err) + } + return nil + }) + } + if err := c.waitForErrorGroupOrContextCancelled(g); err != nil { + return fmt.Errorf("waiting for error group: %w", err) + } + return nil +} + +func (c *Controller) waitForErrorGroupOrContextCancelled(g *errgroup.Group) error { + if g == nil { + return nil + } + + done := make(chan error, 1) + go func() { + waitErr := g.Wait() + done <- waitErr + }() + + select { + case err := <-done: + return err + case <-c.ctx.Done(): + return c.ctx.Err() + case <-c.quit: + return nil + } +} + func (c *Controller) loop() { defer c.Stop() updateToolsTicker := time.NewTicker(common.PoolToolUpdateInterval) + defer updateToolsTicker.Stop() + + consilidateTicker := time.NewTicker(common.PoolReapTimeoutInterval) + defer consilidateTicker.Stop() + initialToolUpdate := make(chan struct{}, 1) defer close(initialToolUpdate) + go func() { slog.InfoContext(c.ctx, "running initial tool update") if err := c.updateTools(); err != nil { @@ -206,8 +293,29 @@ func (c *Controller) loop() { slog.InfoContext(c.ctx, "update tools ticker closed") return } + validCreds := c.forgeCredsAreValid if err := c.updateTools(); err != nil { + if err := c.store.AddEntityEvent(c.ctx, c.Entity, params.StatusEvent, params.EventError, fmt.Sprintf("failed to update tools: %q", err.Error()), 30); err != nil { + slog.With(slog.Any("error", err)).Error("failed to add entity event") + } slog.With(slog.Any("error", err)).Error("failed to update tools") + continue + } + if validCreds != c.forgeCredsAreValid && c.forgeCredsAreValid { + if err := c.store.AddEntityEvent(c.ctx, c.Entity, params.StatusEvent, params.EventInfo, "tools updated successfully", 30); err != nil { + slog.With(slog.Any("error", err)).Error("failed to add entity event") + } + } + case _, ok := <-consilidateTicker.C: + if !ok { + slog.InfoContext(c.ctx, "consolidate ticker closed") + return + } + if err := c.consolidateRunnerState(); err != nil { + if err := c.store.AddEntityEvent(c.ctx, c.Entity, params.StatusEvent, params.EventError, fmt.Sprintf("failed to consolidate runner state: %q", err.Error()), 30); err != nil { + slog.With(slog.Any("error", err)).Error("failed to add entity event") + } + slog.With(slog.Any("error", err)).Error("failed to consolidate runner state") } case <-c.quit: return diff --git a/workers/scaleset/scaleset.go b/workers/scaleset/scaleset.go index a4b690ef..f740f051 100644 --- a/workers/scaleset/scaleset.go +++ b/workers/scaleset/scaleset.go @@ -63,14 +63,6 @@ type Worker struct { quit chan struct{} } -func (w *Worker) RunnersAndStatuses() map[string]string { - runners := make(map[string]string) - for _, runner := range w.runners { - runners[runner.Name] = string(runner.Status) - } - return runners -} - func (w *Worker) Stop() error { slog.DebugContext(w.ctx, "stopping scale set worker", "scale_set", w.consumerID) w.mux.Lock() @@ -239,6 +231,240 @@ func (w *Worker) Start() (err error) { return nil } +func (w *Worker) runnerByName() map[string]params.Instance { + runners := make(map[string]params.Instance) + for _, runner := range w.runners { + runners[runner.Name] = runner + } + return runners +} + +func (w *Worker) setRunnerDBStatus(runner string, status commonParams.InstanceStatus) (params.Instance, error) { + updateParams := params.UpdateInstanceParams{ + Status: status, + } + newDbInstance, err := w.store.UpdateInstance(w.ctx, runner, updateParams) + if err != nil { + if !errors.Is(err, runnerErrors.ErrNotFound) { + return params.Instance{}, fmt.Errorf("updating runner %s: %w", runner, err) + } + } + return newDbInstance, nil +} + +func (w *Worker) removeRunnerFromGithubAndSetPendingDelete(runnerName string, agentID int64) error { + if err := w.scaleSetCli.RemoveRunner(w.ctx, agentID); err != nil { + if !errors.Is(err, runnerErrors.ErrNotFound) { + return fmt.Errorf("removing runner %s: %w", runnerName, err) + } + } + instance, err := w.setRunnerDBStatus(runnerName, commonParams.InstancePendingDelete) + if err != nil { + return fmt.Errorf("updating runner %s: %w", instance.Name, err) + } + w.runners[instance.ID] = instance + return nil +} + +func (w *Worker) reapTimedOutRunners(runners map[string]params.RunnerReference) (func(), error) { + lockNames := []string{} + + unlockFn := func() { + for _, name := range lockNames { + slog.DebugContext(w.ctx, "unlockFn unlocking runner", "runner_name", name) + locking.Unlock(name, false) + } + } + + for _, runner := range w.runners { + if time.Since(runner.UpdatedAt).Minutes() < float64(w.scaleSet.RunnerBootstrapTimeout) { + continue + } + switch runner.Status { + case commonParams.InstancePendingDelete, commonParams.InstancePendingForceDelete, + commonParams.InstanceDeleting, commonParams.InstanceDeleted: + continue + } + + if runner.RunnerStatus != params.RunnerPending && runner.RunnerStatus != params.RunnerInstalling { + slog.DebugContext(w.ctx, "runner is not pending or installing; skipping", "runner_name", runner.Name) + continue + } + if ghRunner, ok := runners[runner.Name]; !ok || ghRunner.GetStatus() == params.RunnerOffline { + if ok, err := locking.TryLock(runner.Name, w.consumerID); err != nil || !ok { + slog.DebugContext(w.ctx, "runner is locked; skipping", "runner_name", runner.Name) + continue + } + lockNames = append(lockNames, runner.Name) + + slog.InfoContext( + w.ctx, "reaping timed-out/failed runner", + "runner_name", runner.Name) + + if err := w.removeRunnerFromGithubAndSetPendingDelete(runner.Name, runner.AgentID); err != nil { + slog.ErrorContext(w.ctx, "error removing runner", "runner_name", runner.Name, "error", err) + unlockFn() + return nil, fmt.Errorf("removing runner %s: %w", runner.Name, err) + } + } + } + return unlockFn, nil +} + +func (w *Worker) consolidateRunnerState(runners []params.RunnerReference) error { + w.mux.Lock() + defer w.mux.Unlock() + + ghRunnersByName := make(map[string]params.RunnerReference) + for _, runner := range runners { + ghRunnersByName[runner.Name] = runner + } + + dbRunnersByName := w.runnerByName() + // Cross check what exists in github with what we have in the database. + for name, runner := range ghRunnersByName { + status := runner.GetStatus() + if _, ok := dbRunnersByName[name]; !ok { + // runner appears to be active. Is it not managed by GARM? + if status != params.RunnerIdle && status != params.RunnerActive { + slog.InfoContext(w.ctx, "runner does not exist in GARM; removing from github", "runner_name", name) + if err := w.scaleSetCli.RemoveRunner(w.ctx, runner.ID); err != nil { + if errors.Is(err, runnerErrors.ErrNotFound) { + continue + } + slog.ErrorContext(w.ctx, "error removing runner", "runner_name", runner.Name, "error", err) + } + } + continue + } + } + + unlockFn, err := w.reapTimedOutRunners(ghRunnersByName) + if err != nil { + return fmt.Errorf("reaping timed out runners: %w", err) + } + defer unlockFn() + + // refresh the map. It may have been mutated above. + dbRunnersByName = w.runnerByName() + // Cross check what exists in the database with what we have in github. + for name, runner := range dbRunnersByName { + // in the case of scale sets, JIT configs re used. There is no situation + // in which we create a runner in the DB and one does not exist in github. + // We can safely assume that if the runner is not in github anymore, it can + // be removed from the provider and the DB. + switch runner.Status { + case commonParams.InstancePendingDelete, commonParams.InstancePendingForceDelete, + commonParams.InstanceDeleting, commonParams.InstanceDeleted: + continue + } + + if _, ok := ghRunnersByName[name]; !ok { + if ok, err := locking.TryLock(name, w.consumerID); err != nil || !ok { + slog.DebugContext(w.ctx, "runner is locked; skipping", "runner_name", name) + continue + } + // unlock the runner only after this function returns. This function also cross + // checks between the provider and the database, and removes left over runners. + // If we unlock early, the provider worker will attempt to remove runners that + // we set in pending_delete. This function holds the mutex, so we won't see those + // changes until we return. So we hold the instance lock here until we are done. + // That way, even if the provider sees the pending_delete status, it won't act on + // it until it manages to lock the instance. + defer locking.Unlock(name, false) + + slog.InfoContext(w.ctx, "runner does not exist in github; removing from provider", "runner_name", name) + instance, err := w.setRunnerDBStatus(runner.Name, commonParams.InstancePendingDelete) + if err != nil { + if !errors.Is(err, runnerErrors.ErrNotFound) { + return fmt.Errorf("updating runner %s: %w", instance.Name, err) + } + } + // We will get an update event anyway from the watcher, but updating the runner + // here, will prevent race conditions if some other event is already in the queue + // which involves this runner. For the duration of the lifetime of this function, we + // hold the lock, so no race condition can occur. + w.runners[runner.ID] = instance + } + } + + // Cross check what exists in the provider with the DB. + pseudoPoolID, err := w.pseudoPoolID() + if err != nil { + return fmt.Errorf("getting pseudo pool ID: %w", err) + } + listParams := common.ListInstancesParams{ + ListInstancesV011: common.ListInstancesV011Params{ + ProviderBaseParams: common.ProviderBaseParams{ + ControllerInfo: w.controllerInfo, + }, + }, + } + + providerRunners, err := w.provider.ListInstances(w.ctx, pseudoPoolID, listParams) + if err != nil { + return fmt.Errorf("listing instances: %w", err) + } + + providerRunnersByName := make(map[string]commonParams.ProviderInstance) + for _, runner := range providerRunners { + providerRunnersByName[runner.Name] = runner + } + + deleteInstanceParams := common.DeleteInstanceParams{ + DeleteInstanceV011: common.DeleteInstanceV011Params{ + ProviderBaseParams: common.ProviderBaseParams{ + ControllerInfo: w.controllerInfo, + }, + }, + } + + // refresh the map. It may have been mutated above. + dbRunnersByName = w.runnerByName() + for _, runner := range providerRunners { + if _, ok := dbRunnersByName[runner.Name]; !ok { + slog.InfoContext(w.ctx, "runner does not exist in database; removing from provider", "runner_name", runner.Name) + // There is no situation in which the runner will disappear from the provider + // after it was removed from the database. The provider worker will remove the + // instance from the provider nd mark the instance as deleted in the database. + // It is the responsibility of the scaleset worker to then clean up the runners + // in the deleted state. + // That means that if we have a runner in the provider but not the DB, it is most + // likely an inconsistency. + if err := w.provider.DeleteInstance(w.ctx, runner.Name, deleteInstanceParams); err != nil { + slog.ErrorContext(w.ctx, "error removing instance", "instance_name", runner.Name, "error", err) + } + continue + } + } + + for _, runner := range dbRunnersByName { + switch runner.Status { + case commonParams.InstancePendingDelete, commonParams.InstancePendingForceDelete, + commonParams.InstanceDeleting, commonParams.InstanceDeleted: + // This instance is already being deleted. + continue + } + + locked, err := locking.TryLock(runner.Name, w.consumerID) + if err != nil || !locked { + slog.DebugContext(w.ctx, "runner is locked; skipping", "runner_name", runner.Name) + continue + } + defer locking.Unlock(runner.Name, false) + + if _, ok := providerRunnersByName[runner.Name]; !ok { + // The runner is not in the provider anymore. Remove it from the DB. + slog.InfoContext(w.ctx, "runner does not exist in provider; removing from database", "runner_name", runner.Name) + if err := w.removeRunnerFromGithubAndSetPendingDelete(runner.Name, runner.AgentID); err != nil { + return fmt.Errorf("removing runner %s: %w", runner.Name, err) + } + } + } + + return nil +} + func (w *Worker) SetGithubClient(client common.GithubClient) error { w.mux.Lock() defer w.mux.Unlock() @@ -256,6 +482,15 @@ func (w *Worker) SetGithubClient(client common.GithubClient) error { return nil } +func (w *Worker) pseudoPoolID() (string, error) { + // This is temporary. We need to extend providers to know about scale sets. + entity, err := w.scaleSet.GetEntity() + if err != nil { + return "", fmt.Errorf("getting entity: %w", err) + } + return fmt.Sprintf("%s-%s", w.scaleSet.Name, entity.ID), nil +} + func (w *Worker) handleScaleSetEvent(event dbCommon.ChangePayload) { scaleSet, ok := event.Payload.(params.ScaleSet) if !ok { @@ -418,7 +653,10 @@ func (w *Worker) keepListenerAlive() { w.mux.Unlock() continue } - // noop if already started + // noop if already started. If the scaleset was just enabled, we need to + // start the listener here, or the <-w.listener.Wait() channel receive bellow + // will block forever, even if we start the listener, as a nil channel will + // block forever. w.listener.Start() w.mux.Unlock() @@ -513,13 +751,15 @@ func (w *Worker) handleScaleUp(target, current uint) { AgentID: jitConfig.Runner.ID, } - if _, err := w.store.CreateScaleSetInstance(w.ctx, w.scaleSet.ID, runnerParams); err != nil { + dbInstance, err := w.store.CreateScaleSetInstance(w.ctx, w.scaleSet.ID, runnerParams) + if err != nil { slog.ErrorContext(w.ctx, "error creating instance", "error", err) if err := w.scaleSetCli.RemoveRunner(w.ctx, jitConfig.Runner.ID); err != nil { slog.ErrorContext(w.ctx, "error deleting runner", "error", err) } continue } + w.runners[dbInstance.ID] = dbInstance _, err = w.scaleSetCli.GetRunner(w.ctx, jitConfig.Runner.ID) if err != nil { @@ -636,7 +876,7 @@ func (w *Worker) handleAutoScale() { lastMsg := "" lastMsgDebugLog := func(msg string, targetRunners, currentRunners uint) { if lastMsg != msg { - slog.DebugContext(w.ctx, msg, "current_runners", currentRunners, "target_runners", targetRunners, "current_runners", w.RunnersAndStatuses()) + slog.DebugContext(w.ctx, msg, "current_runners", currentRunners, "target_runners", targetRunners) lastMsg = msg } } diff --git a/workers/scaleset/util.go b/workers/scaleset/util.go index a594f88c..aa3156c7 100644 --- a/workers/scaleset/util.go +++ b/workers/scaleset/util.go @@ -1,6 +1,8 @@ package scaleset import ( + "strings" + dbCommon "github.com/cloudbase/garm/database/common" "github.com/cloudbase/garm/database/watcher" "github.com/cloudbase/garm/params" @@ -26,3 +28,12 @@ func composeControllerWatcherFilters(entity params.GithubEntity) dbCommon.Payloa ), ) } + +func poolIDFromLabels(runner params.RunnerReference) string { + for _, lbl := range runner.Labels { + if strings.HasPrefix(lbl.Name, poolIDLabelprefix) { + return lbl.Name[len(poolIDLabelprefix):] + } + } + return "" +}