diff --git a/database/common/store.go b/database/common/store.go index b7222d1c..82c5e4c0 100644 --- a/database/common/store.go +++ b/database/common/store.go @@ -143,7 +143,7 @@ type ScaleSetsStore interface { GetScaleSetByID(ctx context.Context, scaleSet uint) (params.ScaleSet, error) DeleteScaleSetByID(ctx context.Context, scaleSetID uint) (err error) SetScaleSetLastMessageID(ctx context.Context, scaleSetID uint, lastMessageID int64) error - SetScaleSetDesiredRunnerCount(ctx context.Context, scaleSetID uint, desiredRunnerCount int64) error + SetScaleSetDesiredRunnerCount(ctx context.Context, scaleSetID uint, desiredRunnerCount int) error } type ScaleSetInstanceStore interface { diff --git a/database/sql/models.go b/database/sql/models.go index ce156ccd..45e329f6 100644 --- a/database/sql/models.go +++ b/database/sql/models.go @@ -119,7 +119,7 @@ type ScaleSet struct { OSArch commonParams.OSArch Enabled bool LastMessageID int64 - DesiredRunnerCount int64 + DesiredRunnerCount int // ExtraSpecs is an opaque json that gets sent to the provider // as part of the bootstrap params for instances. It can contain // any kind of data needed by providers. diff --git a/database/sql/scalesets.go b/database/sql/scalesets.go index 03c34800..3adc423c 100644 --- a/database/sql/scalesets.go +++ b/database/sql/scalesets.go @@ -392,7 +392,7 @@ func (s *sqlDatabase) SetScaleSetLastMessageID(ctx context.Context, scaleSetID u return nil } -func (s *sqlDatabase) SetScaleSetDesiredRunnerCount(ctx context.Context, scaleSetID uint, desiredRunnerCount int64) error { +func (s *sqlDatabase) SetScaleSetDesiredRunnerCount(ctx context.Context, scaleSetID uint, desiredRunnerCount int) error { if err := s.conn.Transaction(func(tx *gorm.DB) error { if q := tx.Model(&ScaleSet{}).Where("id = ?", scaleSetID).Update("desired_runner_count", desiredRunnerCount); q.Error != nil { return errors.Wrap(q.Error, "saving database entry") diff --git a/params/params.go b/params/params.go index 8f63fecb..64a53984 100644 --- a/params/params.go +++ b/params/params.go @@ -462,7 +462,7 @@ type ScaleSet struct { OSArch commonParams.OSArch `json:"os_arch,omitempty"` Enabled bool `json:"enabled,omitempty"` Instances []Instance `json:"instances,omitempty"` - DesiredRunnerCount int64 `json:"desired_runner_count,omitempty"` + DesiredRunnerCount int `json:"desired_runner_count,omitempty"` RunnerBootstrapTimeout uint `json:"runner_bootstrap_timeout,omitempty"` // ExtraSpecs is an opaque raw json that gets sent to the provider diff --git a/workers/scaleset/interfaces.go b/workers/scaleset/interfaces.go index 077a35e5..51d1d54b 100644 --- a/workers/scaleset/interfaces.go +++ b/workers/scaleset/interfaces.go @@ -9,6 +9,7 @@ type scaleSetHelper interface { ScaleSetCLI() *scalesets.ScaleSetClient GetScaleSet() params.ScaleSet SetLastMessageID(id int64) error + SetDesiredRunnerCount(count int) error Owner() string HandleJobsCompleted(jobs []params.ScaleSetJobMessage) error HandleJobsStarted(jobs []params.ScaleSetJobMessage) error diff --git a/workers/scaleset/scaleset_helper.go b/workers/scaleset/scaleset_helper.go index 4d84a76b..ca673c4d 100644 --- a/workers/scaleset/scaleset_helper.go +++ b/workers/scaleset/scaleset_helper.go @@ -1,7 +1,12 @@ package scaleset import ( + "errors" "fmt" + "log/slog" + + runnerErrors "github.com/cloudbase/garm-provider-common/errors" + commonParams "github.com/cloudbase/garm-provider-common/params" "github.com/cloudbase/garm/params" "github.com/cloudbase/garm/util/github/scalesets" @@ -30,11 +35,54 @@ func (w *Worker) SetLastMessageID(id int64) error { // assigned and was not canceled before it had a chance to run, then we mark // that runner as pending_delete. func (w *Worker) HandleJobsCompleted(jobs []params.ScaleSetJobMessage) error { + for _, job := range jobs { + if job.RunnerName == "" { + // This job was not assigned to a runner, so we can skip it. + continue + } + // Set the runner to pending_delete. + runnerUpdateParams := params.UpdateInstanceParams{ + Status: commonParams.InstancePendingDelete, + RunnerStatus: params.RunnerTerminated, + } + _, err := w.store.UpdateInstance(w.ctx, job.RunnerName, runnerUpdateParams) + if err != nil { + if !errors.Is(err, runnerErrors.ErrNotFound) { + return fmt.Errorf("updating runner %s: %w", job.RunnerName, err) + } + } + } return nil } // HandleJobStarted updates the runners from idle to active in the DB and // assigns the job to them. func (w *Worker) HandleJobsStarted(jobs []params.ScaleSetJobMessage) error { + for _, job := range jobs { + if job.RunnerName == "" { + // This should not happen, but just in case. + continue + } + + updateParams := params.UpdateInstanceParams{ + RunnerStatus: params.RunnerActive, + } + + _, err := w.store.UpdateInstance(w.ctx, job.RunnerName, updateParams) + if err != nil { + if errors.Is(err, runnerErrors.ErrNotFound) { + slog.InfoContext(w.ctx, "runner not found; handled by some other controller?", "runner_name", job.RunnerName) + continue + } + return fmt.Errorf("updating runner %s: %w", job.RunnerName, err) + } + } + return nil +} + +func (w *Worker) SetDesiredRunnerCount(count int) error { + if err := w.store.SetScaleSetDesiredRunnerCount(w.ctx, w.scaleSet.ID, count); err != nil { + return fmt.Errorf("setting desired runner count: %w", err) + } return nil } diff --git a/workers/scaleset/scaleset_listener.go b/workers/scaleset/scaleset_listener.go index 80ba67c3..58d99bf3 100644 --- a/workers/scaleset/scaleset_listener.go +++ b/workers/scaleset/scaleset_listener.go @@ -167,6 +167,11 @@ func (l *scaleSetListener) handleSessionMessage(msg params.RunnerScaleSetMessage } else { l.lastMessageID = msg.MessageID } + + if err := l.scaleSetHelper.SetDesiredRunnerCount(msg.Statistics.TotalAssignedJobs); err != nil { + slog.ErrorContext(l.ctx, "setting desired runner count", "error", err) + } + if err := l.messageSession.DeleteMessage(l.listenerCtx, msg.MessageID); err != nil { slog.ErrorContext(l.ctx, "deleting message", "error", err) }