We'll use GithubEntityType throughout the codebase to determine the type of operation that is about to take place, so this won't belimited to determining only pool type. We'll also use this to dedupe the label scope as well. Signed-off-by: Gabriel Adrian Samfira <gsamfira@cloudbasesolutions.com>
359 lines
9.8 KiB
Go
359 lines
9.8 KiB
Go
package sql
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"log/slog"
|
|
|
|
"github.com/google/uuid"
|
|
"github.com/pkg/errors"
|
|
"gorm.io/gorm"
|
|
"gorm.io/gorm/clause"
|
|
|
|
runnerErrors "github.com/cloudbase/garm-provider-common/errors"
|
|
"github.com/cloudbase/garm/database/common"
|
|
"github.com/cloudbase/garm/params"
|
|
)
|
|
|
|
var _ common.JobsStore = &sqlDatabase{}
|
|
|
|
func sqlWorkflowJobToParamsJob(job WorkflowJob) (params.Job, error) {
|
|
labels := []string{}
|
|
if job.Labels != nil {
|
|
if err := json.Unmarshal(job.Labels, &labels); err != nil {
|
|
return params.Job{}, errors.Wrap(err, "unmarshaling labels")
|
|
}
|
|
}
|
|
|
|
jobParam := params.Job{
|
|
ID: job.ID,
|
|
RunID: job.RunID,
|
|
Action: job.Action,
|
|
Status: job.Status,
|
|
Name: job.Name,
|
|
Conclusion: job.Conclusion,
|
|
StartedAt: job.StartedAt,
|
|
CompletedAt: job.CompletedAt,
|
|
GithubRunnerID: job.GithubRunnerID,
|
|
RunnerGroupID: job.RunnerGroupID,
|
|
RunnerGroupName: job.RunnerGroupName,
|
|
RepositoryName: job.RepositoryName,
|
|
RepositoryOwner: job.RepositoryOwner,
|
|
RepoID: job.RepoID,
|
|
OrgID: job.OrgID,
|
|
EnterpriseID: job.EnterpriseID,
|
|
Labels: labels,
|
|
CreatedAt: job.CreatedAt,
|
|
UpdatedAt: job.UpdatedAt,
|
|
LockedBy: job.LockedBy,
|
|
}
|
|
|
|
if job.InstanceID != nil {
|
|
jobParam.RunnerName = job.Instance.Name
|
|
}
|
|
return jobParam, nil
|
|
}
|
|
|
|
func (s *sqlDatabase) paramsJobToWorkflowJob(ctx context.Context, job params.Job) (WorkflowJob, error) {
|
|
asJSON, err := json.Marshal(job.Labels)
|
|
if err != nil {
|
|
return WorkflowJob{}, errors.Wrap(err, "marshaling labels")
|
|
}
|
|
|
|
workflofJob := WorkflowJob{
|
|
ID: job.ID,
|
|
RunID: job.RunID,
|
|
Action: job.Action,
|
|
Status: job.Status,
|
|
Name: job.Name,
|
|
Conclusion: job.Conclusion,
|
|
StartedAt: job.StartedAt,
|
|
CompletedAt: job.CompletedAt,
|
|
GithubRunnerID: job.GithubRunnerID,
|
|
RunnerGroupID: job.RunnerGroupID,
|
|
RunnerGroupName: job.RunnerGroupName,
|
|
RepositoryName: job.RepositoryName,
|
|
RepositoryOwner: job.RepositoryOwner,
|
|
RepoID: job.RepoID,
|
|
OrgID: job.OrgID,
|
|
EnterpriseID: job.EnterpriseID,
|
|
Labels: asJSON,
|
|
LockedBy: job.LockedBy,
|
|
}
|
|
|
|
if job.RunnerName != "" {
|
|
instance, err := s.getInstanceByName(s.ctx, job.RunnerName)
|
|
if err != nil {
|
|
slog.With(slog.Any("error", err)).ErrorContext(ctx, "failed to get instance by name")
|
|
} else {
|
|
workflofJob.InstanceID = &instance.ID
|
|
}
|
|
}
|
|
|
|
return workflofJob, nil
|
|
}
|
|
|
|
func (s *sqlDatabase) DeleteJob(_ context.Context, jobID int64) error {
|
|
q := s.conn.Delete(&WorkflowJob{}, jobID)
|
|
if q.Error != nil {
|
|
if errors.Is(q.Error, gorm.ErrRecordNotFound) {
|
|
return nil
|
|
}
|
|
return errors.Wrap(q.Error, "deleting job")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *sqlDatabase) LockJob(_ context.Context, jobID int64, entityID string) error {
|
|
entityUUID, err := uuid.Parse(entityID)
|
|
if err != nil {
|
|
return errors.Wrap(err, "parsing entity id")
|
|
}
|
|
var workflowJob WorkflowJob
|
|
q := s.conn.Clauses(clause.Locking{Strength: "UPDATE"}).Preload("Instance").Where("id = ?", jobID).First(&workflowJob)
|
|
|
|
if q.Error != nil {
|
|
if errors.Is(q.Error, gorm.ErrRecordNotFound) {
|
|
return runnerErrors.ErrNotFound
|
|
}
|
|
return errors.Wrap(q.Error, "fetching job")
|
|
}
|
|
|
|
if workflowJob.LockedBy.String() == entityID {
|
|
// Already locked by us.
|
|
return nil
|
|
}
|
|
|
|
if workflowJob.LockedBy != uuid.Nil {
|
|
return runnerErrors.NewConflictError("job is locked by another entity %s", workflowJob.LockedBy.String())
|
|
}
|
|
|
|
workflowJob.LockedBy = entityUUID
|
|
|
|
if err := s.conn.Save(&workflowJob).Error; err != nil {
|
|
return errors.Wrap(err, "saving job")
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *sqlDatabase) BreakLockJobIsQueued(_ context.Context, jobID int64) error {
|
|
var workflowJob WorkflowJob
|
|
q := s.conn.Clauses(clause.Locking{Strength: "UPDATE"}).Preload("Instance").Where("id = ? and status = ?", jobID, params.JobStatusQueued).First(&workflowJob)
|
|
|
|
if q.Error != nil {
|
|
if errors.Is(q.Error, gorm.ErrRecordNotFound) {
|
|
return nil
|
|
}
|
|
return errors.Wrap(q.Error, "fetching job")
|
|
}
|
|
|
|
if workflowJob.LockedBy == uuid.Nil {
|
|
// Job is already unlocked.
|
|
return nil
|
|
}
|
|
|
|
workflowJob.LockedBy = uuid.Nil
|
|
if err := s.conn.Save(&workflowJob).Error; err != nil {
|
|
return errors.Wrap(err, "saving job")
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *sqlDatabase) UnlockJob(_ context.Context, jobID int64, entityID string) error {
|
|
var workflowJob WorkflowJob
|
|
q := s.conn.Clauses(clause.Locking{Strength: "UPDATE"}).Where("id = ?", jobID).First(&workflowJob)
|
|
|
|
if q.Error != nil {
|
|
if errors.Is(q.Error, gorm.ErrRecordNotFound) {
|
|
return runnerErrors.ErrNotFound
|
|
}
|
|
return errors.Wrap(q.Error, "fetching job")
|
|
}
|
|
|
|
if workflowJob.LockedBy == uuid.Nil {
|
|
// Job is already unlocked.
|
|
return nil
|
|
}
|
|
|
|
if workflowJob.LockedBy != uuid.Nil && workflowJob.LockedBy.String() != entityID {
|
|
return runnerErrors.NewConflictError("job is locked by another entity %s", workflowJob.LockedBy.String())
|
|
}
|
|
|
|
workflowJob.LockedBy = uuid.Nil
|
|
if err := s.conn.Save(&workflowJob).Error; err != nil {
|
|
return errors.Wrap(err, "saving job")
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *sqlDatabase) CreateOrUpdateJob(ctx context.Context, job params.Job) (params.Job, error) {
|
|
var workflowJob WorkflowJob
|
|
q := s.conn.Clauses(clause.Locking{Strength: "UPDATE"}).Preload("Instance").Where("id = ?", job.ID).First(&workflowJob)
|
|
|
|
if q.Error != nil {
|
|
if !errors.Is(q.Error, gorm.ErrRecordNotFound) {
|
|
return params.Job{}, errors.Wrap(q.Error, "fetching job")
|
|
}
|
|
}
|
|
|
|
if workflowJob.ID != 0 {
|
|
// Update workflowJob with values from job.
|
|
workflowJob.Status = job.Status
|
|
workflowJob.Action = job.Action
|
|
workflowJob.Conclusion = job.Conclusion
|
|
workflowJob.StartedAt = job.StartedAt
|
|
workflowJob.CompletedAt = job.CompletedAt
|
|
workflowJob.GithubRunnerID = job.GithubRunnerID
|
|
workflowJob.RunnerGroupID = job.RunnerGroupID
|
|
workflowJob.RunnerGroupName = job.RunnerGroupName
|
|
|
|
if job.LockedBy != uuid.Nil {
|
|
workflowJob.LockedBy = job.LockedBy
|
|
}
|
|
|
|
if job.RunnerName != "" {
|
|
instance, err := s.getInstanceByName(ctx, job.RunnerName)
|
|
if err == nil {
|
|
workflowJob.InstanceID = &instance.ID
|
|
} else {
|
|
slog.With(slog.Any("error", err)).ErrorContext(ctx, "failed to get instance by name")
|
|
}
|
|
}
|
|
|
|
if job.RepoID != nil {
|
|
workflowJob.RepoID = job.RepoID
|
|
}
|
|
|
|
if job.OrgID != nil {
|
|
workflowJob.OrgID = job.OrgID
|
|
}
|
|
|
|
if job.EnterpriseID != nil {
|
|
workflowJob.EnterpriseID = job.EnterpriseID
|
|
}
|
|
if err := s.conn.Save(&workflowJob).Error; err != nil {
|
|
return params.Job{}, errors.Wrap(err, "saving job")
|
|
}
|
|
} else {
|
|
workflowJob, err := s.paramsJobToWorkflowJob(ctx, job)
|
|
if err != nil {
|
|
return params.Job{}, errors.Wrap(err, "converting job")
|
|
}
|
|
if err := s.conn.Create(&workflowJob).Error; err != nil {
|
|
return params.Job{}, errors.Wrap(err, "creating job")
|
|
}
|
|
}
|
|
|
|
return sqlWorkflowJobToParamsJob(workflowJob)
|
|
}
|
|
|
|
// ListJobsByStatus lists all jobs for a given status.
|
|
func (s *sqlDatabase) ListJobsByStatus(_ context.Context, status params.JobStatus) ([]params.Job, error) {
|
|
var jobs []WorkflowJob
|
|
query := s.conn.Model(&WorkflowJob{}).Preload("Instance").Where("status = ?", status)
|
|
|
|
if err := query.Find(&jobs); err.Error != nil {
|
|
return nil, err.Error
|
|
}
|
|
|
|
ret := make([]params.Job, len(jobs))
|
|
for idx, job := range jobs {
|
|
jobParam, err := sqlWorkflowJobToParamsJob(job)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "converting job")
|
|
}
|
|
ret[idx] = jobParam
|
|
}
|
|
return ret, nil
|
|
}
|
|
|
|
// ListEntityJobsByStatus lists all jobs for a given entity type and id.
|
|
func (s *sqlDatabase) ListEntityJobsByStatus(_ context.Context, entityType params.GithubEntityType, entityID string, status params.JobStatus) ([]params.Job, error) {
|
|
u, err := uuid.Parse(entityID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var jobs []WorkflowJob
|
|
query := s.conn.Model(&WorkflowJob{}).Preload("Instance").Where("status = ?", status)
|
|
|
|
switch entityType {
|
|
case params.GithubEntityTypeOrganization:
|
|
query = query.Where("org_id = ?", u)
|
|
case params.GithubEntityTypeRepository:
|
|
query = query.Where("repo_id = ?", u)
|
|
case params.GithubEntityTypeEnterprise:
|
|
query = query.Where("enterprise_id = ?", u)
|
|
}
|
|
|
|
if err := query.Find(&jobs); err.Error != nil {
|
|
if errors.Is(err.Error, gorm.ErrRecordNotFound) {
|
|
return []params.Job{}, nil
|
|
}
|
|
return nil, err.Error
|
|
}
|
|
|
|
ret := make([]params.Job, len(jobs))
|
|
for idx, job := range jobs {
|
|
jobParam, err := sqlWorkflowJobToParamsJob(job)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "converting job")
|
|
}
|
|
ret[idx] = jobParam
|
|
}
|
|
return ret, nil
|
|
}
|
|
|
|
func (s *sqlDatabase) ListAllJobs(_ context.Context) ([]params.Job, error) {
|
|
var jobs []WorkflowJob
|
|
query := s.conn.Model(&WorkflowJob{})
|
|
|
|
if err := query.Preload("Instance").Find(&jobs); err.Error != nil {
|
|
if errors.Is(err.Error, gorm.ErrRecordNotFound) {
|
|
return []params.Job{}, nil
|
|
}
|
|
return nil, err.Error
|
|
}
|
|
|
|
ret := make([]params.Job, len(jobs))
|
|
for idx, job := range jobs {
|
|
jobParam, err := sqlWorkflowJobToParamsJob(job)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "converting job")
|
|
}
|
|
ret[idx] = jobParam
|
|
}
|
|
return ret, nil
|
|
}
|
|
|
|
// GetJobByID gets a job by id.
|
|
func (s *sqlDatabase) GetJobByID(_ context.Context, jobID int64) (params.Job, error) {
|
|
var job WorkflowJob
|
|
query := s.conn.Model(&WorkflowJob{}).Preload("Instance").Where("id = ?", jobID)
|
|
|
|
if err := query.First(&job); err.Error != nil {
|
|
if errors.Is(err.Error, gorm.ErrRecordNotFound) {
|
|
return params.Job{}, runnerErrors.ErrNotFound
|
|
}
|
|
return params.Job{}, err.Error
|
|
}
|
|
|
|
return sqlWorkflowJobToParamsJob(job)
|
|
}
|
|
|
|
// DeleteCompletedJobs deletes all completed jobs.
|
|
func (s *sqlDatabase) DeleteCompletedJobs(_ context.Context) error {
|
|
query := s.conn.Model(&WorkflowJob{}).Where("status = ?", params.JobStatusCompleted)
|
|
|
|
if err := query.Unscoped().Delete(&WorkflowJob{}); err.Error != nil {
|
|
if errors.Is(err.Error, gorm.ErrRecordNotFound) {
|
|
return nil
|
|
}
|
|
return err.Error
|
|
}
|
|
|
|
return nil
|
|
}
|