Handle new jobID for scale sets

There seems to be a change in the scale set message. It now includes
a jobID and sets the runner request ID to 0. This change adds separate
job ID fields for workflow jobs and scaleset jobs.

Signed-off-by: Gabriel Adrian Samfira <gsamfira@cloudbasesolutions.com>
This commit is contained in:
Gabriel Adrian Samfira 2025-07-18 07:51:50 +00:00
parent d26973da2a
commit a984782fd7
8 changed files with 107 additions and 59 deletions

View file

@ -41,6 +41,8 @@ func sqlWorkflowJobToParamsJob(job WorkflowJob) (params.Job, error) {
jobParam := params.Job{
ID: job.ID,
WorkflowJobID: job.WorkflowJobID,
ScaleSetJobID: job.ScaleSetJobID,
RunID: job.RunID,
Action: job.Action,
Status: job.Status,
@ -75,7 +77,8 @@ func (s *sqlDatabase) paramsJobToWorkflowJob(ctx context.Context, job params.Job
}
workflofJob := WorkflowJob{
ID: job.ID,
ScaleSetJobID: job.ScaleSetJobID,
WorkflowJobID: job.ID,
RunID: job.RunID,
Action: job.Action,
Status: job.Status,
@ -109,14 +112,27 @@ func (s *sqlDatabase) paramsJobToWorkflowJob(ctx context.Context, job params.Job
}
func (s *sqlDatabase) DeleteJob(_ context.Context, jobID int64) (err error) {
var workflowJob WorkflowJob
q := s.conn.Where("workflow_job_id = ?", jobID).Preload("Instance").First(&workflowJob)
if q.Error != nil {
if errors.Is(q.Error, gorm.ErrRecordNotFound) {
return nil
}
return errors.Wrap(q.Error, "fetching job")
}
removedJob, err := sqlWorkflowJobToParamsJob(workflowJob)
if err != nil {
return errors.Wrap(err, "converting job")
}
defer func() {
if err == nil {
if notifyErr := s.sendNotify(common.JobEntityType, common.DeleteOperation, params.Job{ID: jobID}); notifyErr != nil {
if notifyErr := s.sendNotify(common.JobEntityType, common.DeleteOperation, removedJob); notifyErr != nil {
slog.With(slog.Any("error", notifyErr)).Error("failed to send notify")
}
}
}()
q := s.conn.Delete(&WorkflowJob{}, jobID)
q = s.conn.Delete(&workflowJob)
if q.Error != nil {
if errors.Is(q.Error, gorm.ErrRecordNotFound) {
return nil
@ -132,7 +148,7 @@ func (s *sqlDatabase) LockJob(_ context.Context, jobID int64, entityID string) e
return errors.Wrap(err, "parsing entity id")
}
var workflowJob WorkflowJob
q := s.conn.Clauses(clause.Locking{Strength: "UPDATE"}).Preload("Instance").Where("id = ?", jobID).First(&workflowJob)
q := s.conn.Preload("Instance").Where("id = ?", jobID).First(&workflowJob)
if q.Error != nil {
if errors.Is(q.Error, gorm.ErrRecordNotFound) {
@ -167,7 +183,7 @@ func (s *sqlDatabase) LockJob(_ context.Context, jobID int64, entityID string) e
func (s *sqlDatabase) BreakLockJobIsQueued(_ context.Context, jobID int64) (err error) {
var workflowJob WorkflowJob
q := s.conn.Clauses(clause.Locking{Strength: "UPDATE"}).Preload("Instance").Where("id = ? and status = ?", jobID, params.JobStatusQueued).First(&workflowJob)
q := s.conn.Clauses(clause.Locking{Strength: "UPDATE"}).Preload("Instance").Where("workflow_job_id = ? and status = ?", jobID, params.JobStatusQueued).First(&workflowJob)
if q.Error != nil {
if errors.Is(q.Error, gorm.ErrRecordNotFound) {
@ -195,7 +211,7 @@ func (s *sqlDatabase) BreakLockJobIsQueued(_ context.Context, jobID int64) (err
func (s *sqlDatabase) UnlockJob(_ context.Context, jobID int64, entityID string) error {
var workflowJob WorkflowJob
q := s.conn.Clauses(clause.Locking{Strength: "UPDATE"}).Where("id = ?", jobID).First(&workflowJob)
q := s.conn.Clauses(clause.Locking{Strength: "UPDATE"}).Where("workflow_job_id = ?", jobID).First(&workflowJob)
if q.Error != nil {
if errors.Is(q.Error, gorm.ErrRecordNotFound) {
@ -229,7 +245,14 @@ func (s *sqlDatabase) UnlockJob(_ context.Context, jobID int64, entityID string)
func (s *sqlDatabase) CreateOrUpdateJob(ctx context.Context, job params.Job) (params.Job, error) {
var workflowJob WorkflowJob
var err error
q := s.conn.Clauses(clause.Locking{Strength: "UPDATE"}).Preload("Instance").Where("id = ?", job.ID).First(&workflowJob)
searchField := "workflow_job_id = ?"
var searchVal any = job.ID
if job.ScaleSetJobID != "" {
searchField = "scale_set_job_id = ?"
searchVal = job.ScaleSetJobID
}
q := s.conn.Preload("Instance").Where(searchField, searchVal).First(&workflowJob)
if q.Error != nil {
if !errors.Is(q.Error, gorm.ErrRecordNotFound) {
@ -249,6 +272,9 @@ func (s *sqlDatabase) CreateOrUpdateJob(ctx context.Context, job params.Job) (pa
workflowJob.GithubRunnerID = job.GithubRunnerID
workflowJob.RunnerGroupID = job.RunnerGroupID
workflowJob.RunnerGroupName = job.RunnerGroupName
if job.RunID != 0 && workflowJob.RunID == 0 {
workflowJob.RunID = job.RunID
}
if job.LockedBy != uuid.Nil {
workflowJob.LockedBy = job.LockedBy
@ -327,7 +353,11 @@ func (s *sqlDatabase) ListEntityJobsByStatus(_ context.Context, entityType param
}
var jobs []WorkflowJob
query := s.conn.Model(&WorkflowJob{}).Preload("Instance").Where("status = ?", status)
query := s.conn.
Model(&WorkflowJob{}).
Preload("Instance").
Where("status = ?", status).
Where("workflow_job_id > 0")
switch entityType {
case params.ForgeEntityTypeOrganization:
@ -381,7 +411,7 @@ func (s *sqlDatabase) ListAllJobs(_ context.Context) ([]params.Job, error) {
// GetJobByID gets a job by id.
func (s *sqlDatabase) GetJobByID(_ context.Context, jobID int64) (params.Job, error) {
var job WorkflowJob
query := s.conn.Model(&WorkflowJob{}).Preload("Instance").Where("id = ?", jobID)
query := s.conn.Model(&WorkflowJob{}).Preload("Instance").Where("workflow_job_id = ?", jobID)
if err := query.First(&job); err.Error != nil {
if errors.Is(err.Error, gorm.ErrRecordNotFound) {

View file

@ -319,6 +319,12 @@ type User struct {
type WorkflowJob struct {
// ID is the ID of the job.
ID int64 `gorm:"index"`
// WorkflowJobID is the ID of the workflow job.
WorkflowJobID int64 `gorm:"index:workflow_job_id_idx"`
// ScaleSetJobID is the job ID for a scaleset job.
ScaleSetJobID string `gorm:"index:scaleset_job_id_idx"`
// RunID is the ID of the workflow run. A run may have multiple jobs.
RunID int64
// Action is the specific activity that triggered the event.

View file

@ -374,6 +374,22 @@ func (s *sqlDatabase) migrateCredentialsToDB() (err error) {
return nil
}
func (s *sqlDatabase) migrateWorkflow() error {
if s.conn.Migrator().HasTable(&WorkflowJob{}) {
if s.conn.Migrator().HasColumn(&WorkflowJob{}, "runner_name") {
// Remove jobs that are not in "queued" status. We really only care about queued jobs. Once they transition
// to something else, we don't really consume them anyway.
if err := s.conn.Exec("delete from workflow_jobs where status is not 'queued'").Error; err != nil {
return errors.Wrap(err, "updating workflow_jobs")
}
if err := s.conn.Migrator().DropColumn(&WorkflowJob{}, "runner_name"); err != nil {
return errors.Wrap(err, "updating workflow_jobs")
}
}
}
return nil
}
func (s *sqlDatabase) migrateDB() error {
if s.conn.Migrator().HasIndex(&Organization{}, "idx_organizations_name") {
if err := s.conn.Migrator().DropIndex(&Organization{}, "idx_organizations_name"); err != nil {
@ -405,17 +421,8 @@ func (s *sqlDatabase) migrateDB() error {
}
}
if s.conn.Migrator().HasTable(&WorkflowJob{}) {
if s.conn.Migrator().HasColumn(&WorkflowJob{}, "runner_name") {
// Remove jobs that are not in "queued" status. We really only care about queued jobs. Once they transition
// to something else, we don't really consume them anyway.
if err := s.conn.Exec("delete from workflow_jobs where status is not 'queued'").Error; err != nil {
return errors.Wrap(err, "updating workflow_jobs")
}
if err := s.conn.Migrator().DropColumn(&WorkflowJob{}, "runner_name"); err != nil {
return errors.Wrap(err, "updating workflow_jobs")
}
}
if err := s.migrateWorkflow(); err != nil {
return errors.Wrap(err, "migrating workflows")
}
if s.conn.Migrator().HasTable(&GithubEndpoint{}) {

View file

@ -420,7 +420,6 @@ func (r RunnerScaleSetMessage) GetJobsFromBody() ([]ScaleSetJobMessage, error) {
if r.Body == "" {
return nil, fmt.Errorf("no body specified")
}
if err := json.Unmarshal([]byte(r.Body), &body); err != nil {
return nil, fmt.Errorf("failed to unmarshal body: %w", err)
}
@ -519,6 +518,7 @@ type RunnerGroupList struct {
type ScaleSetJobMessage struct {
MessageType string `json:"messageType,omitempty"`
JobID string `json:"jobId,omitempty"`
RunnerRequestID int64 `json:"runnerRequestId,omitempty"`
RepositoryName string `json:"repositoryName,omitempty"`
OwnerName string `json:"ownerName,omitempty"`
@ -552,7 +552,7 @@ func (s ScaleSetJobMessage) MessageTypeToStatus() JobStatus {
func (s ScaleSetJobMessage) ToJob() Job {
return Job{
ID: s.RunnerRequestID,
ScaleSetJobID: s.JobID,
Action: s.EventName,
RunID: s.WorkflowRunID,
Status: string(s.MessageTypeToStatus()),

View file

@ -1035,6 +1035,10 @@ func (p RunnerPrefix) GetRunnerPrefix() string {
type Job struct {
// ID is the ID of the job.
ID int64 `json:"id,omitempty"`
WorkflowJobID int64 `json:"workflow_job_id,omitempty"`
// ScaleSetJobID is the job ID when generated for a scale set.
ScaleSetJobID string `json:"scaleset_job_id,omitempty"`
// RunID is the ID of the workflow run. A run may have multiple jobs.
RunID int64 `json:"run_id,omitempty"`
// Action is the specific activity that triggered the event.

View file

@ -176,19 +176,19 @@ func (r *basePoolManager) HandleWorkflowJob(job params.WorkflowJob) error {
var triggeredBy int64
defer func() {
if jobParams.ID == 0 {
if jobParams.WorkflowJobID == 0 {
return
}
// we're updating the job in the database, regardless of whether it was successful or not.
// or if it was meant for this pool or not. Github will send the same job data to all hierarchies
// that have been configured to work with garm. Updating the job at all levels should yield the same
// outcome in the db.
_, err := r.store.GetJobByID(r.ctx, jobParams.ID)
_, err := r.store.GetJobByID(r.ctx, jobParams.WorkflowJobID)
if err != nil {
if !errors.Is(err, runnerErrors.ErrNotFound) {
slog.With(slog.Any("error", err)).ErrorContext(
r.ctx, "failed to get job",
"job_id", jobParams.ID)
"job_id", jobParams.WorkflowJobID)
return
}
// This job is new to us. Check if we have a pool that can handle it.
@ -203,10 +203,10 @@ func (r *basePoolManager) HandleWorkflowJob(job params.WorkflowJob) error {
if _, jobErr := r.store.CreateOrUpdateJob(r.ctx, jobParams); jobErr != nil {
slog.With(slog.Any("error", jobErr)).ErrorContext(
r.ctx, "failed to update job", "job_id", jobParams.ID)
r.ctx, "failed to update job", "job_id", jobParams.WorkflowJobID)
}
if triggeredBy != 0 && jobParams.ID != triggeredBy {
if triggeredBy != 0 && jobParams.WorkflowJobID != triggeredBy {
// The triggeredBy value is only set by the "in_progress" webhook. The runner that
// transitioned to in_progress was created as a result of a different queued job. If that job is
// still queued and we don't remove the lock, it will linger until the lock timeout is reached.
@ -970,7 +970,7 @@ func (r *basePoolManager) paramsWorkflowJobToParamsJob(job params.WorkflowJob) (
}
jobParams := params.Job{
ID: job.WorkflowJob.ID,
WorkflowJobID: job.WorkflowJob.ID,
Action: job.Action,
RunID: job.WorkflowJob.RunID,
Status: job.WorkflowJob.Status,
@ -1106,10 +1106,10 @@ func (r *basePoolManager) scaleDownOnePool(ctx context.Context, pool params.Pool
for _, job := range queued {
if time.Since(job.CreatedAt).Minutes() > 10 && pool.HasRequiredLabels(job.Labels) {
if err := r.store.DeleteJob(ctx, job.ID); err != nil && !errors.Is(err, runnerErrors.ErrNotFound) {
if err := r.store.DeleteJob(ctx, job.WorkflowJobID); err != nil && !errors.Is(err, runnerErrors.ErrNotFound) {
slog.With(slog.Any("error", err)).ErrorContext(
ctx, "failed to delete job",
"job_id", job.ID)
"job_id", job.WorkflowJobID)
}
}
}
@ -1760,7 +1760,7 @@ func (r *basePoolManager) consumeQueuedJobs() error {
// Job was handled by us or another entity.
slog.DebugContext(
r.ctx, "job is locked",
"job_id", job.ID,
"job_id", job.WorkflowJobID,
"locking_entity", job.LockedBy.String())
continue
}
@ -1769,7 +1769,7 @@ func (r *basePoolManager) consumeQueuedJobs() error {
// give the idle runners a chance to pick up the job.
slog.DebugContext(
r.ctx, "job backoff not reached", "backoff_interval", r.controllerInfo.MinimumJobAgeBackoff,
"job_id", job.ID)
"job_id", job.WorkflowJobID)
continue
}
@ -1777,12 +1777,12 @@ func (r *basePoolManager) consumeQueuedJobs() error {
// Job is still queued in our db, 10 minutes after a matching runner
// was spawned. Unlock it and try again. A different job may have picked up
// the runner.
if err := r.store.UnlockJob(r.ctx, job.ID, r.ID()); err != nil {
if err := r.store.UnlockJob(r.ctx, job.WorkflowJobID, r.ID()); err != nil {
// nolint:golangci-lint,godox
// TODO: Implament a cache? Should we return here?
slog.With(slog.Any("error", err)).ErrorContext(
r.ctx, "failed to unlock job",
"job_id", job.ID)
"job_id", job.WorkflowJobID)
continue
}
}
@ -1795,7 +1795,7 @@ func (r *basePoolManager) consumeQueuedJobs() error {
// runner.
slog.DebugContext(
r.ctx, "job is locked by us",
"job_id", job.ID)
"job_id", job.WorkflowJobID)
continue
}
@ -1816,29 +1816,29 @@ func (r *basePoolManager) consumeQueuedJobs() error {
}
runnerCreated := false
if err := r.store.LockJob(r.ctx, job.ID, r.ID()); err != nil {
if err := r.store.LockJob(r.ctx, job.WorkflowJobID, r.ID()); err != nil {
slog.With(slog.Any("error", err)).ErrorContext(
r.ctx, "could not lock job",
"job_id", job.ID)
"job_id", job.WorkflowJobID)
continue
}
jobLabels := []string{
fmt.Sprintf("%s=%d", jobLabelPrefix, job.ID),
fmt.Sprintf("%s=%d", jobLabelPrefix, job.WorkflowJobID),
}
for i := 0; i < poolRR.Len(); i++ {
pool, err := poolRR.Next()
if err != nil {
slog.With(slog.Any("error", err)).ErrorContext(
r.ctx, "could not find a pool to create a runner for job",
"job_id", job.ID)
"job_id", job.WorkflowJobID)
break
}
slog.InfoContext(
r.ctx, "attempting to create a runner in pool",
"pool_id", pool.ID,
"job_id", job.ID)
"job_id", job.WorkflowJobID)
if err := r.addRunnerToPool(pool, jobLabels); err != nil {
slog.With(slog.Any("error", err)).ErrorContext(
r.ctx, "could not add runner to pool",
@ -1847,7 +1847,7 @@ func (r *basePoolManager) consumeQueuedJobs() error {
}
slog.DebugContext(r.ctx, "a new runner was added as a response to queued job",
"pool_id", pool.ID,
"job_id", job.ID)
"job_id", job.WorkflowJobID)
runnerCreated = true
break
}
@ -1855,11 +1855,11 @@ func (r *basePoolManager) consumeQueuedJobs() error {
if !runnerCreated {
slog.WarnContext(
r.ctx, "could not create a runner for job; unlocking",
"job_id", job.ID)
if err := r.store.UnlockJob(r.ctx, job.ID, r.ID()); err != nil {
"job_id", job.WorkflowJobID)
if err := r.store.UnlockJob(r.ctx, job.WorkflowJobID, r.ID()); err != nil {
slog.With(slog.Any("error", err)).ErrorContext(
r.ctx, "failed to unlock job",
"job_id", job.ID)
"job_id", job.WorkflowJobID)
return errors.Wrap(err, "unlocking job")
}
}

View file

@ -80,7 +80,7 @@ func (w *Worker) recordOrUpdateJob(job params.ScaleSetJobMessage) error {
case params.ForgeEntityTypeOrganization:
jobParams.OrgID = &asUUID
default:
return fmt.Errorf("unknown entity type: %s", entity.EntityType)
return fmt.Errorf("unknown entity type: %s --> %s", entity.EntityType, entity)
}
if _, jobErr := w.store.CreateOrUpdateJob(w.ctx, jobParams); jobErr != nil {
@ -163,6 +163,7 @@ func (w *Worker) HandleJobsStarted(jobs []params.ScaleSetJobMessage) (err error)
}
func (w *Worker) HandleJobsAvailable(jobs []params.ScaleSetJobMessage) error {
slog.DebugContext(w.ctx, "handling jobs available", "jobs", jobs)
for _, job := range jobs {
if err := w.recordOrUpdateJob(job); err != nil {
// recording scale set jobs are purely informational for now.

View file

@ -150,28 +150,22 @@ func (l *scaleSetListener) handleSessionMessage(msg params.RunnerScaleSetMessage
for _, job := range body {
switch job.MessageType {
case params.MessageTypeJobAssigned:
slog.InfoContext(l.ctx, "new job assigned", "job_id", job.RunnerRequestID, "job_name", job.JobDisplayName)
slog.InfoContext(l.ctx, "new job assigned", "job_id", job.JobID, "job_name", job.JobDisplayName)
assignedJobs = append(assignedJobs, job)
case params.MessageTypeJobStarted:
slog.InfoContext(l.ctx, "job started", "job_id", job.RunnerRequestID, "job_name", job.JobDisplayName, "runner_name", job.RunnerName)
slog.InfoContext(l.ctx, "job started", "job_id", job.JobID, "job_name", job.JobDisplayName, "runner_name", job.RunnerName)
startedJobs = append(startedJobs, job)
case params.MessageTypeJobCompleted:
slog.InfoContext(l.ctx, "job completed", "job_id", job.RunnerRequestID, "job_name", job.JobDisplayName, "runner_name", job.RunnerName)
slog.InfoContext(l.ctx, "job completed", "job_id", job.JobID, "job_name", job.JobDisplayName, "runner_name", job.RunnerName)
completedJobs = append(completedJobs, job)
case params.MessageTypeJobAvailable:
slog.InfoContext(l.ctx, "job available", "job_id", job.RunnerRequestID, "job_name", job.JobDisplayName)
slog.InfoContext(l.ctx, "job available", "job_id", job.JobID, "job_name", job.JobDisplayName)
availableJobs = append(availableJobs, job)
default:
slog.DebugContext(l.ctx, "unknown message type", "message_type", job.MessageType)
}
}
if len(assignedJobs) > 0 {
if err := l.scaleSetHelper.HandleJobsAvailable(assignedJobs); err != nil {
slog.ErrorContext(l.ctx, "error handling available jobs", "error", err)
}
}
scaleSetClient, err := l.scaleSetHelper.GetScaleSetClient()
if err != nil {
slog.ErrorContext(l.ctx, "getting scale set client", "error", err)
@ -198,10 +192,9 @@ func (l *scaleSetListener) handleSessionMessage(msg params.RunnerScaleSetMessage
slog.DebugContext(l.ctx, "acquired jobs", "job_ids", idsAcquired)
}
if len(completedJobs) > 0 {
if err := l.scaleSetHelper.HandleJobsCompleted(completedJobs); err != nil {
slog.ErrorContext(l.ctx, "error handling completed jobs", "error", err)
return
if len(assignedJobs) > 0 {
if err := l.scaleSetHelper.HandleJobsAvailable(assignedJobs); err != nil {
slog.ErrorContext(l.ctx, "error handling available jobs", "error", err)
}
}
@ -212,6 +205,13 @@ func (l *scaleSetListener) handleSessionMessage(msg params.RunnerScaleSetMessage
}
}
if len(completedJobs) > 0 {
if err := l.scaleSetHelper.HandleJobsCompleted(completedJobs); err != nil {
slog.ErrorContext(l.ctx, "error handling completed jobs", "error", err)
return
}
}
if err := l.scaleSetHelper.SetLastMessageID(msg.MessageID); err != nil {
slog.ErrorContext(l.ctx, "setting last message ID", "error", err)
} else {