Break lock and lower scale down timeout

Break the lock on a job if it's still queued and the runner that it
triggered was assigned to another job. This may cause leftover runners
to be created, but we scale those down in ~3 minutes.

Signed-off-by: Gabriel Adrian Samfira <gsamfira@cloudbasesolutions.com>
This commit is contained in:
Gabriel Adrian Samfira 2023-06-24 00:22:51 +00:00
parent 1287a93cf2
commit a15a91b974
9 changed files with 124 additions and 36 deletions

View file

@ -54,15 +54,16 @@ var jobsListCmd = &cobra.Command{
func formatJobs(jobs []params.Job) {
t := table.NewWriter()
header := table.Row{"ID", "Name", "Status", "Conclusion", "Runner Name", "Locked by"}
header := table.Row{"ID", "Name", "Status", "Conclusion", "Runner Name", "Repository", "Locked by"}
t.AppendHeader(header)
for _, job := range jobs {
lockedBy := ""
repo := fmt.Sprintf("%s/%s", job.RepositoryOwner, job.RepositoryName)
if job.LockedBy != uuid.Nil {
lockedBy = job.LockedBy.String()
}
t.AppendRow(table.Row{job.ID, job.Name, job.Status, job.Conclusion, job.RunnerName, lockedBy})
t.AppendRow(table.Row{job.ID, job.Name, job.Status, job.Conclusion, job.RunnerName, repo, lockedBy})
t.AppendSeparator()
}
fmt.Println(t.Render())

View file

@ -122,6 +122,7 @@ type JobsStore interface {
DeleteJob(ctx context.Context, jobID int64) error
UnlockJob(ctx context.Context, jobID int64, entityID string) error
LockJob(ctx context.Context, jobID int64, entityID string) error
BreakLockJobIsQueued(ctx context.Context, jobID int64) error
DeleteCompletedJobs(ctx context.Context) error
}

View file

@ -16,12 +16,14 @@ package sql
import (
"context"
"encoding/json"
runnerErrors "github.com/cloudbase/garm/errors"
"github.com/cloudbase/garm/params"
"github.com/google/uuid"
"github.com/pkg/errors"
"gorm.io/datatypes"
"gorm.io/gorm"
"gorm.io/gorm/clause"
)
@ -32,6 +34,14 @@ func (s *sqlDatabase) CreateInstance(ctx context.Context, poolID string, param p
return params.Instance{}, errors.Wrap(err, "fetching pool")
}
var labels datatypes.JSON
if len(param.AditionalLabels) > 0 {
labels, err = json.Marshal(param.AditionalLabels)
if err != nil {
return params.Instance{}, errors.Wrap(err, "marshalling labels")
}
}
newInstance := Instance{
Pool: pool,
Name: param.Name,
@ -42,6 +52,7 @@ func (s *sqlDatabase) CreateInstance(ctx context.Context, poolID string, param p
CallbackURL: param.CallbackURL,
MetadataURL: param.MetadataURL,
GitHubRunnerGroup: param.GitHubRunnerGroup,
AditionalLabels: labels,
}
q := s.conn.Create(&newInstance)
if q.Error != nil {

View file

@ -119,6 +119,30 @@ func (s *sqlDatabase) LockJob(ctx context.Context, jobID int64, entityID string)
return nil
}
func (s *sqlDatabase) BreakLockJobIsQueued(ctx context.Context, jobID int64) error {
var workflowJob WorkflowJob
q := s.conn.Clauses(clause.Locking{Strength: "UPDATE"}).Where("id = ? and status = ?", jobID, params.JobStatusQueued).First(&workflowJob)
if q.Error != nil {
if errors.Is(q.Error, gorm.ErrRecordNotFound) {
return nil
}
return errors.Wrap(q.Error, "fetching job")
}
if workflowJob.LockedBy == uuid.Nil {
// Job is already unlocked.
return nil
}
workflowJob.LockedBy = uuid.Nil
if err := s.conn.Save(&workflowJob).Error; err != nil {
return errors.Wrap(err, "saving job")
}
return nil
}
func (s *sqlDatabase) UnlockJob(ctx context.Context, jobID int64, entityID string) error {
var workflowJob WorkflowJob
q := s.conn.Clauses(clause.Locking{Strength: "UPDATE"}).Where("id = ?", jobID).First(&workflowJob)
@ -140,7 +164,6 @@ func (s *sqlDatabase) UnlockJob(ctx context.Context, jobID int64, entityID strin
}
workflowJob.LockedBy = uuid.Nil
if err := s.conn.Save(&workflowJob).Error; err != nil {
return errors.Wrap(err, "saving job")
}

View file

@ -153,6 +153,7 @@ type Instance struct {
CreateAttempt int
TokenFetched bool
GitHubRunnerGroup string
AditionalLabels datatypes.JSON
PoolID uuid.UUID
Pool Pool `gorm:"foreignKey:PoolID"`

View file

@ -32,6 +32,9 @@ func (s *sqlDatabase) sqlToParamsInstance(instance Instance) params.Instance {
if instance.ProviderID != nil {
id = *instance.ProviderID
}
var labels []string
_ = json.Unmarshal(instance.AditionalLabels, &labels)
ret := params.Instance{
ID: instance.ID.String(),
ProviderID: id,
@ -51,6 +54,7 @@ func (s *sqlDatabase) sqlToParamsInstance(instance Instance) params.Instance {
UpdatedAt: instance.UpdatedAt,
TokenFetched: instance.TokenFetched,
GitHubRunnerGroup: instance.GitHubRunnerGroup,
AditionalLabels: labels,
}
if len(instance.ProviderFault) > 0 {

View file

@ -156,10 +156,11 @@ type Instance struct {
GitHubRunnerGroup string `json:"github-runner-group"`
// Do not serialize sensitive info.
CallbackURL string `json:"-"`
MetadataURL string `json:"-"`
CreateAttempt int `json:"-"`
TokenFetched bool `json:"-"`
CallbackURL string `json:"-"`
MetadataURL string `json:"-"`
CreateAttempt int `json:"-"`
TokenFetched bool `json:"-"`
AditionalLabels []string `json:"-"`
}
func (i Instance) GetName() string {

View file

@ -136,6 +136,7 @@ type CreateInstanceParams struct {
// The runner group must be created by someone with access to the enterprise.
GitHubRunnerGroup string
CreateAttempt int `json:"-"`
AditionalLabels []string
}
type CreatePoolParams struct {

View file

@ -20,6 +20,7 @@ import (
"log"
"math"
"net/http"
"strconv"
"strings"
"sync"
"time"
@ -41,6 +42,13 @@ import (
var (
poolIDLabelprefix = "runner-pool-id:"
controllerLabelPrefix = "runner-controller-id:"
// We tag runners that have been spawned as a result of a queued job with the job ID
// that spawned them. There is no way to guarantee that the runner spawned in response to a particular
// job, will be picked up by that job. We mark them so as in the very likely event that the runner
// has picked up a different job, we can clear the lock on the job that spaned it.
// The job it picked up would already be transitioned to in_progress so it will be ignored by the
// consume loop.
jobLabelPrefix = "in_response_to_job:"
)
const (
@ -104,14 +112,27 @@ func (r *basePoolManager) HandleWorkflowJob(job params.WorkflowJob) error {
var jobParams params.Job
var err error
var triggeredBy int64
defer func() {
// we're updating the job in the database, regardless of whether it was successful or not.
// or if it was meant for this pool or not. Github will send the same job data to all hierarchies
// that have been configured to work with garm. Updating the job at all levels should yield the same
// outcome in the db.
if jobParams.ID != 0 {
if _, jobErr := r.store.CreateOrUpdateJob(r.ctx, jobParams); jobErr != nil {
log.Printf("failed to update job %d: %s", jobParams.ID, jobErr)
if jobParams.ID == 0 {
return
}
if _, jobErr := r.store.CreateOrUpdateJob(r.ctx, jobParams); jobErr != nil {
log.Printf("failed to update job %d: %s", jobParams.ID, jobErr)
}
if triggeredBy != 0 && jobParams.ID != triggeredBy {
// The triggeredBy value is only set by the "in_progress" webhook. The runner that
// transitioned to in_progress was created as a result of a different queued job. If that job is
// still queued and we don't remove the lock, it will linger until the lock timeout is reached.
// That may take a long time, so we break the lock here and allow it to be scheduled again.
if err := r.store.BreakLockJobIsQueued(r.ctx, triggeredBy); err != nil {
log.Printf("failed to break lock for job %d: %s", triggeredBy, err)
}
}
}()
@ -175,6 +196,8 @@ func (r *basePoolManager) HandleWorkflowJob(job params.WorkflowJob) error {
r.log("failed to update runner %s status: %s", util.SanitizeLogEntry(jobParams.RunnerName), err)
return errors.Wrap(err, "updating runner")
}
// Set triggeredBy here so we break the lock on any potential queued job.
triggeredBy = jobIdFromLabels(instance.AditionalLabels)
// A runner has picked up the job, and is now running it. It may need to be replaced if the pool has
// a minimum number of idle runners configured.
@ -189,6 +212,19 @@ func (r *basePoolManager) HandleWorkflowJob(job params.WorkflowJob) error {
return nil
}
func jobIdFromLabels(labels []string) int64 {
for _, lbl := range labels {
if strings.HasPrefix(lbl, jobLabelPrefix) {
jobId, err := strconv.ParseInt(lbl[len(jobLabelPrefix):], 10, 64)
if err != nil {
return 0
}
return jobId
}
}
return 0
}
func (r *basePoolManager) startLoopForFunction(f func() error, interval time.Duration, name string, alwaysRun bool) {
r.log("starting %s loop for %s", name, r.helper.String())
ticker := time.NewTicker(interval)
@ -619,7 +655,7 @@ func (r *basePoolManager) setInstanceStatus(runnerName string, status providerCo
return instance, nil
}
func (r *basePoolManager) AddRunner(ctx context.Context, poolID string) error {
func (r *basePoolManager) AddRunner(ctx context.Context, poolID string, aditionalLabels []string) error {
pool, err := r.helper.GetPoolByID(poolID)
if err != nil {
return errors.Wrap(err, "fetching pool")
@ -637,6 +673,7 @@ func (r *basePoolManager) AddRunner(ctx context.Context, poolID string) error {
MetadataURL: r.helper.GetMetadataURL(),
CreateAttempt: 1,
GitHubRunnerGroup: pool.GitHubRunnerGroup,
AditionalLabels: aditionalLabels,
}
_, err = r.store.CreateInstance(r.ctx, poolID, createParams)
@ -690,6 +727,10 @@ func (r *basePoolManager) addInstanceToProvider(instance params.Instance) error
labels = append(labels, r.controllerLabel())
labels = append(labels, r.poolLabel(pool.ID))
if len(instance.AditionalLabels) > 0 {
labels = append(labels, instance.AditionalLabels...)
}
jwtValidity := pool.RunnerTimeout()
entity := r.helper.String()
@ -796,6 +837,11 @@ func (r *basePoolManager) getRunnerDetailsFromJob(job params.WorkflowJob) (param
// decissions based on the status of saved jobs. A "queued" job will prompt garm to search for an appropriate pool
// and spin up a runner there if no other idle runner exists to pick it up.
func (r *basePoolManager) paramsWorkflowJobToParamsJob(job params.WorkflowJob) (params.Job, error) {
asUUID, err := uuid.Parse(r.ID())
if err != nil {
return params.Job{}, errors.Wrap(err, "parsing pool ID as UUID")
}
jobParams := params.Job{
ID: job.WorkflowJob.ID,
Action: job.Action,
@ -815,24 +861,21 @@ func (r *basePoolManager) paramsWorkflowJobToParamsJob(job params.WorkflowJob) (
runnerName := job.WorkflowJob.RunnerName
if job.Action != "queued" && runnerName == "" {
// Runner name was not set in WorkflowJob by github. We can still attempt to fetch the info we need,
// using the workflow run ID, from the API.
// We may still get no runner name. In situations such as jobs being cancelled before a runner had the chance
// to pick up the job, the runner name is not available from the API.
runnerInfo, err := r.getRunnerDetailsFromJob(job)
if err != nil && !errors.Is(err, runnerErrors.ErrNotFound) {
return jobParams, errors.Wrap(err, "fetching runner details")
if job.WorkflowJob.Conclusion != "skipped" && job.WorkflowJob.Conclusion != "canceled" {
// Runner name was not set in WorkflowJob by github. We can still attempt to fetch the info we need,
// using the workflow run ID, from the API.
// We may still get no runner name. In situations such as jobs being cancelled before a runner had the chance
// to pick up the job, the runner name is not available from the API.
runnerInfo, err := r.getRunnerDetailsFromJob(job)
if err != nil && !errors.Is(err, runnerErrors.ErrNotFound) {
return jobParams, errors.Wrap(err, "fetching runner details")
}
runnerName = runnerInfo.Name
}
runnerName = runnerInfo.Name
}
jobParams.RunnerName = runnerName
asUUID, err := uuid.Parse(r.ID())
if err != nil {
return jobParams, errors.Wrap(err, "parsing pool ID as UUID")
}
switch r.helper.PoolType() {
case params.EnterprisePool:
jobParams.EnterpriseID = asUUID
@ -884,9 +927,7 @@ func (r *basePoolManager) scaleDownOnePool(ctx context.Context, pool params.Pool
// consideration for scale-down. The 5 minute grace period prevents a situation where a
// "queued" workflow triggers the creation of a new idle runner, and this routine reaps
// an idle runner before they have a chance to pick up a job.
if providerCommon.RunnerStatus(inst.RunnerStatus) == providerCommon.RunnerIdle &&
providerCommon.InstanceStatus(inst.Status) == providerCommon.InstanceRunning &&
time.Since(inst.UpdatedAt).Minutes() > 5 {
if inst.RunnerStatus == providerCommon.RunnerIdle && inst.Status == providerCommon.InstanceRunning && time.Since(inst.UpdatedAt).Minutes() > 2 {
idleWorkers = append(idleWorkers, inst)
}
}
@ -935,9 +976,9 @@ func (r *basePoolManager) scaleDownOnePool(ctx context.Context, pool params.Pool
return nil
}
func (r *basePoolManager) addRunnerToPool(pool params.Pool) error {
func (r *basePoolManager) addRunnerToPool(pool params.Pool, aditionalLabels []string) error {
if !pool.Enabled {
return nil
return fmt.Errorf("pool %s is disabled", pool.ID)
}
poolInstanceCount, err := r.store.PoolInstanceCount(r.ctx, pool.ID)
@ -949,7 +990,7 @@ func (r *basePoolManager) addRunnerToPool(pool params.Pool) error {
return fmt.Errorf("max workers (%d) reached for pool %s", pool.MaxRunners, pool.ID)
}
if err := r.AddRunner(r.ctx, pool.ID); err != nil {
if err := r.AddRunner(r.ctx, pool.ID, aditionalLabels); err != nil {
return fmt.Errorf("failed to add new instance for pool %s: %s", pool.ID, err)
}
return nil
@ -993,7 +1034,7 @@ func (r *basePoolManager) ensureIdleRunnersForOnePool(pool params.Pool) error {
for i := 0; i < required; i++ {
r.log("adding new idle worker to pool %s", pool.ID)
if err := r.AddRunner(r.ctx, pool.ID); err != nil {
if err := r.AddRunner(r.ctx, pool.ID, nil); err != nil {
return fmt.Errorf("failed to add new instance for pool %s: %w", pool.ID, err)
}
}
@ -1416,14 +1457,14 @@ func (r *basePoolManager) consumeQueuedJobs() error {
continue
}
if time.Since(job.CreatedAt) < time.Second*30 {
if time.Since(job.UpdatedAt) < time.Second*30 {
// give the idle runners a chance to pick up the job.
log.Printf("job %d was created less than 30 seconds ago. Skipping", job.ID)
log.Printf("job %d was updated less than 30 seconds ago. Skipping", job.ID)
continue
}
if time.Since(job.UpdatedAt) >= time.Minute*5 {
// Job has been in queued state for 5 minutes or more. Check if it was consumed by another runner.
if time.Since(job.UpdatedAt) >= time.Minute*10 {
// Job has been in queued state for 10 minutes or more. Check if it was consumed by another runner.
workflow, ghResp, err := r.helper.GithubCLI().GetWorkflowJobByID(r.ctx, job.RepositoryOwner, job.RepositoryName, job.ID)
if err != nil {
if ghResp != nil {
@ -1497,9 +1538,13 @@ func (r *basePoolManager) consumeQueuedJobs() error {
log.Printf("[Pool mgr ID %s] could not lock job %d: %s", r.ID(), job.ID, err)
continue
}
jobLabels := []string{
fmt.Sprintf("%s%d", jobLabelPrefix, job.ID),
}
for _, pool := range potentialPools {
log.Printf("attempting to create a runner in pool %s for job %d", pool.ID, job.ID)
if err := r.addRunnerToPool(pool); err != nil {
if err := r.addRunnerToPool(pool, jobLabels); err != nil {
log.Printf("could not add runner to pool %s: %s", pool.ID, err)
continue
}