diff --git a/params/params.go b/params/params.go index eab4a173..5db7e6b7 100644 --- a/params/params.go +++ b/params/params.go @@ -297,6 +297,20 @@ func (p *Pool) PoolType() PoolType { return "" } +func (p *Pool) HasRequiredLabels(set []string) bool { + asMap := make(map[string]struct{}, len(p.Tags)) + for _, t := range p.Tags { + asMap[t.Name] = struct{}{} + } + + for _, l := range set { + if _, ok := asMap[l]; !ok { + return false + } + } + return true +} + // used by swagger client generated code type Pools []Pool diff --git a/runner/pool/pool.go b/runner/pool/pool.go index 508b41f4..7e4d365d 100644 --- a/runner/pool/pool.go +++ b/runner/pool/pool.go @@ -1049,6 +1049,28 @@ func (r *basePoolManager) scaleDownOnePool(ctx context.Context, pool params.Pool }) } + if numScaleDown > 0 { + // We just scaled down a runner for this pool. That means that if we have jobs that are + // still queued in our DB, and those jobs should match this pool but have not been picked + // up by a runner, they are most likely stale and can be removed. For now, we can simply + // remove jobs older than 10 minutes. + // + // TODO: should probably allow aditional filters to list functions. Would help to filter by date + // instead of returning a bunch of results and filtering manually. + queued, err := r.store.ListEntityJobsByStatus(r.ctx, r.helper.PoolType(), r.helper.ID(), params.JobStatusQueued) + if err != nil && !errors.Is(err, runnerErrors.ErrNotFound) { + return errors.Wrap(err, "listing queued jobs") + } + + for _, job := range queued { + if time.Since(job.CreatedAt).Minutes() > 10 && pool.HasRequiredLabels(job.Labels) { + if err := r.store.DeleteJob(ctx, job.ID); err != nil && !errors.Is(err, runnerErrors.ErrNotFound) { + r.log("failed to delete job %d: %s", job.ID, err) + } + } + } + } + if err := r.waitForErrorGroupOrContextCancelled(g); err != nil { return fmt.Errorf("failed to scale down pool %s: %w", pool.ID, err) } @@ -1574,48 +1596,9 @@ func (r *basePoolManager) consumeQueuedJobs() error { } if time.Since(job.UpdatedAt) >= time.Minute*10 { - // Job has been in queued state for 10 minutes or more. Check if it was consumed by another runner. - workflow, ghResp, err := r.helper.GithubCLI().GetWorkflowJobByID(r.ctx, job.RepositoryOwner, job.RepositoryName, job.ID) - if err != nil { - if ghResp != nil { - switch ghResp.StatusCode { - case http.StatusNotFound: - // Job does not exist in github. Remove it from the database. - if err := r.store.DeleteJob(r.ctx, job.ID); err != nil { - return errors.Wrap(err, "deleting job") - } - default: - r.log("failed to fetch job information from github: %q (status code: %d)", err, ghResp.StatusCode) - } - } - r.log("error fetching workflow info: %q", err) - continue - } - - if workflow.GetStatus() != "queued" { - r.log("job is no longer in queued state on github. New status is: %s", workflow.GetStatus()) - job.Action = workflow.GetStatus() - job.Status = workflow.GetStatus() - job.Conclusion = workflow.GetConclusion() - if workflow.RunnerName != nil { - job.RunnerName = *workflow.RunnerName - } - if workflow.RunnerID != nil { - job.GithubRunnerID = *workflow.RunnerID - } - if workflow.RunnerGroupName != nil { - job.RunnerGroupName = *workflow.RunnerGroupName - } - if workflow.RunnerGroupID != nil { - job.RunnerGroupID = *workflow.RunnerGroupID - } - if _, err := r.store.CreateOrUpdateJob(r.ctx, job); err != nil { - r.log("failed to update job status: %q", err) - } - continue - } - - // Job is still queued in our db and in github. Unlock it and try again. + // Job is still queued in our db, 10 minutes after a matching runner + // was spawned. Unlock it and try again. A different job may have picked up + // the runner. if err := r.store.UnlockJob(r.ctx, job.ID, r.ID()); err != nil { // TODO: Implament a cache? Should we return here? r.log("failed to unlock job %d: %q", job.ID, err)