2025-05-20 09:40:15 +00:00
// Copyright 2025 Cloudbase Solutions SRL
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License. You may obtain
// a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations
// under the License.
2023-04-10 00:03:49 +00:00
package sql
import (
"context"
"encoding/json"
2025-08-16 19:31:58 +00:00
"errors"
"fmt"
2024-01-05 23:32:16 +00:00
"log/slog"
2023-04-10 00:03:49 +00:00
"github.com/google/uuid"
"gorm.io/gorm"
2023-06-28 10:08:05 +00:00
"gorm.io/gorm/clause"
2024-02-22 16:54:38 +01:00
runnerErrors "github.com/cloudbase/garm-provider-common/errors"
"github.com/cloudbase/garm/database/common"
"github.com/cloudbase/garm/params"
2023-04-10 00:03:49 +00:00
)
var _ common . JobsStore = & sqlDatabase { }
func sqlWorkflowJobToParamsJob ( job WorkflowJob ) ( params . Job , error ) {
labels := [ ] string { }
if job . Labels != nil {
if err := json . Unmarshal ( job . Labels , & labels ) ; err != nil {
2025-08-16 19:31:58 +00:00
return params . Job { } , fmt . Errorf ( "error unmarshaling labels: %w" , err )
2023-04-10 00:03:49 +00:00
}
}
2023-06-28 14:50:59 +00:00
2023-08-26 19:43:57 +00:00
jobParam := params . Job {
2023-04-10 00:03:49 +00:00
ID : job . ID ,
2025-07-18 07:51:50 +00:00
WorkflowJobID : job . WorkflowJobID ,
ScaleSetJobID : job . ScaleSetJobID ,
2023-04-10 00:03:49 +00:00
RunID : job . RunID ,
Action : job . Action ,
Status : job . Status ,
Name : job . Name ,
Conclusion : job . Conclusion ,
StartedAt : job . StartedAt ,
CompletedAt : job . CompletedAt ,
GithubRunnerID : job . GithubRunnerID ,
RunnerGroupID : job . RunnerGroupID ,
RunnerGroupName : job . RunnerGroupName ,
RepositoryName : job . RepositoryName ,
RepositoryOwner : job . RepositoryOwner ,
RepoID : job . RepoID ,
OrgID : job . OrgID ,
EnterpriseID : job . EnterpriseID ,
Labels : labels ,
CreatedAt : job . CreatedAt ,
UpdatedAt : job . UpdatedAt ,
LockedBy : job . LockedBy ,
2023-08-26 19:43:57 +00:00
}
if job . InstanceID != nil {
jobParam . RunnerName = job . Instance . Name
}
return jobParam , nil
2023-04-10 00:03:49 +00:00
}
2024-01-05 23:32:16 +00:00
func ( s * sqlDatabase ) paramsJobToWorkflowJob ( ctx context . Context , job params . Job ) ( WorkflowJob , error ) {
2024-02-22 16:54:38 +01:00
asJSON , err := json . Marshal ( job . Labels )
2023-04-10 00:03:49 +00:00
if err != nil {
2025-08-16 19:31:58 +00:00
return WorkflowJob { } , fmt . Errorf ( "error marshaling labels: %w" , err )
2023-04-10 00:03:49 +00:00
}
2023-08-26 19:43:57 +00:00
workflofJob := WorkflowJob {
2025-07-18 07:51:50 +00:00
ScaleSetJobID : job . ScaleSetJobID ,
WorkflowJobID : job . ID ,
2023-04-10 00:03:49 +00:00
RunID : job . RunID ,
Action : job . Action ,
Status : job . Status ,
Name : job . Name ,
Conclusion : job . Conclusion ,
StartedAt : job . StartedAt ,
CompletedAt : job . CompletedAt ,
GithubRunnerID : job . GithubRunnerID ,
RunnerGroupID : job . RunnerGroupID ,
RunnerGroupName : job . RunnerGroupName ,
RepositoryName : job . RepositoryName ,
RepositoryOwner : job . RepositoryOwner ,
RepoID : job . RepoID ,
OrgID : job . OrgID ,
EnterpriseID : job . EnterpriseID ,
2024-02-22 16:54:38 +01:00
Labels : asJSON ,
2023-04-10 00:03:49 +00:00
LockedBy : job . LockedBy ,
2023-08-26 19:43:57 +00:00
}
if job . RunnerName != "" {
2025-08-16 23:00:55 +00:00
instance , err := s . getInstance ( s . ctx , job . RunnerName )
2023-08-26 19:43:57 +00:00
if err != nil {
2024-11-22 10:32:13 +01:00
// This usually is very normal as not all jobs run on our runners.
2024-11-22 16:40:23 +01:00
slog . DebugContext ( ctx , "failed to get instance by name" , "instance_name" , job . RunnerName )
2023-08-26 19:43:57 +00:00
} else {
workflofJob . InstanceID = & instance . ID
}
}
return workflofJob , nil
2023-04-10 00:03:49 +00:00
}
2024-06-19 13:44:24 +00:00
func ( s * sqlDatabase ) DeleteJob ( _ context . Context , jobID int64 ) ( err error ) {
2025-07-18 07:51:50 +00:00
var workflowJob WorkflowJob
q := s . conn . Where ( "workflow_job_id = ?" , jobID ) . Preload ( "Instance" ) . First ( & workflowJob )
if q . Error != nil {
if errors . Is ( q . Error , gorm . ErrRecordNotFound ) {
return nil
}
2025-08-16 19:31:58 +00:00
return fmt . Errorf ( "error fetching job: %w" , q . Error )
2025-07-18 07:51:50 +00:00
}
removedJob , err := sqlWorkflowJobToParamsJob ( workflowJob )
if err != nil {
2025-08-16 19:31:58 +00:00
return fmt . Errorf ( "error converting job: %w" , err )
2025-07-18 07:51:50 +00:00
}
2024-06-19 13:44:24 +00:00
defer func ( ) {
if err == nil {
2025-07-18 07:51:50 +00:00
if notifyErr := s . sendNotify ( common . JobEntityType , common . DeleteOperation , removedJob ) ; notifyErr != nil {
2024-06-19 13:44:24 +00:00
slog . With ( slog . Any ( "error" , notifyErr ) ) . Error ( "failed to send notify" )
}
}
} ( )
2025-07-18 07:51:50 +00:00
q = s . conn . Delete ( & workflowJob )
2023-04-10 00:03:49 +00:00
if q . Error != nil {
if errors . Is ( q . Error , gorm . ErrRecordNotFound ) {
return nil
}
2025-08-16 19:31:58 +00:00
return fmt . Errorf ( "error deleting job: %w" , q . Error )
2023-04-10 00:03:49 +00:00
}
return nil
}
2024-02-22 16:54:38 +01:00
func ( s * sqlDatabase ) LockJob ( _ context . Context , jobID int64 , entityID string ) error {
2023-04-10 00:03:49 +00:00
entityUUID , err := uuid . Parse ( entityID )
if err != nil {
2025-08-16 19:31:58 +00:00
return fmt . Errorf ( "error parsing entity id: %w" , err )
2023-04-10 00:03:49 +00:00
}
var workflowJob WorkflowJob
2025-07-18 07:51:50 +00:00
q := s . conn . Preload ( "Instance" ) . Where ( "id = ?" , jobID ) . First ( & workflowJob )
2023-04-10 00:03:49 +00:00
if q . Error != nil {
if errors . Is ( q . Error , gorm . ErrRecordNotFound ) {
return runnerErrors . ErrNotFound
}
2025-08-16 19:31:58 +00:00
return fmt . Errorf ( "error fetching job: %w" , q . Error )
2023-04-10 00:03:49 +00:00
}
2023-06-23 15:03:55 +00:00
if workflowJob . LockedBy . String ( ) == entityID {
// Already locked by us.
return nil
}
2023-04-10 00:03:49 +00:00
if workflowJob . LockedBy != uuid . Nil {
return runnerErrors . NewConflictError ( "job is locked by another entity %s" , workflowJob . LockedBy . String ( ) )
}
workflowJob . LockedBy = entityUUID
if err := s . conn . Save ( & workflowJob ) . Error ; err != nil {
2025-08-16 19:31:58 +00:00
return fmt . Errorf ( "error saving job: %w" , err )
2023-04-10 00:03:49 +00:00
}
2024-06-19 13:44:24 +00:00
asParams , err := sqlWorkflowJobToParamsJob ( workflowJob )
2024-07-04 14:33:32 +00:00
if err != nil {
2025-08-16 19:31:58 +00:00
return fmt . Errorf ( "error converting job: %w" , err )
2024-06-19 13:44:24 +00:00
}
2024-07-04 14:33:32 +00:00
s . sendNotify ( common . JobEntityType , common . UpdateOperation , asParams )
2024-06-19 13:44:24 +00:00
2023-04-10 00:03:49 +00:00
return nil
}
2024-06-19 13:44:24 +00:00
func ( s * sqlDatabase ) BreakLockJobIsQueued ( _ context . Context , jobID int64 ) ( err error ) {
2023-06-24 00:22:51 +00:00
var workflowJob WorkflowJob
2025-07-18 07:51:50 +00:00
q := s . conn . Clauses ( clause . Locking { Strength : "UPDATE" } ) . Preload ( "Instance" ) . Where ( "workflow_job_id = ? and status = ?" , jobID , params . JobStatusQueued ) . First ( & workflowJob )
2023-06-24 00:22:51 +00:00
if q . Error != nil {
if errors . Is ( q . Error , gorm . ErrRecordNotFound ) {
return nil
}
2025-08-16 19:31:58 +00:00
return fmt . Errorf ( "error fetching job: %w" , q . Error )
2023-06-24 00:22:51 +00:00
}
if workflowJob . LockedBy == uuid . Nil {
// Job is already unlocked.
return nil
}
workflowJob . LockedBy = uuid . Nil
if err := s . conn . Save ( & workflowJob ) . Error ; err != nil {
2025-08-16 19:31:58 +00:00
return fmt . Errorf ( "error saving job: %w" , err )
2023-06-24 00:22:51 +00:00
}
2024-06-19 13:44:24 +00:00
asParams , err := sqlWorkflowJobToParamsJob ( workflowJob )
2024-07-04 14:33:32 +00:00
if err != nil {
2025-08-16 19:31:58 +00:00
return fmt . Errorf ( "error converting job: %w" , err )
2024-06-19 13:44:24 +00:00
}
2024-07-04 14:33:32 +00:00
s . sendNotify ( common . JobEntityType , common . UpdateOperation , asParams )
2023-06-24 00:22:51 +00:00
return nil
}
2024-02-22 16:54:38 +01:00
func ( s * sqlDatabase ) UnlockJob ( _ context . Context , jobID int64 , entityID string ) error {
2023-04-10 00:03:49 +00:00
var workflowJob WorkflowJob
2025-07-18 07:51:50 +00:00
q := s . conn . Clauses ( clause . Locking { Strength : "UPDATE" } ) . Where ( "workflow_job_id = ?" , jobID ) . First ( & workflowJob )
2023-04-10 00:03:49 +00:00
if q . Error != nil {
if errors . Is ( q . Error , gorm . ErrRecordNotFound ) {
return runnerErrors . ErrNotFound
}
2025-08-16 19:31:58 +00:00
return fmt . Errorf ( "error fetching job: %w" , q . Error )
2023-04-10 00:03:49 +00:00
}
if workflowJob . LockedBy == uuid . Nil {
// Job is already unlocked.
return nil
}
if workflowJob . LockedBy != uuid . Nil && workflowJob . LockedBy . String ( ) != entityID {
return runnerErrors . NewConflictError ( "job is locked by another entity %s" , workflowJob . LockedBy . String ( ) )
}
workflowJob . LockedBy = uuid . Nil
if err := s . conn . Save ( & workflowJob ) . Error ; err != nil {
2025-08-16 19:31:58 +00:00
return fmt . Errorf ( "error saving job: %w" , err )
2023-04-10 00:03:49 +00:00
}
2024-06-19 13:44:24 +00:00
asParams , err := sqlWorkflowJobToParamsJob ( workflowJob )
2024-07-04 14:33:32 +00:00
if err != nil {
2025-08-16 19:31:58 +00:00
return fmt . Errorf ( "error converting job: %w" , err )
2024-06-19 13:44:24 +00:00
}
2024-07-04 14:33:32 +00:00
s . sendNotify ( common . JobEntityType , common . UpdateOperation , asParams )
2023-04-10 00:03:49 +00:00
return nil
}
func ( s * sqlDatabase ) CreateOrUpdateJob ( ctx context . Context , job params . Job ) ( params . Job , error ) {
var workflowJob WorkflowJob
2024-07-04 14:28:57 +00:00
var err error
2025-07-18 07:51:50 +00:00
searchField := "workflow_job_id = ?"
var searchVal any = job . ID
if job . ScaleSetJobID != "" {
searchField = "scale_set_job_id = ?"
searchVal = job . ScaleSetJobID
}
q := s . conn . Preload ( "Instance" ) . Where ( searchField , searchVal ) . First ( & workflowJob )
2023-04-10 00:03:49 +00:00
if q . Error != nil {
if ! errors . Is ( q . Error , gorm . ErrRecordNotFound ) {
2025-08-16 19:31:58 +00:00
return params . Job { } , fmt . Errorf ( "error fetching job: %w" , q . Error )
2023-04-10 00:03:49 +00:00
}
}
2024-06-19 13:44:24 +00:00
var operation common . OperationType
2023-04-10 00:03:49 +00:00
if workflowJob . ID != 0 {
// Update workflowJob with values from job.
2024-06-19 13:44:24 +00:00
operation = common . UpdateOperation
2023-04-10 00:03:49 +00:00
workflowJob . Status = job . Status
workflowJob . Action = job . Action
workflowJob . Conclusion = job . Conclusion
workflowJob . StartedAt = job . StartedAt
workflowJob . CompletedAt = job . CompletedAt
workflowJob . GithubRunnerID = job . GithubRunnerID
workflowJob . RunnerGroupID = job . RunnerGroupID
workflowJob . RunnerGroupName = job . RunnerGroupName
2025-07-18 07:51:50 +00:00
if job . RunID != 0 && workflowJob . RunID == 0 {
workflowJob . RunID = job . RunID
}
2023-04-10 00:03:49 +00:00
if job . LockedBy != uuid . Nil {
workflowJob . LockedBy = job . LockedBy
}
if job . RunnerName != "" {
2025-08-16 23:00:55 +00:00
instance , err := s . getInstance ( ctx , job . RunnerName )
2023-08-26 19:43:57 +00:00
if err == nil {
workflowJob . InstanceID = & instance . ID
} else {
2024-11-22 10:32:13 +01:00
// This usually is very normal as not all jobs run on our runners.
2024-11-22 16:46:39 +01:00
slog . DebugContext ( ctx , "failed to get instance by name" , "instance_name" , job . RunnerName )
2023-08-26 19:43:57 +00:00
}
2023-04-10 00:03:49 +00:00
}
2023-06-28 14:50:59 +00:00
if job . RepoID != nil {
2023-04-10 00:03:49 +00:00
workflowJob . RepoID = job . RepoID
}
2023-06-28 14:50:59 +00:00
if job . OrgID != nil {
2023-04-10 00:03:49 +00:00
workflowJob . OrgID = job . OrgID
}
2023-06-28 14:50:59 +00:00
if job . EnterpriseID != nil {
2023-04-10 00:03:49 +00:00
workflowJob . EnterpriseID = job . EnterpriseID
}
if err := s . conn . Save ( & workflowJob ) . Error ; err != nil {
2025-08-16 19:31:58 +00:00
return params . Job { } , fmt . Errorf ( "error saving job: %w" , err )
2023-04-10 00:03:49 +00:00
}
} else {
2024-06-19 13:44:24 +00:00
operation = common . CreateOperation
2024-07-04 14:28:57 +00:00
workflowJob , err = s . paramsJobToWorkflowJob ( ctx , job )
2023-04-10 00:03:49 +00:00
if err != nil {
2025-08-16 19:31:58 +00:00
return params . Job { } , fmt . Errorf ( "error converting job: %w" , err )
2023-04-10 00:03:49 +00:00
}
if err := s . conn . Create ( & workflowJob ) . Error ; err != nil {
2025-08-16 19:31:58 +00:00
return params . Job { } , fmt . Errorf ( "error creating job: %w" , err )
2023-04-10 00:03:49 +00:00
}
}
2024-06-19 13:44:24 +00:00
asParams , err := sqlWorkflowJobToParamsJob ( workflowJob )
if err != nil {
2025-08-16 19:31:58 +00:00
return params . Job { } , fmt . Errorf ( "error converting job: %w" , err )
2024-06-19 13:44:24 +00:00
}
s . sendNotify ( common . JobEntityType , operation , asParams )
return asParams , nil
2023-04-10 00:03:49 +00:00
}
// ListJobsByStatus lists all jobs for a given status.
2024-02-22 16:54:38 +01:00
func ( s * sqlDatabase ) ListJobsByStatus ( _ context . Context , status params . JobStatus ) ( [ ] params . Job , error ) {
2023-04-10 00:03:49 +00:00
var jobs [ ] WorkflowJob
2023-08-26 19:43:57 +00:00
query := s . conn . Model ( & WorkflowJob { } ) . Preload ( "Instance" ) . Where ( "status = ?" , status )
2023-04-10 00:03:49 +00:00
if err := query . Find ( & jobs ) ; err . Error != nil {
return nil , err . Error
}
ret := make ( [ ] params . Job , len ( jobs ) )
for idx , job := range jobs {
jobParam , err := sqlWorkflowJobToParamsJob ( job )
if err != nil {
2025-08-16 19:31:58 +00:00
return nil , fmt . Errorf ( "error converting job: %w" , err )
2023-04-10 00:03:49 +00:00
}
ret [ idx ] = jobParam
}
return ret , nil
}
// ListEntityJobsByStatus lists all jobs for a given entity type and id.
2025-05-12 21:47:13 +00:00
func ( s * sqlDatabase ) ListEntityJobsByStatus ( _ context . Context , entityType params . ForgeEntityType , entityID string , status params . JobStatus ) ( [ ] params . Job , error ) {
2023-04-10 00:03:49 +00:00
u , err := uuid . Parse ( entityID )
if err != nil {
return nil , err
}
var jobs [ ] WorkflowJob
2025-07-18 07:51:50 +00:00
query := s . conn .
Model ( & WorkflowJob { } ) .
Preload ( "Instance" ) .
Where ( "status = ?" , status ) .
Where ( "workflow_job_id > 0" )
2023-04-10 00:03:49 +00:00
switch entityType {
2025-05-12 21:47:13 +00:00
case params . ForgeEntityTypeOrganization :
2023-04-10 00:03:49 +00:00
query = query . Where ( "org_id = ?" , u )
2025-05-12 21:47:13 +00:00
case params . ForgeEntityTypeRepository :
2023-04-10 00:03:49 +00:00
query = query . Where ( "repo_id = ?" , u )
2025-05-12 21:47:13 +00:00
case params . ForgeEntityTypeEnterprise :
2023-04-10 00:03:49 +00:00
query = query . Where ( "enterprise_id = ?" , u )
}
if err := query . Find ( & jobs ) ; err . Error != nil {
if errors . Is ( err . Error , gorm . ErrRecordNotFound ) {
return [ ] params . Job { } , nil
}
return nil , err . Error
}
ret := make ( [ ] params . Job , len ( jobs ) )
2023-06-23 21:15:46 +00:00
for idx , job := range jobs {
jobParam , err := sqlWorkflowJobToParamsJob ( job )
if err != nil {
2025-08-16 19:31:58 +00:00
return nil , fmt . Errorf ( "error converting job: %w" , err )
2023-06-23 21:15:46 +00:00
}
ret [ idx ] = jobParam
}
return ret , nil
}
2024-02-22 16:54:38 +01:00
func ( s * sqlDatabase ) ListAllJobs ( _ context . Context ) ( [ ] params . Job , error ) {
2023-06-23 21:15:46 +00:00
var jobs [ ] WorkflowJob
query := s . conn . Model ( & WorkflowJob { } )
2023-08-26 19:43:57 +00:00
if err := query . Preload ( "Instance" ) . Find ( & jobs ) ; err . Error != nil {
2023-06-23 21:15:46 +00:00
if errors . Is ( err . Error , gorm . ErrRecordNotFound ) {
return [ ] params . Job { } , nil
}
return nil , err . Error
}
ret := make ( [ ] params . Job , len ( jobs ) )
2023-04-10 00:03:49 +00:00
for idx , job := range jobs {
jobParam , err := sqlWorkflowJobToParamsJob ( job )
if err != nil {
2025-08-16 19:31:58 +00:00
return nil , fmt . Errorf ( "error converting job: %w" , err )
2023-04-10 00:03:49 +00:00
}
ret [ idx ] = jobParam
}
return ret , nil
}
// GetJobByID gets a job by id.
2024-02-22 16:54:38 +01:00
func ( s * sqlDatabase ) GetJobByID ( _ context . Context , jobID int64 ) ( params . Job , error ) {
2023-04-10 00:03:49 +00:00
var job WorkflowJob
2025-07-18 07:51:50 +00:00
query := s . conn . Model ( & WorkflowJob { } ) . Preload ( "Instance" ) . Where ( "workflow_job_id = ?" , jobID )
2023-04-10 00:03:49 +00:00
if err := query . First ( & job ) ; err . Error != nil {
if errors . Is ( err . Error , gorm . ErrRecordNotFound ) {
return params . Job { } , runnerErrors . ErrNotFound
}
return params . Job { } , err . Error
}
return sqlWorkflowJobToParamsJob ( job )
}
// DeleteCompletedJobs deletes all completed jobs.
2024-02-22 16:54:38 +01:00
func ( s * sqlDatabase ) DeleteCompletedJobs ( _ context . Context ) error {
2023-04-10 00:03:49 +00:00
query := s . conn . Model ( & WorkflowJob { } ) . Where ( "status = ?" , params . JobStatusCompleted )
if err := query . Unscoped ( ) . Delete ( & WorkflowJob { } ) ; err . Error != nil {
if errors . Is ( err . Error , gorm . ErrRecordNotFound ) {
return nil
}
return err . Error
}
return nil
}