Create relation between WorkflowJobs and Instances

Ensure that there is a foreign key constraint between runners and jobs.
Once a runner is associated with a job, we want the job to be removed along
with the runner.

Signed-off-by: Gabriel Adrian Samfira <gsamfira@cloudbasesolutions.com>
This commit is contained in:
Gabriel Adrian Samfira 2023-08-26 19:43:57 +00:00
parent d3479790d7
commit 59e6fb28c2
3 changed files with 57 additions and 18 deletions

View file

@ -3,6 +3,7 @@ package sql
import (
"context"
"encoding/json"
"log"
runnerErrors "github.com/cloudbase/garm-provider-common/errors"
"github.com/cloudbase/garm/database/common"
@ -23,7 +24,7 @@ func sqlWorkflowJobToParamsJob(job WorkflowJob) (params.Job, error) {
}
}
return params.Job{
jobParam := params.Job{
ID: job.ID,
RunID: job.RunID,
Action: job.Action,
@ -33,7 +34,6 @@ func sqlWorkflowJobToParamsJob(job WorkflowJob) (params.Job, error) {
StartedAt: job.StartedAt,
CompletedAt: job.CompletedAt,
GithubRunnerID: job.GithubRunnerID,
RunnerName: job.RunnerName,
RunnerGroupID: job.RunnerGroupID,
RunnerGroupName: job.RunnerGroupName,
RepositoryName: job.RepositoryName,
@ -45,15 +45,21 @@ func sqlWorkflowJobToParamsJob(job WorkflowJob) (params.Job, error) {
CreatedAt: job.CreatedAt,
UpdatedAt: job.UpdatedAt,
LockedBy: job.LockedBy,
}, nil
}
if job.InstanceID != nil {
jobParam.RunnerName = job.Instance.Name
}
return jobParam, nil
}
func paramsJobToWorkflowJob(job params.Job) (WorkflowJob, error) {
func (s *sqlDatabase) paramsJobToWorkflowJob(job params.Job) (WorkflowJob, error) {
asJson, err := json.Marshal(job.Labels)
if err != nil {
return WorkflowJob{}, errors.Wrap(err, "marshaling labels")
}
return WorkflowJob{
workflofJob := WorkflowJob{
ID: job.ID,
RunID: job.RunID,
Action: job.Action,
@ -63,7 +69,6 @@ func paramsJobToWorkflowJob(job params.Job) (WorkflowJob, error) {
StartedAt: job.StartedAt,
CompletedAt: job.CompletedAt,
GithubRunnerID: job.GithubRunnerID,
RunnerName: job.RunnerName,
RunnerGroupID: job.RunnerGroupID,
RunnerGroupName: job.RunnerGroupName,
RepositoryName: job.RepositoryName,
@ -73,7 +78,18 @@ func paramsJobToWorkflowJob(job params.Job) (WorkflowJob, error) {
EnterpriseID: job.EnterpriseID,
Labels: asJson,
LockedBy: job.LockedBy,
}, nil
}
if job.RunnerName != "" {
instance, err := s.getInstanceByName(s.ctx, job.RunnerName)
if err != nil {
log.Printf("failed to get instance by name: %v", err)
} else {
workflofJob.InstanceID = &instance.ID
}
}
return workflofJob, nil
}
func (s *sqlDatabase) DeleteJob(ctx context.Context, jobID int64) error {
@ -93,7 +109,7 @@ func (s *sqlDatabase) LockJob(ctx context.Context, jobID int64, entityID string)
return errors.Wrap(err, "parsing entity id")
}
var workflowJob WorkflowJob
q := s.conn.Clauses(clause.Locking{Strength: "UPDATE"}).Where("id = ?", jobID).First(&workflowJob)
q := s.conn.Clauses(clause.Locking{Strength: "UPDATE"}).Preload("Instance").Where("id = ?", jobID).First(&workflowJob)
if q.Error != nil {
if errors.Is(q.Error, gorm.ErrRecordNotFound) {
@ -122,7 +138,7 @@ func (s *sqlDatabase) LockJob(ctx context.Context, jobID int64, entityID string)
func (s *sqlDatabase) BreakLockJobIsQueued(ctx context.Context, jobID int64) error {
var workflowJob WorkflowJob
q := s.conn.Clauses(clause.Locking{Strength: "UPDATE"}).Where("id = ? and status = ?", jobID, params.JobStatusQueued).First(&workflowJob)
q := s.conn.Clauses(clause.Locking{Strength: "UPDATE"}).Preload("Instance").Where("id = ? and status = ?", jobID, params.JobStatusQueued).First(&workflowJob)
if q.Error != nil {
if errors.Is(q.Error, gorm.ErrRecordNotFound) {
@ -174,7 +190,7 @@ func (s *sqlDatabase) UnlockJob(ctx context.Context, jobID int64, entityID strin
func (s *sqlDatabase) CreateOrUpdateJob(ctx context.Context, job params.Job) (params.Job, error) {
var workflowJob WorkflowJob
q := s.conn.Clauses(clause.Locking{Strength: "UPDATE"}).Where("id = ?", job.ID).First(&workflowJob)
q := s.conn.Clauses(clause.Locking{Strength: "UPDATE"}).Preload("Instance").Where("id = ?", job.ID).First(&workflowJob)
if q.Error != nil {
if !errors.Is(q.Error, gorm.ErrRecordNotFound) {
@ -198,7 +214,12 @@ func (s *sqlDatabase) CreateOrUpdateJob(ctx context.Context, job params.Job) (pa
}
if job.RunnerName != "" {
workflowJob.RunnerName = job.RunnerName
instance, err := s.getInstanceByName(ctx, job.RunnerName)
if err == nil {
workflowJob.InstanceID = &instance.ID
} else {
log.Printf("failed to get instance by name: %v", err)
}
}
if job.RepoID != nil {
@ -216,7 +237,7 @@ func (s *sqlDatabase) CreateOrUpdateJob(ctx context.Context, job params.Job) (pa
return params.Job{}, errors.Wrap(err, "saving job")
}
} else {
workflowJob, err := paramsJobToWorkflowJob(job)
workflowJob, err := s.paramsJobToWorkflowJob(job)
if err != nil {
return params.Job{}, errors.Wrap(err, "converting job")
}
@ -231,7 +252,7 @@ func (s *sqlDatabase) CreateOrUpdateJob(ctx context.Context, job params.Job) (pa
// ListJobsByStatus lists all jobs for a given status.
func (s *sqlDatabase) ListJobsByStatus(ctx context.Context, status params.JobStatus) ([]params.Job, error) {
var jobs []WorkflowJob
query := s.conn.Model(&WorkflowJob{}).Where("status = ?", status)
query := s.conn.Model(&WorkflowJob{}).Preload("Instance").Where("status = ?", status)
if err := query.Find(&jobs); err.Error != nil {
return nil, err.Error
@ -256,7 +277,7 @@ func (s *sqlDatabase) ListEntityJobsByStatus(ctx context.Context, entityType par
}
var jobs []WorkflowJob
query := s.conn.Model(&WorkflowJob{}).Where("status = ?", status)
query := s.conn.Model(&WorkflowJob{}).Preload("Instance").Where("status = ?", status)
switch entityType {
case params.OrganizationPool:
@ -289,7 +310,7 @@ func (s *sqlDatabase) ListAllJobs(ctx context.Context) ([]params.Job, error) {
var jobs []WorkflowJob
query := s.conn.Model(&WorkflowJob{})
if err := query.Find(&jobs); err.Error != nil {
if err := query.Preload("Instance").Find(&jobs); err.Error != nil {
if errors.Is(err.Error, gorm.ErrRecordNotFound) {
return []params.Job{}, nil
}
@ -310,7 +331,7 @@ func (s *sqlDatabase) ListAllJobs(ctx context.Context) ([]params.Job, error) {
// GetJobByID gets a job by id.
func (s *sqlDatabase) GetJobByID(ctx context.Context, jobID int64) (params.Job, error) {
var job WorkflowJob
query := s.conn.Model(&WorkflowJob{}).Where("id = ?", jobID)
query := s.conn.Model(&WorkflowJob{}).Preload("Instance").Where("id = ?", jobID)
if err := query.First(&job); err.Error != nil {
if errors.Is(err.Error, gorm.ErrRecordNotFound) {

View file

@ -162,6 +162,8 @@ type Instance struct {
Pool Pool `gorm:"foreignKey:PoolID"`
StatusMessages []InstanceStatusUpdate `gorm:"foreignKey:InstanceID;constraint:OnDelete:CASCADE,OnUpdate:CASCADE;"`
Job *WorkflowJob `gorm:"foreignKey:InstanceID;constraint:OnDelete:CASCADE,OnUpdate:CASCADE;"`
}
type User struct {
@ -201,8 +203,11 @@ type WorkflowJob struct {
StartedAt time.Time
CompletedAt time.Time
GithubRunnerID int64
RunnerName string
GithubRunnerID int64
InstanceID *uuid.UUID `gorm:"index:idx_instance_job"`
Instance Instance `gorm:"foreignKey:InstanceID"`
RunnerGroupID int64
RunnerGroupName string

View file

@ -221,6 +221,19 @@ func (s *sqlDatabase) migrateDB() error {
}
}
if s.conn.Migrator().HasTable(&WorkflowJob{}) {
if s.conn.Migrator().HasColumn(&WorkflowJob{}, "runner_name") {
// Remove jobs that are not in "queued" status. We really only care about queued jobs. Once they transition
// to something else, we don't really consume them anyway.
if err := s.conn.Exec("delete from workflow_jobs where status is not 'queued'").Error; err != nil {
return errors.Wrap(err, "updating workflow_jobs")
}
if err := s.conn.Migrator().DropColumn(&WorkflowJob{}, "runner_name"); err != nil {
return errors.Wrap(err, "updating workflow_jobs")
}
}
}
if err := s.conn.AutoMigrate(
&Tag{},
&Pool{},