From ec0fd6e3f83e6b2f33c115365992170ce30b4c19 Mon Sep 17 00:00:00 2001 From: Gabriel Adrian Samfira Date: Tue, 10 Feb 2026 14:08:44 +0200 Subject: [PATCH] Delete all inactionable jobs GARM cares about jobs in queued state for anything that requires decision making. Anything else is purely informational. This change cleans up all inactionable jobs and refuses to record jobs that are not already in the database, have an inactionable state and which do not have a runner we own handling them. Signed-off-by: Gabriel Adrian Samfira --- database/common/mocks/Store.go | 92 ++++----- database/common/store.go | 2 +- database/sql/jobs.go | 23 ++- database/sql/jobs_test.go | 189 ++++++++++++++++++ database/sql/models.go | 4 +- doc/config.md | 7 +- metrics/jobs.go | 2 +- runner/pool/pool.go | 351 +++++++++++++++++++++------------ util/github/client.go | 4 +- 9 files changed, 487 insertions(+), 187 deletions(-) create mode 100644 database/sql/jobs_test.go diff --git a/database/common/mocks/Store.go b/database/common/mocks/Store.go index 353afc70..613a8685 100644 --- a/database/common/mocks/Store.go +++ b/database/common/mocks/Store.go @@ -1104,52 +1104,6 @@ func (_c *Store_CreateUser_Call) RunAndReturn(run func(context.Context, params.N return _c } -// DeleteCompletedJobs provides a mock function with given fields: ctx -func (_m *Store) DeleteCompletedJobs(ctx context.Context) error { - ret := _m.Called(ctx) - - if len(ret) == 0 { - panic("no return value specified for DeleteCompletedJobs") - } - - var r0 error - if rf, ok := ret.Get(0).(func(context.Context) error); ok { - r0 = rf(ctx) - } else { - r0 = ret.Error(0) - } - - return r0 -} - -// Store_DeleteCompletedJobs_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DeleteCompletedJobs' -type Store_DeleteCompletedJobs_Call struct { - *mock.Call -} - -// DeleteCompletedJobs is a helper method to define mock.On call -// - ctx context.Context -func (_e *Store_Expecter) DeleteCompletedJobs(ctx interface{}) *Store_DeleteCompletedJobs_Call { - return &Store_DeleteCompletedJobs_Call{Call: _e.mock.On("DeleteCompletedJobs", ctx)} -} - -func (_c *Store_DeleteCompletedJobs_Call) Run(run func(ctx context.Context)) *Store_DeleteCompletedJobs_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context)) - }) - return _c -} - -func (_c *Store_DeleteCompletedJobs_Call) Return(_a0 error) *Store_DeleteCompletedJobs_Call { - _c.Call.Return(_a0) - return _c -} - -func (_c *Store_DeleteCompletedJobs_Call) RunAndReturn(run func(context.Context) error) *Store_DeleteCompletedJobs_Call { - _c.Call.Return(run) - return _c -} - // DeleteEnterprise provides a mock function with given fields: ctx, enterpriseID func (_m *Store) DeleteEnterprise(ctx context.Context, enterpriseID string) error { ret := _m.Called(ctx, enterpriseID) @@ -1537,6 +1491,52 @@ func (_c *Store_DeleteGithubEndpoint_Call) RunAndReturn(run func(context.Context return _c } +// DeleteInactionableJobs provides a mock function with given fields: ctx +func (_m *Store) DeleteInactionableJobs(ctx context.Context) error { + ret := _m.Called(ctx) + + if len(ret) == 0 { + panic("no return value specified for DeleteInactionableJobs") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context) error); ok { + r0 = rf(ctx) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Store_DeleteInactionableJobs_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DeleteInactionableJobs' +type Store_DeleteInactionableJobs_Call struct { + *mock.Call +} + +// DeleteInactionableJobs is a helper method to define mock.On call +// - ctx context.Context +func (_e *Store_Expecter) DeleteInactionableJobs(ctx interface{}) *Store_DeleteInactionableJobs_Call { + return &Store_DeleteInactionableJobs_Call{Call: _e.mock.On("DeleteInactionableJobs", ctx)} +} + +func (_c *Store_DeleteInactionableJobs_Call) Run(run func(ctx context.Context)) *Store_DeleteInactionableJobs_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context)) + }) + return _c +} + +func (_c *Store_DeleteInactionableJobs_Call) Return(_a0 error) *Store_DeleteInactionableJobs_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *Store_DeleteInactionableJobs_Call) RunAndReturn(run func(context.Context) error) *Store_DeleteInactionableJobs_Call { + _c.Call.Return(run) + return _c +} + // DeleteInstance provides a mock function with given fields: ctx, poolID, instanceNameOrID func (_m *Store) DeleteInstance(ctx context.Context, poolID string, instanceNameOrID string) error { ret := _m.Called(ctx, poolID, instanceNameOrID) diff --git a/database/common/store.go b/database/common/store.go index f5bf73c2..db7a190d 100644 --- a/database/common/store.go +++ b/database/common/store.go @@ -119,7 +119,7 @@ type JobsStore interface { LockJob(ctx context.Context, jobID int64, entityID string) error BreakLockJobIsQueued(ctx context.Context, jobID int64) error - DeleteCompletedJobs(ctx context.Context) error + DeleteInactionableJobs(ctx context.Context) error } type EntityPoolStore interface { diff --git a/database/sql/jobs.go b/database/sql/jobs.go index edb82831..0845188d 100644 --- a/database/sql/jobs.go +++ b/database/sql/jobs.go @@ -424,16 +424,19 @@ func (s *sqlDatabase) GetJobByID(_ context.Context, jobID int64) (params.Job, er return sqlWorkflowJobToParamsJob(job) } -// DeleteCompletedJobs deletes all completed jobs. -func (s *sqlDatabase) DeleteCompletedJobs(_ context.Context) error { - query := s.conn.Model(&WorkflowJob{}).Where("status = ?", params.JobStatusCompleted) - - if err := query.Unscoped().Delete(&WorkflowJob{}); err.Error != nil { - if errors.Is(err.Error, gorm.ErrRecordNotFound) { - return nil - } - return err.Error +// DeleteInactionableJobs will delete jobs that are not in queued state and have no +// runner associated with them. This can happen if we have a pool that matches labels +// defined on a job, but the job itself was picked up by a runner we don't manage. +// When a job transitions from queued to anything else, GARM only uses them for informational +// purposes. So they are safe to delete. +// Also deletes completed jobs with GARM runners attached as they are no longer needed. +func (s *sqlDatabase) DeleteInactionableJobs(_ context.Context) error { + q := s.conn. + Unscoped(). + Where("(status != ? AND instance_id IS NULL) OR (status = ? AND instance_id IS NOT NULL)", params.JobStatusQueued, params.JobStatusCompleted). + Delete(&WorkflowJob{}) + if q.Error != nil { + return fmt.Errorf("deleting inactionable jobs: %w", q.Error) } - return nil } diff --git a/database/sql/jobs_test.go b/database/sql/jobs_test.go new file mode 100644 index 00000000..9ada2a70 --- /dev/null +++ b/database/sql/jobs_test.go @@ -0,0 +1,189 @@ +// Copyright 2025 Cloudbase Solutions SRL +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. You may obtain +// a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations +// under the License. + +package sql + +import ( + "context" + "fmt" + "testing" + + "github.com/stretchr/testify/suite" + + dbCommon "github.com/cloudbase/garm/database/common" + "github.com/cloudbase/garm/database/watcher" + garmTesting "github.com/cloudbase/garm/internal/testing" + "github.com/cloudbase/garm/params" +) + +type JobsTestSuite struct { + suite.Suite + Store dbCommon.Store + adminCtx context.Context +} + +func (s *JobsTestSuite) SetupTest() { + ctx := context.Background() + watcher.InitWatcher(ctx) + + // Create testing sqlite database + db, err := NewSQLDatabase(ctx, garmTesting.GetTestSqliteDBConfig(s.T())) + if err != nil { + s.FailNow(fmt.Sprintf("failed to create db connection: %s", err)) + } + s.Store = db + + adminCtx := garmTesting.ImpersonateAdminContext(ctx, db, s.T()) + s.adminCtx = adminCtx +} + +func (s *JobsTestSuite) TearDownTest() { + watcher.CloseWatcher() +} + +func TestJobsTestSuite(t *testing.T) { + suite.Run(t, new(JobsTestSuite)) +} + +// TestDeleteInactionableJobs verifies the deletion logic for jobs +func (s *JobsTestSuite) TestDeleteInactionableJobs() { + db := s.Store.(*sqlDatabase) + + // Create mix of jobs to test all conditions: + // 1. Queued jobs (should NOT be deleted) + queuedJob := params.Job{ + WorkflowJobID: 12345, + RunID: 67890, + Action: "test-action", + Status: string(params.JobStatusQueued), + Name: "queued-job", + RepositoryName: "test-repo", + RepositoryOwner: "test-owner", + } + _, err := s.Store.CreateOrUpdateJob(s.adminCtx, queuedJob) + s.Require().NoError(err) + + // 2. In-progress job without instance (should be deleted) + inProgressNoInstance := params.Job{ + WorkflowJobID: 12346, + RunID: 67890, + Action: "test-action", + Status: string(params.JobStatusInProgress), + Name: "inprogress-no-instance", + RepositoryName: "test-repo", + RepositoryOwner: "test-owner", + } + _, err = s.Store.CreateOrUpdateJob(s.adminCtx, inProgressNoInstance) + s.Require().NoError(err) + + // 3. Completed job without instance (should be deleted) + completedNoInstance := params.Job{ + WorkflowJobID: 12347, + RunID: 67890, + Action: "test-action", + Status: string(params.JobStatusCompleted), + Conclusion: "success", + Name: "completed-no-instance", + RepositoryName: "test-repo", + RepositoryOwner: "test-owner", + } + _, err = s.Store.CreateOrUpdateJob(s.adminCtx, completedNoInstance) + s.Require().NoError(err) + + // Count total jobs before deletion + var countBefore int64 + err = db.conn.Model(&WorkflowJob{}).Count(&countBefore).Error + s.Require().NoError(err) + s.Require().Equal(int64(3), countBefore, "Should have 3 jobs before deletion") + + // Run deletion + err = s.Store.DeleteInactionableJobs(s.adminCtx) + s.Require().NoError(err) + + // Count remaining jobs - should only have the queued job + var countAfter int64 + err = db.conn.Model(&WorkflowJob{}).Count(&countAfter).Error + s.Require().NoError(err) + s.Require().Equal(int64(1), countAfter, "Should have 1 job remaining (queued)") + + // Verify the remaining job is the queued one + var remaining WorkflowJob + err = db.conn.Where("workflow_job_id = ?", 12345).First(&remaining).Error + s.Require().NoError(err) + s.Require().Equal("queued", remaining.Status) +} + +// TestDeleteInactionableJobs_AllScenarios verifies all deletion rules +func (s *JobsTestSuite) TestDeleteInactionableJobs_AllScenarios() { + db := s.Store.(*sqlDatabase) + + // Rule 1: Queued jobs are NEVER deleted (regardless of instance_id) + queuedNoInstance := params.Job{ + WorkflowJobID: 20001, + RunID: 67890, + Status: string(params.JobStatusQueued), + Name: "queued-no-instance", + RepositoryName: "test-repo", + RepositoryOwner: "test-owner", + } + _, err := s.Store.CreateOrUpdateJob(s.adminCtx, queuedNoInstance) + s.Require().NoError(err) + + // Rule 2: Non-queued jobs WITHOUT instance_id ARE deleted + inProgressNoInstance := params.Job{ + WorkflowJobID: 20002, + RunID: 67890, + Status: string(params.JobStatusInProgress), + Name: "inprogress-no-instance", + RepositoryName: "test-repo", + RepositoryOwner: "test-owner", + } + _, err = s.Store.CreateOrUpdateJob(s.adminCtx, inProgressNoInstance) + s.Require().NoError(err) + + completedNoInstance := params.Job{ + WorkflowJobID: 20003, + RunID: 67890, + Status: string(params.JobStatusCompleted), + Conclusion: "success", + Name: "completed-no-instance", + RepositoryName: "test-repo", + RepositoryOwner: "test-owner", + } + _, err = s.Store.CreateOrUpdateJob(s.adminCtx, completedNoInstance) + s.Require().NoError(err) + + // Count jobs before deletion + var countBefore int64 + err = db.conn.Model(&WorkflowJob{}).Count(&countBefore).Error + s.Require().NoError(err) + s.Require().Equal(int64(3), countBefore) + + // Run deletion + err = s.Store.DeleteInactionableJobs(s.adminCtx) + s.Require().NoError(err) + + // After deletion, only queued job should remain + var countAfter int64 + err = db.conn.Model(&WorkflowJob{}).Count(&countAfter).Error + s.Require().NoError(err) + s.Require().Equal(int64(1), countAfter, "Only queued job should remain") + + // Verify it's the queued job that remains + var jobs []WorkflowJob + err = db.conn.Find(&jobs).Error + s.Require().NoError(err) + s.Require().Len(jobs, 1) + s.Require().Equal(string(params.JobStatusQueued), jobs[0].Status) +} diff --git a/database/sql/models.go b/database/sql/models.go index 2a5313d7..585b6ed7 100644 --- a/database/sql/models.go +++ b/database/sql/models.go @@ -400,7 +400,7 @@ type WorkflowJob struct { Conclusion string // Status is the phase of the lifecycle that the job is currently in. // "queued", "in_progress" and "completed". - Status string + Status string `gorm:"index:idx_workflow_jobs_status_instance_id,priority:1"` // Name is the name if the job that was triggered. Name string @@ -409,7 +409,7 @@ type WorkflowJob struct { GithubRunnerID int64 - InstanceID *uuid.UUID `gorm:"index:idx_instance_job"` + InstanceID *uuid.UUID `gorm:"index:idx_instance_job;index:idx_workflow_jobs_status_instance_id,priority:2"` Instance Instance `gorm:"foreignKey:InstanceID"` RunnerGroupID int64 diff --git a/doc/config.md b/doc/config.md index 555c7495..5f631f7d 100644 --- a/doc/config.md +++ b/doc/config.md @@ -361,7 +361,12 @@ This is one of the features in GARM that I really love having. For one thing, it | `garm_runner_status` | Gauge | `name`=<runner name>
`pool_owner`=<owner name>
`pool_type`=<repository\|organization\|enterprise>
`provider`=<provider name>
`runner_status`=<running\|stopped\|error\|pending_delete\|deleting\|pending_create\|creating\|unknown>
`status`=<idle\|pending\|terminated\|installing\|failed\|active>
| This is a gauge value that gives us details about the runners garm spawns | | `garm_runner_operations_total` | Counter | `provider`=<provider name>
`operation`=<CreateInstance\|DeleteInstance\|GetInstance\|ListInstances\|RemoveAllInstances\|Start\Stop> | This is a counter that increments every time a runner operation is performed | | `garm_runner_errors_total` | Counter | `provider`=<provider name>
`operation`=<CreateInstance\|DeleteInstance\|GetInstance\|ListInstances\|RemoveAllInstances\|Start\Stop> | This is a counter that increments every time a runner operation errored | -| `garm_runner_jobs_status` | Gauge | `job_id`=<job id>
`name`=<job name>
`status`=<job status>
`conclusion`=<job conclusion>
`runner_name`=<runner name>
`repository`=<repository>
`requested_labels`=<requested labels> | List of jobs and their status | + +### Job metrics + +| Metric name | Type | Labels | Description | +|---------------------------|---------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-------------------------------| +| `garm_job_jobs_status` | Gauge | `job_id`=<job id>
`name`=<job name>
`status`=<job status>
`conclusion`=<job conclusion>
`runner_name`=<runner name>
`repository`=<repository>
`requested_labels`=<requested labels> | List of jobs and their status | ### Github metrics diff --git a/metrics/jobs.go b/metrics/jobs.go index 12c243ec..957e8f5b 100644 --- a/metrics/jobs.go +++ b/metrics/jobs.go @@ -20,7 +20,7 @@ import ( var JobStatus = prometheus.NewGaugeVec(prometheus.GaugeOpts{ Namespace: metricsNamespace, - Subsystem: metricsRunnerSubsystem, + Subsystem: metricsJobsSubsystem, Name: "jobs_status", Help: "List of jobs and their status", }, []string{"job_id", "name", "status", "conclusion", "runner_name", "repository", "requested_labels"}) diff --git a/runner/pool/pool.go b/runner/pool/pool.go index 4722f0d6..a4b9fa07 100644 --- a/runner/pool/pool.go +++ b/runner/pool/pool.go @@ -167,149 +167,249 @@ func (r *basePoolManager) getProviderBaseParams(pool params.Pool) common.Provide } } +// isEntityPool checks if a pool belongs to this manager's entity +func (r *basePoolManager) isEntityPool(pool params.Pool) bool { + switch r.entity.EntityType { + case params.ForgeEntityTypeRepository: + return pool.RepoID != "" && pool.RepoID == r.entity.ID + case params.ForgeEntityTypeOrganization: + return pool.OrgID != "" && pool.OrgID == r.entity.ID + case params.ForgeEntityTypeEnterprise: + return pool.EnterpriseID != "" && pool.EnterpriseID == r.entity.ID + default: + return false + } +} + +// shouldRecordJob determines if a job should be recorded in the database +func (r *basePoolManager) shouldRecordJob(jobParams params.Job, isNewJob bool) bool { + // Always record existing jobs + if !isNewJob { + return true + } + + // For new jobs, only record if queued + if jobParams.Status == string(params.JobStatusQueued) { + // Check if we have pools that can handle it + potentialPools, err := r.store.FindPoolsMatchingAllTags(r.ctx, r.entity.EntityType, r.entity.ID, jobParams.Labels) + if err != nil { + slog.With(slog.Any("error", err)).ErrorContext( + r.ctx, "failed to find pools matching tags", + "requested_tags", strings.Join(jobParams.Labels, ", ")) + return false + } + if len(potentialPools) == 0 { + slog.WarnContext( + r.ctx, "no pools matching tags; not recording job", + "requested_tags", strings.Join(jobParams.Labels, ", ")) + return false + } + return true + } + + // For new non-queued jobs, only record if runner belongs to us + if jobParams.RunnerName != "" { + _, err := r.store.GetInstance(r.ctx, jobParams.RunnerName) + if err != nil { + if !errors.Is(err, runnerErrors.ErrNotFound) { + slog.With(slog.Any("error", err)).ErrorContext( + r.ctx, "failed to get instance", + "runner_name", util.SanitizeLogEntry(jobParams.RunnerName)) + } + return false + } + return true + } + + // New non-queued job with no runner - don't record + return false +} + +// persistJobToDB saves or updates the job in the database +func (r *basePoolManager) persistJobToDB(jobParams params.Job, triggeredBy int64) { + if jobParams.WorkflowJobID == 0 { + return + } + + // Check if job exists + _, err := r.store.GetJobByID(r.ctx, jobParams.WorkflowJobID) + isNewJob := errors.Is(err, runnerErrors.ErrNotFound) + if err != nil && !isNewJob { + slog.With(slog.Any("error", err)).ErrorContext( + r.ctx, "failed to get job", + "job_id", jobParams.WorkflowJobID) + return + } + + // Determine if we should record this job + if !r.shouldRecordJob(jobParams, isNewJob) { + return + } + + // Save or update the job + if _, jobErr := r.store.CreateOrUpdateJob(r.ctx, jobParams); jobErr != nil { + slog.With(slog.Any("error", jobErr)).ErrorContext( + r.ctx, "failed to update job", "job_id", jobParams.WorkflowJobID) + return + } + + // Break lock on the queued job that triggered this runner (if different) + if triggeredBy != 0 && jobParams.WorkflowJobID != triggeredBy { + if err := r.store.BreakLockJobIsQueued(r.ctx, triggeredBy); err != nil { + slog.With(slog.Any("error", err)).ErrorContext( + r.ctx, "failed to break lock for job", + "job_id", triggeredBy) + } + } +} + +// handleCompletedJob processes a completed job webhook +func (r *basePoolManager) handleCompletedJob(jobParams params.Job) error { + // Ignore jobs without a runner + if jobParams.RunnerName == "" { + slog.InfoContext(r.ctx, "job never got assigned to a runner, ignoring") + return nil + } + + // Check if runner belongs to us + instance, err := r.store.GetInstance(r.ctx, jobParams.RunnerName) + if err != nil { + if errors.Is(err, runnerErrors.ErrNotFound) { + return nil + } + slog.With(slog.Any("error", err)).ErrorContext( + r.ctx, "failed to get instance", + "runner_name", util.SanitizeLogEntry(jobParams.RunnerName)) + return nil + } + + // Verify pool belongs to this entity + pool, err := r.store.GetPoolByID(r.ctx, instance.PoolID) + if err != nil { + if errors.Is(err, runnerErrors.ErrNotFound) { + return nil + } + slog.With(slog.Any("error", err)).ErrorContext( + r.ctx, "failed to get pool", + "pool_id", instance.PoolID) + return nil + } + + if !r.isEntityPool(pool) { + slog.DebugContext(r.ctx, "instance belongs to a pool not managed by this entity", "pool_id", instance.PoolID) + return nil + } + + // Mark runner as terminated + if _, err := r.setInstanceRunnerStatus(jobParams.RunnerName, params.RunnerTerminated); err != nil { + if errors.Is(err, runnerErrors.ErrNotFound) { + return nil + } + slog.With(slog.Any("error", err)).ErrorContext( + r.ctx, "failed to update runner status", + "runner_name", util.SanitizeLogEntry(jobParams.RunnerName)) + return fmt.Errorf("error updating runner: %w", err) + } + + // Mark instance for deletion + slog.DebugContext( + r.ctx, "marking instance as pending_delete", + "runner_name", util.SanitizeLogEntry(jobParams.RunnerName)) + if _, err := r.setInstanceStatus(jobParams.RunnerName, commonParams.InstancePendingDelete, nil); err != nil { + if errors.Is(err, runnerErrors.ErrNotFound) { + return nil + } + slog.With(slog.Any("error", err)).ErrorContext( + r.ctx, "failed to update runner status", + "runner_name", util.SanitizeLogEntry(jobParams.RunnerName)) + return fmt.Errorf("error updating runner: %w", err) + } + + return nil +} + +// handleInProgressJob processes an in-progress job webhook +func (r *basePoolManager) handleInProgressJob(jobParams params.Job) (triggeredBy int64, err error) { + // Mark runner as active (this also validates the instance exists) + instance, err := r.setInstanceRunnerStatus(jobParams.RunnerName, params.RunnerActive) + if err != nil { + if errors.Is(err, runnerErrors.ErrNotFound) { + slog.DebugContext(r.ctx, "instance not found", "runner_name", jobParams.RunnerName) + return 0, nil + } + slog.With(slog.Any("error", err)).ErrorContext( + r.ctx, "failed to update runner status", + "runner_name", util.SanitizeLogEntry(jobParams.RunnerName)) + return 0, fmt.Errorf("error updating runner: %w", err) + } + + // Verify pool belongs to this entity + pool, err := r.store.GetPoolByID(r.ctx, instance.PoolID) + if err != nil { + if errors.Is(err, runnerErrors.ErrNotFound) { + return 0, nil + } + slog.With(slog.Any("error", err)).ErrorContext( + r.ctx, "failed to get pool", + "pool_id", instance.PoolID) + return 0, nil + } + + if !r.isEntityPool(pool) { + slog.DebugContext(r.ctx, "instance belongs to a pool not managed by this entity", "pool_id", instance.PoolID) + return 0, nil + } + + // Extract the job ID that triggered this runner + triggeredBy = jobIDFromLabels(instance.AditionalLabels) + + // Ensure minimum idle runners for the pool + if err := r.ensureIdleRunnersForOnePool(pool); err != nil { + slog.With(slog.Any("error", err)).ErrorContext( + r.ctx, "error ensuring idle runners for pool", + "pool_id", pool.ID) + } + + return triggeredBy, nil +} + func (r *basePoolManager) HandleWorkflowJob(job params.WorkflowJob) error { + // Validate job ownership if err := r.ValidateOwner(job); err != nil { slog.ErrorContext(r.ctx, "failed to validate owner", "error", err) return fmt.Errorf("error validating owner: %w", err) } - // we see events where the lables seem to be missing. We should ignore these - // as we can't know if we should handle them or not. + // Jobs without labels cannot be processed if len(job.WorkflowJob.Labels) == 0 { slog.WarnContext(r.ctx, "job has no labels", "workflow_job", job.WorkflowJob.Name) return nil } + // Convert webhook payload to internal job format jobParams, err := r.paramsWorkflowJobToParamsJob(job) if err != nil { slog.ErrorContext(r.ctx, "failed to convert job to params", "error", err) return fmt.Errorf("error converting job to params: %w", err) } + // Process job based on action type var triggeredBy int64 - defer func() { - if jobParams.WorkflowJobID == 0 { - return - } - // we're updating the job in the database, regardless of whether it was successful or not. - // or if it was meant for this pool or not. Github will send the same job data to all hierarchies - // that have been configured to work with garm. Updating the job at all levels should yield the same - // outcome in the db. - _, err := r.store.GetJobByID(r.ctx, jobParams.WorkflowJobID) - if err != nil { - if !errors.Is(err, runnerErrors.ErrNotFound) { - slog.With(slog.Any("error", err)).ErrorContext( - r.ctx, "failed to get job", - "job_id", jobParams.WorkflowJobID) - return - } - // This job is new to us. Check if we have a pool that can handle it. - potentialPools := cache.FindPoolsMatchingAllTags(r.entity.ID, jobParams.Labels) - if len(potentialPools) == 0 { - slog.WarnContext( - r.ctx, "no pools matching tags; not recording job", - "requested_tags", strings.Join(jobParams.Labels, ", ")) - return - } - } - - if _, jobErr := r.store.CreateOrUpdateJob(r.ctx, jobParams); jobErr != nil { - slog.With(slog.Any("error", jobErr)).ErrorContext( - r.ctx, "failed to update job", "job_id", jobParams.WorkflowJobID) - } - - if triggeredBy != 0 && jobParams.WorkflowJobID != triggeredBy { - // The triggeredBy value is only set by the "in_progress" webhook. The runner that - // transitioned to in_progress was created as a result of a different queued job. If that job is - // still queued and we don't remove the lock, it will linger until the lock timeout is reached. - // That may take a long time, so we break the lock here and allow it to be scheduled again. - if err := r.store.BreakLockJobIsQueued(r.ctx, triggeredBy); err != nil { - slog.With(slog.Any("error", err)).ErrorContext( - r.ctx, "failed to break lock for job", - "job_id", triggeredBy) - } - } - }() + var actionErr error switch job.Action { case "queued": - // Record the job in the database. Queued jobs will be picked up by the consumeQueuedJobs() method - // when reconciling. + // Queued jobs are just recorded; they'll be picked up by consumeQueuedJobs() case "completed": - // If job was not assigned to a runner, we can ignore it. - if jobParams.RunnerName == "" { - slog.InfoContext( - r.ctx, "job never got assigned to a runner, ignoring") - return nil - } - - fromCache, ok := cache.GetInstanceCache(jobParams.RunnerName) - if !ok { - return nil - } - - if _, ok := cache.GetEntityPool(r.entity.ID, fromCache.PoolID); !ok { - slog.DebugContext(r.ctx, "instance belongs to a pool not managed by this entity", "pool_id", fromCache.PoolID) - return nil - } - - // update instance workload state. - if _, err := r.setInstanceRunnerStatus(jobParams.RunnerName, params.RunnerTerminated); err != nil { - if errors.Is(err, runnerErrors.ErrNotFound) { - return nil - } - slog.With(slog.Any("error", err)).ErrorContext( - r.ctx, "failed to update runner status", - "runner_name", util.SanitizeLogEntry(jobParams.RunnerName)) - return fmt.Errorf("error updating runner: %w", err) - } - slog.DebugContext( - r.ctx, "marking instance as pending_delete", - "runner_name", util.SanitizeLogEntry(jobParams.RunnerName)) - if _, err := r.setInstanceStatus(jobParams.RunnerName, commonParams.InstancePendingDelete, nil); err != nil { - if errors.Is(err, runnerErrors.ErrNotFound) { - return nil - } - slog.With(slog.Any("error", err)).ErrorContext( - r.ctx, "failed to update runner status", - "runner_name", util.SanitizeLogEntry(jobParams.RunnerName)) - return fmt.Errorf("error updating runner: %w", err) - } + actionErr = r.handleCompletedJob(jobParams) case "in_progress": - fromCache, ok := cache.GetInstanceCache(jobParams.RunnerName) - if !ok { - slog.DebugContext(r.ctx, "instance not found in cache", "runner_name", jobParams.RunnerName) - return nil - } - - pool, ok := cache.GetEntityPool(r.entity.ID, fromCache.PoolID) - if !ok { - slog.DebugContext(r.ctx, "instance belongs to a pool not managed by this entity", "pool_id", fromCache.PoolID) - return nil - } - // update instance workload state. - instance, err := r.setInstanceRunnerStatus(jobParams.RunnerName, params.RunnerActive) - if err != nil { - if errors.Is(err, runnerErrors.ErrNotFound) { - return nil - } - slog.With(slog.Any("error", err)).ErrorContext( - r.ctx, "failed to update runner status", - "runner_name", util.SanitizeLogEntry(jobParams.RunnerName)) - return fmt.Errorf("error updating runner: %w", err) - } - // Set triggeredBy here so we break the lock on any potential queued job. - triggeredBy = jobIDFromLabels(instance.AditionalLabels) - - // A runner has picked up the job, and is now running it. It may need to be replaced if the pool has - // a minimum number of idle runners configured. - if err := r.ensureIdleRunnersForOnePool(pool); err != nil { - slog.With(slog.Any("error", err)).ErrorContext( - r.ctx, "error ensuring idle runners for pool", - "pool_id", pool.ID) - } + triggeredBy, actionErr = r.handleInProgressJob(jobParams) } - return nil + + // Always persist job to database (success or failure) + r.persistJobToDB(jobParams, triggeredBy) + + return actionErr } func jobIDFromLabels(labels []string) int64 { @@ -1754,6 +1854,14 @@ func (r *basePoolManager) DeleteRunner(runner params.Instance, forceRemove, bypa // so those will trigger the creation of a runner. The jobs we don't know about will be dealt with by the idle runners. // Once jobs are consumed, you can set min-idle-runners to 0 again. func (r *basePoolManager) consumeQueuedJobs() error { + defer func() { + // Always try to clean inactionable jobs. Otherwise, if any condition + // makes this function exit, we never clean up jobs. + if err := r.store.DeleteInactionableJobs(r.ctx); err != nil { + slog.With(slog.Any("error", err)).ErrorContext( + r.ctx, "failed to delete completed jobs") + } + }() queued := r.getQueuedJobs() poolsCache := poolsForTags{ @@ -1872,11 +1980,6 @@ func (r *basePoolManager) consumeQueuedJobs() error { } } } - - if err := r.store.DeleteCompletedJobs(r.ctx); err != nil { - slog.With(slog.Any("error", err)).ErrorContext( - r.ctx, "failed to delete completed jobs") - } return nil } diff --git a/util/github/client.go b/util/github/client.go index 64933df3..ab7127bb 100644 --- a/util/github/client.go +++ b/util/github/client.go @@ -400,7 +400,7 @@ func (g *githubClient) getOrganizationRunnerGroupIDByName(ctx context.Context, e } return 0, fmt.Errorf("error fetching runners: %w", err) } - if err == nil && ghResp != nil { + if ghResp != nil { g.recordLimits(ghResp.Rate) } @@ -440,7 +440,7 @@ func (g *githubClient) getEnterpriseRunnerGroupIDByName(ctx context.Context, ent } return 0, fmt.Errorf("error fetching runners: %w", err) } - if err == nil && ghResp != nil { + if ghResp != nil { g.recordLimits(ghResp.Rate) } for _, runnerGroup := range runnerGroups.RunnerGroups {