From 3ceb2f7ebb8fede7111b59dfbd9949e6f73edc56 Mon Sep 17 00:00:00 2001 From: Gabriel Adrian Samfira Date: Sat, 14 Feb 2026 18:15:11 +0200 Subject: [PATCH] Add workflow and scaleset job ID to metrics and fixes This change adds workflow job ID, scaleset job ID and workflow run ID to the metrics. This change also attempts to fix how jobs are recorded when a workflow is posted by a webhook, but the job is handled by a scale set. Signed-off-by: Gabriel Adrian Samfira --- cmd/garm-cli/cmd/jobs.go | 4 ++-- metrics/jobs.go | 15 +++++++++++++-- runner/metrics/jobs.go | 17 ++++++++++------- runner/pool/pool.go | 30 ++++++++++++++++++++++++++++-- 4 files changed, 53 insertions(+), 13 deletions(-) diff --git a/cmd/garm-cli/cmd/jobs.go b/cmd/garm-cli/cmd/jobs.go index 1ce372cb..30612d19 100644 --- a/cmd/garm-cli/cmd/jobs.go +++ b/cmd/garm-cli/cmd/jobs.go @@ -63,7 +63,7 @@ func formatJobs(jobs []params.Job) { return } t := table.NewWriter() - header := table.Row{"ID", "Name", "Status", "Conclusion", "Runner Name", "Repository", "Requested Labels", "Locked by"} + header := table.Row{"Workflow Job ID", "Scale Set Job ID", "Name", "Status", "Conclusion", "Runner Name", "Repository", "Requested Labels", "Locked by"} t.AppendHeader(header) for _, job := range jobs { @@ -72,7 +72,7 @@ func formatJobs(jobs []params.Job) { if job.LockedBy != uuid.Nil { lockedBy = job.LockedBy.String() } - t.AppendRow(table.Row{job.ID, job.Name, job.Status, job.Conclusion, job.RunnerName, repo, strings.Join(job.Labels, " "), lockedBy}) + t.AppendRow(table.Row{job.WorkflowJobID, job.ScaleSetJobID, job.Name, job.Status, job.Conclusion, job.RunnerName, repo, strings.Join(job.Labels, " "), lockedBy}) t.AppendSeparator() } fmt.Println(t.Render()) diff --git a/metrics/jobs.go b/metrics/jobs.go index 38d32903..7883b41d 100644 --- a/metrics/jobs.go +++ b/metrics/jobs.go @@ -22,5 +22,16 @@ var JobStatus = prometheus.NewGaugeVec(prometheus.GaugeOpts{ Namespace: metricsNamespace, Subsystem: metricsJobsSubsystem, Name: "status", - Help: "List of jobs and their status", -}, []string{"job_id", "name", "status", "conclusion", "runner_name", "repository", "requested_labels"}) + Help: "List of workflow jobs and their status", +}, []string{ + "job_id", + "workflow_job_id", + "scaleset_job_id", + "workflow_run_id", + "name", + "status", + "conclusion", + "runner_name", + "repository", + "requested_labels", +}) diff --git a/runner/metrics/jobs.go b/runner/metrics/jobs.go index 33b51ab2..ed665f13 100644 --- a/runner/metrics/jobs.go +++ b/runner/metrics/jobs.go @@ -35,13 +35,16 @@ func CollectJobMetric(ctx context.Context, r *runner.Runner) error { for _, job := range jobs { metrics.JobStatus.WithLabelValues( - fmt.Sprintf("%d", job.ID), // label: job_id - job.Name, // label: name - job.Status, // label: status - job.Conclusion, // label: conclusion - job.RunnerName, // label: runner_name - job.RepositoryName, // label: repository - strings.Join(job.Labels, " "), // label: requested_labels + fmt.Sprintf("%d", job.ID), // label: job_id + fmt.Sprintf("%d", job.WorkflowJobID), // label: workflow_job_id + job.ScaleSetJobID, // label: scaleset_job_id + fmt.Sprintf("%d", job.WorkflowJobID), // label: scaleset_job_id + job.Name, // label: name + job.Status, // label: status + job.Conclusion, // label: conclusion + job.RunnerName, // label: runner_name + job.RepositoryName, // label: repository + strings.Join(job.Labels, " "), // label: requested_labels ).Set(1) } return nil diff --git a/runner/pool/pool.go b/runner/pool/pool.go index 11b3b8a2..52f5133b 100644 --- a/runner/pool/pool.go +++ b/runner/pool/pool.go @@ -330,6 +330,11 @@ func (r *basePoolManager) handleCompletedJob(jobParams params.Job) error { // handleInProgressJob processes an in-progress job webhook func (r *basePoolManager) handleInProgressJob(jobParams params.Job) (triggeredBy int64, err error) { + if jobParams.RunnerName == "" { + slog.DebugContext(r.ctx, "instance not found in job data", "workflow_job_id", jobParams.ID) + return 0, nil + } + // Mark runner as active (this also validates the instance exists) instance, err := r.setInstanceRunnerStatus(jobParams.RunnerName, params.RunnerActive) if err != nil { @@ -393,6 +398,27 @@ func (r *basePoolManager) HandleWorkflowJob(job params.WorkflowJob) error { return fmt.Errorf("error converting job to params: %w", err) } + // For in_progress/completed jobs, check if the runner belongs to a scale set. + // Scale set jobs are handled by the scale set listener, not by webhooks. + if job.Action == "in_progress" || job.Action == "completed" { + if jobParams.RunnerName != "" { + instance, err := r.store.GetInstance(r.ctx, jobParams.RunnerName) + if err == nil && instance.ScaleSetID != 0 { + slog.DebugContext(r.ctx, "job belongs to a scale set instance, skipping webhook processing", + "runner_name", util.SanitizeLogEntry(jobParams.RunnerName), + "scale_set_id", instance.ScaleSetID) + // Clean up any orphaned queued job that may have been recorded + // via webhook before we knew it belonged to a scale set. + if err := r.store.DeleteJob(r.ctx, jobParams.WorkflowJobID); err != nil && !errors.Is(err, runnerErrors.ErrNotFound) { + slog.With(slog.Any("error", err)).ErrorContext( + r.ctx, "failed to delete orphaned webhook job for scale set instance", + "job_id", jobParams.WorkflowJobID) + } + return nil + } + } + } + // Process job based on action type var triggeredBy int64 var actionErr error @@ -400,10 +426,10 @@ func (r *basePoolManager) HandleWorkflowJob(job params.WorkflowJob) error { switch job.Action { case "queued": // Queued jobs are just recorded; they'll be picked up by consumeQueuedJobs() - case "completed": - actionErr = r.handleCompletedJob(jobParams) case "in_progress": triggeredBy, actionErr = r.handleInProgressJob(jobParams) + case "completed": + actionErr = r.handleCompletedJob(jobParams) } // Always persist job to database (success or failure)