diff --git a/params/params.go b/params/params.go index 64a53984..43d1dd92 100644 --- a/params/params.go +++ b/params/params.go @@ -381,7 +381,7 @@ func (p Pool) MaxRunnersAsInt() int { return int(p.MaxRunners) } -func (p Pool) GithubEntity() (GithubEntity, error) { +func (p Pool) GetEntity() (GithubEntity, error) { switch p.PoolType() { case GithubEntityTypeRepository: return GithubEntity{ @@ -489,7 +489,7 @@ type ScaleSet struct { LastMessageID int64 `json:"-"` } -func (p ScaleSet) GithubEntity() (GithubEntity, error) { +func (p ScaleSet) GetEntity() (GithubEntity, error) { switch p.ScaleSetType() { case GithubEntityTypeRepository: return GithubEntity{ diff --git a/runner/common/pool.go b/runner/common/pool.go index 68a7ddf0..18f46a9d 100644 --- a/runner/common/pool.go +++ b/runner/common/pool.go @@ -54,13 +54,6 @@ type PoolManager interface { // for it and call this function with the WorkflowJob as a parameter. HandleWorkflowJob(job params.WorkflowJob) error - // DeleteRunner will attempt to remove a runner from the pool. If forceRemove is true, any error - // received from the provider will be ignored and we will proceed to remove the runner from the database. - // An error received while attempting to remove from GitHub (other than 404) will still stop the deletion - // process. This can happen if the runner is already processing a job. At which point, you can simply cancel - // the job in github. Doing so will prompt GARM to reap the runner automatically. - DeleteRunner(runner params.Instance, forceRemove, bypassGHUnauthorizedError bool) error - // InstallWebhook will create a webhook in github for the entity associated with this pool manager. InstallWebhook(ctx context.Context, param params.InstallWebhookParams) (params.HookInfo, error) // GetWebhookInfo will return information about the webhook installed in github for the entity associated @@ -74,6 +67,8 @@ type PoolManager interface { // may use internal or self signed certificates. RootCABundle() (params.CertificateBundle, error) + SetPoolRunningState(isRunning bool, failureReason string) + // Start will start the pool manager and all associated workers. Start() error // Stop will stop the pool manager and all associated workers. diff --git a/runner/common/util.go b/runner/common/util.go index ee5110e1..7dbec688 100644 --- a/runner/common/util.go +++ b/runner/common/util.go @@ -17,7 +17,7 @@ type GithubEntityOperations interface { PingEntityHook(ctx context.Context, id int64) (ret *github.Response, err error) ListEntityRunners(ctx context.Context, opts *github.ListOptions) (*github.Runners, *github.Response, error) ListEntityRunnerApplicationDownloads(ctx context.Context) ([]*github.RunnerApplicationDownload, *github.Response, error) - RemoveEntityRunner(ctx context.Context, runnerID int64) (*github.Response, error) + RemoveEntityRunner(ctx context.Context, runnerID int64) error CreateEntityRegistrationToken(ctx context.Context) (*github.RegistrationToken, *github.Response, error) GetEntityJITConfig(ctx context.Context, instance string, pool params.Pool, labels []string) (jitConfigMap map[string]string, runner *github.Runner, err error) diff --git a/runner/metadata.go b/runner/metadata.go index 0be41fc7..1d75fba4 100644 --- a/runner/metadata.go +++ b/runner/metadata.go @@ -66,7 +66,7 @@ func (r *Runner) GetRunnerServiceName(ctx context.Context) (string, error) { "pool_id", instance.PoolID) return "", errors.Wrap(err, "fetching pool") } - entity, err = pool.GithubEntity() + entity, err = pool.GetEntity() if err != nil { slog.With(slog.Any("error", err)).ErrorContext( ctx, "failed to get pool entity", @@ -81,7 +81,7 @@ func (r *Runner) GetRunnerServiceName(ctx context.Context) (string, error) { "scale_set_id", instance.ScaleSetID) return "", errors.Wrap(err, "fetching scale set") } - entity, err = scaleSet.GithubEntity() + entity, err = scaleSet.GetEntity() if err != nil { slog.With(slog.Any("error", err)).ErrorContext( ctx, "failed to get scale set entity", diff --git a/runner/pool/pool.go b/runner/pool/pool.go index 88be9e97..f17ba15f 100644 --- a/runner/pool/pool.go +++ b/runner/pool/pool.go @@ -349,7 +349,7 @@ func (r *basePoolManager) startLoopForFunction(f func() error, interval time.Dur r.ctx, "error in loop", "loop_name", name) if errors.Is(err, runnerErrors.ErrUnauthorized) { - r.setPoolRunningState(false, err.Error()) + r.SetPoolRunningState(false, err.Error()) } } case <-r.ctx.Done(): @@ -380,7 +380,7 @@ func (r *basePoolManager) updateTools() error { if err != nil { slog.With(slog.Any("error", err)).ErrorContext( r.ctx, "failed to update tools for entity", "entity", r.entity.String()) - r.setPoolRunningState(false, err.Error()) + r.SetPoolRunningState(false, err.Error()) return fmt.Errorf("failed to update tools for entity %s: %w", r.entity.String(), err) } r.mux.Lock() @@ -388,7 +388,7 @@ func (r *basePoolManager) updateTools() error { r.mux.Unlock() slog.DebugContext(r.ctx, "successfully updated tools") - r.setPoolRunningState(true, "") + r.SetPoolRunningState(true, "") return err } @@ -565,16 +565,19 @@ func (r *basePoolManager) cleanupOrphanedGithubRunners(runners []*github.Runner) slog.InfoContext( r.ctx, "Runner has no database entry in garm, removing from github", "runner_name", runner.GetName()) - resp, err := r.ghcli.RemoveEntityRunner(r.ctx, runner.GetID()) - if err != nil { + if err := r.ghcli.RemoveEntityRunner(r.ctx, runner.GetID()); err != nil { // Removed in the meantime? - if resp != nil && resp.StatusCode == http.StatusNotFound { + if errors.Is(err, runnerErrors.ErrNotFound) { continue } return errors.Wrap(err, "removing runner") } continue } + if dbInstance.ScaleSetID != 0 { + // ignore scale set instances. + continue + } switch dbInstance.Status { case commonParams.InstancePendingDelete, commonParams.InstanceDeleting: @@ -650,10 +653,9 @@ func (r *basePoolManager) cleanupOrphanedGithubRunners(runners []*github.Runner) slog.InfoContext( r.ctx, "Runner instance is no longer on the provider, removing from github", "runner_name", dbInstance.Name) - resp, err := r.ghcli.RemoveEntityRunner(r.ctx, runner.GetID()) - if err != nil { + if err := r.ghcli.RemoveEntityRunner(r.ctx, runner.GetID()); err != nil { // Removed in the meantime? - if resp != nil && resp.StatusCode == http.StatusNotFound { + if errors.Is(err, runnerErrors.ErrNotFound) { slog.DebugContext( r.ctx, "runner disappeared from github", "runner_name", dbInstance.Name) @@ -806,7 +808,7 @@ func (r *basePoolManager) AddRunner(ctx context.Context, poolID string, aditiona } if runner != nil { - _, runnerCleanupErr := r.ghcli.RemoveEntityRunner(r.ctx, runner.GetID()) + runnerCleanupErr := r.ghcli.RemoveEntityRunner(r.ctx, runner.GetID()) if err != nil { slog.With(slog.Any("error", runnerCleanupErr)).ErrorContext( ctx, "failed to remove runner", @@ -840,7 +842,7 @@ func (r *basePoolManager) waitForTimeoutOrCancelled(timeout time.Duration) { } } -func (r *basePoolManager) setPoolRunningState(isRunning bool, failureReason string) { +func (r *basePoolManager) SetPoolRunningState(isRunning bool, failureReason string) { r.mux.Lock() r.managerErrorReason = failureReason r.managerIsRunning = isRunning @@ -1660,45 +1662,22 @@ func (r *basePoolManager) DeleteRunner(runner params.Instance, forceRemove, bypa if !r.managerIsRunning && !bypassGHUnauthorizedError { return runnerErrors.NewConflictError("pool manager is not running for %s", r.entity.String()) } + if runner.AgentID != 0 { - resp, err := r.ghcli.RemoveEntityRunner(r.ctx, runner.AgentID) - if err != nil { - if resp != nil { - switch resp.StatusCode { - case http.StatusUnprocessableEntity: - return errors.Wrapf(runnerErrors.ErrBadRequest, "removing runner: %q", err) - case http.StatusNotFound: - // Runner may have been deleted by a finished job, or manually by the user. - slog.DebugContext( - r.ctx, "runner was not found in github", - "agent_id", runner.AgentID) - case http.StatusUnauthorized: - slog.With(slog.Any("error", err)).ErrorContext(r.ctx, "failed to remove runner from github") - // Mark the pool as offline from this point forward - r.setPoolRunningState(false, fmt.Sprintf("failed to remove runner: %q", err)) - slog.With(slog.Any("error", err)).ErrorContext( - r.ctx, "failed to remove runner") - if bypassGHUnauthorizedError { - slog.Info("bypass github unauthorized error is set, marking runner for deletion") - break - } - // evaluate the next switch case. - fallthrough - default: + if err := r.ghcli.RemoveEntityRunner(r.ctx, runner.AgentID); err != nil { + if errors.Is(err, runnerErrors.ErrUnauthorized) { + slog.With(slog.Any("error", err)).ErrorContext(r.ctx, "failed to remove runner from github") + // Mark the pool as offline from this point forward + r.SetPoolRunningState(false, fmt.Sprintf("failed to remove runner: %q", err)) + slog.With(slog.Any("error", err)).ErrorContext( + r.ctx, "failed to remove runner") + if bypassGHUnauthorizedError { + slog.Info("bypass github unauthorized error is set, marking runner for deletion") + } else { return errors.Wrap(err, "removing runner") } } else { - errResp := &github.ErrorResponse{} - if errors.As(err, &errResp) { - if errResp.Response != nil && errResp.Response.StatusCode == http.StatusUnauthorized && bypassGHUnauthorizedError { - slog.Info("bypass github unauthorized error is set, marking runner for deletion") - } else { - return errors.Wrap(err, "removing runner") - } - } else { - // We got a nil response. Assume we are in error. - return errors.Wrap(err, "removing runner") - } + return errors.Wrap(err, "removing runner") } } } diff --git a/runner/pool/stub_client.go b/runner/pool/stub_client.go index d291e736..7a82567f 100644 --- a/runner/pool/stub_client.go +++ b/runner/pool/stub_client.go @@ -41,8 +41,8 @@ func (s *stubGithubClient) ListEntityRunnerApplicationDownloads(_ context.Contex return nil, nil, s.err } -func (s *stubGithubClient) RemoveEntityRunner(_ context.Context, _ int64) (*github.Response, error) { - return nil, s.err +func (s *stubGithubClient) RemoveEntityRunner(_ context.Context, _ int64) error { + return s.err } func (s *stubGithubClient) CreateEntityRegistrationToken(_ context.Context) (*github.RegistrationToken, *github.Response, error) { diff --git a/runner/pools.go b/runner/pools.go index f2eb3c25..15aecb5e 100644 --- a/runner/pools.go +++ b/runner/pools.go @@ -99,7 +99,7 @@ func (r *Runner) UpdatePoolByID(ctx context.Context, poolID string, param params return params.Pool{}, runnerErrors.NewBadRequestError("min_idle_runners cannot be larger than max_runners") } - entity, err := pool.GithubEntity() + entity, err := pool.GetEntity() if err != nil { return params.Pool{}, errors.Wrap(err, "getting entity") } diff --git a/runner/runner.go b/runner/runner.go index 5c0883aa..4032a94c 100644 --- a/runner/runner.go +++ b/runner/runner.go @@ -45,6 +45,8 @@ import ( "github.com/cloudbase/garm/runner/common" "github.com/cloudbase/garm/runner/pool" "github.com/cloudbase/garm/runner/providers" + "github.com/cloudbase/garm/util/github" + "github.com/cloudbase/garm/util/github/scalesets" ) func NewRunner(ctx context.Context, cfg config.Config, db dbCommon.Store) (*Runner, error) { @@ -849,13 +851,92 @@ func (r *Runner) DeleteRunner(ctx context.Context, instanceName string, forceDel return runnerErrors.NewBadRequestError("runner must be in one of the following states: %q", strings.Join(validStates, ", ")) } - poolMgr, err := r.getPoolManagerFromInstance(ctx, instance) + ghCli, ssCli, err := r.getGHCliFromInstance(ctx, instance) if err != nil { - return errors.Wrap(err, "fetching pool manager for instance") + return errors.Wrap(err, "fetching github client") } - if err := poolMgr.DeleteRunner(instance, forceDelete, bypassGithubUnauthorized); err != nil { - return errors.Wrap(err, "removing runner") + if instance.AgentID != 0 { + if instance.ScaleSetID != 0 { + err = ssCli.RemoveRunner(ctx, instance.AgentID) + } else if instance.PoolID != "" { + err = ghCli.RemoveEntityRunner(ctx, instance.AgentID) + } else { + return errors.New("instance does not have a pool or scale set") + } + + if err != nil { + if errors.Is(err, runnerErrors.ErrUnauthorized) && instance.PoolID != "" { + poolMgr, err := r.getPoolManagerFromInstance(ctx, instance) + if err != nil { + return errors.Wrap(err, "fetching pool manager for instance") + } + poolMgr.SetPoolRunningState(false, fmt.Sprintf("failed to remove runner: %q", err)) + } + if !bypassGithubUnauthorized { + return errors.Wrap(err, "removing runner from github") + } + } } + + instanceStatus := commonParams.InstancePendingDelete + if forceDelete { + instanceStatus = commonParams.InstancePendingForceDelete + } + + slog.InfoContext( + r.ctx, "setting instance status", + "runner_name", instance.Name, + "status", instanceStatus) + + updateParams := params.UpdateInstanceParams{ + Status: instanceStatus, + } + _, err = r.store.UpdateInstance(r.ctx, instance.Name, updateParams) + if err != nil { + return errors.Wrap(err, "updating runner state") + } + return nil } + +func (r *Runner) getGHCliFromInstance(ctx context.Context, instance params.Instance) (common.GithubClient, *scalesets.ScaleSetClient, error) { + // TODO(gabriel-samfira): We can probably cache the entity. + var entityGetter params.EntityGetter + var err error + if instance.PoolID != "" { + entityGetter, err = r.store.GetPoolByID(ctx, instance.PoolID) + if err != nil { + return nil, nil, errors.Wrap(err, "fetching pool") + } + } else if instance.ScaleSetID != 0 { + entityGetter, err = r.store.GetScaleSetByID(ctx, instance.ScaleSetID) + if err != nil { + return nil, nil, errors.Wrap(err, "fetching scale set") + } + } else { + return nil, nil, errors.New("instance does not have a pool or scale set") + } + + entity, err := entityGetter.GetEntity() + if err != nil { + return nil, nil, errors.Wrap(err, "fetching entity") + } + + // Fetching the entity from the database will populate all fields, including credentials. + entity, err = r.store.GetGithubEntity(ctx, entity.EntityType, entity.ID) + if err != nil { + return nil, nil, errors.Wrap(err, "fetching entity") + } + + ghCli, err := github.Client(ctx, entity) + if err != nil { + return nil, nil, errors.Wrap(err, "creating github client") + } + + scaleSetCli, err := scalesets.NewClient(ghCli) + if err != nil { + return nil, nil, errors.Wrap(err, "creating scaleset client") + } + return ghCli, scaleSetCli, nil +} diff --git a/runner/scalesets.go b/runner/scalesets.go index ef45a783..7b93a662 100644 --- a/runner/scalesets.go +++ b/runner/scalesets.go @@ -74,7 +74,7 @@ func (r *Runner) DeleteScaleSetByID(ctx context.Context, scaleSetID uint) error return runnerErrors.NewBadRequestError("scale set is enabled; disable it first") } - paramEntity, err := scaleSet.GithubEntity() + paramEntity, err := scaleSet.GetEntity() if err != nil { return errors.Wrap(err, "getting entity") } @@ -137,7 +137,7 @@ func (r *Runner) UpdateScaleSetByID(ctx context.Context, scaleSetID uint, param return params.ScaleSet{}, runnerErrors.NewBadRequestError("min_idle_runners cannot be larger than max_runners") } - paramEntity, err := scaleSet.GithubEntity() + paramEntity, err := scaleSet.GetEntity() if err != nil { return params.ScaleSet{}, errors.Wrap(err, "getting entity") } diff --git a/util/github/client.go b/util/github/client.go index ae0b6485..50f97d39 100644 --- a/util/github/client.go +++ b/util/github/client.go @@ -226,7 +226,7 @@ func (g *githubClient) ListEntityRunnerApplicationDownloads(ctx context.Context) return ret, response, err } -func (g *githubClient) RemoveEntityRunner(ctx context.Context, runnerID int64) (*github.Response, error) { +func (g *githubClient) RemoveEntityRunner(ctx context.Context, runnerID int64) error { var response *github.Response var err error @@ -251,10 +251,36 @@ func (g *githubClient) RemoveEntityRunner(ctx context.Context, runnerID int64) ( case params.GithubEntityTypeEnterprise: response, err = g.enterprise.RemoveRunner(ctx, g.entity.Owner, runnerID) default: - return nil, errors.New("invalid entity type") + return errors.New("invalid entity type") } - return response, err + switch response.StatusCode { + case http.StatusNotFound: + return runnerErrors.NewNotFoundError("runner %d not found", runnerID) + case http.StatusUnauthorized: + return runnerErrors.ErrUnauthorized + case http.StatusUnprocessableEntity: + return runnerErrors.NewBadRequestError("cannot remove runner %d in its current state", runnerID) + default: + if err != nil { + errResp := &github.ErrorResponse{} + if errors.As(err, &errResp) && errResp.Response != nil { + switch errResp.Response.StatusCode { + case http.StatusNotFound: + return runnerErrors.NewNotFoundError("runner %d not found", runnerID) + case http.StatusUnauthorized: + return runnerErrors.ErrUnauthorized + case http.StatusUnprocessableEntity: + return runnerErrors.NewBadRequestError("cannot remove runner %d in its current state", runnerID) + default: + return errors.Wrap(err, "removing runner") + } + } + return errors.Wrap(err, "removing runner") + } + } + + return nil } func (g *githubClient) CreateEntityRegistrationToken(ctx context.Context) (*github.RegistrationToken, *github.Response, error) { @@ -417,7 +443,7 @@ func (g *githubClient) GetEntityJITConfig(ctx context.Context, instance string, defer func(run *github.Runner) { if err != nil && run != nil { - _, innerErr := g.RemoveEntityRunner(ctx, run.GetID()) + innerErr := g.RemoveEntityRunner(ctx, run.GetID()) slog.With(slog.Any("error", innerErr)).ErrorContext( ctx, "failed to remove runner", "runner_id", run.GetID(), string(g.entity.EntityType), g.entity.String()) diff --git a/workers/provider/instance_manager.go b/workers/provider/instance_manager.go index c20c75ae..c784b41f 100644 --- a/workers/provider/instance_manager.go +++ b/workers/provider/instance_manager.go @@ -21,7 +21,7 @@ import ( func NewInstanceManager(ctx context.Context, instance params.Instance, scaleSet params.ScaleSet, provider common.Provider, helper providerHelper) (*instanceManager, error) { ctx = garmUtil.WithSlogContext(ctx, slog.Any("instance", instance.Name)) - githubEntity, err := scaleSet.GithubEntity() + githubEntity, err := scaleSet.GetEntity() if err != nil { return nil, fmt.Errorf("getting github entity: %w", err) } @@ -129,7 +129,7 @@ func (i *instanceManager) incrementBackOff() { } func (i *instanceManager) getEntity() (params.GithubEntity, error) { - entity, err := i.scaleSet.GithubEntity() + entity, err := i.scaleSet.GetEntity() if err != nil { return params.GithubEntity{}, fmt.Errorf("getting entity: %w", err) } @@ -276,6 +276,9 @@ func (i *instanceManager) handleDeleteInstanceInProvider(instance params.Instanc func (i *instanceManager) consolidateState() error { i.mux.Lock() defer i.mux.Unlock() + if !i.running { + return nil + } switch i.instance.Status { case commonParams.InstancePendingCreate: @@ -305,6 +308,7 @@ func (i *instanceManager) consolidateState() error { } } + prevStatus := i.instance.Status if err := i.helper.SetInstanceStatus(i.instance.Name, commonParams.InstanceDeleting, nil); err != nil { if errors.Is(err, runnerErrors.ErrNotFound) { return nil @@ -314,7 +318,7 @@ func (i *instanceManager) consolidateState() error { if err := i.handleDeleteInstanceInProvider(i.instance); err != nil { slog.ErrorContext(i.ctx, "deleting instance in provider", "error", err, "forced", i.instance.Status == commonParams.InstancePendingForceDelete) - if i.instance.Status == commonParams.InstancePendingDelete { + if prevStatus == commonParams.InstancePendingDelete { i.incrementBackOff() if err := i.helper.SetInstanceStatus(i.instance.Name, commonParams.InstancePendingDelete, []byte(err.Error())); err != nil { return fmt.Errorf("setting instance status to error: %w", err) @@ -324,8 +328,11 @@ func (i *instanceManager) consolidateState() error { } } if err := i.helper.SetInstanceStatus(i.instance.Name, commonParams.InstanceDeleted, nil); err != nil { - return fmt.Errorf("setting instance status to deleted: %w", err) + if !errors.Is(err, runnerErrors.ErrNotFound) { + return fmt.Errorf("setting instance status to deleted: %w", err) + } } + return ErrInstanceDeleted case commonParams.InstanceError: // Instance is in error state. We wait for next status or potentially retry // spawning the instance with a backoff timer. @@ -343,26 +350,23 @@ func (i *instanceManager) handleUpdate(update dbCommon.ChangePayload) error { // end up with an inconsistent state between what we know about the instance and what // is reflected in the database. i.mux.Lock() + defer i.mux.Unlock() if !i.running { - i.mux.Unlock() return nil } instance, ok := update.Payload.(params.Instance) if !ok { - i.mux.Unlock() return runnerErrors.NewBadRequestError("invalid payload type") } i.instance = instance if i.instance.Status == instance.Status { // Nothing of interest happened. - i.mux.Unlock() return nil } - i.mux.Unlock() - return i.consolidateState() + return nil } func (i *instanceManager) Update(instance dbCommon.ChangePayload) error { diff --git a/workers/provider/provider.go b/workers/provider/provider.go index 07f65b26..0c2cf4df 100644 --- a/workers/provider/provider.go +++ b/workers/provider/provider.go @@ -88,6 +88,10 @@ func (p *provider) loadAllRunners() error { if runner.Status == commonParams.InstanceCreating { continue } + if runner.Status == commonParams.InstanceDeleting || runner.Status == commonParams.InstanceDeleted { + continue + } + scaleSet, ok := p.scaleSets[runner.ScaleSetID] if !ok { slog.ErrorContext(p.ctx, "scale set not found", "scale_set_id", runner.ScaleSetID) diff --git a/workers/scaleset/scaleset.go b/workers/scaleset/scaleset.go index ba7701d7..a0a5d657 100644 --- a/workers/scaleset/scaleset.go +++ b/workers/scaleset/scaleset.go @@ -64,6 +64,14 @@ type Worker struct { quit chan struct{} } +func (w *Worker) RunnersAndStatuses() map[string]string { + runners := make(map[string]string) + for _, runner := range w.runners { + runners[runner.Name] = string(runner.Status) + } + return runners +} + func (w *Worker) Stop() error { slog.DebugContext(w.ctx, "stopping scale set worker", "scale_set", w.consumerID) w.mux.Lock() @@ -629,7 +637,7 @@ func (w *Worker) handleAutoScale() { lastMsg := "" lastMsgDebugLog := func(msg string, targetRunners, currentRunners uint) { if lastMsg != msg { - slog.DebugContext(w.ctx, msg, "current_runners", currentRunners, "target_runners", targetRunners) + slog.DebugContext(w.ctx, msg, "current_runners", currentRunners, "target_runners", targetRunners, "current_runners", w.RunnersAndStatuses()) lastMsg = msg } }