Merge pull request #172 from gabriel-samfira/add-relation-to-jobs
Add relation to jobs
This commit is contained in:
commit
4348999cb1
7 changed files with 181 additions and 18 deletions
|
|
@ -294,6 +294,10 @@ func (s *InstancesTestSuite) TestDeleteInstanceDBRecordNotFoundErr() {
|
|||
ExpectQuery(regexp.QuoteMeta("SELECT * FROM `addresses` WHERE `addresses`.`instance_id` = ? AND `addresses`.`deleted_at` IS NULL")).
|
||||
WithArgs(instance.ID).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"address", "type", "instance_id"}).AddRow("10.10.1.10", "private", instance.ID))
|
||||
s.Fixtures.SQLMock.
|
||||
ExpectQuery(regexp.QuoteMeta("SELECT * FROM `workflow_jobs` WHERE `workflow_jobs`.`instance_id` = ? AND `workflow_jobs`.`deleted_at` IS NULL")).
|
||||
WithArgs(instance.ID).
|
||||
WillReturnRows(sqlmock.NewRows([]string{}))
|
||||
s.Fixtures.SQLMock.
|
||||
ExpectQuery(regexp.QuoteMeta("SELECT * FROM `instance_status_updates` WHERE `instance_status_updates`.`instance_id` = ? AND `instance_status_updates`.`deleted_at` IS NULL")).
|
||||
WithArgs(instance.ID).
|
||||
|
|
@ -327,6 +331,10 @@ func (s *InstancesTestSuite) TestDeleteInstanceDBDeleteErr() {
|
|||
ExpectQuery(regexp.QuoteMeta("SELECT * FROM `addresses` WHERE `addresses`.`instance_id` = ? AND `addresses`.`deleted_at` IS NULL")).
|
||||
WithArgs(instance.ID).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"address", "type", "instance_id"}).AddRow("12.10.12.13", "public", instance.ID))
|
||||
s.Fixtures.SQLMock.
|
||||
ExpectQuery(regexp.QuoteMeta("SELECT * FROM `workflow_jobs` WHERE `workflow_jobs`.`instance_id` = ? AND `workflow_jobs`.`deleted_at` IS NULL")).
|
||||
WithArgs(instance.ID).
|
||||
WillReturnRows(sqlmock.NewRows([]string{}))
|
||||
s.Fixtures.SQLMock.
|
||||
ExpectQuery(regexp.QuoteMeta("SELECT * FROM `instance_status_updates` WHERE `instance_status_updates`.`instance_id` = ? AND `instance_status_updates`.`deleted_at` IS NULL")).
|
||||
WithArgs(instance.ID).
|
||||
|
|
@ -378,6 +386,10 @@ func (s *InstancesTestSuite) TestAddInstanceEventDBUpdateErr() {
|
|||
ExpectQuery(regexp.QuoteMeta("SELECT * FROM `addresses` WHERE `addresses`.`instance_id` = ? AND `addresses`.`deleted_at` IS NULL")).
|
||||
WithArgs(instance.ID).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"address", "type", "instance_id"}).AddRow("10.10.1.10", "private", instance.ID))
|
||||
s.Fixtures.SQLMock.
|
||||
ExpectQuery(regexp.QuoteMeta("SELECT * FROM `workflow_jobs` WHERE `workflow_jobs`.`instance_id` = ? AND `workflow_jobs`.`deleted_at` IS NULL")).
|
||||
WithArgs(instance.ID).
|
||||
WillReturnRows(sqlmock.NewRows([]string{}))
|
||||
s.Fixtures.SQLMock.
|
||||
ExpectQuery(regexp.QuoteMeta("SELECT * FROM `instance_status_updates` WHERE `instance_status_updates`.`instance_id` = ? AND `instance_status_updates`.`deleted_at` IS NULL")).
|
||||
WithArgs(instance.ID).
|
||||
|
|
@ -429,6 +441,10 @@ func (s *InstancesTestSuite) TestUpdateInstanceDBUpdateInstanceErr() {
|
|||
ExpectQuery(regexp.QuoteMeta("SELECT * FROM `addresses` WHERE `addresses`.`instance_id` = ? AND `addresses`.`deleted_at` IS NULL")).
|
||||
WithArgs(instance.ID).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"address", "type", "instance_id"}).AddRow("10.10.1.10", "private", instance.ID))
|
||||
s.Fixtures.SQLMock.
|
||||
ExpectQuery(regexp.QuoteMeta("SELECT * FROM `workflow_jobs` WHERE `workflow_jobs`.`instance_id` = ? AND `workflow_jobs`.`deleted_at` IS NULL")).
|
||||
WithArgs(instance.ID).
|
||||
WillReturnRows(sqlmock.NewRows([]string{}))
|
||||
s.Fixtures.SQLMock.
|
||||
ExpectQuery(regexp.QuoteMeta("SELECT * FROM `instance_status_updates` WHERE `instance_status_updates`.`instance_id` = ? AND `instance_status_updates`.`deleted_at` IS NULL")).
|
||||
WithArgs(instance.ID).
|
||||
|
|
@ -457,6 +473,10 @@ func (s *InstancesTestSuite) TestUpdateInstanceDBUpdateAddressErr() {
|
|||
ExpectQuery(regexp.QuoteMeta("SELECT * FROM `addresses` WHERE `addresses`.`instance_id` = ? AND `addresses`.`deleted_at` IS NULL")).
|
||||
WithArgs(instance.ID).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"address", "type", "instance_id"}).AddRow("10.10.1.10", "private", instance.ID))
|
||||
s.Fixtures.SQLMock.
|
||||
ExpectQuery(regexp.QuoteMeta("SELECT * FROM `workflow_jobs` WHERE `workflow_jobs`.`instance_id` = ? AND `workflow_jobs`.`deleted_at` IS NULL")).
|
||||
WithArgs(instance.ID).
|
||||
WillReturnRows(sqlmock.NewRows([]string{}))
|
||||
s.Fixtures.SQLMock.
|
||||
ExpectQuery(regexp.QuoteMeta("SELECT * FROM `instance_status_updates` WHERE `instance_status_updates`.`instance_id` = ? AND `instance_status_updates`.`deleted_at` IS NULL")).
|
||||
WithArgs(instance.ID).
|
||||
|
|
|
|||
|
|
@ -3,6 +3,7 @@ package sql
|
|||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"log"
|
||||
|
||||
runnerErrors "github.com/cloudbase/garm-provider-common/errors"
|
||||
"github.com/cloudbase/garm/database/common"
|
||||
|
|
@ -23,7 +24,7 @@ func sqlWorkflowJobToParamsJob(job WorkflowJob) (params.Job, error) {
|
|||
}
|
||||
}
|
||||
|
||||
return params.Job{
|
||||
jobParam := params.Job{
|
||||
ID: job.ID,
|
||||
RunID: job.RunID,
|
||||
Action: job.Action,
|
||||
|
|
@ -33,7 +34,6 @@ func sqlWorkflowJobToParamsJob(job WorkflowJob) (params.Job, error) {
|
|||
StartedAt: job.StartedAt,
|
||||
CompletedAt: job.CompletedAt,
|
||||
GithubRunnerID: job.GithubRunnerID,
|
||||
RunnerName: job.RunnerName,
|
||||
RunnerGroupID: job.RunnerGroupID,
|
||||
RunnerGroupName: job.RunnerGroupName,
|
||||
RepositoryName: job.RepositoryName,
|
||||
|
|
@ -45,15 +45,21 @@ func sqlWorkflowJobToParamsJob(job WorkflowJob) (params.Job, error) {
|
|||
CreatedAt: job.CreatedAt,
|
||||
UpdatedAt: job.UpdatedAt,
|
||||
LockedBy: job.LockedBy,
|
||||
}, nil
|
||||
}
|
||||
|
||||
if job.InstanceID != nil {
|
||||
jobParam.RunnerName = job.Instance.Name
|
||||
}
|
||||
return jobParam, nil
|
||||
}
|
||||
|
||||
func paramsJobToWorkflowJob(job params.Job) (WorkflowJob, error) {
|
||||
func (s *sqlDatabase) paramsJobToWorkflowJob(job params.Job) (WorkflowJob, error) {
|
||||
asJson, err := json.Marshal(job.Labels)
|
||||
if err != nil {
|
||||
return WorkflowJob{}, errors.Wrap(err, "marshaling labels")
|
||||
}
|
||||
return WorkflowJob{
|
||||
|
||||
workflofJob := WorkflowJob{
|
||||
ID: job.ID,
|
||||
RunID: job.RunID,
|
||||
Action: job.Action,
|
||||
|
|
@ -63,7 +69,6 @@ func paramsJobToWorkflowJob(job params.Job) (WorkflowJob, error) {
|
|||
StartedAt: job.StartedAt,
|
||||
CompletedAt: job.CompletedAt,
|
||||
GithubRunnerID: job.GithubRunnerID,
|
||||
RunnerName: job.RunnerName,
|
||||
RunnerGroupID: job.RunnerGroupID,
|
||||
RunnerGroupName: job.RunnerGroupName,
|
||||
RepositoryName: job.RepositoryName,
|
||||
|
|
@ -73,7 +78,18 @@ func paramsJobToWorkflowJob(job params.Job) (WorkflowJob, error) {
|
|||
EnterpriseID: job.EnterpriseID,
|
||||
Labels: asJson,
|
||||
LockedBy: job.LockedBy,
|
||||
}, nil
|
||||
}
|
||||
|
||||
if job.RunnerName != "" {
|
||||
instance, err := s.getInstanceByName(s.ctx, job.RunnerName)
|
||||
if err != nil {
|
||||
log.Printf("failed to get instance by name: %v", err)
|
||||
} else {
|
||||
workflofJob.InstanceID = &instance.ID
|
||||
}
|
||||
}
|
||||
|
||||
return workflofJob, nil
|
||||
}
|
||||
|
||||
func (s *sqlDatabase) DeleteJob(ctx context.Context, jobID int64) error {
|
||||
|
|
@ -93,7 +109,7 @@ func (s *sqlDatabase) LockJob(ctx context.Context, jobID int64, entityID string)
|
|||
return errors.Wrap(err, "parsing entity id")
|
||||
}
|
||||
var workflowJob WorkflowJob
|
||||
q := s.conn.Clauses(clause.Locking{Strength: "UPDATE"}).Where("id = ?", jobID).First(&workflowJob)
|
||||
q := s.conn.Clauses(clause.Locking{Strength: "UPDATE"}).Preload("Instance").Where("id = ?", jobID).First(&workflowJob)
|
||||
|
||||
if q.Error != nil {
|
||||
if errors.Is(q.Error, gorm.ErrRecordNotFound) {
|
||||
|
|
@ -122,7 +138,7 @@ func (s *sqlDatabase) LockJob(ctx context.Context, jobID int64, entityID string)
|
|||
|
||||
func (s *sqlDatabase) BreakLockJobIsQueued(ctx context.Context, jobID int64) error {
|
||||
var workflowJob WorkflowJob
|
||||
q := s.conn.Clauses(clause.Locking{Strength: "UPDATE"}).Where("id = ? and status = ?", jobID, params.JobStatusQueued).First(&workflowJob)
|
||||
q := s.conn.Clauses(clause.Locking{Strength: "UPDATE"}).Preload("Instance").Where("id = ? and status = ?", jobID, params.JobStatusQueued).First(&workflowJob)
|
||||
|
||||
if q.Error != nil {
|
||||
if errors.Is(q.Error, gorm.ErrRecordNotFound) {
|
||||
|
|
@ -174,7 +190,7 @@ func (s *sqlDatabase) UnlockJob(ctx context.Context, jobID int64, entityID strin
|
|||
|
||||
func (s *sqlDatabase) CreateOrUpdateJob(ctx context.Context, job params.Job) (params.Job, error) {
|
||||
var workflowJob WorkflowJob
|
||||
q := s.conn.Clauses(clause.Locking{Strength: "UPDATE"}).Where("id = ?", job.ID).First(&workflowJob)
|
||||
q := s.conn.Clauses(clause.Locking{Strength: "UPDATE"}).Preload("Instance").Where("id = ?", job.ID).First(&workflowJob)
|
||||
|
||||
if q.Error != nil {
|
||||
if !errors.Is(q.Error, gorm.ErrRecordNotFound) {
|
||||
|
|
@ -198,7 +214,12 @@ func (s *sqlDatabase) CreateOrUpdateJob(ctx context.Context, job params.Job) (pa
|
|||
}
|
||||
|
||||
if job.RunnerName != "" {
|
||||
workflowJob.RunnerName = job.RunnerName
|
||||
instance, err := s.getInstanceByName(ctx, job.RunnerName)
|
||||
if err == nil {
|
||||
workflowJob.InstanceID = &instance.ID
|
||||
} else {
|
||||
log.Printf("failed to get instance by name: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
if job.RepoID != nil {
|
||||
|
|
@ -216,7 +237,7 @@ func (s *sqlDatabase) CreateOrUpdateJob(ctx context.Context, job params.Job) (pa
|
|||
return params.Job{}, errors.Wrap(err, "saving job")
|
||||
}
|
||||
} else {
|
||||
workflowJob, err := paramsJobToWorkflowJob(job)
|
||||
workflowJob, err := s.paramsJobToWorkflowJob(job)
|
||||
if err != nil {
|
||||
return params.Job{}, errors.Wrap(err, "converting job")
|
||||
}
|
||||
|
|
@ -231,7 +252,7 @@ func (s *sqlDatabase) CreateOrUpdateJob(ctx context.Context, job params.Job) (pa
|
|||
// ListJobsByStatus lists all jobs for a given status.
|
||||
func (s *sqlDatabase) ListJobsByStatus(ctx context.Context, status params.JobStatus) ([]params.Job, error) {
|
||||
var jobs []WorkflowJob
|
||||
query := s.conn.Model(&WorkflowJob{}).Where("status = ?", status)
|
||||
query := s.conn.Model(&WorkflowJob{}).Preload("Instance").Where("status = ?", status)
|
||||
|
||||
if err := query.Find(&jobs); err.Error != nil {
|
||||
return nil, err.Error
|
||||
|
|
@ -256,7 +277,7 @@ func (s *sqlDatabase) ListEntityJobsByStatus(ctx context.Context, entityType par
|
|||
}
|
||||
|
||||
var jobs []WorkflowJob
|
||||
query := s.conn.Model(&WorkflowJob{}).Where("status = ?", status)
|
||||
query := s.conn.Model(&WorkflowJob{}).Preload("Instance").Where("status = ?", status)
|
||||
|
||||
switch entityType {
|
||||
case params.OrganizationPool:
|
||||
|
|
@ -289,7 +310,7 @@ func (s *sqlDatabase) ListAllJobs(ctx context.Context) ([]params.Job, error) {
|
|||
var jobs []WorkflowJob
|
||||
query := s.conn.Model(&WorkflowJob{})
|
||||
|
||||
if err := query.Find(&jobs); err.Error != nil {
|
||||
if err := query.Preload("Instance").Find(&jobs); err.Error != nil {
|
||||
if errors.Is(err.Error, gorm.ErrRecordNotFound) {
|
||||
return []params.Job{}, nil
|
||||
}
|
||||
|
|
@ -310,7 +331,7 @@ func (s *sqlDatabase) ListAllJobs(ctx context.Context) ([]params.Job, error) {
|
|||
// GetJobByID gets a job by id.
|
||||
func (s *sqlDatabase) GetJobByID(ctx context.Context, jobID int64) (params.Job, error) {
|
||||
var job WorkflowJob
|
||||
query := s.conn.Model(&WorkflowJob{}).Where("id = ?", jobID)
|
||||
query := s.conn.Model(&WorkflowJob{}).Preload("Instance").Where("id = ?", jobID)
|
||||
|
||||
if err := query.First(&job); err.Error != nil {
|
||||
if errors.Is(err.Error, gorm.ErrRecordNotFound) {
|
||||
|
|
|
|||
|
|
@ -162,6 +162,8 @@ type Instance struct {
|
|||
Pool Pool `gorm:"foreignKey:PoolID"`
|
||||
|
||||
StatusMessages []InstanceStatusUpdate `gorm:"foreignKey:InstanceID;constraint:OnDelete:CASCADE,OnUpdate:CASCADE;"`
|
||||
|
||||
Job *WorkflowJob `gorm:"foreignKey:InstanceID;constraint:OnDelete:CASCADE,OnUpdate:CASCADE;"`
|
||||
}
|
||||
|
||||
type User struct {
|
||||
|
|
@ -201,8 +203,11 @@ type WorkflowJob struct {
|
|||
StartedAt time.Time
|
||||
CompletedAt time.Time
|
||||
|
||||
GithubRunnerID int64
|
||||
RunnerName string
|
||||
GithubRunnerID int64
|
||||
|
||||
InstanceID *uuid.UUID `gorm:"index:idx_instance_job"`
|
||||
Instance Instance `gorm:"foreignKey:InstanceID"`
|
||||
|
||||
RunnerGroupID int64
|
||||
RunnerGroupName string
|
||||
|
||||
|
|
|
|||
|
|
@ -221,6 +221,19 @@ func (s *sqlDatabase) migrateDB() error {
|
|||
}
|
||||
}
|
||||
|
||||
if s.conn.Migrator().HasTable(&WorkflowJob{}) {
|
||||
if s.conn.Migrator().HasColumn(&WorkflowJob{}, "runner_name") {
|
||||
// Remove jobs that are not in "queued" status. We really only care about queued jobs. Once they transition
|
||||
// to something else, we don't really consume them anyway.
|
||||
if err := s.conn.Exec("delete from workflow_jobs where status is not 'queued'").Error; err != nil {
|
||||
return errors.Wrap(err, "updating workflow_jobs")
|
||||
}
|
||||
if err := s.conn.Migrator().DropColumn(&WorkflowJob{}, "runner_name"); err != nil {
|
||||
return errors.Wrap(err, "updating workflow_jobs")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if err := s.conn.AutoMigrate(
|
||||
&Tag{},
|
||||
&Pool{},
|
||||
|
|
|
|||
|
|
@ -521,6 +521,58 @@ func (_m *GithubClient) ListRunners(ctx context.Context, owner string, repo stri
|
|||
return r0, r1, r2
|
||||
}
|
||||
|
||||
// PingOrgHook provides a mock function with given fields: ctx, org, id
|
||||
func (_m *GithubClient) PingOrgHook(ctx context.Context, org string, id int64) (*github.Response, error) {
|
||||
ret := _m.Called(ctx, org, id)
|
||||
|
||||
var r0 *github.Response
|
||||
var r1 error
|
||||
if rf, ok := ret.Get(0).(func(context.Context, string, int64) (*github.Response, error)); ok {
|
||||
return rf(ctx, org, id)
|
||||
}
|
||||
if rf, ok := ret.Get(0).(func(context.Context, string, int64) *github.Response); ok {
|
||||
r0 = rf(ctx, org, id)
|
||||
} else {
|
||||
if ret.Get(0) != nil {
|
||||
r0 = ret.Get(0).(*github.Response)
|
||||
}
|
||||
}
|
||||
|
||||
if rf, ok := ret.Get(1).(func(context.Context, string, int64) error); ok {
|
||||
r1 = rf(ctx, org, id)
|
||||
} else {
|
||||
r1 = ret.Error(1)
|
||||
}
|
||||
|
||||
return r0, r1
|
||||
}
|
||||
|
||||
// PingRepoHook provides a mock function with given fields: ctx, owner, repo, id
|
||||
func (_m *GithubClient) PingRepoHook(ctx context.Context, owner string, repo string, id int64) (*github.Response, error) {
|
||||
ret := _m.Called(ctx, owner, repo, id)
|
||||
|
||||
var r0 *github.Response
|
||||
var r1 error
|
||||
if rf, ok := ret.Get(0).(func(context.Context, string, string, int64) (*github.Response, error)); ok {
|
||||
return rf(ctx, owner, repo, id)
|
||||
}
|
||||
if rf, ok := ret.Get(0).(func(context.Context, string, string, int64) *github.Response); ok {
|
||||
r0 = rf(ctx, owner, repo, id)
|
||||
} else {
|
||||
if ret.Get(0) != nil {
|
||||
r0 = ret.Get(0).(*github.Response)
|
||||
}
|
||||
}
|
||||
|
||||
if rf, ok := ret.Get(1).(func(context.Context, string, string, int64) error); ok {
|
||||
r1 = rf(ctx, owner, repo, id)
|
||||
} else {
|
||||
r1 = ret.Error(1)
|
||||
}
|
||||
|
||||
return r0, r1
|
||||
}
|
||||
|
||||
// RemoveOrganizationRunner provides a mock function with given fields: ctx, owner, runnerID
|
||||
func (_m *GithubClient) RemoveOrganizationRunner(ctx context.Context, owner string, runnerID int64) (*github.Response, error) {
|
||||
ret := _m.Called(ctx, owner, runnerID)
|
||||
|
|
|
|||
|
|
@ -145,6 +145,32 @@ func (_m *OrganizationHooks) ListOrgHooks(ctx context.Context, org string, opts
|
|||
return r0, r1, r2
|
||||
}
|
||||
|
||||
// PingOrgHook provides a mock function with given fields: ctx, org, id
|
||||
func (_m *OrganizationHooks) PingOrgHook(ctx context.Context, org string, id int64) (*github.Response, error) {
|
||||
ret := _m.Called(ctx, org, id)
|
||||
|
||||
var r0 *github.Response
|
||||
var r1 error
|
||||
if rf, ok := ret.Get(0).(func(context.Context, string, int64) (*github.Response, error)); ok {
|
||||
return rf(ctx, org, id)
|
||||
}
|
||||
if rf, ok := ret.Get(0).(func(context.Context, string, int64) *github.Response); ok {
|
||||
r0 = rf(ctx, org, id)
|
||||
} else {
|
||||
if ret.Get(0) != nil {
|
||||
r0 = ret.Get(0).(*github.Response)
|
||||
}
|
||||
}
|
||||
|
||||
if rf, ok := ret.Get(1).(func(context.Context, string, int64) error); ok {
|
||||
r1 = rf(ctx, org, id)
|
||||
} else {
|
||||
r1 = ret.Error(1)
|
||||
}
|
||||
|
||||
return r0, r1
|
||||
}
|
||||
|
||||
// NewOrganizationHooks creates a new instance of OrganizationHooks. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
|
||||
// The first argument is typically a *testing.T value.
|
||||
func NewOrganizationHooks(t interface {
|
||||
|
|
|
|||
|
|
@ -145,6 +145,32 @@ func (_m *RepositoryHooks) ListRepoHooks(ctx context.Context, owner string, repo
|
|||
return r0, r1, r2
|
||||
}
|
||||
|
||||
// PingRepoHook provides a mock function with given fields: ctx, owner, repo, id
|
||||
func (_m *RepositoryHooks) PingRepoHook(ctx context.Context, owner string, repo string, id int64) (*github.Response, error) {
|
||||
ret := _m.Called(ctx, owner, repo, id)
|
||||
|
||||
var r0 *github.Response
|
||||
var r1 error
|
||||
if rf, ok := ret.Get(0).(func(context.Context, string, string, int64) (*github.Response, error)); ok {
|
||||
return rf(ctx, owner, repo, id)
|
||||
}
|
||||
if rf, ok := ret.Get(0).(func(context.Context, string, string, int64) *github.Response); ok {
|
||||
r0 = rf(ctx, owner, repo, id)
|
||||
} else {
|
||||
if ret.Get(0) != nil {
|
||||
r0 = ret.Get(0).(*github.Response)
|
||||
}
|
||||
}
|
||||
|
||||
if rf, ok := ret.Get(1).(func(context.Context, string, string, int64) error); ok {
|
||||
r1 = rf(ctx, owner, repo, id)
|
||||
} else {
|
||||
r1 = ret.Error(1)
|
||||
}
|
||||
|
||||
return r0, r1
|
||||
}
|
||||
|
||||
// NewRepositoryHooks creates a new instance of RepositoryHooks. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
|
||||
// The first argument is typically a *testing.T value.
|
||||
func NewRepositoryHooks(t interface {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue