From a984782fd73eae3f0ef4dffede1f901a4437e331 Mon Sep 17 00:00:00 2001 From: Gabriel Adrian Samfira Date: Fri, 18 Jul 2025 07:51:50 +0000 Subject: [PATCH] Handle new jobID for scale sets There seems to be a change in the scale set message. It now includes a jobID and sets the runner request ID to 0. This change adds separate job ID fields for workflow jobs and scaleset jobs. Signed-off-by: Gabriel Adrian Samfira --- database/sql/jobs.go | 48 ++++++++++++++++++++++----- database/sql/models.go | 6 ++++ database/sql/sql.go | 29 ++++++++++------ params/github.go | 4 +-- params/params.go | 4 +++ runner/pool/pool.go | 44 ++++++++++++------------ workers/scaleset/scaleset_helper.go | 3 +- workers/scaleset/scaleset_listener.go | 28 ++++++++-------- 8 files changed, 107 insertions(+), 59 deletions(-) diff --git a/database/sql/jobs.go b/database/sql/jobs.go index 1215e3f3..ff19394f 100644 --- a/database/sql/jobs.go +++ b/database/sql/jobs.go @@ -41,6 +41,8 @@ func sqlWorkflowJobToParamsJob(job WorkflowJob) (params.Job, error) { jobParam := params.Job{ ID: job.ID, + WorkflowJobID: job.WorkflowJobID, + ScaleSetJobID: job.ScaleSetJobID, RunID: job.RunID, Action: job.Action, Status: job.Status, @@ -75,7 +77,8 @@ func (s *sqlDatabase) paramsJobToWorkflowJob(ctx context.Context, job params.Job } workflofJob := WorkflowJob{ - ID: job.ID, + ScaleSetJobID: job.ScaleSetJobID, + WorkflowJobID: job.ID, RunID: job.RunID, Action: job.Action, Status: job.Status, @@ -109,14 +112,27 @@ func (s *sqlDatabase) paramsJobToWorkflowJob(ctx context.Context, job params.Job } func (s *sqlDatabase) DeleteJob(_ context.Context, jobID int64) (err error) { + var workflowJob WorkflowJob + q := s.conn.Where("workflow_job_id = ?", jobID).Preload("Instance").First(&workflowJob) + if q.Error != nil { + if errors.Is(q.Error, gorm.ErrRecordNotFound) { + return nil + } + return errors.Wrap(q.Error, "fetching job") + } + removedJob, err := sqlWorkflowJobToParamsJob(workflowJob) + if err != nil { + return errors.Wrap(err, "converting job") + } + defer func() { if err == nil { - if notifyErr := s.sendNotify(common.JobEntityType, common.DeleteOperation, params.Job{ID: jobID}); notifyErr != nil { + if notifyErr := s.sendNotify(common.JobEntityType, common.DeleteOperation, removedJob); notifyErr != nil { slog.With(slog.Any("error", notifyErr)).Error("failed to send notify") } } }() - q := s.conn.Delete(&WorkflowJob{}, jobID) + q = s.conn.Delete(&workflowJob) if q.Error != nil { if errors.Is(q.Error, gorm.ErrRecordNotFound) { return nil @@ -132,7 +148,7 @@ func (s *sqlDatabase) LockJob(_ context.Context, jobID int64, entityID string) e return errors.Wrap(err, "parsing entity id") } var workflowJob WorkflowJob - q := s.conn.Clauses(clause.Locking{Strength: "UPDATE"}).Preload("Instance").Where("id = ?", jobID).First(&workflowJob) + q := s.conn.Preload("Instance").Where("id = ?", jobID).First(&workflowJob) if q.Error != nil { if errors.Is(q.Error, gorm.ErrRecordNotFound) { @@ -167,7 +183,7 @@ func (s *sqlDatabase) LockJob(_ context.Context, jobID int64, entityID string) e func (s *sqlDatabase) BreakLockJobIsQueued(_ context.Context, jobID int64) (err error) { var workflowJob WorkflowJob - q := s.conn.Clauses(clause.Locking{Strength: "UPDATE"}).Preload("Instance").Where("id = ? and status = ?", jobID, params.JobStatusQueued).First(&workflowJob) + q := s.conn.Clauses(clause.Locking{Strength: "UPDATE"}).Preload("Instance").Where("workflow_job_id = ? and status = ?", jobID, params.JobStatusQueued).First(&workflowJob) if q.Error != nil { if errors.Is(q.Error, gorm.ErrRecordNotFound) { @@ -195,7 +211,7 @@ func (s *sqlDatabase) BreakLockJobIsQueued(_ context.Context, jobID int64) (err func (s *sqlDatabase) UnlockJob(_ context.Context, jobID int64, entityID string) error { var workflowJob WorkflowJob - q := s.conn.Clauses(clause.Locking{Strength: "UPDATE"}).Where("id = ?", jobID).First(&workflowJob) + q := s.conn.Clauses(clause.Locking{Strength: "UPDATE"}).Where("workflow_job_id = ?", jobID).First(&workflowJob) if q.Error != nil { if errors.Is(q.Error, gorm.ErrRecordNotFound) { @@ -229,7 +245,14 @@ func (s *sqlDatabase) UnlockJob(_ context.Context, jobID int64, entityID string) func (s *sqlDatabase) CreateOrUpdateJob(ctx context.Context, job params.Job) (params.Job, error) { var workflowJob WorkflowJob var err error - q := s.conn.Clauses(clause.Locking{Strength: "UPDATE"}).Preload("Instance").Where("id = ?", job.ID).First(&workflowJob) + + searchField := "workflow_job_id = ?" + var searchVal any = job.ID + if job.ScaleSetJobID != "" { + searchField = "scale_set_job_id = ?" + searchVal = job.ScaleSetJobID + } + q := s.conn.Preload("Instance").Where(searchField, searchVal).First(&workflowJob) if q.Error != nil { if !errors.Is(q.Error, gorm.ErrRecordNotFound) { @@ -249,6 +272,9 @@ func (s *sqlDatabase) CreateOrUpdateJob(ctx context.Context, job params.Job) (pa workflowJob.GithubRunnerID = job.GithubRunnerID workflowJob.RunnerGroupID = job.RunnerGroupID workflowJob.RunnerGroupName = job.RunnerGroupName + if job.RunID != 0 && workflowJob.RunID == 0 { + workflowJob.RunID = job.RunID + } if job.LockedBy != uuid.Nil { workflowJob.LockedBy = job.LockedBy @@ -327,7 +353,11 @@ func (s *sqlDatabase) ListEntityJobsByStatus(_ context.Context, entityType param } var jobs []WorkflowJob - query := s.conn.Model(&WorkflowJob{}).Preload("Instance").Where("status = ?", status) + query := s.conn. + Model(&WorkflowJob{}). + Preload("Instance"). + Where("status = ?", status). + Where("workflow_job_id > 0") switch entityType { case params.ForgeEntityTypeOrganization: @@ -381,7 +411,7 @@ func (s *sqlDatabase) ListAllJobs(_ context.Context) ([]params.Job, error) { // GetJobByID gets a job by id. func (s *sqlDatabase) GetJobByID(_ context.Context, jobID int64) (params.Job, error) { var job WorkflowJob - query := s.conn.Model(&WorkflowJob{}).Preload("Instance").Where("id = ?", jobID) + query := s.conn.Model(&WorkflowJob{}).Preload("Instance").Where("workflow_job_id = ?", jobID) if err := query.First(&job); err.Error != nil { if errors.Is(err.Error, gorm.ErrRecordNotFound) { diff --git a/database/sql/models.go b/database/sql/models.go index 4cdb9b8b..8944dee1 100644 --- a/database/sql/models.go +++ b/database/sql/models.go @@ -319,6 +319,12 @@ type User struct { type WorkflowJob struct { // ID is the ID of the job. ID int64 `gorm:"index"` + + // WorkflowJobID is the ID of the workflow job. + WorkflowJobID int64 `gorm:"index:workflow_job_id_idx"` + // ScaleSetJobID is the job ID for a scaleset job. + ScaleSetJobID string `gorm:"index:scaleset_job_id_idx"` + // RunID is the ID of the workflow run. A run may have multiple jobs. RunID int64 // Action is the specific activity that triggered the event. diff --git a/database/sql/sql.go b/database/sql/sql.go index d6e60586..16411364 100644 --- a/database/sql/sql.go +++ b/database/sql/sql.go @@ -374,6 +374,22 @@ func (s *sqlDatabase) migrateCredentialsToDB() (err error) { return nil } +func (s *sqlDatabase) migrateWorkflow() error { + if s.conn.Migrator().HasTable(&WorkflowJob{}) { + if s.conn.Migrator().HasColumn(&WorkflowJob{}, "runner_name") { + // Remove jobs that are not in "queued" status. We really only care about queued jobs. Once they transition + // to something else, we don't really consume them anyway. + if err := s.conn.Exec("delete from workflow_jobs where status is not 'queued'").Error; err != nil { + return errors.Wrap(err, "updating workflow_jobs") + } + if err := s.conn.Migrator().DropColumn(&WorkflowJob{}, "runner_name"); err != nil { + return errors.Wrap(err, "updating workflow_jobs") + } + } + } + return nil +} + func (s *sqlDatabase) migrateDB() error { if s.conn.Migrator().HasIndex(&Organization{}, "idx_organizations_name") { if err := s.conn.Migrator().DropIndex(&Organization{}, "idx_organizations_name"); err != nil { @@ -405,17 +421,8 @@ func (s *sqlDatabase) migrateDB() error { } } - if s.conn.Migrator().HasTable(&WorkflowJob{}) { - if s.conn.Migrator().HasColumn(&WorkflowJob{}, "runner_name") { - // Remove jobs that are not in "queued" status. We really only care about queued jobs. Once they transition - // to something else, we don't really consume them anyway. - if err := s.conn.Exec("delete from workflow_jobs where status is not 'queued'").Error; err != nil { - return errors.Wrap(err, "updating workflow_jobs") - } - if err := s.conn.Migrator().DropColumn(&WorkflowJob{}, "runner_name"); err != nil { - return errors.Wrap(err, "updating workflow_jobs") - } - } + if err := s.migrateWorkflow(); err != nil { + return errors.Wrap(err, "migrating workflows") } if s.conn.Migrator().HasTable(&GithubEndpoint{}) { diff --git a/params/github.go b/params/github.go index 9859f717..cb68d880 100644 --- a/params/github.go +++ b/params/github.go @@ -420,7 +420,6 @@ func (r RunnerScaleSetMessage) GetJobsFromBody() ([]ScaleSetJobMessage, error) { if r.Body == "" { return nil, fmt.Errorf("no body specified") } - if err := json.Unmarshal([]byte(r.Body), &body); err != nil { return nil, fmt.Errorf("failed to unmarshal body: %w", err) } @@ -519,6 +518,7 @@ type RunnerGroupList struct { type ScaleSetJobMessage struct { MessageType string `json:"messageType,omitempty"` + JobID string `json:"jobId,omitempty"` RunnerRequestID int64 `json:"runnerRequestId,omitempty"` RepositoryName string `json:"repositoryName,omitempty"` OwnerName string `json:"ownerName,omitempty"` @@ -552,7 +552,7 @@ func (s ScaleSetJobMessage) MessageTypeToStatus() JobStatus { func (s ScaleSetJobMessage) ToJob() Job { return Job{ - ID: s.RunnerRequestID, + ScaleSetJobID: s.JobID, Action: s.EventName, RunID: s.WorkflowRunID, Status: string(s.MessageTypeToStatus()), diff --git a/params/params.go b/params/params.go index 6e1bc1aa..9cd4fc83 100644 --- a/params/params.go +++ b/params/params.go @@ -1035,6 +1035,10 @@ func (p RunnerPrefix) GetRunnerPrefix() string { type Job struct { // ID is the ID of the job. ID int64 `json:"id,omitempty"` + + WorkflowJobID int64 `json:"workflow_job_id,omitempty"` + // ScaleSetJobID is the job ID when generated for a scale set. + ScaleSetJobID string `json:"scaleset_job_id,omitempty"` // RunID is the ID of the workflow run. A run may have multiple jobs. RunID int64 `json:"run_id,omitempty"` // Action is the specific activity that triggered the event. diff --git a/runner/pool/pool.go b/runner/pool/pool.go index c161e41d..1afee56e 100644 --- a/runner/pool/pool.go +++ b/runner/pool/pool.go @@ -176,19 +176,19 @@ func (r *basePoolManager) HandleWorkflowJob(job params.WorkflowJob) error { var triggeredBy int64 defer func() { - if jobParams.ID == 0 { + if jobParams.WorkflowJobID == 0 { return } // we're updating the job in the database, regardless of whether it was successful or not. // or if it was meant for this pool or not. Github will send the same job data to all hierarchies // that have been configured to work with garm. Updating the job at all levels should yield the same // outcome in the db. - _, err := r.store.GetJobByID(r.ctx, jobParams.ID) + _, err := r.store.GetJobByID(r.ctx, jobParams.WorkflowJobID) if err != nil { if !errors.Is(err, runnerErrors.ErrNotFound) { slog.With(slog.Any("error", err)).ErrorContext( r.ctx, "failed to get job", - "job_id", jobParams.ID) + "job_id", jobParams.WorkflowJobID) return } // This job is new to us. Check if we have a pool that can handle it. @@ -203,10 +203,10 @@ func (r *basePoolManager) HandleWorkflowJob(job params.WorkflowJob) error { if _, jobErr := r.store.CreateOrUpdateJob(r.ctx, jobParams); jobErr != nil { slog.With(slog.Any("error", jobErr)).ErrorContext( - r.ctx, "failed to update job", "job_id", jobParams.ID) + r.ctx, "failed to update job", "job_id", jobParams.WorkflowJobID) } - if triggeredBy != 0 && jobParams.ID != triggeredBy { + if triggeredBy != 0 && jobParams.WorkflowJobID != triggeredBy { // The triggeredBy value is only set by the "in_progress" webhook. The runner that // transitioned to in_progress was created as a result of a different queued job. If that job is // still queued and we don't remove the lock, it will linger until the lock timeout is reached. @@ -970,7 +970,7 @@ func (r *basePoolManager) paramsWorkflowJobToParamsJob(job params.WorkflowJob) ( } jobParams := params.Job{ - ID: job.WorkflowJob.ID, + WorkflowJobID: job.WorkflowJob.ID, Action: job.Action, RunID: job.WorkflowJob.RunID, Status: job.WorkflowJob.Status, @@ -1106,10 +1106,10 @@ func (r *basePoolManager) scaleDownOnePool(ctx context.Context, pool params.Pool for _, job := range queued { if time.Since(job.CreatedAt).Minutes() > 10 && pool.HasRequiredLabels(job.Labels) { - if err := r.store.DeleteJob(ctx, job.ID); err != nil && !errors.Is(err, runnerErrors.ErrNotFound) { + if err := r.store.DeleteJob(ctx, job.WorkflowJobID); err != nil && !errors.Is(err, runnerErrors.ErrNotFound) { slog.With(slog.Any("error", err)).ErrorContext( ctx, "failed to delete job", - "job_id", job.ID) + "job_id", job.WorkflowJobID) } } } @@ -1760,7 +1760,7 @@ func (r *basePoolManager) consumeQueuedJobs() error { // Job was handled by us or another entity. slog.DebugContext( r.ctx, "job is locked", - "job_id", job.ID, + "job_id", job.WorkflowJobID, "locking_entity", job.LockedBy.String()) continue } @@ -1769,7 +1769,7 @@ func (r *basePoolManager) consumeQueuedJobs() error { // give the idle runners a chance to pick up the job. slog.DebugContext( r.ctx, "job backoff not reached", "backoff_interval", r.controllerInfo.MinimumJobAgeBackoff, - "job_id", job.ID) + "job_id", job.WorkflowJobID) continue } @@ -1777,12 +1777,12 @@ func (r *basePoolManager) consumeQueuedJobs() error { // Job is still queued in our db, 10 minutes after a matching runner // was spawned. Unlock it and try again. A different job may have picked up // the runner. - if err := r.store.UnlockJob(r.ctx, job.ID, r.ID()); err != nil { + if err := r.store.UnlockJob(r.ctx, job.WorkflowJobID, r.ID()); err != nil { // nolint:golangci-lint,godox // TODO: Implament a cache? Should we return here? slog.With(slog.Any("error", err)).ErrorContext( r.ctx, "failed to unlock job", - "job_id", job.ID) + "job_id", job.WorkflowJobID) continue } } @@ -1795,7 +1795,7 @@ func (r *basePoolManager) consumeQueuedJobs() error { // runner. slog.DebugContext( r.ctx, "job is locked by us", - "job_id", job.ID) + "job_id", job.WorkflowJobID) continue } @@ -1816,29 +1816,29 @@ func (r *basePoolManager) consumeQueuedJobs() error { } runnerCreated := false - if err := r.store.LockJob(r.ctx, job.ID, r.ID()); err != nil { + if err := r.store.LockJob(r.ctx, job.WorkflowJobID, r.ID()); err != nil { slog.With(slog.Any("error", err)).ErrorContext( r.ctx, "could not lock job", - "job_id", job.ID) + "job_id", job.WorkflowJobID) continue } jobLabels := []string{ - fmt.Sprintf("%s=%d", jobLabelPrefix, job.ID), + fmt.Sprintf("%s=%d", jobLabelPrefix, job.WorkflowJobID), } for i := 0; i < poolRR.Len(); i++ { pool, err := poolRR.Next() if err != nil { slog.With(slog.Any("error", err)).ErrorContext( r.ctx, "could not find a pool to create a runner for job", - "job_id", job.ID) + "job_id", job.WorkflowJobID) break } slog.InfoContext( r.ctx, "attempting to create a runner in pool", "pool_id", pool.ID, - "job_id", job.ID) + "job_id", job.WorkflowJobID) if err := r.addRunnerToPool(pool, jobLabels); err != nil { slog.With(slog.Any("error", err)).ErrorContext( r.ctx, "could not add runner to pool", @@ -1847,7 +1847,7 @@ func (r *basePoolManager) consumeQueuedJobs() error { } slog.DebugContext(r.ctx, "a new runner was added as a response to queued job", "pool_id", pool.ID, - "job_id", job.ID) + "job_id", job.WorkflowJobID) runnerCreated = true break } @@ -1855,11 +1855,11 @@ func (r *basePoolManager) consumeQueuedJobs() error { if !runnerCreated { slog.WarnContext( r.ctx, "could not create a runner for job; unlocking", - "job_id", job.ID) - if err := r.store.UnlockJob(r.ctx, job.ID, r.ID()); err != nil { + "job_id", job.WorkflowJobID) + if err := r.store.UnlockJob(r.ctx, job.WorkflowJobID, r.ID()); err != nil { slog.With(slog.Any("error", err)).ErrorContext( r.ctx, "failed to unlock job", - "job_id", job.ID) + "job_id", job.WorkflowJobID) return errors.Wrap(err, "unlocking job") } } diff --git a/workers/scaleset/scaleset_helper.go b/workers/scaleset/scaleset_helper.go index 7b3fdf03..c04c92a2 100644 --- a/workers/scaleset/scaleset_helper.go +++ b/workers/scaleset/scaleset_helper.go @@ -80,7 +80,7 @@ func (w *Worker) recordOrUpdateJob(job params.ScaleSetJobMessage) error { case params.ForgeEntityTypeOrganization: jobParams.OrgID = &asUUID default: - return fmt.Errorf("unknown entity type: %s", entity.EntityType) + return fmt.Errorf("unknown entity type: %s --> %s", entity.EntityType, entity) } if _, jobErr := w.store.CreateOrUpdateJob(w.ctx, jobParams); jobErr != nil { @@ -163,6 +163,7 @@ func (w *Worker) HandleJobsStarted(jobs []params.ScaleSetJobMessage) (err error) } func (w *Worker) HandleJobsAvailable(jobs []params.ScaleSetJobMessage) error { + slog.DebugContext(w.ctx, "handling jobs available", "jobs", jobs) for _, job := range jobs { if err := w.recordOrUpdateJob(job); err != nil { // recording scale set jobs are purely informational for now. diff --git a/workers/scaleset/scaleset_listener.go b/workers/scaleset/scaleset_listener.go index 1274ee59..7808f9f6 100644 --- a/workers/scaleset/scaleset_listener.go +++ b/workers/scaleset/scaleset_listener.go @@ -150,28 +150,22 @@ func (l *scaleSetListener) handleSessionMessage(msg params.RunnerScaleSetMessage for _, job := range body { switch job.MessageType { case params.MessageTypeJobAssigned: - slog.InfoContext(l.ctx, "new job assigned", "job_id", job.RunnerRequestID, "job_name", job.JobDisplayName) + slog.InfoContext(l.ctx, "new job assigned", "job_id", job.JobID, "job_name", job.JobDisplayName) assignedJobs = append(assignedJobs, job) case params.MessageTypeJobStarted: - slog.InfoContext(l.ctx, "job started", "job_id", job.RunnerRequestID, "job_name", job.JobDisplayName, "runner_name", job.RunnerName) + slog.InfoContext(l.ctx, "job started", "job_id", job.JobID, "job_name", job.JobDisplayName, "runner_name", job.RunnerName) startedJobs = append(startedJobs, job) case params.MessageTypeJobCompleted: - slog.InfoContext(l.ctx, "job completed", "job_id", job.RunnerRequestID, "job_name", job.JobDisplayName, "runner_name", job.RunnerName) + slog.InfoContext(l.ctx, "job completed", "job_id", job.JobID, "job_name", job.JobDisplayName, "runner_name", job.RunnerName) completedJobs = append(completedJobs, job) case params.MessageTypeJobAvailable: - slog.InfoContext(l.ctx, "job available", "job_id", job.RunnerRequestID, "job_name", job.JobDisplayName) + slog.InfoContext(l.ctx, "job available", "job_id", job.JobID, "job_name", job.JobDisplayName) availableJobs = append(availableJobs, job) default: slog.DebugContext(l.ctx, "unknown message type", "message_type", job.MessageType) } } - if len(assignedJobs) > 0 { - if err := l.scaleSetHelper.HandleJobsAvailable(assignedJobs); err != nil { - slog.ErrorContext(l.ctx, "error handling available jobs", "error", err) - } - } - scaleSetClient, err := l.scaleSetHelper.GetScaleSetClient() if err != nil { slog.ErrorContext(l.ctx, "getting scale set client", "error", err) @@ -198,10 +192,9 @@ func (l *scaleSetListener) handleSessionMessage(msg params.RunnerScaleSetMessage slog.DebugContext(l.ctx, "acquired jobs", "job_ids", idsAcquired) } - if len(completedJobs) > 0 { - if err := l.scaleSetHelper.HandleJobsCompleted(completedJobs); err != nil { - slog.ErrorContext(l.ctx, "error handling completed jobs", "error", err) - return + if len(assignedJobs) > 0 { + if err := l.scaleSetHelper.HandleJobsAvailable(assignedJobs); err != nil { + slog.ErrorContext(l.ctx, "error handling available jobs", "error", err) } } @@ -212,6 +205,13 @@ func (l *scaleSetListener) handleSessionMessage(msg params.RunnerScaleSetMessage } } + if len(completedJobs) > 0 { + if err := l.scaleSetHelper.HandleJobsCompleted(completedJobs); err != nil { + slog.ErrorContext(l.ctx, "error handling completed jobs", "error", err) + return + } + } + if err := l.scaleSetHelper.SetLastMessageID(msg.MessageID); err != nil { slog.ErrorContext(l.ctx, "setting last message ID", "error", err) } else {