From a15a91b97440486bc365349c0d95decca381c305 Mon Sep 17 00:00:00 2001 From: Gabriel Adrian Samfira Date: Sat, 24 Jun 2023 00:22:51 +0000 Subject: [PATCH] Break lock and lower scale down timeout Break the lock on a job if it's still queued and the runner that it triggered was assigned to another job. This may cause leftover runners to be created, but we scale those down in ~3 minutes. Signed-off-by: Gabriel Adrian Samfira --- cmd/garm-cli/cmd/jobs.go | 5 +- database/common/common.go | 1 + database/sql/instances.go | 11 ++++ database/sql/jobs.go | 25 ++++++++- database/sql/models.go | 1 + database/sql/util.go | 4 ++ params/params.go | 9 ++-- params/requests.go | 1 + runner/pool/pool.go | 103 +++++++++++++++++++++++++++----------- 9 files changed, 124 insertions(+), 36 deletions(-) diff --git a/cmd/garm-cli/cmd/jobs.go b/cmd/garm-cli/cmd/jobs.go index 949dda6f..67c331dc 100644 --- a/cmd/garm-cli/cmd/jobs.go +++ b/cmd/garm-cli/cmd/jobs.go @@ -54,15 +54,16 @@ var jobsListCmd = &cobra.Command{ func formatJobs(jobs []params.Job) { t := table.NewWriter() - header := table.Row{"ID", "Name", "Status", "Conclusion", "Runner Name", "Locked by"} + header := table.Row{"ID", "Name", "Status", "Conclusion", "Runner Name", "Repository", "Locked by"} t.AppendHeader(header) for _, job := range jobs { lockedBy := "" + repo := fmt.Sprintf("%s/%s", job.RepositoryOwner, job.RepositoryName) if job.LockedBy != uuid.Nil { lockedBy = job.LockedBy.String() } - t.AppendRow(table.Row{job.ID, job.Name, job.Status, job.Conclusion, job.RunnerName, lockedBy}) + t.AppendRow(table.Row{job.ID, job.Name, job.Status, job.Conclusion, job.RunnerName, repo, lockedBy}) t.AppendSeparator() } fmt.Println(t.Render()) diff --git a/database/common/common.go b/database/common/common.go index 2e92035a..069c7095 100644 --- a/database/common/common.go +++ b/database/common/common.go @@ -122,6 +122,7 @@ type JobsStore interface { DeleteJob(ctx context.Context, jobID int64) error UnlockJob(ctx context.Context, jobID int64, entityID string) error LockJob(ctx context.Context, jobID int64, entityID string) error + BreakLockJobIsQueued(ctx context.Context, jobID int64) error DeleteCompletedJobs(ctx context.Context) error } diff --git a/database/sql/instances.go b/database/sql/instances.go index 4fa1e066..bc16be90 100644 --- a/database/sql/instances.go +++ b/database/sql/instances.go @@ -16,12 +16,14 @@ package sql import ( "context" + "encoding/json" runnerErrors "github.com/cloudbase/garm/errors" "github.com/cloudbase/garm/params" "github.com/google/uuid" "github.com/pkg/errors" + "gorm.io/datatypes" "gorm.io/gorm" "gorm.io/gorm/clause" ) @@ -32,6 +34,14 @@ func (s *sqlDatabase) CreateInstance(ctx context.Context, poolID string, param p return params.Instance{}, errors.Wrap(err, "fetching pool") } + var labels datatypes.JSON + if len(param.AditionalLabels) > 0 { + labels, err = json.Marshal(param.AditionalLabels) + if err != nil { + return params.Instance{}, errors.Wrap(err, "marshalling labels") + } + } + newInstance := Instance{ Pool: pool, Name: param.Name, @@ -42,6 +52,7 @@ func (s *sqlDatabase) CreateInstance(ctx context.Context, poolID string, param p CallbackURL: param.CallbackURL, MetadataURL: param.MetadataURL, GitHubRunnerGroup: param.GitHubRunnerGroup, + AditionalLabels: labels, } q := s.conn.Create(&newInstance) if q.Error != nil { diff --git a/database/sql/jobs.go b/database/sql/jobs.go index 2ee22a96..96ab75a4 100644 --- a/database/sql/jobs.go +++ b/database/sql/jobs.go @@ -119,6 +119,30 @@ func (s *sqlDatabase) LockJob(ctx context.Context, jobID int64, entityID string) return nil } +func (s *sqlDatabase) BreakLockJobIsQueued(ctx context.Context, jobID int64) error { + var workflowJob WorkflowJob + q := s.conn.Clauses(clause.Locking{Strength: "UPDATE"}).Where("id = ? and status = ?", jobID, params.JobStatusQueued).First(&workflowJob) + + if q.Error != nil { + if errors.Is(q.Error, gorm.ErrRecordNotFound) { + return nil + } + return errors.Wrap(q.Error, "fetching job") + } + + if workflowJob.LockedBy == uuid.Nil { + // Job is already unlocked. + return nil + } + + workflowJob.LockedBy = uuid.Nil + if err := s.conn.Save(&workflowJob).Error; err != nil { + return errors.Wrap(err, "saving job") + } + + return nil +} + func (s *sqlDatabase) UnlockJob(ctx context.Context, jobID int64, entityID string) error { var workflowJob WorkflowJob q := s.conn.Clauses(clause.Locking{Strength: "UPDATE"}).Where("id = ?", jobID).First(&workflowJob) @@ -140,7 +164,6 @@ func (s *sqlDatabase) UnlockJob(ctx context.Context, jobID int64, entityID strin } workflowJob.LockedBy = uuid.Nil - if err := s.conn.Save(&workflowJob).Error; err != nil { return errors.Wrap(err, "saving job") } diff --git a/database/sql/models.go b/database/sql/models.go index 220f5080..7a15fd41 100644 --- a/database/sql/models.go +++ b/database/sql/models.go @@ -153,6 +153,7 @@ type Instance struct { CreateAttempt int TokenFetched bool GitHubRunnerGroup string + AditionalLabels datatypes.JSON PoolID uuid.UUID Pool Pool `gorm:"foreignKey:PoolID"` diff --git a/database/sql/util.go b/database/sql/util.go index b33274fc..9392f5ec 100644 --- a/database/sql/util.go +++ b/database/sql/util.go @@ -32,6 +32,9 @@ func (s *sqlDatabase) sqlToParamsInstance(instance Instance) params.Instance { if instance.ProviderID != nil { id = *instance.ProviderID } + + var labels []string + _ = json.Unmarshal(instance.AditionalLabels, &labels) ret := params.Instance{ ID: instance.ID.String(), ProviderID: id, @@ -51,6 +54,7 @@ func (s *sqlDatabase) sqlToParamsInstance(instance Instance) params.Instance { UpdatedAt: instance.UpdatedAt, TokenFetched: instance.TokenFetched, GitHubRunnerGroup: instance.GitHubRunnerGroup, + AditionalLabels: labels, } if len(instance.ProviderFault) > 0 { diff --git a/params/params.go b/params/params.go index 4f94c22d..358805f9 100644 --- a/params/params.go +++ b/params/params.go @@ -156,10 +156,11 @@ type Instance struct { GitHubRunnerGroup string `json:"github-runner-group"` // Do not serialize sensitive info. - CallbackURL string `json:"-"` - MetadataURL string `json:"-"` - CreateAttempt int `json:"-"` - TokenFetched bool `json:"-"` + CallbackURL string `json:"-"` + MetadataURL string `json:"-"` + CreateAttempt int `json:"-"` + TokenFetched bool `json:"-"` + AditionalLabels []string `json:"-"` } func (i Instance) GetName() string { diff --git a/params/requests.go b/params/requests.go index c7c50eb5..71bf9c59 100644 --- a/params/requests.go +++ b/params/requests.go @@ -136,6 +136,7 @@ type CreateInstanceParams struct { // The runner group must be created by someone with access to the enterprise. GitHubRunnerGroup string CreateAttempt int `json:"-"` + AditionalLabels []string } type CreatePoolParams struct { diff --git a/runner/pool/pool.go b/runner/pool/pool.go index 4f439857..07c03fd1 100644 --- a/runner/pool/pool.go +++ b/runner/pool/pool.go @@ -20,6 +20,7 @@ import ( "log" "math" "net/http" + "strconv" "strings" "sync" "time" @@ -41,6 +42,13 @@ import ( var ( poolIDLabelprefix = "runner-pool-id:" controllerLabelPrefix = "runner-controller-id:" + // We tag runners that have been spawned as a result of a queued job with the job ID + // that spawned them. There is no way to guarantee that the runner spawned in response to a particular + // job, will be picked up by that job. We mark them so as in the very likely event that the runner + // has picked up a different job, we can clear the lock on the job that spaned it. + // The job it picked up would already be transitioned to in_progress so it will be ignored by the + // consume loop. + jobLabelPrefix = "in_response_to_job:" ) const ( @@ -104,14 +112,27 @@ func (r *basePoolManager) HandleWorkflowJob(job params.WorkflowJob) error { var jobParams params.Job var err error + var triggeredBy int64 defer func() { // we're updating the job in the database, regardless of whether it was successful or not. // or if it was meant for this pool or not. Github will send the same job data to all hierarchies // that have been configured to work with garm. Updating the job at all levels should yield the same // outcome in the db. - if jobParams.ID != 0 { - if _, jobErr := r.store.CreateOrUpdateJob(r.ctx, jobParams); jobErr != nil { - log.Printf("failed to update job %d: %s", jobParams.ID, jobErr) + if jobParams.ID == 0 { + return + } + + if _, jobErr := r.store.CreateOrUpdateJob(r.ctx, jobParams); jobErr != nil { + log.Printf("failed to update job %d: %s", jobParams.ID, jobErr) + } + + if triggeredBy != 0 && jobParams.ID != triggeredBy { + // The triggeredBy value is only set by the "in_progress" webhook. The runner that + // transitioned to in_progress was created as a result of a different queued job. If that job is + // still queued and we don't remove the lock, it will linger until the lock timeout is reached. + // That may take a long time, so we break the lock here and allow it to be scheduled again. + if err := r.store.BreakLockJobIsQueued(r.ctx, triggeredBy); err != nil { + log.Printf("failed to break lock for job %d: %s", triggeredBy, err) } } }() @@ -175,6 +196,8 @@ func (r *basePoolManager) HandleWorkflowJob(job params.WorkflowJob) error { r.log("failed to update runner %s status: %s", util.SanitizeLogEntry(jobParams.RunnerName), err) return errors.Wrap(err, "updating runner") } + // Set triggeredBy here so we break the lock on any potential queued job. + triggeredBy = jobIdFromLabels(instance.AditionalLabels) // A runner has picked up the job, and is now running it. It may need to be replaced if the pool has // a minimum number of idle runners configured. @@ -189,6 +212,19 @@ func (r *basePoolManager) HandleWorkflowJob(job params.WorkflowJob) error { return nil } +func jobIdFromLabels(labels []string) int64 { + for _, lbl := range labels { + if strings.HasPrefix(lbl, jobLabelPrefix) { + jobId, err := strconv.ParseInt(lbl[len(jobLabelPrefix):], 10, 64) + if err != nil { + return 0 + } + return jobId + } + } + return 0 +} + func (r *basePoolManager) startLoopForFunction(f func() error, interval time.Duration, name string, alwaysRun bool) { r.log("starting %s loop for %s", name, r.helper.String()) ticker := time.NewTicker(interval) @@ -619,7 +655,7 @@ func (r *basePoolManager) setInstanceStatus(runnerName string, status providerCo return instance, nil } -func (r *basePoolManager) AddRunner(ctx context.Context, poolID string) error { +func (r *basePoolManager) AddRunner(ctx context.Context, poolID string, aditionalLabels []string) error { pool, err := r.helper.GetPoolByID(poolID) if err != nil { return errors.Wrap(err, "fetching pool") @@ -637,6 +673,7 @@ func (r *basePoolManager) AddRunner(ctx context.Context, poolID string) error { MetadataURL: r.helper.GetMetadataURL(), CreateAttempt: 1, GitHubRunnerGroup: pool.GitHubRunnerGroup, + AditionalLabels: aditionalLabels, } _, err = r.store.CreateInstance(r.ctx, poolID, createParams) @@ -690,6 +727,10 @@ func (r *basePoolManager) addInstanceToProvider(instance params.Instance) error labels = append(labels, r.controllerLabel()) labels = append(labels, r.poolLabel(pool.ID)) + if len(instance.AditionalLabels) > 0 { + labels = append(labels, instance.AditionalLabels...) + } + jwtValidity := pool.RunnerTimeout() entity := r.helper.String() @@ -796,6 +837,11 @@ func (r *basePoolManager) getRunnerDetailsFromJob(job params.WorkflowJob) (param // decissions based on the status of saved jobs. A "queued" job will prompt garm to search for an appropriate pool // and spin up a runner there if no other idle runner exists to pick it up. func (r *basePoolManager) paramsWorkflowJobToParamsJob(job params.WorkflowJob) (params.Job, error) { + asUUID, err := uuid.Parse(r.ID()) + if err != nil { + return params.Job{}, errors.Wrap(err, "parsing pool ID as UUID") + } + jobParams := params.Job{ ID: job.WorkflowJob.ID, Action: job.Action, @@ -815,24 +861,21 @@ func (r *basePoolManager) paramsWorkflowJobToParamsJob(job params.WorkflowJob) ( runnerName := job.WorkflowJob.RunnerName if job.Action != "queued" && runnerName == "" { - // Runner name was not set in WorkflowJob by github. We can still attempt to fetch the info we need, - // using the workflow run ID, from the API. - // We may still get no runner name. In situations such as jobs being cancelled before a runner had the chance - // to pick up the job, the runner name is not available from the API. - runnerInfo, err := r.getRunnerDetailsFromJob(job) - if err != nil && !errors.Is(err, runnerErrors.ErrNotFound) { - return jobParams, errors.Wrap(err, "fetching runner details") + if job.WorkflowJob.Conclusion != "skipped" && job.WorkflowJob.Conclusion != "canceled" { + // Runner name was not set in WorkflowJob by github. We can still attempt to fetch the info we need, + // using the workflow run ID, from the API. + // We may still get no runner name. In situations such as jobs being cancelled before a runner had the chance + // to pick up the job, the runner name is not available from the API. + runnerInfo, err := r.getRunnerDetailsFromJob(job) + if err != nil && !errors.Is(err, runnerErrors.ErrNotFound) { + return jobParams, errors.Wrap(err, "fetching runner details") + } + runnerName = runnerInfo.Name } - runnerName = runnerInfo.Name } jobParams.RunnerName = runnerName - asUUID, err := uuid.Parse(r.ID()) - if err != nil { - return jobParams, errors.Wrap(err, "parsing pool ID as UUID") - } - switch r.helper.PoolType() { case params.EnterprisePool: jobParams.EnterpriseID = asUUID @@ -884,9 +927,7 @@ func (r *basePoolManager) scaleDownOnePool(ctx context.Context, pool params.Pool // consideration for scale-down. The 5 minute grace period prevents a situation where a // "queued" workflow triggers the creation of a new idle runner, and this routine reaps // an idle runner before they have a chance to pick up a job. - if providerCommon.RunnerStatus(inst.RunnerStatus) == providerCommon.RunnerIdle && - providerCommon.InstanceStatus(inst.Status) == providerCommon.InstanceRunning && - time.Since(inst.UpdatedAt).Minutes() > 5 { + if inst.RunnerStatus == providerCommon.RunnerIdle && inst.Status == providerCommon.InstanceRunning && time.Since(inst.UpdatedAt).Minutes() > 2 { idleWorkers = append(idleWorkers, inst) } } @@ -935,9 +976,9 @@ func (r *basePoolManager) scaleDownOnePool(ctx context.Context, pool params.Pool return nil } -func (r *basePoolManager) addRunnerToPool(pool params.Pool) error { +func (r *basePoolManager) addRunnerToPool(pool params.Pool, aditionalLabels []string) error { if !pool.Enabled { - return nil + return fmt.Errorf("pool %s is disabled", pool.ID) } poolInstanceCount, err := r.store.PoolInstanceCount(r.ctx, pool.ID) @@ -949,7 +990,7 @@ func (r *basePoolManager) addRunnerToPool(pool params.Pool) error { return fmt.Errorf("max workers (%d) reached for pool %s", pool.MaxRunners, pool.ID) } - if err := r.AddRunner(r.ctx, pool.ID); err != nil { + if err := r.AddRunner(r.ctx, pool.ID, aditionalLabels); err != nil { return fmt.Errorf("failed to add new instance for pool %s: %s", pool.ID, err) } return nil @@ -993,7 +1034,7 @@ func (r *basePoolManager) ensureIdleRunnersForOnePool(pool params.Pool) error { for i := 0; i < required; i++ { r.log("adding new idle worker to pool %s", pool.ID) - if err := r.AddRunner(r.ctx, pool.ID); err != nil { + if err := r.AddRunner(r.ctx, pool.ID, nil); err != nil { return fmt.Errorf("failed to add new instance for pool %s: %w", pool.ID, err) } } @@ -1416,14 +1457,14 @@ func (r *basePoolManager) consumeQueuedJobs() error { continue } - if time.Since(job.CreatedAt) < time.Second*30 { + if time.Since(job.UpdatedAt) < time.Second*30 { // give the idle runners a chance to pick up the job. - log.Printf("job %d was created less than 30 seconds ago. Skipping", job.ID) + log.Printf("job %d was updated less than 30 seconds ago. Skipping", job.ID) continue } - if time.Since(job.UpdatedAt) >= time.Minute*5 { - // Job has been in queued state for 5 minutes or more. Check if it was consumed by another runner. + if time.Since(job.UpdatedAt) >= time.Minute*10 { + // Job has been in queued state for 10 minutes or more. Check if it was consumed by another runner. workflow, ghResp, err := r.helper.GithubCLI().GetWorkflowJobByID(r.ctx, job.RepositoryOwner, job.RepositoryName, job.ID) if err != nil { if ghResp != nil { @@ -1497,9 +1538,13 @@ func (r *basePoolManager) consumeQueuedJobs() error { log.Printf("[Pool mgr ID %s] could not lock job %d: %s", r.ID(), job.ID, err) continue } + + jobLabels := []string{ + fmt.Sprintf("%s%d", jobLabelPrefix, job.ID), + } for _, pool := range potentialPools { log.Printf("attempting to create a runner in pool %s for job %d", pool.ID, job.ID) - if err := r.addRunnerToPool(pool); err != nil { + if err := r.addRunnerToPool(pool, jobLabels); err != nil { log.Printf("could not add runner to pool %s: %s", pool.ID, err) continue }