Add notifications for jobs

Signed-off-by: Gabriel Adrian Samfira <gsamfira@cloudbasesolutions.com>
This commit is contained in:
Gabriel Adrian Samfira 2024-06-19 13:44:24 +00:00
parent 5f07bc2d7c
commit b7d138d2ac

View file

@ -93,7 +93,14 @@ func (s *sqlDatabase) paramsJobToWorkflowJob(ctx context.Context, job params.Job
return workflofJob, nil
}
func (s *sqlDatabase) DeleteJob(_ context.Context, jobID int64) error {
func (s *sqlDatabase) DeleteJob(_ context.Context, jobID int64) (err error) {
defer func() {
if err == nil {
if notifyErr := s.sendNotify(common.JobEntityType, common.DeleteOperation, params.Job{ID: jobID}); notifyErr != nil {
slog.With(slog.Any("error", notifyErr)).Error("failed to send notify")
}
}
}()
q := s.conn.Delete(&WorkflowJob{}, jobID)
if q.Error != nil {
if errors.Is(q.Error, gorm.ErrRecordNotFound) {
@ -134,10 +141,17 @@ func (s *sqlDatabase) LockJob(_ context.Context, jobID int64, entityID string) e
return errors.Wrap(err, "saving job")
}
asParams, err := sqlWorkflowJobToParamsJob(workflowJob)
if err == nil {
s.sendNotify(common.JobEntityType, common.UpdateOperation, asParams)
} else {
slog.With(slog.Any("error", err)).Error("failed to convert job to params")
}
return nil
}
func (s *sqlDatabase) BreakLockJobIsQueued(_ context.Context, jobID int64) error {
func (s *sqlDatabase) BreakLockJobIsQueued(_ context.Context, jobID int64) (err error) {
var workflowJob WorkflowJob
q := s.conn.Clauses(clause.Locking{Strength: "UPDATE"}).Preload("Instance").Where("id = ? and status = ?", jobID, params.JobStatusQueued).First(&workflowJob)
@ -157,7 +171,12 @@ func (s *sqlDatabase) BreakLockJobIsQueued(_ context.Context, jobID int64) error
if err := s.conn.Save(&workflowJob).Error; err != nil {
return errors.Wrap(err, "saving job")
}
asParams, err := sqlWorkflowJobToParamsJob(workflowJob)
if err == nil {
s.sendNotify(common.JobEntityType, common.UpdateOperation, asParams)
} else {
slog.With(slog.Any("error", err)).Error("failed to convert job to params")
}
return nil
}
@ -186,6 +205,12 @@ func (s *sqlDatabase) UnlockJob(_ context.Context, jobID int64, entityID string)
return errors.Wrap(err, "saving job")
}
asParams, err := sqlWorkflowJobToParamsJob(workflowJob)
if err == nil {
s.sendNotify(common.JobEntityType, common.UpdateOperation, asParams)
} else {
slog.With(slog.Any("error", err)).Error("failed to convert job to params")
}
return nil
}
@ -198,9 +223,11 @@ func (s *sqlDatabase) CreateOrUpdateJob(ctx context.Context, job params.Job) (pa
return params.Job{}, errors.Wrap(q.Error, "fetching job")
}
}
var operation common.OperationType
if workflowJob.ID != 0 {
// Update workflowJob with values from job.
operation = common.UpdateOperation
workflowJob.Status = job.Status
workflowJob.Action = job.Action
workflowJob.Conclusion = job.Conclusion
@ -238,6 +265,8 @@ func (s *sqlDatabase) CreateOrUpdateJob(ctx context.Context, job params.Job) (pa
return params.Job{}, errors.Wrap(err, "saving job")
}
} else {
operation = common.CreateOperation
workflowJob, err := s.paramsJobToWorkflowJob(ctx, job)
if err != nil {
return params.Job{}, errors.Wrap(err, "converting job")
@ -247,7 +276,13 @@ func (s *sqlDatabase) CreateOrUpdateJob(ctx context.Context, job params.Job) (pa
}
}
return sqlWorkflowJobToParamsJob(workflowJob)
asParams, err := sqlWorkflowJobToParamsJob(workflowJob)
if err != nil {
return params.Job{}, errors.Wrap(err, "converting job")
}
s.sendNotify(common.JobEntityType, operation, asParams)
return asParams, nil
}
// ListJobsByStatus lists all jobs for a given status.