From 59e6fb28c2d14b4e75069deb44ef4a3c7728c090 Mon Sep 17 00:00:00 2001 From: Gabriel Adrian Samfira Date: Sat, 26 Aug 2023 19:43:57 +0000 Subject: [PATCH 1/3] Create relation between WorkflowJobs and Instances Ensure that there is a foreign key constraint between runners and jobs. Once a runner is associated with a job, we want the job to be removed along with the runner. Signed-off-by: Gabriel Adrian Samfira --- database/sql/jobs.go | 53 +++++++++++++++++++++++++++++------------- database/sql/models.go | 9 +++++-- database/sql/sql.go | 13 +++++++++++ 3 files changed, 57 insertions(+), 18 deletions(-) diff --git a/database/sql/jobs.go b/database/sql/jobs.go index 091dfd7c..1d050f01 100644 --- a/database/sql/jobs.go +++ b/database/sql/jobs.go @@ -3,6 +3,7 @@ package sql import ( "context" "encoding/json" + "log" runnerErrors "github.com/cloudbase/garm-provider-common/errors" "github.com/cloudbase/garm/database/common" @@ -23,7 +24,7 @@ func sqlWorkflowJobToParamsJob(job WorkflowJob) (params.Job, error) { } } - return params.Job{ + jobParam := params.Job{ ID: job.ID, RunID: job.RunID, Action: job.Action, @@ -33,7 +34,6 @@ func sqlWorkflowJobToParamsJob(job WorkflowJob) (params.Job, error) { StartedAt: job.StartedAt, CompletedAt: job.CompletedAt, GithubRunnerID: job.GithubRunnerID, - RunnerName: job.RunnerName, RunnerGroupID: job.RunnerGroupID, RunnerGroupName: job.RunnerGroupName, RepositoryName: job.RepositoryName, @@ -45,15 +45,21 @@ func sqlWorkflowJobToParamsJob(job WorkflowJob) (params.Job, error) { CreatedAt: job.CreatedAt, UpdatedAt: job.UpdatedAt, LockedBy: job.LockedBy, - }, nil + } + + if job.InstanceID != nil { + jobParam.RunnerName = job.Instance.Name + } + return jobParam, nil } -func paramsJobToWorkflowJob(job params.Job) (WorkflowJob, error) { +func (s *sqlDatabase) paramsJobToWorkflowJob(job params.Job) (WorkflowJob, error) { asJson, err := json.Marshal(job.Labels) if err != nil { return WorkflowJob{}, errors.Wrap(err, "marshaling labels") } - return WorkflowJob{ + + workflofJob := WorkflowJob{ ID: job.ID, RunID: job.RunID, Action: job.Action, @@ -63,7 +69,6 @@ func paramsJobToWorkflowJob(job params.Job) (WorkflowJob, error) { StartedAt: job.StartedAt, CompletedAt: job.CompletedAt, GithubRunnerID: job.GithubRunnerID, - RunnerName: job.RunnerName, RunnerGroupID: job.RunnerGroupID, RunnerGroupName: job.RunnerGroupName, RepositoryName: job.RepositoryName, @@ -73,7 +78,18 @@ func paramsJobToWorkflowJob(job params.Job) (WorkflowJob, error) { EnterpriseID: job.EnterpriseID, Labels: asJson, LockedBy: job.LockedBy, - }, nil + } + + if job.RunnerName != "" { + instance, err := s.getInstanceByName(s.ctx, job.RunnerName) + if err != nil { + log.Printf("failed to get instance by name: %v", err) + } else { + workflofJob.InstanceID = &instance.ID + } + } + + return workflofJob, nil } func (s *sqlDatabase) DeleteJob(ctx context.Context, jobID int64) error { @@ -93,7 +109,7 @@ func (s *sqlDatabase) LockJob(ctx context.Context, jobID int64, entityID string) return errors.Wrap(err, "parsing entity id") } var workflowJob WorkflowJob - q := s.conn.Clauses(clause.Locking{Strength: "UPDATE"}).Where("id = ?", jobID).First(&workflowJob) + q := s.conn.Clauses(clause.Locking{Strength: "UPDATE"}).Preload("Instance").Where("id = ?", jobID).First(&workflowJob) if q.Error != nil { if errors.Is(q.Error, gorm.ErrRecordNotFound) { @@ -122,7 +138,7 @@ func (s *sqlDatabase) LockJob(ctx context.Context, jobID int64, entityID string) func (s *sqlDatabase) BreakLockJobIsQueued(ctx context.Context, jobID int64) error { var workflowJob WorkflowJob - q := s.conn.Clauses(clause.Locking{Strength: "UPDATE"}).Where("id = ? and status = ?", jobID, params.JobStatusQueued).First(&workflowJob) + q := s.conn.Clauses(clause.Locking{Strength: "UPDATE"}).Preload("Instance").Where("id = ? and status = ?", jobID, params.JobStatusQueued).First(&workflowJob) if q.Error != nil { if errors.Is(q.Error, gorm.ErrRecordNotFound) { @@ -174,7 +190,7 @@ func (s *sqlDatabase) UnlockJob(ctx context.Context, jobID int64, entityID strin func (s *sqlDatabase) CreateOrUpdateJob(ctx context.Context, job params.Job) (params.Job, error) { var workflowJob WorkflowJob - q := s.conn.Clauses(clause.Locking{Strength: "UPDATE"}).Where("id = ?", job.ID).First(&workflowJob) + q := s.conn.Clauses(clause.Locking{Strength: "UPDATE"}).Preload("Instance").Where("id = ?", job.ID).First(&workflowJob) if q.Error != nil { if !errors.Is(q.Error, gorm.ErrRecordNotFound) { @@ -198,7 +214,12 @@ func (s *sqlDatabase) CreateOrUpdateJob(ctx context.Context, job params.Job) (pa } if job.RunnerName != "" { - workflowJob.RunnerName = job.RunnerName + instance, err := s.getInstanceByName(ctx, job.RunnerName) + if err == nil { + workflowJob.InstanceID = &instance.ID + } else { + log.Printf("failed to get instance by name: %v", err) + } } if job.RepoID != nil { @@ -216,7 +237,7 @@ func (s *sqlDatabase) CreateOrUpdateJob(ctx context.Context, job params.Job) (pa return params.Job{}, errors.Wrap(err, "saving job") } } else { - workflowJob, err := paramsJobToWorkflowJob(job) + workflowJob, err := s.paramsJobToWorkflowJob(job) if err != nil { return params.Job{}, errors.Wrap(err, "converting job") } @@ -231,7 +252,7 @@ func (s *sqlDatabase) CreateOrUpdateJob(ctx context.Context, job params.Job) (pa // ListJobsByStatus lists all jobs for a given status. func (s *sqlDatabase) ListJobsByStatus(ctx context.Context, status params.JobStatus) ([]params.Job, error) { var jobs []WorkflowJob - query := s.conn.Model(&WorkflowJob{}).Where("status = ?", status) + query := s.conn.Model(&WorkflowJob{}).Preload("Instance").Where("status = ?", status) if err := query.Find(&jobs); err.Error != nil { return nil, err.Error @@ -256,7 +277,7 @@ func (s *sqlDatabase) ListEntityJobsByStatus(ctx context.Context, entityType par } var jobs []WorkflowJob - query := s.conn.Model(&WorkflowJob{}).Where("status = ?", status) + query := s.conn.Model(&WorkflowJob{}).Preload("Instance").Where("status = ?", status) switch entityType { case params.OrganizationPool: @@ -289,7 +310,7 @@ func (s *sqlDatabase) ListAllJobs(ctx context.Context) ([]params.Job, error) { var jobs []WorkflowJob query := s.conn.Model(&WorkflowJob{}) - if err := query.Find(&jobs); err.Error != nil { + if err := query.Preload("Instance").Find(&jobs); err.Error != nil { if errors.Is(err.Error, gorm.ErrRecordNotFound) { return []params.Job{}, nil } @@ -310,7 +331,7 @@ func (s *sqlDatabase) ListAllJobs(ctx context.Context) ([]params.Job, error) { // GetJobByID gets a job by id. func (s *sqlDatabase) GetJobByID(ctx context.Context, jobID int64) (params.Job, error) { var job WorkflowJob - query := s.conn.Model(&WorkflowJob{}).Where("id = ?", jobID) + query := s.conn.Model(&WorkflowJob{}).Preload("Instance").Where("id = ?", jobID) if err := query.First(&job); err.Error != nil { if errors.Is(err.Error, gorm.ErrRecordNotFound) { diff --git a/database/sql/models.go b/database/sql/models.go index 86a343cc..ac41f031 100644 --- a/database/sql/models.go +++ b/database/sql/models.go @@ -162,6 +162,8 @@ type Instance struct { Pool Pool `gorm:"foreignKey:PoolID"` StatusMessages []InstanceStatusUpdate `gorm:"foreignKey:InstanceID;constraint:OnDelete:CASCADE,OnUpdate:CASCADE;"` + + Job *WorkflowJob `gorm:"foreignKey:InstanceID;constraint:OnDelete:CASCADE,OnUpdate:CASCADE;"` } type User struct { @@ -201,8 +203,11 @@ type WorkflowJob struct { StartedAt time.Time CompletedAt time.Time - GithubRunnerID int64 - RunnerName string + GithubRunnerID int64 + + InstanceID *uuid.UUID `gorm:"index:idx_instance_job"` + Instance Instance `gorm:"foreignKey:InstanceID"` + RunnerGroupID int64 RunnerGroupName string diff --git a/database/sql/sql.go b/database/sql/sql.go index ac3149e9..47c6ef02 100644 --- a/database/sql/sql.go +++ b/database/sql/sql.go @@ -221,6 +221,19 @@ func (s *sqlDatabase) migrateDB() error { } } + if s.conn.Migrator().HasTable(&WorkflowJob{}) { + if s.conn.Migrator().HasColumn(&WorkflowJob{}, "runner_name") { + // Remove jobs that are not in "queued" status. We really only care about queued jobs. Once they transition + // to something else, we don't really consume them anyway. + if err := s.conn.Exec("delete from workflow_jobs where status is not 'queued'").Error; err != nil { + return errors.Wrap(err, "updating workflow_jobs") + } + if err := s.conn.Migrator().DropColumn(&WorkflowJob{}, "runner_name"); err != nil { + return errors.Wrap(err, "updating workflow_jobs") + } + } + } + if err := s.conn.AutoMigrate( &Tag{}, &Pool{}, From 891b6d3105f25d24fdf044ef989a198f46489e22 Mon Sep 17 00:00:00 2001 From: Gabriel Adrian Samfira Date: Sat, 26 Aug 2023 19:47:17 +0000 Subject: [PATCH 2/3] Run go generate Signed-off-by: Gabriel Adrian Samfira --- runner/common/mocks/GithubClient.go | 52 ++++++++++++++++++++++++ runner/common/mocks/OrganizationHooks.go | 26 ++++++++++++ runner/common/mocks/RepositoryHooks.go | 26 ++++++++++++ 3 files changed, 104 insertions(+) diff --git a/runner/common/mocks/GithubClient.go b/runner/common/mocks/GithubClient.go index ebd442ba..bfcada29 100644 --- a/runner/common/mocks/GithubClient.go +++ b/runner/common/mocks/GithubClient.go @@ -521,6 +521,58 @@ func (_m *GithubClient) ListRunners(ctx context.Context, owner string, repo stri return r0, r1, r2 } +// PingOrgHook provides a mock function with given fields: ctx, org, id +func (_m *GithubClient) PingOrgHook(ctx context.Context, org string, id int64) (*github.Response, error) { + ret := _m.Called(ctx, org, id) + + var r0 *github.Response + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, string, int64) (*github.Response, error)); ok { + return rf(ctx, org, id) + } + if rf, ok := ret.Get(0).(func(context.Context, string, int64) *github.Response); ok { + r0 = rf(ctx, org, id) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*github.Response) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, string, int64) error); ok { + r1 = rf(ctx, org, id) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// PingRepoHook provides a mock function with given fields: ctx, owner, repo, id +func (_m *GithubClient) PingRepoHook(ctx context.Context, owner string, repo string, id int64) (*github.Response, error) { + ret := _m.Called(ctx, owner, repo, id) + + var r0 *github.Response + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, string, string, int64) (*github.Response, error)); ok { + return rf(ctx, owner, repo, id) + } + if rf, ok := ret.Get(0).(func(context.Context, string, string, int64) *github.Response); ok { + r0 = rf(ctx, owner, repo, id) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*github.Response) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, string, string, int64) error); ok { + r1 = rf(ctx, owner, repo, id) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // RemoveOrganizationRunner provides a mock function with given fields: ctx, owner, runnerID func (_m *GithubClient) RemoveOrganizationRunner(ctx context.Context, owner string, runnerID int64) (*github.Response, error) { ret := _m.Called(ctx, owner, runnerID) diff --git a/runner/common/mocks/OrganizationHooks.go b/runner/common/mocks/OrganizationHooks.go index 4d891f0b..c19a7b68 100644 --- a/runner/common/mocks/OrganizationHooks.go +++ b/runner/common/mocks/OrganizationHooks.go @@ -145,6 +145,32 @@ func (_m *OrganizationHooks) ListOrgHooks(ctx context.Context, org string, opts return r0, r1, r2 } +// PingOrgHook provides a mock function with given fields: ctx, org, id +func (_m *OrganizationHooks) PingOrgHook(ctx context.Context, org string, id int64) (*github.Response, error) { + ret := _m.Called(ctx, org, id) + + var r0 *github.Response + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, string, int64) (*github.Response, error)); ok { + return rf(ctx, org, id) + } + if rf, ok := ret.Get(0).(func(context.Context, string, int64) *github.Response); ok { + r0 = rf(ctx, org, id) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*github.Response) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, string, int64) error); ok { + r1 = rf(ctx, org, id) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // NewOrganizationHooks creates a new instance of OrganizationHooks. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. // The first argument is typically a *testing.T value. func NewOrganizationHooks(t interface { diff --git a/runner/common/mocks/RepositoryHooks.go b/runner/common/mocks/RepositoryHooks.go index ab9f1496..800a48c5 100644 --- a/runner/common/mocks/RepositoryHooks.go +++ b/runner/common/mocks/RepositoryHooks.go @@ -145,6 +145,32 @@ func (_m *RepositoryHooks) ListRepoHooks(ctx context.Context, owner string, repo return r0, r1, r2 } +// PingRepoHook provides a mock function with given fields: ctx, owner, repo, id +func (_m *RepositoryHooks) PingRepoHook(ctx context.Context, owner string, repo string, id int64) (*github.Response, error) { + ret := _m.Called(ctx, owner, repo, id) + + var r0 *github.Response + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, string, string, int64) (*github.Response, error)); ok { + return rf(ctx, owner, repo, id) + } + if rf, ok := ret.Get(0).(func(context.Context, string, string, int64) *github.Response); ok { + r0 = rf(ctx, owner, repo, id) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*github.Response) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, string, string, int64) error); ok { + r1 = rf(ctx, owner, repo, id) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // NewRepositoryHooks creates a new instance of RepositoryHooks. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. // The first argument is typically a *testing.T value. func NewRepositoryHooks(t interface { From f2100f7c91d27f2be90cfd35fc844f30c207eee0 Mon Sep 17 00:00:00 2001 From: Gabriel Adrian Samfira Date: Sat, 26 Aug 2023 20:13:48 +0000 Subject: [PATCH 3/3] Fix tests Signed-off-by: Gabriel Adrian Samfira --- database/sql/instances_test.go | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/database/sql/instances_test.go b/database/sql/instances_test.go index d47b265e..18c58ab9 100644 --- a/database/sql/instances_test.go +++ b/database/sql/instances_test.go @@ -294,6 +294,10 @@ func (s *InstancesTestSuite) TestDeleteInstanceDBRecordNotFoundErr() { ExpectQuery(regexp.QuoteMeta("SELECT * FROM `addresses` WHERE `addresses`.`instance_id` = ? AND `addresses`.`deleted_at` IS NULL")). WithArgs(instance.ID). WillReturnRows(sqlmock.NewRows([]string{"address", "type", "instance_id"}).AddRow("10.10.1.10", "private", instance.ID)) + s.Fixtures.SQLMock. + ExpectQuery(regexp.QuoteMeta("SELECT * FROM `workflow_jobs` WHERE `workflow_jobs`.`instance_id` = ? AND `workflow_jobs`.`deleted_at` IS NULL")). + WithArgs(instance.ID). + WillReturnRows(sqlmock.NewRows([]string{})) s.Fixtures.SQLMock. ExpectQuery(regexp.QuoteMeta("SELECT * FROM `instance_status_updates` WHERE `instance_status_updates`.`instance_id` = ? AND `instance_status_updates`.`deleted_at` IS NULL")). WithArgs(instance.ID). @@ -327,6 +331,10 @@ func (s *InstancesTestSuite) TestDeleteInstanceDBDeleteErr() { ExpectQuery(regexp.QuoteMeta("SELECT * FROM `addresses` WHERE `addresses`.`instance_id` = ? AND `addresses`.`deleted_at` IS NULL")). WithArgs(instance.ID). WillReturnRows(sqlmock.NewRows([]string{"address", "type", "instance_id"}).AddRow("12.10.12.13", "public", instance.ID)) + s.Fixtures.SQLMock. + ExpectQuery(regexp.QuoteMeta("SELECT * FROM `workflow_jobs` WHERE `workflow_jobs`.`instance_id` = ? AND `workflow_jobs`.`deleted_at` IS NULL")). + WithArgs(instance.ID). + WillReturnRows(sqlmock.NewRows([]string{})) s.Fixtures.SQLMock. ExpectQuery(regexp.QuoteMeta("SELECT * FROM `instance_status_updates` WHERE `instance_status_updates`.`instance_id` = ? AND `instance_status_updates`.`deleted_at` IS NULL")). WithArgs(instance.ID). @@ -378,6 +386,10 @@ func (s *InstancesTestSuite) TestAddInstanceEventDBUpdateErr() { ExpectQuery(regexp.QuoteMeta("SELECT * FROM `addresses` WHERE `addresses`.`instance_id` = ? AND `addresses`.`deleted_at` IS NULL")). WithArgs(instance.ID). WillReturnRows(sqlmock.NewRows([]string{"address", "type", "instance_id"}).AddRow("10.10.1.10", "private", instance.ID)) + s.Fixtures.SQLMock. + ExpectQuery(regexp.QuoteMeta("SELECT * FROM `workflow_jobs` WHERE `workflow_jobs`.`instance_id` = ? AND `workflow_jobs`.`deleted_at` IS NULL")). + WithArgs(instance.ID). + WillReturnRows(sqlmock.NewRows([]string{})) s.Fixtures.SQLMock. ExpectQuery(regexp.QuoteMeta("SELECT * FROM `instance_status_updates` WHERE `instance_status_updates`.`instance_id` = ? AND `instance_status_updates`.`deleted_at` IS NULL")). WithArgs(instance.ID). @@ -429,6 +441,10 @@ func (s *InstancesTestSuite) TestUpdateInstanceDBUpdateInstanceErr() { ExpectQuery(regexp.QuoteMeta("SELECT * FROM `addresses` WHERE `addresses`.`instance_id` = ? AND `addresses`.`deleted_at` IS NULL")). WithArgs(instance.ID). WillReturnRows(sqlmock.NewRows([]string{"address", "type", "instance_id"}).AddRow("10.10.1.10", "private", instance.ID)) + s.Fixtures.SQLMock. + ExpectQuery(regexp.QuoteMeta("SELECT * FROM `workflow_jobs` WHERE `workflow_jobs`.`instance_id` = ? AND `workflow_jobs`.`deleted_at` IS NULL")). + WithArgs(instance.ID). + WillReturnRows(sqlmock.NewRows([]string{})) s.Fixtures.SQLMock. ExpectQuery(regexp.QuoteMeta("SELECT * FROM `instance_status_updates` WHERE `instance_status_updates`.`instance_id` = ? AND `instance_status_updates`.`deleted_at` IS NULL")). WithArgs(instance.ID). @@ -457,6 +473,10 @@ func (s *InstancesTestSuite) TestUpdateInstanceDBUpdateAddressErr() { ExpectQuery(regexp.QuoteMeta("SELECT * FROM `addresses` WHERE `addresses`.`instance_id` = ? AND `addresses`.`deleted_at` IS NULL")). WithArgs(instance.ID). WillReturnRows(sqlmock.NewRows([]string{"address", "type", "instance_id"}).AddRow("10.10.1.10", "private", instance.ID)) + s.Fixtures.SQLMock. + ExpectQuery(regexp.QuoteMeta("SELECT * FROM `workflow_jobs` WHERE `workflow_jobs`.`instance_id` = ? AND `workflow_jobs`.`deleted_at` IS NULL")). + WithArgs(instance.ID). + WillReturnRows(sqlmock.NewRows([]string{})) s.Fixtures.SQLMock. ExpectQuery(regexp.QuoteMeta("SELECT * FROM `instance_status_updates` WHERE `instance_status_updates`.`instance_id` = ? AND `instance_status_updates`.`deleted_at` IS NULL")). WithArgs(instance.ID).