diff --git a/cmd/garm-cli/cmd/jobs.go b/cmd/garm-cli/cmd/jobs.go index 1ce372cb..30612d19 100644 --- a/cmd/garm-cli/cmd/jobs.go +++ b/cmd/garm-cli/cmd/jobs.go @@ -63,7 +63,7 @@ func formatJobs(jobs []params.Job) { return } t := table.NewWriter() - header := table.Row{"ID", "Name", "Status", "Conclusion", "Runner Name", "Repository", "Requested Labels", "Locked by"} + header := table.Row{"Workflow Job ID", "Scale Set Job ID", "Name", "Status", "Conclusion", "Runner Name", "Repository", "Requested Labels", "Locked by"} t.AppendHeader(header) for _, job := range jobs { @@ -72,7 +72,7 @@ func formatJobs(jobs []params.Job) { if job.LockedBy != uuid.Nil { lockedBy = job.LockedBy.String() } - t.AppendRow(table.Row{job.ID, job.Name, job.Status, job.Conclusion, job.RunnerName, repo, strings.Join(job.Labels, " "), lockedBy}) + t.AppendRow(table.Row{job.WorkflowJobID, job.ScaleSetJobID, job.Name, job.Status, job.Conclusion, job.RunnerName, repo, strings.Join(job.Labels, " "), lockedBy}) t.AppendSeparator() } fmt.Println(t.Render()) diff --git a/metrics/jobs.go b/metrics/jobs.go index 38d32903..7883b41d 100644 --- a/metrics/jobs.go +++ b/metrics/jobs.go @@ -22,5 +22,16 @@ var JobStatus = prometheus.NewGaugeVec(prometheus.GaugeOpts{ Namespace: metricsNamespace, Subsystem: metricsJobsSubsystem, Name: "status", - Help: "List of jobs and their status", -}, []string{"job_id", "name", "status", "conclusion", "runner_name", "repository", "requested_labels"}) + Help: "List of workflow jobs and their status", +}, []string{ + "job_id", + "workflow_job_id", + "scaleset_job_id", + "workflow_run_id", + "name", + "status", + "conclusion", + "runner_name", + "repository", + "requested_labels", +}) diff --git a/runner/metrics/jobs.go b/runner/metrics/jobs.go index 33b51ab2..ed665f13 100644 --- a/runner/metrics/jobs.go +++ b/runner/metrics/jobs.go @@ -35,13 +35,16 @@ func CollectJobMetric(ctx context.Context, r *runner.Runner) error { for _, job := range jobs { metrics.JobStatus.WithLabelValues( - fmt.Sprintf("%d", job.ID), // label: job_id - job.Name, // label: name - job.Status, // label: status - job.Conclusion, // label: conclusion - job.RunnerName, // label: runner_name - job.RepositoryName, // label: repository - strings.Join(job.Labels, " "), // label: requested_labels + fmt.Sprintf("%d", job.ID), // label: job_id + fmt.Sprintf("%d", job.WorkflowJobID), // label: workflow_job_id + job.ScaleSetJobID, // label: scaleset_job_id + fmt.Sprintf("%d", job.WorkflowJobID), // label: scaleset_job_id + job.Name, // label: name + job.Status, // label: status + job.Conclusion, // label: conclusion + job.RunnerName, // label: runner_name + job.RepositoryName, // label: repository + strings.Join(job.Labels, " "), // label: requested_labels ).Set(1) } return nil diff --git a/runner/pool/pool.go b/runner/pool/pool.go index 11b3b8a2..52f5133b 100644 --- a/runner/pool/pool.go +++ b/runner/pool/pool.go @@ -330,6 +330,11 @@ func (r *basePoolManager) handleCompletedJob(jobParams params.Job) error { // handleInProgressJob processes an in-progress job webhook func (r *basePoolManager) handleInProgressJob(jobParams params.Job) (triggeredBy int64, err error) { + if jobParams.RunnerName == "" { + slog.DebugContext(r.ctx, "instance not found in job data", "workflow_job_id", jobParams.ID) + return 0, nil + } + // Mark runner as active (this also validates the instance exists) instance, err := r.setInstanceRunnerStatus(jobParams.RunnerName, params.RunnerActive) if err != nil { @@ -393,6 +398,27 @@ func (r *basePoolManager) HandleWorkflowJob(job params.WorkflowJob) error { return fmt.Errorf("error converting job to params: %w", err) } + // For in_progress/completed jobs, check if the runner belongs to a scale set. + // Scale set jobs are handled by the scale set listener, not by webhooks. + if job.Action == "in_progress" || job.Action == "completed" { + if jobParams.RunnerName != "" { + instance, err := r.store.GetInstance(r.ctx, jobParams.RunnerName) + if err == nil && instance.ScaleSetID != 0 { + slog.DebugContext(r.ctx, "job belongs to a scale set instance, skipping webhook processing", + "runner_name", util.SanitizeLogEntry(jobParams.RunnerName), + "scale_set_id", instance.ScaleSetID) + // Clean up any orphaned queued job that may have been recorded + // via webhook before we knew it belonged to a scale set. + if err := r.store.DeleteJob(r.ctx, jobParams.WorkflowJobID); err != nil && !errors.Is(err, runnerErrors.ErrNotFound) { + slog.With(slog.Any("error", err)).ErrorContext( + r.ctx, "failed to delete orphaned webhook job for scale set instance", + "job_id", jobParams.WorkflowJobID) + } + return nil + } + } + } + // Process job based on action type var triggeredBy int64 var actionErr error @@ -400,10 +426,10 @@ func (r *basePoolManager) HandleWorkflowJob(job params.WorkflowJob) error { switch job.Action { case "queued": // Queued jobs are just recorded; they'll be picked up by consumeQueuedJobs() - case "completed": - actionErr = r.handleCompletedJob(jobParams) case "in_progress": triggeredBy, actionErr = r.handleInProgressJob(jobParams) + case "completed": + actionErr = r.handleCompletedJob(jobParams) } // Always persist job to database (success or failure)