diff --git a/database/common/mocks/Store.go b/database/common/mocks/Store.go index 353afc70..613a8685 100644 --- a/database/common/mocks/Store.go +++ b/database/common/mocks/Store.go @@ -1104,52 +1104,6 @@ func (_c *Store_CreateUser_Call) RunAndReturn(run func(context.Context, params.N return _c } -// DeleteCompletedJobs provides a mock function with given fields: ctx -func (_m *Store) DeleteCompletedJobs(ctx context.Context) error { - ret := _m.Called(ctx) - - if len(ret) == 0 { - panic("no return value specified for DeleteCompletedJobs") - } - - var r0 error - if rf, ok := ret.Get(0).(func(context.Context) error); ok { - r0 = rf(ctx) - } else { - r0 = ret.Error(0) - } - - return r0 -} - -// Store_DeleteCompletedJobs_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DeleteCompletedJobs' -type Store_DeleteCompletedJobs_Call struct { - *mock.Call -} - -// DeleteCompletedJobs is a helper method to define mock.On call -// - ctx context.Context -func (_e *Store_Expecter) DeleteCompletedJobs(ctx interface{}) *Store_DeleteCompletedJobs_Call { - return &Store_DeleteCompletedJobs_Call{Call: _e.mock.On("DeleteCompletedJobs", ctx)} -} - -func (_c *Store_DeleteCompletedJobs_Call) Run(run func(ctx context.Context)) *Store_DeleteCompletedJobs_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context)) - }) - return _c -} - -func (_c *Store_DeleteCompletedJobs_Call) Return(_a0 error) *Store_DeleteCompletedJobs_Call { - _c.Call.Return(_a0) - return _c -} - -func (_c *Store_DeleteCompletedJobs_Call) RunAndReturn(run func(context.Context) error) *Store_DeleteCompletedJobs_Call { - _c.Call.Return(run) - return _c -} - // DeleteEnterprise provides a mock function with given fields: ctx, enterpriseID func (_m *Store) DeleteEnterprise(ctx context.Context, enterpriseID string) error { ret := _m.Called(ctx, enterpriseID) @@ -1537,6 +1491,52 @@ func (_c *Store_DeleteGithubEndpoint_Call) RunAndReturn(run func(context.Context return _c } +// DeleteInactionableJobs provides a mock function with given fields: ctx +func (_m *Store) DeleteInactionableJobs(ctx context.Context) error { + ret := _m.Called(ctx) + + if len(ret) == 0 { + panic("no return value specified for DeleteInactionableJobs") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context) error); ok { + r0 = rf(ctx) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Store_DeleteInactionableJobs_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DeleteInactionableJobs' +type Store_DeleteInactionableJobs_Call struct { + *mock.Call +} + +// DeleteInactionableJobs is a helper method to define mock.On call +// - ctx context.Context +func (_e *Store_Expecter) DeleteInactionableJobs(ctx interface{}) *Store_DeleteInactionableJobs_Call { + return &Store_DeleteInactionableJobs_Call{Call: _e.mock.On("DeleteInactionableJobs", ctx)} +} + +func (_c *Store_DeleteInactionableJobs_Call) Run(run func(ctx context.Context)) *Store_DeleteInactionableJobs_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context)) + }) + return _c +} + +func (_c *Store_DeleteInactionableJobs_Call) Return(_a0 error) *Store_DeleteInactionableJobs_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *Store_DeleteInactionableJobs_Call) RunAndReturn(run func(context.Context) error) *Store_DeleteInactionableJobs_Call { + _c.Call.Return(run) + return _c +} + // DeleteInstance provides a mock function with given fields: ctx, poolID, instanceNameOrID func (_m *Store) DeleteInstance(ctx context.Context, poolID string, instanceNameOrID string) error { ret := _m.Called(ctx, poolID, instanceNameOrID) diff --git a/database/common/store.go b/database/common/store.go index f5bf73c2..db7a190d 100644 --- a/database/common/store.go +++ b/database/common/store.go @@ -119,7 +119,7 @@ type JobsStore interface { LockJob(ctx context.Context, jobID int64, entityID string) error BreakLockJobIsQueued(ctx context.Context, jobID int64) error - DeleteCompletedJobs(ctx context.Context) error + DeleteInactionableJobs(ctx context.Context) error } type EntityPoolStore interface { diff --git a/database/sql/jobs.go b/database/sql/jobs.go index edb82831..0845188d 100644 --- a/database/sql/jobs.go +++ b/database/sql/jobs.go @@ -424,16 +424,19 @@ func (s *sqlDatabase) GetJobByID(_ context.Context, jobID int64) (params.Job, er return sqlWorkflowJobToParamsJob(job) } -// DeleteCompletedJobs deletes all completed jobs. -func (s *sqlDatabase) DeleteCompletedJobs(_ context.Context) error { - query := s.conn.Model(&WorkflowJob{}).Where("status = ?", params.JobStatusCompleted) - - if err := query.Unscoped().Delete(&WorkflowJob{}); err.Error != nil { - if errors.Is(err.Error, gorm.ErrRecordNotFound) { - return nil - } - return err.Error +// DeleteInactionableJobs will delete jobs that are not in queued state and have no +// runner associated with them. This can happen if we have a pool that matches labels +// defined on a job, but the job itself was picked up by a runner we don't manage. +// When a job transitions from queued to anything else, GARM only uses them for informational +// purposes. So they are safe to delete. +// Also deletes completed jobs with GARM runners attached as they are no longer needed. +func (s *sqlDatabase) DeleteInactionableJobs(_ context.Context) error { + q := s.conn. + Unscoped(). + Where("(status != ? AND instance_id IS NULL) OR (status = ? AND instance_id IS NOT NULL)", params.JobStatusQueued, params.JobStatusCompleted). + Delete(&WorkflowJob{}) + if q.Error != nil { + return fmt.Errorf("deleting inactionable jobs: %w", q.Error) } - return nil } diff --git a/database/sql/jobs_test.go b/database/sql/jobs_test.go new file mode 100644 index 00000000..9ada2a70 --- /dev/null +++ b/database/sql/jobs_test.go @@ -0,0 +1,189 @@ +// Copyright 2025 Cloudbase Solutions SRL +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. You may obtain +// a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations +// under the License. + +package sql + +import ( + "context" + "fmt" + "testing" + + "github.com/stretchr/testify/suite" + + dbCommon "github.com/cloudbase/garm/database/common" + "github.com/cloudbase/garm/database/watcher" + garmTesting "github.com/cloudbase/garm/internal/testing" + "github.com/cloudbase/garm/params" +) + +type JobsTestSuite struct { + suite.Suite + Store dbCommon.Store + adminCtx context.Context +} + +func (s *JobsTestSuite) SetupTest() { + ctx := context.Background() + watcher.InitWatcher(ctx) + + // Create testing sqlite database + db, err := NewSQLDatabase(ctx, garmTesting.GetTestSqliteDBConfig(s.T())) + if err != nil { + s.FailNow(fmt.Sprintf("failed to create db connection: %s", err)) + } + s.Store = db + + adminCtx := garmTesting.ImpersonateAdminContext(ctx, db, s.T()) + s.adminCtx = adminCtx +} + +func (s *JobsTestSuite) TearDownTest() { + watcher.CloseWatcher() +} + +func TestJobsTestSuite(t *testing.T) { + suite.Run(t, new(JobsTestSuite)) +} + +// TestDeleteInactionableJobs verifies the deletion logic for jobs +func (s *JobsTestSuite) TestDeleteInactionableJobs() { + db := s.Store.(*sqlDatabase) + + // Create mix of jobs to test all conditions: + // 1. Queued jobs (should NOT be deleted) + queuedJob := params.Job{ + WorkflowJobID: 12345, + RunID: 67890, + Action: "test-action", + Status: string(params.JobStatusQueued), + Name: "queued-job", + RepositoryName: "test-repo", + RepositoryOwner: "test-owner", + } + _, err := s.Store.CreateOrUpdateJob(s.adminCtx, queuedJob) + s.Require().NoError(err) + + // 2. In-progress job without instance (should be deleted) + inProgressNoInstance := params.Job{ + WorkflowJobID: 12346, + RunID: 67890, + Action: "test-action", + Status: string(params.JobStatusInProgress), + Name: "inprogress-no-instance", + RepositoryName: "test-repo", + RepositoryOwner: "test-owner", + } + _, err = s.Store.CreateOrUpdateJob(s.adminCtx, inProgressNoInstance) + s.Require().NoError(err) + + // 3. Completed job without instance (should be deleted) + completedNoInstance := params.Job{ + WorkflowJobID: 12347, + RunID: 67890, + Action: "test-action", + Status: string(params.JobStatusCompleted), + Conclusion: "success", + Name: "completed-no-instance", + RepositoryName: "test-repo", + RepositoryOwner: "test-owner", + } + _, err = s.Store.CreateOrUpdateJob(s.adminCtx, completedNoInstance) + s.Require().NoError(err) + + // Count total jobs before deletion + var countBefore int64 + err = db.conn.Model(&WorkflowJob{}).Count(&countBefore).Error + s.Require().NoError(err) + s.Require().Equal(int64(3), countBefore, "Should have 3 jobs before deletion") + + // Run deletion + err = s.Store.DeleteInactionableJobs(s.adminCtx) + s.Require().NoError(err) + + // Count remaining jobs - should only have the queued job + var countAfter int64 + err = db.conn.Model(&WorkflowJob{}).Count(&countAfter).Error + s.Require().NoError(err) + s.Require().Equal(int64(1), countAfter, "Should have 1 job remaining (queued)") + + // Verify the remaining job is the queued one + var remaining WorkflowJob + err = db.conn.Where("workflow_job_id = ?", 12345).First(&remaining).Error + s.Require().NoError(err) + s.Require().Equal("queued", remaining.Status) +} + +// TestDeleteInactionableJobs_AllScenarios verifies all deletion rules +func (s *JobsTestSuite) TestDeleteInactionableJobs_AllScenarios() { + db := s.Store.(*sqlDatabase) + + // Rule 1: Queued jobs are NEVER deleted (regardless of instance_id) + queuedNoInstance := params.Job{ + WorkflowJobID: 20001, + RunID: 67890, + Status: string(params.JobStatusQueued), + Name: "queued-no-instance", + RepositoryName: "test-repo", + RepositoryOwner: "test-owner", + } + _, err := s.Store.CreateOrUpdateJob(s.adminCtx, queuedNoInstance) + s.Require().NoError(err) + + // Rule 2: Non-queued jobs WITHOUT instance_id ARE deleted + inProgressNoInstance := params.Job{ + WorkflowJobID: 20002, + RunID: 67890, + Status: string(params.JobStatusInProgress), + Name: "inprogress-no-instance", + RepositoryName: "test-repo", + RepositoryOwner: "test-owner", + } + _, err = s.Store.CreateOrUpdateJob(s.adminCtx, inProgressNoInstance) + s.Require().NoError(err) + + completedNoInstance := params.Job{ + WorkflowJobID: 20003, + RunID: 67890, + Status: string(params.JobStatusCompleted), + Conclusion: "success", + Name: "completed-no-instance", + RepositoryName: "test-repo", + RepositoryOwner: "test-owner", + } + _, err = s.Store.CreateOrUpdateJob(s.adminCtx, completedNoInstance) + s.Require().NoError(err) + + // Count jobs before deletion + var countBefore int64 + err = db.conn.Model(&WorkflowJob{}).Count(&countBefore).Error + s.Require().NoError(err) + s.Require().Equal(int64(3), countBefore) + + // Run deletion + err = s.Store.DeleteInactionableJobs(s.adminCtx) + s.Require().NoError(err) + + // After deletion, only queued job should remain + var countAfter int64 + err = db.conn.Model(&WorkflowJob{}).Count(&countAfter).Error + s.Require().NoError(err) + s.Require().Equal(int64(1), countAfter, "Only queued job should remain") + + // Verify it's the queued job that remains + var jobs []WorkflowJob + err = db.conn.Find(&jobs).Error + s.Require().NoError(err) + s.Require().Len(jobs, 1) + s.Require().Equal(string(params.JobStatusQueued), jobs[0].Status) +} diff --git a/database/sql/models.go b/database/sql/models.go index 2a5313d7..585b6ed7 100644 --- a/database/sql/models.go +++ b/database/sql/models.go @@ -400,7 +400,7 @@ type WorkflowJob struct { Conclusion string // Status is the phase of the lifecycle that the job is currently in. // "queued", "in_progress" and "completed". - Status string + Status string `gorm:"index:idx_workflow_jobs_status_instance_id,priority:1"` // Name is the name if the job that was triggered. Name string @@ -409,7 +409,7 @@ type WorkflowJob struct { GithubRunnerID int64 - InstanceID *uuid.UUID `gorm:"index:idx_instance_job"` + InstanceID *uuid.UUID `gorm:"index:idx_instance_job;index:idx_workflow_jobs_status_instance_id,priority:2"` Instance Instance `gorm:"foreignKey:InstanceID"` RunnerGroupID int64 diff --git a/doc/config.md b/doc/config.md index 555c7495..5f631f7d 100644 --- a/doc/config.md +++ b/doc/config.md @@ -361,7 +361,12 @@ This is one of the features in GARM that I really love having. For one thing, it | `garm_runner_status` | Gauge | `name`=<runner name>
`pool_owner`=<owner name>
`pool_type`=<repository\|organization\|enterprise>
`provider`=<provider name>
`runner_status`=<running\|stopped\|error\|pending_delete\|deleting\|pending_create\|creating\|unknown>
`status`=<idle\|pending\|terminated\|installing\|failed\|active>
| This is a gauge value that gives us details about the runners garm spawns | | `garm_runner_operations_total` | Counter | `provider`=<provider name>
`operation`=<CreateInstance\|DeleteInstance\|GetInstance\|ListInstances\|RemoveAllInstances\|Start\Stop> | This is a counter that increments every time a runner operation is performed | | `garm_runner_errors_total` | Counter | `provider`=<provider name>
`operation`=<CreateInstance\|DeleteInstance\|GetInstance\|ListInstances\|RemoveAllInstances\|Start\Stop> | This is a counter that increments every time a runner operation errored | -| `garm_runner_jobs_status` | Gauge | `job_id`=<job id>
`name`=<job name>
`status`=<job status>
`conclusion`=<job conclusion>
`runner_name`=<runner name>
`repository`=<repository>
`requested_labels`=<requested labels> | List of jobs and their status | + +### Job metrics + +| Metric name | Type | Labels | Description | +|---------------------------|---------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-------------------------------| +| `garm_job_jobs_status` | Gauge | `job_id`=<job id>
`name`=<job name>
`status`=<job status>
`conclusion`=<job conclusion>
`runner_name`=<runner name>
`repository`=<repository>
`requested_labels`=<requested labels> | List of jobs and their status | ### Github metrics diff --git a/metrics/jobs.go b/metrics/jobs.go index 12c243ec..957e8f5b 100644 --- a/metrics/jobs.go +++ b/metrics/jobs.go @@ -20,7 +20,7 @@ import ( var JobStatus = prometheus.NewGaugeVec(prometheus.GaugeOpts{ Namespace: metricsNamespace, - Subsystem: metricsRunnerSubsystem, + Subsystem: metricsJobsSubsystem, Name: "jobs_status", Help: "List of jobs and their status", }, []string{"job_id", "name", "status", "conclusion", "runner_name", "repository", "requested_labels"}) diff --git a/runner/pool/pool.go b/runner/pool/pool.go index 4722f0d6..a4b9fa07 100644 --- a/runner/pool/pool.go +++ b/runner/pool/pool.go @@ -167,149 +167,249 @@ func (r *basePoolManager) getProviderBaseParams(pool params.Pool) common.Provide } } +// isEntityPool checks if a pool belongs to this manager's entity +func (r *basePoolManager) isEntityPool(pool params.Pool) bool { + switch r.entity.EntityType { + case params.ForgeEntityTypeRepository: + return pool.RepoID != "" && pool.RepoID == r.entity.ID + case params.ForgeEntityTypeOrganization: + return pool.OrgID != "" && pool.OrgID == r.entity.ID + case params.ForgeEntityTypeEnterprise: + return pool.EnterpriseID != "" && pool.EnterpriseID == r.entity.ID + default: + return false + } +} + +// shouldRecordJob determines if a job should be recorded in the database +func (r *basePoolManager) shouldRecordJob(jobParams params.Job, isNewJob bool) bool { + // Always record existing jobs + if !isNewJob { + return true + } + + // For new jobs, only record if queued + if jobParams.Status == string(params.JobStatusQueued) { + // Check if we have pools that can handle it + potentialPools, err := r.store.FindPoolsMatchingAllTags(r.ctx, r.entity.EntityType, r.entity.ID, jobParams.Labels) + if err != nil { + slog.With(slog.Any("error", err)).ErrorContext( + r.ctx, "failed to find pools matching tags", + "requested_tags", strings.Join(jobParams.Labels, ", ")) + return false + } + if len(potentialPools) == 0 { + slog.WarnContext( + r.ctx, "no pools matching tags; not recording job", + "requested_tags", strings.Join(jobParams.Labels, ", ")) + return false + } + return true + } + + // For new non-queued jobs, only record if runner belongs to us + if jobParams.RunnerName != "" { + _, err := r.store.GetInstance(r.ctx, jobParams.RunnerName) + if err != nil { + if !errors.Is(err, runnerErrors.ErrNotFound) { + slog.With(slog.Any("error", err)).ErrorContext( + r.ctx, "failed to get instance", + "runner_name", util.SanitizeLogEntry(jobParams.RunnerName)) + } + return false + } + return true + } + + // New non-queued job with no runner - don't record + return false +} + +// persistJobToDB saves or updates the job in the database +func (r *basePoolManager) persistJobToDB(jobParams params.Job, triggeredBy int64) { + if jobParams.WorkflowJobID == 0 { + return + } + + // Check if job exists + _, err := r.store.GetJobByID(r.ctx, jobParams.WorkflowJobID) + isNewJob := errors.Is(err, runnerErrors.ErrNotFound) + if err != nil && !isNewJob { + slog.With(slog.Any("error", err)).ErrorContext( + r.ctx, "failed to get job", + "job_id", jobParams.WorkflowJobID) + return + } + + // Determine if we should record this job + if !r.shouldRecordJob(jobParams, isNewJob) { + return + } + + // Save or update the job + if _, jobErr := r.store.CreateOrUpdateJob(r.ctx, jobParams); jobErr != nil { + slog.With(slog.Any("error", jobErr)).ErrorContext( + r.ctx, "failed to update job", "job_id", jobParams.WorkflowJobID) + return + } + + // Break lock on the queued job that triggered this runner (if different) + if triggeredBy != 0 && jobParams.WorkflowJobID != triggeredBy { + if err := r.store.BreakLockJobIsQueued(r.ctx, triggeredBy); err != nil { + slog.With(slog.Any("error", err)).ErrorContext( + r.ctx, "failed to break lock for job", + "job_id", triggeredBy) + } + } +} + +// handleCompletedJob processes a completed job webhook +func (r *basePoolManager) handleCompletedJob(jobParams params.Job) error { + // Ignore jobs without a runner + if jobParams.RunnerName == "" { + slog.InfoContext(r.ctx, "job never got assigned to a runner, ignoring") + return nil + } + + // Check if runner belongs to us + instance, err := r.store.GetInstance(r.ctx, jobParams.RunnerName) + if err != nil { + if errors.Is(err, runnerErrors.ErrNotFound) { + return nil + } + slog.With(slog.Any("error", err)).ErrorContext( + r.ctx, "failed to get instance", + "runner_name", util.SanitizeLogEntry(jobParams.RunnerName)) + return nil + } + + // Verify pool belongs to this entity + pool, err := r.store.GetPoolByID(r.ctx, instance.PoolID) + if err != nil { + if errors.Is(err, runnerErrors.ErrNotFound) { + return nil + } + slog.With(slog.Any("error", err)).ErrorContext( + r.ctx, "failed to get pool", + "pool_id", instance.PoolID) + return nil + } + + if !r.isEntityPool(pool) { + slog.DebugContext(r.ctx, "instance belongs to a pool not managed by this entity", "pool_id", instance.PoolID) + return nil + } + + // Mark runner as terminated + if _, err := r.setInstanceRunnerStatus(jobParams.RunnerName, params.RunnerTerminated); err != nil { + if errors.Is(err, runnerErrors.ErrNotFound) { + return nil + } + slog.With(slog.Any("error", err)).ErrorContext( + r.ctx, "failed to update runner status", + "runner_name", util.SanitizeLogEntry(jobParams.RunnerName)) + return fmt.Errorf("error updating runner: %w", err) + } + + // Mark instance for deletion + slog.DebugContext( + r.ctx, "marking instance as pending_delete", + "runner_name", util.SanitizeLogEntry(jobParams.RunnerName)) + if _, err := r.setInstanceStatus(jobParams.RunnerName, commonParams.InstancePendingDelete, nil); err != nil { + if errors.Is(err, runnerErrors.ErrNotFound) { + return nil + } + slog.With(slog.Any("error", err)).ErrorContext( + r.ctx, "failed to update runner status", + "runner_name", util.SanitizeLogEntry(jobParams.RunnerName)) + return fmt.Errorf("error updating runner: %w", err) + } + + return nil +} + +// handleInProgressJob processes an in-progress job webhook +func (r *basePoolManager) handleInProgressJob(jobParams params.Job) (triggeredBy int64, err error) { + // Mark runner as active (this also validates the instance exists) + instance, err := r.setInstanceRunnerStatus(jobParams.RunnerName, params.RunnerActive) + if err != nil { + if errors.Is(err, runnerErrors.ErrNotFound) { + slog.DebugContext(r.ctx, "instance not found", "runner_name", jobParams.RunnerName) + return 0, nil + } + slog.With(slog.Any("error", err)).ErrorContext( + r.ctx, "failed to update runner status", + "runner_name", util.SanitizeLogEntry(jobParams.RunnerName)) + return 0, fmt.Errorf("error updating runner: %w", err) + } + + // Verify pool belongs to this entity + pool, err := r.store.GetPoolByID(r.ctx, instance.PoolID) + if err != nil { + if errors.Is(err, runnerErrors.ErrNotFound) { + return 0, nil + } + slog.With(slog.Any("error", err)).ErrorContext( + r.ctx, "failed to get pool", + "pool_id", instance.PoolID) + return 0, nil + } + + if !r.isEntityPool(pool) { + slog.DebugContext(r.ctx, "instance belongs to a pool not managed by this entity", "pool_id", instance.PoolID) + return 0, nil + } + + // Extract the job ID that triggered this runner + triggeredBy = jobIDFromLabels(instance.AditionalLabels) + + // Ensure minimum idle runners for the pool + if err := r.ensureIdleRunnersForOnePool(pool); err != nil { + slog.With(slog.Any("error", err)).ErrorContext( + r.ctx, "error ensuring idle runners for pool", + "pool_id", pool.ID) + } + + return triggeredBy, nil +} + func (r *basePoolManager) HandleWorkflowJob(job params.WorkflowJob) error { + // Validate job ownership if err := r.ValidateOwner(job); err != nil { slog.ErrorContext(r.ctx, "failed to validate owner", "error", err) return fmt.Errorf("error validating owner: %w", err) } - // we see events where the lables seem to be missing. We should ignore these - // as we can't know if we should handle them or not. + // Jobs without labels cannot be processed if len(job.WorkflowJob.Labels) == 0 { slog.WarnContext(r.ctx, "job has no labels", "workflow_job", job.WorkflowJob.Name) return nil } + // Convert webhook payload to internal job format jobParams, err := r.paramsWorkflowJobToParamsJob(job) if err != nil { slog.ErrorContext(r.ctx, "failed to convert job to params", "error", err) return fmt.Errorf("error converting job to params: %w", err) } + // Process job based on action type var triggeredBy int64 - defer func() { - if jobParams.WorkflowJobID == 0 { - return - } - // we're updating the job in the database, regardless of whether it was successful or not. - // or if it was meant for this pool or not. Github will send the same job data to all hierarchies - // that have been configured to work with garm. Updating the job at all levels should yield the same - // outcome in the db. - _, err := r.store.GetJobByID(r.ctx, jobParams.WorkflowJobID) - if err != nil { - if !errors.Is(err, runnerErrors.ErrNotFound) { - slog.With(slog.Any("error", err)).ErrorContext( - r.ctx, "failed to get job", - "job_id", jobParams.WorkflowJobID) - return - } - // This job is new to us. Check if we have a pool that can handle it. - potentialPools := cache.FindPoolsMatchingAllTags(r.entity.ID, jobParams.Labels) - if len(potentialPools) == 0 { - slog.WarnContext( - r.ctx, "no pools matching tags; not recording job", - "requested_tags", strings.Join(jobParams.Labels, ", ")) - return - } - } - - if _, jobErr := r.store.CreateOrUpdateJob(r.ctx, jobParams); jobErr != nil { - slog.With(slog.Any("error", jobErr)).ErrorContext( - r.ctx, "failed to update job", "job_id", jobParams.WorkflowJobID) - } - - if triggeredBy != 0 && jobParams.WorkflowJobID != triggeredBy { - // The triggeredBy value is only set by the "in_progress" webhook. The runner that - // transitioned to in_progress was created as a result of a different queued job. If that job is - // still queued and we don't remove the lock, it will linger until the lock timeout is reached. - // That may take a long time, so we break the lock here and allow it to be scheduled again. - if err := r.store.BreakLockJobIsQueued(r.ctx, triggeredBy); err != nil { - slog.With(slog.Any("error", err)).ErrorContext( - r.ctx, "failed to break lock for job", - "job_id", triggeredBy) - } - } - }() + var actionErr error switch job.Action { case "queued": - // Record the job in the database. Queued jobs will be picked up by the consumeQueuedJobs() method - // when reconciling. + // Queued jobs are just recorded; they'll be picked up by consumeQueuedJobs() case "completed": - // If job was not assigned to a runner, we can ignore it. - if jobParams.RunnerName == "" { - slog.InfoContext( - r.ctx, "job never got assigned to a runner, ignoring") - return nil - } - - fromCache, ok := cache.GetInstanceCache(jobParams.RunnerName) - if !ok { - return nil - } - - if _, ok := cache.GetEntityPool(r.entity.ID, fromCache.PoolID); !ok { - slog.DebugContext(r.ctx, "instance belongs to a pool not managed by this entity", "pool_id", fromCache.PoolID) - return nil - } - - // update instance workload state. - if _, err := r.setInstanceRunnerStatus(jobParams.RunnerName, params.RunnerTerminated); err != nil { - if errors.Is(err, runnerErrors.ErrNotFound) { - return nil - } - slog.With(slog.Any("error", err)).ErrorContext( - r.ctx, "failed to update runner status", - "runner_name", util.SanitizeLogEntry(jobParams.RunnerName)) - return fmt.Errorf("error updating runner: %w", err) - } - slog.DebugContext( - r.ctx, "marking instance as pending_delete", - "runner_name", util.SanitizeLogEntry(jobParams.RunnerName)) - if _, err := r.setInstanceStatus(jobParams.RunnerName, commonParams.InstancePendingDelete, nil); err != nil { - if errors.Is(err, runnerErrors.ErrNotFound) { - return nil - } - slog.With(slog.Any("error", err)).ErrorContext( - r.ctx, "failed to update runner status", - "runner_name", util.SanitizeLogEntry(jobParams.RunnerName)) - return fmt.Errorf("error updating runner: %w", err) - } + actionErr = r.handleCompletedJob(jobParams) case "in_progress": - fromCache, ok := cache.GetInstanceCache(jobParams.RunnerName) - if !ok { - slog.DebugContext(r.ctx, "instance not found in cache", "runner_name", jobParams.RunnerName) - return nil - } - - pool, ok := cache.GetEntityPool(r.entity.ID, fromCache.PoolID) - if !ok { - slog.DebugContext(r.ctx, "instance belongs to a pool not managed by this entity", "pool_id", fromCache.PoolID) - return nil - } - // update instance workload state. - instance, err := r.setInstanceRunnerStatus(jobParams.RunnerName, params.RunnerActive) - if err != nil { - if errors.Is(err, runnerErrors.ErrNotFound) { - return nil - } - slog.With(slog.Any("error", err)).ErrorContext( - r.ctx, "failed to update runner status", - "runner_name", util.SanitizeLogEntry(jobParams.RunnerName)) - return fmt.Errorf("error updating runner: %w", err) - } - // Set triggeredBy here so we break the lock on any potential queued job. - triggeredBy = jobIDFromLabels(instance.AditionalLabels) - - // A runner has picked up the job, and is now running it. It may need to be replaced if the pool has - // a minimum number of idle runners configured. - if err := r.ensureIdleRunnersForOnePool(pool); err != nil { - slog.With(slog.Any("error", err)).ErrorContext( - r.ctx, "error ensuring idle runners for pool", - "pool_id", pool.ID) - } + triggeredBy, actionErr = r.handleInProgressJob(jobParams) } - return nil + + // Always persist job to database (success or failure) + r.persistJobToDB(jobParams, triggeredBy) + + return actionErr } func jobIDFromLabels(labels []string) int64 { @@ -1754,6 +1854,14 @@ func (r *basePoolManager) DeleteRunner(runner params.Instance, forceRemove, bypa // so those will trigger the creation of a runner. The jobs we don't know about will be dealt with by the idle runners. // Once jobs are consumed, you can set min-idle-runners to 0 again. func (r *basePoolManager) consumeQueuedJobs() error { + defer func() { + // Always try to clean inactionable jobs. Otherwise, if any condition + // makes this function exit, we never clean up jobs. + if err := r.store.DeleteInactionableJobs(r.ctx); err != nil { + slog.With(slog.Any("error", err)).ErrorContext( + r.ctx, "failed to delete completed jobs") + } + }() queued := r.getQueuedJobs() poolsCache := poolsForTags{ @@ -1872,11 +1980,6 @@ func (r *basePoolManager) consumeQueuedJobs() error { } } } - - if err := r.store.DeleteCompletedJobs(r.ctx); err != nil { - slog.With(slog.Any("error", err)).ErrorContext( - r.ctx, "failed to delete completed jobs") - } return nil } diff --git a/util/github/client.go b/util/github/client.go index 64933df3..ab7127bb 100644 --- a/util/github/client.go +++ b/util/github/client.go @@ -400,7 +400,7 @@ func (g *githubClient) getOrganizationRunnerGroupIDByName(ctx context.Context, e } return 0, fmt.Errorf("error fetching runners: %w", err) } - if err == nil && ghResp != nil { + if ghResp != nil { g.recordLimits(ghResp.Rate) } @@ -440,7 +440,7 @@ func (g *githubClient) getEnterpriseRunnerGroupIDByName(ctx context.Context, ent } return 0, fmt.Errorf("error fetching runners: %w", err) } - if err == nil && ghResp != nil { + if ghResp != nil { g.recordLimits(ghResp.Rate) } for _, runnerGroup := range runnerGroups.RunnerGroups {