From bf90eb323ad03c3b30b6d78080e7a02bbf534913 Mon Sep 17 00:00:00 2001 From: Gabriel Adrian Samfira Date: Wed, 28 Jun 2023 10:08:05 +0000 Subject: [PATCH] Add back update locks Signed-off-by: Gabriel Adrian Samfira --- database/sql/jobs.go | 9 +++++---- runner/pool/pool.go | 4 ++-- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/database/sql/jobs.go b/database/sql/jobs.go index 821ccda1..96ab75a4 100644 --- a/database/sql/jobs.go +++ b/database/sql/jobs.go @@ -10,6 +10,7 @@ import ( "github.com/google/uuid" "github.com/pkg/errors" "gorm.io/gorm" + "gorm.io/gorm/clause" ) var _ common.JobsStore = &sqlDatabase{} @@ -91,7 +92,7 @@ func (s *sqlDatabase) LockJob(ctx context.Context, jobID int64, entityID string) return errors.Wrap(err, "parsing entity id") } var workflowJob WorkflowJob - q := s.conn.Where("id = ?", jobID).First(&workflowJob) + q := s.conn.Clauses(clause.Locking{Strength: "UPDATE"}).Where("id = ?", jobID).First(&workflowJob) if q.Error != nil { if errors.Is(q.Error, gorm.ErrRecordNotFound) { @@ -120,7 +121,7 @@ func (s *sqlDatabase) LockJob(ctx context.Context, jobID int64, entityID string) func (s *sqlDatabase) BreakLockJobIsQueued(ctx context.Context, jobID int64) error { var workflowJob WorkflowJob - q := s.conn.Where("id = ? and status = ?", jobID, params.JobStatusQueued).First(&workflowJob) + q := s.conn.Clauses(clause.Locking{Strength: "UPDATE"}).Where("id = ? and status = ?", jobID, params.JobStatusQueued).First(&workflowJob) if q.Error != nil { if errors.Is(q.Error, gorm.ErrRecordNotFound) { @@ -144,7 +145,7 @@ func (s *sqlDatabase) BreakLockJobIsQueued(ctx context.Context, jobID int64) err func (s *sqlDatabase) UnlockJob(ctx context.Context, jobID int64, entityID string) error { var workflowJob WorkflowJob - q := s.conn.Where("id = ?", jobID).First(&workflowJob) + q := s.conn.Clauses(clause.Locking{Strength: "UPDATE"}).Where("id = ?", jobID).First(&workflowJob) if q.Error != nil { if errors.Is(q.Error, gorm.ErrRecordNotFound) { @@ -172,7 +173,7 @@ func (s *sqlDatabase) UnlockJob(ctx context.Context, jobID int64, entityID strin func (s *sqlDatabase) CreateOrUpdateJob(ctx context.Context, job params.Job) (params.Job, error) { var workflowJob WorkflowJob - q := s.conn.Where("id = ?", job.ID).First(&workflowJob) + q := s.conn.Clauses(clause.Locking{Strength: "UPDATE"}).Where("id = ?", job.ID).First(&workflowJob) if q.Error != nil { if !errors.Is(q.Error, gorm.ErrRecordNotFound) { diff --git a/runner/pool/pool.go b/runner/pool/pool.go index 1fad2e5a..043698c4 100644 --- a/runner/pool/pool.go +++ b/runner/pool/pool.go @@ -124,11 +124,11 @@ func (r *basePoolManager) HandleWorkflowJob(job params.WorkflowJob) error { potentialPools, err := r.store.FindPoolsMatchingAllTags(r.ctx, r.helper.PoolType(), r.helper.ID(), jobParams.Labels) if err != nil { - log.Printf("failed to find pools matching tags: %s; not recording job", err) + log.Printf("failed to find pools matching tags %s: %s; not recording job", strings.Join(jobParams.Labels, ", "), err) return } if len(potentialPools) == 0 { - log.Printf("no pools matching tags: %s; not recording job", err) + log.Printf("no pools matching tags %s; not recording job", strings.Join(jobParams.Labels, ", ")) return }