Delete all inactionable jobs
GARM cares about jobs in queued state for anything that requires decision making. Anything else is purely informational. This change cleans up all inactionable jobs and refuses to record jobs that are not already in the database, have an inactionable state and which do not have a runner we own handling them. Signed-off-by: Gabriel Adrian Samfira <gsamfira@cloudbasesolutions.com>
This commit is contained in:
parent
eeb8bf6457
commit
ec0fd6e3f8
9 changed files with 487 additions and 187 deletions
|
|
@ -1104,52 +1104,6 @@ func (_c *Store_CreateUser_Call) RunAndReturn(run func(context.Context, params.N
|
|||
return _c
|
||||
}
|
||||
|
||||
// DeleteCompletedJobs provides a mock function with given fields: ctx
|
||||
func (_m *Store) DeleteCompletedJobs(ctx context.Context) error {
|
||||
ret := _m.Called(ctx)
|
||||
|
||||
if len(ret) == 0 {
|
||||
panic("no return value specified for DeleteCompletedJobs")
|
||||
}
|
||||
|
||||
var r0 error
|
||||
if rf, ok := ret.Get(0).(func(context.Context) error); ok {
|
||||
r0 = rf(ctx)
|
||||
} else {
|
||||
r0 = ret.Error(0)
|
||||
}
|
||||
|
||||
return r0
|
||||
}
|
||||
|
||||
// Store_DeleteCompletedJobs_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DeleteCompletedJobs'
|
||||
type Store_DeleteCompletedJobs_Call struct {
|
||||
*mock.Call
|
||||
}
|
||||
|
||||
// DeleteCompletedJobs is a helper method to define mock.On call
|
||||
// - ctx context.Context
|
||||
func (_e *Store_Expecter) DeleteCompletedJobs(ctx interface{}) *Store_DeleteCompletedJobs_Call {
|
||||
return &Store_DeleteCompletedJobs_Call{Call: _e.mock.On("DeleteCompletedJobs", ctx)}
|
||||
}
|
||||
|
||||
func (_c *Store_DeleteCompletedJobs_Call) Run(run func(ctx context.Context)) *Store_DeleteCompletedJobs_Call {
|
||||
_c.Call.Run(func(args mock.Arguments) {
|
||||
run(args[0].(context.Context))
|
||||
})
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *Store_DeleteCompletedJobs_Call) Return(_a0 error) *Store_DeleteCompletedJobs_Call {
|
||||
_c.Call.Return(_a0)
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *Store_DeleteCompletedJobs_Call) RunAndReturn(run func(context.Context) error) *Store_DeleteCompletedJobs_Call {
|
||||
_c.Call.Return(run)
|
||||
return _c
|
||||
}
|
||||
|
||||
// DeleteEnterprise provides a mock function with given fields: ctx, enterpriseID
|
||||
func (_m *Store) DeleteEnterprise(ctx context.Context, enterpriseID string) error {
|
||||
ret := _m.Called(ctx, enterpriseID)
|
||||
|
|
@ -1537,6 +1491,52 @@ func (_c *Store_DeleteGithubEndpoint_Call) RunAndReturn(run func(context.Context
|
|||
return _c
|
||||
}
|
||||
|
||||
// DeleteInactionableJobs provides a mock function with given fields: ctx
|
||||
func (_m *Store) DeleteInactionableJobs(ctx context.Context) error {
|
||||
ret := _m.Called(ctx)
|
||||
|
||||
if len(ret) == 0 {
|
||||
panic("no return value specified for DeleteInactionableJobs")
|
||||
}
|
||||
|
||||
var r0 error
|
||||
if rf, ok := ret.Get(0).(func(context.Context) error); ok {
|
||||
r0 = rf(ctx)
|
||||
} else {
|
||||
r0 = ret.Error(0)
|
||||
}
|
||||
|
||||
return r0
|
||||
}
|
||||
|
||||
// Store_DeleteInactionableJobs_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DeleteInactionableJobs'
|
||||
type Store_DeleteInactionableJobs_Call struct {
|
||||
*mock.Call
|
||||
}
|
||||
|
||||
// DeleteInactionableJobs is a helper method to define mock.On call
|
||||
// - ctx context.Context
|
||||
func (_e *Store_Expecter) DeleteInactionableJobs(ctx interface{}) *Store_DeleteInactionableJobs_Call {
|
||||
return &Store_DeleteInactionableJobs_Call{Call: _e.mock.On("DeleteInactionableJobs", ctx)}
|
||||
}
|
||||
|
||||
func (_c *Store_DeleteInactionableJobs_Call) Run(run func(ctx context.Context)) *Store_DeleteInactionableJobs_Call {
|
||||
_c.Call.Run(func(args mock.Arguments) {
|
||||
run(args[0].(context.Context))
|
||||
})
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *Store_DeleteInactionableJobs_Call) Return(_a0 error) *Store_DeleteInactionableJobs_Call {
|
||||
_c.Call.Return(_a0)
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *Store_DeleteInactionableJobs_Call) RunAndReturn(run func(context.Context) error) *Store_DeleteInactionableJobs_Call {
|
||||
_c.Call.Return(run)
|
||||
return _c
|
||||
}
|
||||
|
||||
// DeleteInstance provides a mock function with given fields: ctx, poolID, instanceNameOrID
|
||||
func (_m *Store) DeleteInstance(ctx context.Context, poolID string, instanceNameOrID string) error {
|
||||
ret := _m.Called(ctx, poolID, instanceNameOrID)
|
||||
|
|
|
|||
|
|
@ -119,7 +119,7 @@ type JobsStore interface {
|
|||
LockJob(ctx context.Context, jobID int64, entityID string) error
|
||||
BreakLockJobIsQueued(ctx context.Context, jobID int64) error
|
||||
|
||||
DeleteCompletedJobs(ctx context.Context) error
|
||||
DeleteInactionableJobs(ctx context.Context) error
|
||||
}
|
||||
|
||||
type EntityPoolStore interface {
|
||||
|
|
|
|||
|
|
@ -424,16 +424,19 @@ func (s *sqlDatabase) GetJobByID(_ context.Context, jobID int64) (params.Job, er
|
|||
return sqlWorkflowJobToParamsJob(job)
|
||||
}
|
||||
|
||||
// DeleteCompletedJobs deletes all completed jobs.
|
||||
func (s *sqlDatabase) DeleteCompletedJobs(_ context.Context) error {
|
||||
query := s.conn.Model(&WorkflowJob{}).Where("status = ?", params.JobStatusCompleted)
|
||||
|
||||
if err := query.Unscoped().Delete(&WorkflowJob{}); err.Error != nil {
|
||||
if errors.Is(err.Error, gorm.ErrRecordNotFound) {
|
||||
return nil
|
||||
}
|
||||
return err.Error
|
||||
// DeleteInactionableJobs will delete jobs that are not in queued state and have no
|
||||
// runner associated with them. This can happen if we have a pool that matches labels
|
||||
// defined on a job, but the job itself was picked up by a runner we don't manage.
|
||||
// When a job transitions from queued to anything else, GARM only uses them for informational
|
||||
// purposes. So they are safe to delete.
|
||||
// Also deletes completed jobs with GARM runners attached as they are no longer needed.
|
||||
func (s *sqlDatabase) DeleteInactionableJobs(_ context.Context) error {
|
||||
q := s.conn.
|
||||
Unscoped().
|
||||
Where("(status != ? AND instance_id IS NULL) OR (status = ? AND instance_id IS NOT NULL)", params.JobStatusQueued, params.JobStatusCompleted).
|
||||
Delete(&WorkflowJob{})
|
||||
if q.Error != nil {
|
||||
return fmt.Errorf("deleting inactionable jobs: %w", q.Error)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
|
|||
189
database/sql/jobs_test.go
Normal file
189
database/sql/jobs_test.go
Normal file
|
|
@ -0,0 +1,189 @@
|
|||
// Copyright 2025 Cloudbase Solutions SRL
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
// not use this file except in compliance with the License. You may obtain
|
||||
// a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
// License for the specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package sql
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/suite"
|
||||
|
||||
dbCommon "github.com/cloudbase/garm/database/common"
|
||||
"github.com/cloudbase/garm/database/watcher"
|
||||
garmTesting "github.com/cloudbase/garm/internal/testing"
|
||||
"github.com/cloudbase/garm/params"
|
||||
)
|
||||
|
||||
type JobsTestSuite struct {
|
||||
suite.Suite
|
||||
Store dbCommon.Store
|
||||
adminCtx context.Context
|
||||
}
|
||||
|
||||
func (s *JobsTestSuite) SetupTest() {
|
||||
ctx := context.Background()
|
||||
watcher.InitWatcher(ctx)
|
||||
|
||||
// Create testing sqlite database
|
||||
db, err := NewSQLDatabase(ctx, garmTesting.GetTestSqliteDBConfig(s.T()))
|
||||
if err != nil {
|
||||
s.FailNow(fmt.Sprintf("failed to create db connection: %s", err))
|
||||
}
|
||||
s.Store = db
|
||||
|
||||
adminCtx := garmTesting.ImpersonateAdminContext(ctx, db, s.T())
|
||||
s.adminCtx = adminCtx
|
||||
}
|
||||
|
||||
func (s *JobsTestSuite) TearDownTest() {
|
||||
watcher.CloseWatcher()
|
||||
}
|
||||
|
||||
func TestJobsTestSuite(t *testing.T) {
|
||||
suite.Run(t, new(JobsTestSuite))
|
||||
}
|
||||
|
||||
// TestDeleteInactionableJobs verifies the deletion logic for jobs
|
||||
func (s *JobsTestSuite) TestDeleteInactionableJobs() {
|
||||
db := s.Store.(*sqlDatabase)
|
||||
|
||||
// Create mix of jobs to test all conditions:
|
||||
// 1. Queued jobs (should NOT be deleted)
|
||||
queuedJob := params.Job{
|
||||
WorkflowJobID: 12345,
|
||||
RunID: 67890,
|
||||
Action: "test-action",
|
||||
Status: string(params.JobStatusQueued),
|
||||
Name: "queued-job",
|
||||
RepositoryName: "test-repo",
|
||||
RepositoryOwner: "test-owner",
|
||||
}
|
||||
_, err := s.Store.CreateOrUpdateJob(s.adminCtx, queuedJob)
|
||||
s.Require().NoError(err)
|
||||
|
||||
// 2. In-progress job without instance (should be deleted)
|
||||
inProgressNoInstance := params.Job{
|
||||
WorkflowJobID: 12346,
|
||||
RunID: 67890,
|
||||
Action: "test-action",
|
||||
Status: string(params.JobStatusInProgress),
|
||||
Name: "inprogress-no-instance",
|
||||
RepositoryName: "test-repo",
|
||||
RepositoryOwner: "test-owner",
|
||||
}
|
||||
_, err = s.Store.CreateOrUpdateJob(s.adminCtx, inProgressNoInstance)
|
||||
s.Require().NoError(err)
|
||||
|
||||
// 3. Completed job without instance (should be deleted)
|
||||
completedNoInstance := params.Job{
|
||||
WorkflowJobID: 12347,
|
||||
RunID: 67890,
|
||||
Action: "test-action",
|
||||
Status: string(params.JobStatusCompleted),
|
||||
Conclusion: "success",
|
||||
Name: "completed-no-instance",
|
||||
RepositoryName: "test-repo",
|
||||
RepositoryOwner: "test-owner",
|
||||
}
|
||||
_, err = s.Store.CreateOrUpdateJob(s.adminCtx, completedNoInstance)
|
||||
s.Require().NoError(err)
|
||||
|
||||
// Count total jobs before deletion
|
||||
var countBefore int64
|
||||
err = db.conn.Model(&WorkflowJob{}).Count(&countBefore).Error
|
||||
s.Require().NoError(err)
|
||||
s.Require().Equal(int64(3), countBefore, "Should have 3 jobs before deletion")
|
||||
|
||||
// Run deletion
|
||||
err = s.Store.DeleteInactionableJobs(s.adminCtx)
|
||||
s.Require().NoError(err)
|
||||
|
||||
// Count remaining jobs - should only have the queued job
|
||||
var countAfter int64
|
||||
err = db.conn.Model(&WorkflowJob{}).Count(&countAfter).Error
|
||||
s.Require().NoError(err)
|
||||
s.Require().Equal(int64(1), countAfter, "Should have 1 job remaining (queued)")
|
||||
|
||||
// Verify the remaining job is the queued one
|
||||
var remaining WorkflowJob
|
||||
err = db.conn.Where("workflow_job_id = ?", 12345).First(&remaining).Error
|
||||
s.Require().NoError(err)
|
||||
s.Require().Equal("queued", remaining.Status)
|
||||
}
|
||||
|
||||
// TestDeleteInactionableJobs_AllScenarios verifies all deletion rules
|
||||
func (s *JobsTestSuite) TestDeleteInactionableJobs_AllScenarios() {
|
||||
db := s.Store.(*sqlDatabase)
|
||||
|
||||
// Rule 1: Queued jobs are NEVER deleted (regardless of instance_id)
|
||||
queuedNoInstance := params.Job{
|
||||
WorkflowJobID: 20001,
|
||||
RunID: 67890,
|
||||
Status: string(params.JobStatusQueued),
|
||||
Name: "queued-no-instance",
|
||||
RepositoryName: "test-repo",
|
||||
RepositoryOwner: "test-owner",
|
||||
}
|
||||
_, err := s.Store.CreateOrUpdateJob(s.adminCtx, queuedNoInstance)
|
||||
s.Require().NoError(err)
|
||||
|
||||
// Rule 2: Non-queued jobs WITHOUT instance_id ARE deleted
|
||||
inProgressNoInstance := params.Job{
|
||||
WorkflowJobID: 20002,
|
||||
RunID: 67890,
|
||||
Status: string(params.JobStatusInProgress),
|
||||
Name: "inprogress-no-instance",
|
||||
RepositoryName: "test-repo",
|
||||
RepositoryOwner: "test-owner",
|
||||
}
|
||||
_, err = s.Store.CreateOrUpdateJob(s.adminCtx, inProgressNoInstance)
|
||||
s.Require().NoError(err)
|
||||
|
||||
completedNoInstance := params.Job{
|
||||
WorkflowJobID: 20003,
|
||||
RunID: 67890,
|
||||
Status: string(params.JobStatusCompleted),
|
||||
Conclusion: "success",
|
||||
Name: "completed-no-instance",
|
||||
RepositoryName: "test-repo",
|
||||
RepositoryOwner: "test-owner",
|
||||
}
|
||||
_, err = s.Store.CreateOrUpdateJob(s.adminCtx, completedNoInstance)
|
||||
s.Require().NoError(err)
|
||||
|
||||
// Count jobs before deletion
|
||||
var countBefore int64
|
||||
err = db.conn.Model(&WorkflowJob{}).Count(&countBefore).Error
|
||||
s.Require().NoError(err)
|
||||
s.Require().Equal(int64(3), countBefore)
|
||||
|
||||
// Run deletion
|
||||
err = s.Store.DeleteInactionableJobs(s.adminCtx)
|
||||
s.Require().NoError(err)
|
||||
|
||||
// After deletion, only queued job should remain
|
||||
var countAfter int64
|
||||
err = db.conn.Model(&WorkflowJob{}).Count(&countAfter).Error
|
||||
s.Require().NoError(err)
|
||||
s.Require().Equal(int64(1), countAfter, "Only queued job should remain")
|
||||
|
||||
// Verify it's the queued job that remains
|
||||
var jobs []WorkflowJob
|
||||
err = db.conn.Find(&jobs).Error
|
||||
s.Require().NoError(err)
|
||||
s.Require().Len(jobs, 1)
|
||||
s.Require().Equal(string(params.JobStatusQueued), jobs[0].Status)
|
||||
}
|
||||
|
|
@ -400,7 +400,7 @@ type WorkflowJob struct {
|
|||
Conclusion string
|
||||
// Status is the phase of the lifecycle that the job is currently in.
|
||||
// "queued", "in_progress" and "completed".
|
||||
Status string
|
||||
Status string `gorm:"index:idx_workflow_jobs_status_instance_id,priority:1"`
|
||||
// Name is the name if the job that was triggered.
|
||||
Name string
|
||||
|
||||
|
|
@ -409,7 +409,7 @@ type WorkflowJob struct {
|
|||
|
||||
GithubRunnerID int64
|
||||
|
||||
InstanceID *uuid.UUID `gorm:"index:idx_instance_job"`
|
||||
InstanceID *uuid.UUID `gorm:"index:idx_instance_job;index:idx_workflow_jobs_status_instance_id,priority:2"`
|
||||
Instance Instance `gorm:"foreignKey:InstanceID"`
|
||||
|
||||
RunnerGroupID int64
|
||||
|
|
|
|||
|
|
@ -361,7 +361,12 @@ This is one of the features in GARM that I really love having. For one thing, it
|
|||
| `garm_runner_status` | Gauge | `name`=<runner name> <br>`pool_owner`=<owner name> <br>`pool_type`=<repository\|organization\|enterprise> <br>`provider`=<provider name> <br>`runner_status`=<running\|stopped\|error\|pending_delete\|deleting\|pending_create\|creating\|unknown> <br>`status`=<idle\|pending\|terminated\|installing\|failed\|active> <br> | This is a gauge value that gives us details about the runners garm spawns |
|
||||
| `garm_runner_operations_total` | Counter | `provider`=<provider name> <br>`operation`=<CreateInstance\|DeleteInstance\|GetInstance\|ListInstances\|RemoveAllInstances\|Start\Stop> | This is a counter that increments every time a runner operation is performed |
|
||||
| `garm_runner_errors_total` | Counter | `provider`=<provider name> <br>`operation`=<CreateInstance\|DeleteInstance\|GetInstance\|ListInstances\|RemoveAllInstances\|Start\Stop> | This is a counter that increments every time a runner operation errored |
|
||||
| `garm_runner_jobs_status` | Gauge | `job_id`=<job id> <br>`name`=<job name> <br>`status`=<job status> <br>`conclusion`=<job conclusion> <br>`runner_name`=<runner name> <br>`repository`=<repository> <br>`requested_labels`=<requested labels> | List of jobs and their status |
|
||||
|
||||
### Job metrics
|
||||
|
||||
| Metric name | Type | Labels | Description |
|
||||
|---------------------------|---------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-------------------------------|
|
||||
| `garm_job_jobs_status` | Gauge | `job_id`=<job id> <br>`name`=<job name> <br>`status`=<job status> <br>`conclusion`=<job conclusion> <br>`runner_name`=<runner name> <br>`repository`=<repository> <br>`requested_labels`=<requested labels> | List of jobs and their status |
|
||||
|
||||
### Github metrics
|
||||
|
||||
|
|
|
|||
|
|
@ -20,7 +20,7 @@ import (
|
|||
|
||||
var JobStatus = prometheus.NewGaugeVec(prometheus.GaugeOpts{
|
||||
Namespace: metricsNamespace,
|
||||
Subsystem: metricsRunnerSubsystem,
|
||||
Subsystem: metricsJobsSubsystem,
|
||||
Name: "jobs_status",
|
||||
Help: "List of jobs and their status",
|
||||
}, []string{"job_id", "name", "status", "conclusion", "runner_name", "repository", "requested_labels"})
|
||||
|
|
|
|||
|
|
@ -167,149 +167,249 @@ func (r *basePoolManager) getProviderBaseParams(pool params.Pool) common.Provide
|
|||
}
|
||||
}
|
||||
|
||||
// isEntityPool checks if a pool belongs to this manager's entity
|
||||
func (r *basePoolManager) isEntityPool(pool params.Pool) bool {
|
||||
switch r.entity.EntityType {
|
||||
case params.ForgeEntityTypeRepository:
|
||||
return pool.RepoID != "" && pool.RepoID == r.entity.ID
|
||||
case params.ForgeEntityTypeOrganization:
|
||||
return pool.OrgID != "" && pool.OrgID == r.entity.ID
|
||||
case params.ForgeEntityTypeEnterprise:
|
||||
return pool.EnterpriseID != "" && pool.EnterpriseID == r.entity.ID
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
// shouldRecordJob determines if a job should be recorded in the database
|
||||
func (r *basePoolManager) shouldRecordJob(jobParams params.Job, isNewJob bool) bool {
|
||||
// Always record existing jobs
|
||||
if !isNewJob {
|
||||
return true
|
||||
}
|
||||
|
||||
// For new jobs, only record if queued
|
||||
if jobParams.Status == string(params.JobStatusQueued) {
|
||||
// Check if we have pools that can handle it
|
||||
potentialPools, err := r.store.FindPoolsMatchingAllTags(r.ctx, r.entity.EntityType, r.entity.ID, jobParams.Labels)
|
||||
if err != nil {
|
||||
slog.With(slog.Any("error", err)).ErrorContext(
|
||||
r.ctx, "failed to find pools matching tags",
|
||||
"requested_tags", strings.Join(jobParams.Labels, ", "))
|
||||
return false
|
||||
}
|
||||
if len(potentialPools) == 0 {
|
||||
slog.WarnContext(
|
||||
r.ctx, "no pools matching tags; not recording job",
|
||||
"requested_tags", strings.Join(jobParams.Labels, ", "))
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// For new non-queued jobs, only record if runner belongs to us
|
||||
if jobParams.RunnerName != "" {
|
||||
_, err := r.store.GetInstance(r.ctx, jobParams.RunnerName)
|
||||
if err != nil {
|
||||
if !errors.Is(err, runnerErrors.ErrNotFound) {
|
||||
slog.With(slog.Any("error", err)).ErrorContext(
|
||||
r.ctx, "failed to get instance",
|
||||
"runner_name", util.SanitizeLogEntry(jobParams.RunnerName))
|
||||
}
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// New non-queued job with no runner - don't record
|
||||
return false
|
||||
}
|
||||
|
||||
// persistJobToDB saves or updates the job in the database
|
||||
func (r *basePoolManager) persistJobToDB(jobParams params.Job, triggeredBy int64) {
|
||||
if jobParams.WorkflowJobID == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
// Check if job exists
|
||||
_, err := r.store.GetJobByID(r.ctx, jobParams.WorkflowJobID)
|
||||
isNewJob := errors.Is(err, runnerErrors.ErrNotFound)
|
||||
if err != nil && !isNewJob {
|
||||
slog.With(slog.Any("error", err)).ErrorContext(
|
||||
r.ctx, "failed to get job",
|
||||
"job_id", jobParams.WorkflowJobID)
|
||||
return
|
||||
}
|
||||
|
||||
// Determine if we should record this job
|
||||
if !r.shouldRecordJob(jobParams, isNewJob) {
|
||||
return
|
||||
}
|
||||
|
||||
// Save or update the job
|
||||
if _, jobErr := r.store.CreateOrUpdateJob(r.ctx, jobParams); jobErr != nil {
|
||||
slog.With(slog.Any("error", jobErr)).ErrorContext(
|
||||
r.ctx, "failed to update job", "job_id", jobParams.WorkflowJobID)
|
||||
return
|
||||
}
|
||||
|
||||
// Break lock on the queued job that triggered this runner (if different)
|
||||
if triggeredBy != 0 && jobParams.WorkflowJobID != triggeredBy {
|
||||
if err := r.store.BreakLockJobIsQueued(r.ctx, triggeredBy); err != nil {
|
||||
slog.With(slog.Any("error", err)).ErrorContext(
|
||||
r.ctx, "failed to break lock for job",
|
||||
"job_id", triggeredBy)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// handleCompletedJob processes a completed job webhook
|
||||
func (r *basePoolManager) handleCompletedJob(jobParams params.Job) error {
|
||||
// Ignore jobs without a runner
|
||||
if jobParams.RunnerName == "" {
|
||||
slog.InfoContext(r.ctx, "job never got assigned to a runner, ignoring")
|
||||
return nil
|
||||
}
|
||||
|
||||
// Check if runner belongs to us
|
||||
instance, err := r.store.GetInstance(r.ctx, jobParams.RunnerName)
|
||||
if err != nil {
|
||||
if errors.Is(err, runnerErrors.ErrNotFound) {
|
||||
return nil
|
||||
}
|
||||
slog.With(slog.Any("error", err)).ErrorContext(
|
||||
r.ctx, "failed to get instance",
|
||||
"runner_name", util.SanitizeLogEntry(jobParams.RunnerName))
|
||||
return nil
|
||||
}
|
||||
|
||||
// Verify pool belongs to this entity
|
||||
pool, err := r.store.GetPoolByID(r.ctx, instance.PoolID)
|
||||
if err != nil {
|
||||
if errors.Is(err, runnerErrors.ErrNotFound) {
|
||||
return nil
|
||||
}
|
||||
slog.With(slog.Any("error", err)).ErrorContext(
|
||||
r.ctx, "failed to get pool",
|
||||
"pool_id", instance.PoolID)
|
||||
return nil
|
||||
}
|
||||
|
||||
if !r.isEntityPool(pool) {
|
||||
slog.DebugContext(r.ctx, "instance belongs to a pool not managed by this entity", "pool_id", instance.PoolID)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Mark runner as terminated
|
||||
if _, err := r.setInstanceRunnerStatus(jobParams.RunnerName, params.RunnerTerminated); err != nil {
|
||||
if errors.Is(err, runnerErrors.ErrNotFound) {
|
||||
return nil
|
||||
}
|
||||
slog.With(slog.Any("error", err)).ErrorContext(
|
||||
r.ctx, "failed to update runner status",
|
||||
"runner_name", util.SanitizeLogEntry(jobParams.RunnerName))
|
||||
return fmt.Errorf("error updating runner: %w", err)
|
||||
}
|
||||
|
||||
// Mark instance for deletion
|
||||
slog.DebugContext(
|
||||
r.ctx, "marking instance as pending_delete",
|
||||
"runner_name", util.SanitizeLogEntry(jobParams.RunnerName))
|
||||
if _, err := r.setInstanceStatus(jobParams.RunnerName, commonParams.InstancePendingDelete, nil); err != nil {
|
||||
if errors.Is(err, runnerErrors.ErrNotFound) {
|
||||
return nil
|
||||
}
|
||||
slog.With(slog.Any("error", err)).ErrorContext(
|
||||
r.ctx, "failed to update runner status",
|
||||
"runner_name", util.SanitizeLogEntry(jobParams.RunnerName))
|
||||
return fmt.Errorf("error updating runner: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// handleInProgressJob processes an in-progress job webhook
|
||||
func (r *basePoolManager) handleInProgressJob(jobParams params.Job) (triggeredBy int64, err error) {
|
||||
// Mark runner as active (this also validates the instance exists)
|
||||
instance, err := r.setInstanceRunnerStatus(jobParams.RunnerName, params.RunnerActive)
|
||||
if err != nil {
|
||||
if errors.Is(err, runnerErrors.ErrNotFound) {
|
||||
slog.DebugContext(r.ctx, "instance not found", "runner_name", jobParams.RunnerName)
|
||||
return 0, nil
|
||||
}
|
||||
slog.With(slog.Any("error", err)).ErrorContext(
|
||||
r.ctx, "failed to update runner status",
|
||||
"runner_name", util.SanitizeLogEntry(jobParams.RunnerName))
|
||||
return 0, fmt.Errorf("error updating runner: %w", err)
|
||||
}
|
||||
|
||||
// Verify pool belongs to this entity
|
||||
pool, err := r.store.GetPoolByID(r.ctx, instance.PoolID)
|
||||
if err != nil {
|
||||
if errors.Is(err, runnerErrors.ErrNotFound) {
|
||||
return 0, nil
|
||||
}
|
||||
slog.With(slog.Any("error", err)).ErrorContext(
|
||||
r.ctx, "failed to get pool",
|
||||
"pool_id", instance.PoolID)
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
if !r.isEntityPool(pool) {
|
||||
slog.DebugContext(r.ctx, "instance belongs to a pool not managed by this entity", "pool_id", instance.PoolID)
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
// Extract the job ID that triggered this runner
|
||||
triggeredBy = jobIDFromLabels(instance.AditionalLabels)
|
||||
|
||||
// Ensure minimum idle runners for the pool
|
||||
if err := r.ensureIdleRunnersForOnePool(pool); err != nil {
|
||||
slog.With(slog.Any("error", err)).ErrorContext(
|
||||
r.ctx, "error ensuring idle runners for pool",
|
||||
"pool_id", pool.ID)
|
||||
}
|
||||
|
||||
return triggeredBy, nil
|
||||
}
|
||||
|
||||
func (r *basePoolManager) HandleWorkflowJob(job params.WorkflowJob) error {
|
||||
// Validate job ownership
|
||||
if err := r.ValidateOwner(job); err != nil {
|
||||
slog.ErrorContext(r.ctx, "failed to validate owner", "error", err)
|
||||
return fmt.Errorf("error validating owner: %w", err)
|
||||
}
|
||||
|
||||
// we see events where the lables seem to be missing. We should ignore these
|
||||
// as we can't know if we should handle them or not.
|
||||
// Jobs without labels cannot be processed
|
||||
if len(job.WorkflowJob.Labels) == 0 {
|
||||
slog.WarnContext(r.ctx, "job has no labels", "workflow_job", job.WorkflowJob.Name)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Convert webhook payload to internal job format
|
||||
jobParams, err := r.paramsWorkflowJobToParamsJob(job)
|
||||
if err != nil {
|
||||
slog.ErrorContext(r.ctx, "failed to convert job to params", "error", err)
|
||||
return fmt.Errorf("error converting job to params: %w", err)
|
||||
}
|
||||
|
||||
// Process job based on action type
|
||||
var triggeredBy int64
|
||||
defer func() {
|
||||
if jobParams.WorkflowJobID == 0 {
|
||||
return
|
||||
}
|
||||
// we're updating the job in the database, regardless of whether it was successful or not.
|
||||
// or if it was meant for this pool or not. Github will send the same job data to all hierarchies
|
||||
// that have been configured to work with garm. Updating the job at all levels should yield the same
|
||||
// outcome in the db.
|
||||
_, err := r.store.GetJobByID(r.ctx, jobParams.WorkflowJobID)
|
||||
if err != nil {
|
||||
if !errors.Is(err, runnerErrors.ErrNotFound) {
|
||||
slog.With(slog.Any("error", err)).ErrorContext(
|
||||
r.ctx, "failed to get job",
|
||||
"job_id", jobParams.WorkflowJobID)
|
||||
return
|
||||
}
|
||||
// This job is new to us. Check if we have a pool that can handle it.
|
||||
potentialPools := cache.FindPoolsMatchingAllTags(r.entity.ID, jobParams.Labels)
|
||||
if len(potentialPools) == 0 {
|
||||
slog.WarnContext(
|
||||
r.ctx, "no pools matching tags; not recording job",
|
||||
"requested_tags", strings.Join(jobParams.Labels, ", "))
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
if _, jobErr := r.store.CreateOrUpdateJob(r.ctx, jobParams); jobErr != nil {
|
||||
slog.With(slog.Any("error", jobErr)).ErrorContext(
|
||||
r.ctx, "failed to update job", "job_id", jobParams.WorkflowJobID)
|
||||
}
|
||||
|
||||
if triggeredBy != 0 && jobParams.WorkflowJobID != triggeredBy {
|
||||
// The triggeredBy value is only set by the "in_progress" webhook. The runner that
|
||||
// transitioned to in_progress was created as a result of a different queued job. If that job is
|
||||
// still queued and we don't remove the lock, it will linger until the lock timeout is reached.
|
||||
// That may take a long time, so we break the lock here and allow it to be scheduled again.
|
||||
if err := r.store.BreakLockJobIsQueued(r.ctx, triggeredBy); err != nil {
|
||||
slog.With(slog.Any("error", err)).ErrorContext(
|
||||
r.ctx, "failed to break lock for job",
|
||||
"job_id", triggeredBy)
|
||||
}
|
||||
}
|
||||
}()
|
||||
var actionErr error
|
||||
|
||||
switch job.Action {
|
||||
case "queued":
|
||||
// Record the job in the database. Queued jobs will be picked up by the consumeQueuedJobs() method
|
||||
// when reconciling.
|
||||
// Queued jobs are just recorded; they'll be picked up by consumeQueuedJobs()
|
||||
case "completed":
|
||||
// If job was not assigned to a runner, we can ignore it.
|
||||
if jobParams.RunnerName == "" {
|
||||
slog.InfoContext(
|
||||
r.ctx, "job never got assigned to a runner, ignoring")
|
||||
return nil
|
||||
}
|
||||
|
||||
fromCache, ok := cache.GetInstanceCache(jobParams.RunnerName)
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
|
||||
if _, ok := cache.GetEntityPool(r.entity.ID, fromCache.PoolID); !ok {
|
||||
slog.DebugContext(r.ctx, "instance belongs to a pool not managed by this entity", "pool_id", fromCache.PoolID)
|
||||
return nil
|
||||
}
|
||||
|
||||
// update instance workload state.
|
||||
if _, err := r.setInstanceRunnerStatus(jobParams.RunnerName, params.RunnerTerminated); err != nil {
|
||||
if errors.Is(err, runnerErrors.ErrNotFound) {
|
||||
return nil
|
||||
}
|
||||
slog.With(slog.Any("error", err)).ErrorContext(
|
||||
r.ctx, "failed to update runner status",
|
||||
"runner_name", util.SanitizeLogEntry(jobParams.RunnerName))
|
||||
return fmt.Errorf("error updating runner: %w", err)
|
||||
}
|
||||
slog.DebugContext(
|
||||
r.ctx, "marking instance as pending_delete",
|
||||
"runner_name", util.SanitizeLogEntry(jobParams.RunnerName))
|
||||
if _, err := r.setInstanceStatus(jobParams.RunnerName, commonParams.InstancePendingDelete, nil); err != nil {
|
||||
if errors.Is(err, runnerErrors.ErrNotFound) {
|
||||
return nil
|
||||
}
|
||||
slog.With(slog.Any("error", err)).ErrorContext(
|
||||
r.ctx, "failed to update runner status",
|
||||
"runner_name", util.SanitizeLogEntry(jobParams.RunnerName))
|
||||
return fmt.Errorf("error updating runner: %w", err)
|
||||
}
|
||||
actionErr = r.handleCompletedJob(jobParams)
|
||||
case "in_progress":
|
||||
fromCache, ok := cache.GetInstanceCache(jobParams.RunnerName)
|
||||
if !ok {
|
||||
slog.DebugContext(r.ctx, "instance not found in cache", "runner_name", jobParams.RunnerName)
|
||||
return nil
|
||||
}
|
||||
|
||||
pool, ok := cache.GetEntityPool(r.entity.ID, fromCache.PoolID)
|
||||
if !ok {
|
||||
slog.DebugContext(r.ctx, "instance belongs to a pool not managed by this entity", "pool_id", fromCache.PoolID)
|
||||
return nil
|
||||
}
|
||||
// update instance workload state.
|
||||
instance, err := r.setInstanceRunnerStatus(jobParams.RunnerName, params.RunnerActive)
|
||||
if err != nil {
|
||||
if errors.Is(err, runnerErrors.ErrNotFound) {
|
||||
return nil
|
||||
}
|
||||
slog.With(slog.Any("error", err)).ErrorContext(
|
||||
r.ctx, "failed to update runner status",
|
||||
"runner_name", util.SanitizeLogEntry(jobParams.RunnerName))
|
||||
return fmt.Errorf("error updating runner: %w", err)
|
||||
}
|
||||
// Set triggeredBy here so we break the lock on any potential queued job.
|
||||
triggeredBy = jobIDFromLabels(instance.AditionalLabels)
|
||||
|
||||
// A runner has picked up the job, and is now running it. It may need to be replaced if the pool has
|
||||
// a minimum number of idle runners configured.
|
||||
if err := r.ensureIdleRunnersForOnePool(pool); err != nil {
|
||||
slog.With(slog.Any("error", err)).ErrorContext(
|
||||
r.ctx, "error ensuring idle runners for pool",
|
||||
"pool_id", pool.ID)
|
||||
}
|
||||
triggeredBy, actionErr = r.handleInProgressJob(jobParams)
|
||||
}
|
||||
return nil
|
||||
|
||||
// Always persist job to database (success or failure)
|
||||
r.persistJobToDB(jobParams, triggeredBy)
|
||||
|
||||
return actionErr
|
||||
}
|
||||
|
||||
func jobIDFromLabels(labels []string) int64 {
|
||||
|
|
@ -1754,6 +1854,14 @@ func (r *basePoolManager) DeleteRunner(runner params.Instance, forceRemove, bypa
|
|||
// so those will trigger the creation of a runner. The jobs we don't know about will be dealt with by the idle runners.
|
||||
// Once jobs are consumed, you can set min-idle-runners to 0 again.
|
||||
func (r *basePoolManager) consumeQueuedJobs() error {
|
||||
defer func() {
|
||||
// Always try to clean inactionable jobs. Otherwise, if any condition
|
||||
// makes this function exit, we never clean up jobs.
|
||||
if err := r.store.DeleteInactionableJobs(r.ctx); err != nil {
|
||||
slog.With(slog.Any("error", err)).ErrorContext(
|
||||
r.ctx, "failed to delete completed jobs")
|
||||
}
|
||||
}()
|
||||
queued := r.getQueuedJobs()
|
||||
|
||||
poolsCache := poolsForTags{
|
||||
|
|
@ -1872,11 +1980,6 @@ func (r *basePoolManager) consumeQueuedJobs() error {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
if err := r.store.DeleteCompletedJobs(r.ctx); err != nil {
|
||||
slog.With(slog.Any("error", err)).ErrorContext(
|
||||
r.ctx, "failed to delete completed jobs")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -400,7 +400,7 @@ func (g *githubClient) getOrganizationRunnerGroupIDByName(ctx context.Context, e
|
|||
}
|
||||
return 0, fmt.Errorf("error fetching runners: %w", err)
|
||||
}
|
||||
if err == nil && ghResp != nil {
|
||||
if ghResp != nil {
|
||||
g.recordLimits(ghResp.Rate)
|
||||
}
|
||||
|
||||
|
|
@ -440,7 +440,7 @@ func (g *githubClient) getEnterpriseRunnerGroupIDByName(ctx context.Context, ent
|
|||
}
|
||||
return 0, fmt.Errorf("error fetching runners: %w", err)
|
||||
}
|
||||
if err == nil && ghResp != nil {
|
||||
if ghResp != nil {
|
||||
g.recordLimits(ghResp.Rate)
|
||||
}
|
||||
for _, runnerGroup := range runnerGroups.RunnerGroups {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue