Prevent abusing the GH API
On large deployments with many jobs, we cannot check each job that we recorded in the DB against the GH API. Before this change, if a job was updated more than 10 minutes ago, garm would check against the GH api if that job still existed. While this approach allowed us to maintain a consistent view over which jobs still exist and which are stale, it had the potential of spamming the GH API, leading to rate limiting. This change uses the scale-down loop as an indicator for job staleness. If a job remains in queued state in our DB, but has dissapeared from GH or was serviced by another runner and we never got the hook (garm was down or GH had an issue - happened in the past), then garm will spin up a new runner for it. If that runner or any other runner is scaled down, we check if we have jobs in the queue that should have matched that runner. If we did, there is a high chance that the job no longer exists in GH and we can remove the job from the queue. Of course, there is a chance that GH is having issues and the job is never pushed to the runner, but we can't really account for everything. In this case I'd rather avoid rate limiting ourselves. Signed-off-by: Gabriel Adrian Samfira <gsamfira@cloudbasesolutions.com>
This commit is contained in:
parent
46ac1b8166
commit
459906d97e
2 changed files with 39 additions and 42 deletions
|
|
@ -297,6 +297,20 @@ func (p *Pool) PoolType() PoolType {
|
|||
return ""
|
||||
}
|
||||
|
||||
func (p *Pool) HasRequiredLabels(set []string) bool {
|
||||
asMap := make(map[string]struct{}, len(p.Tags))
|
||||
for _, t := range p.Tags {
|
||||
asMap[t.Name] = struct{}{}
|
||||
}
|
||||
|
||||
for _, l := range set {
|
||||
if _, ok := asMap[l]; !ok {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// used by swagger client generated code
|
||||
type Pools []Pool
|
||||
|
||||
|
|
|
|||
|
|
@ -1049,6 +1049,28 @@ func (r *basePoolManager) scaleDownOnePool(ctx context.Context, pool params.Pool
|
|||
})
|
||||
}
|
||||
|
||||
if numScaleDown > 0 {
|
||||
// We just scaled down a runner for this pool. That means that if we have jobs that are
|
||||
// still queued in our DB, and those jobs should match this pool but have not been picked
|
||||
// up by a runner, they are most likely stale and can be removed. For now, we can simply
|
||||
// remove jobs older than 10 minutes.
|
||||
//
|
||||
// TODO: should probably allow aditional filters to list functions. Would help to filter by date
|
||||
// instead of returning a bunch of results and filtering manually.
|
||||
queued, err := r.store.ListEntityJobsByStatus(r.ctx, r.helper.PoolType(), r.helper.ID(), params.JobStatusQueued)
|
||||
if err != nil && !errors.Is(err, runnerErrors.ErrNotFound) {
|
||||
return errors.Wrap(err, "listing queued jobs")
|
||||
}
|
||||
|
||||
for _, job := range queued {
|
||||
if time.Since(job.CreatedAt).Minutes() > 10 && pool.HasRequiredLabels(job.Labels) {
|
||||
if err := r.store.DeleteJob(ctx, job.ID); err != nil && !errors.Is(err, runnerErrors.ErrNotFound) {
|
||||
r.log("failed to delete job %d: %s", job.ID, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if err := r.waitForErrorGroupOrContextCancelled(g); err != nil {
|
||||
return fmt.Errorf("failed to scale down pool %s: %w", pool.ID, err)
|
||||
}
|
||||
|
|
@ -1574,48 +1596,9 @@ func (r *basePoolManager) consumeQueuedJobs() error {
|
|||
}
|
||||
|
||||
if time.Since(job.UpdatedAt) >= time.Minute*10 {
|
||||
// Job has been in queued state for 10 minutes or more. Check if it was consumed by another runner.
|
||||
workflow, ghResp, err := r.helper.GithubCLI().GetWorkflowJobByID(r.ctx, job.RepositoryOwner, job.RepositoryName, job.ID)
|
||||
if err != nil {
|
||||
if ghResp != nil {
|
||||
switch ghResp.StatusCode {
|
||||
case http.StatusNotFound:
|
||||
// Job does not exist in github. Remove it from the database.
|
||||
if err := r.store.DeleteJob(r.ctx, job.ID); err != nil {
|
||||
return errors.Wrap(err, "deleting job")
|
||||
}
|
||||
default:
|
||||
r.log("failed to fetch job information from github: %q (status code: %d)", err, ghResp.StatusCode)
|
||||
}
|
||||
}
|
||||
r.log("error fetching workflow info: %q", err)
|
||||
continue
|
||||
}
|
||||
|
||||
if workflow.GetStatus() != "queued" {
|
||||
r.log("job is no longer in queued state on github. New status is: %s", workflow.GetStatus())
|
||||
job.Action = workflow.GetStatus()
|
||||
job.Status = workflow.GetStatus()
|
||||
job.Conclusion = workflow.GetConclusion()
|
||||
if workflow.RunnerName != nil {
|
||||
job.RunnerName = *workflow.RunnerName
|
||||
}
|
||||
if workflow.RunnerID != nil {
|
||||
job.GithubRunnerID = *workflow.RunnerID
|
||||
}
|
||||
if workflow.RunnerGroupName != nil {
|
||||
job.RunnerGroupName = *workflow.RunnerGroupName
|
||||
}
|
||||
if workflow.RunnerGroupID != nil {
|
||||
job.RunnerGroupID = *workflow.RunnerGroupID
|
||||
}
|
||||
if _, err := r.store.CreateOrUpdateJob(r.ctx, job); err != nil {
|
||||
r.log("failed to update job status: %q", err)
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
// Job is still queued in our db and in github. Unlock it and try again.
|
||||
// Job is still queued in our db, 10 minutes after a matching runner
|
||||
// was spawned. Unlock it and try again. A different job may have picked up
|
||||
// the runner.
|
||||
if err := r.store.UnlockJob(r.ctx, job.ID, r.ID()); err != nil {
|
||||
// TODO: Implament a cache? Should we return here?
|
||||
r.log("failed to unlock job %d: %q", job.ID, err)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue