Handle JobStarted and JobCompleted

Signed-off-by: Gabriel Adrian Samfira <gsamfira@cloudbasesolutions.com>
This commit is contained in:
Gabriel Adrian Samfira 2025-04-17 16:53:54 +00:00
parent 8d10dd4716
commit 94f264d444
7 changed files with 58 additions and 4 deletions

View file

@ -143,7 +143,7 @@ type ScaleSetsStore interface {
GetScaleSetByID(ctx context.Context, scaleSet uint) (params.ScaleSet, error)
DeleteScaleSetByID(ctx context.Context, scaleSetID uint) (err error)
SetScaleSetLastMessageID(ctx context.Context, scaleSetID uint, lastMessageID int64) error
SetScaleSetDesiredRunnerCount(ctx context.Context, scaleSetID uint, desiredRunnerCount int64) error
SetScaleSetDesiredRunnerCount(ctx context.Context, scaleSetID uint, desiredRunnerCount int) error
}
type ScaleSetInstanceStore interface {

View file

@ -119,7 +119,7 @@ type ScaleSet struct {
OSArch commonParams.OSArch
Enabled bool
LastMessageID int64
DesiredRunnerCount int64
DesiredRunnerCount int
// ExtraSpecs is an opaque json that gets sent to the provider
// as part of the bootstrap params for instances. It can contain
// any kind of data needed by providers.

View file

@ -392,7 +392,7 @@ func (s *sqlDatabase) SetScaleSetLastMessageID(ctx context.Context, scaleSetID u
return nil
}
func (s *sqlDatabase) SetScaleSetDesiredRunnerCount(ctx context.Context, scaleSetID uint, desiredRunnerCount int64) error {
func (s *sqlDatabase) SetScaleSetDesiredRunnerCount(ctx context.Context, scaleSetID uint, desiredRunnerCount int) error {
if err := s.conn.Transaction(func(tx *gorm.DB) error {
if q := tx.Model(&ScaleSet{}).Where("id = ?", scaleSetID).Update("desired_runner_count", desiredRunnerCount); q.Error != nil {
return errors.Wrap(q.Error, "saving database entry")

View file

@ -462,7 +462,7 @@ type ScaleSet struct {
OSArch commonParams.OSArch `json:"os_arch,omitempty"`
Enabled bool `json:"enabled,omitempty"`
Instances []Instance `json:"instances,omitempty"`
DesiredRunnerCount int64 `json:"desired_runner_count,omitempty"`
DesiredRunnerCount int `json:"desired_runner_count,omitempty"`
RunnerBootstrapTimeout uint `json:"runner_bootstrap_timeout,omitempty"`
// ExtraSpecs is an opaque raw json that gets sent to the provider

View file

@ -9,6 +9,7 @@ type scaleSetHelper interface {
ScaleSetCLI() *scalesets.ScaleSetClient
GetScaleSet() params.ScaleSet
SetLastMessageID(id int64) error
SetDesiredRunnerCount(count int) error
Owner() string
HandleJobsCompleted(jobs []params.ScaleSetJobMessage) error
HandleJobsStarted(jobs []params.ScaleSetJobMessage) error

View file

@ -1,7 +1,12 @@
package scaleset
import (
"errors"
"fmt"
"log/slog"
runnerErrors "github.com/cloudbase/garm-provider-common/errors"
commonParams "github.com/cloudbase/garm-provider-common/params"
"github.com/cloudbase/garm/params"
"github.com/cloudbase/garm/util/github/scalesets"
@ -30,11 +35,54 @@ func (w *Worker) SetLastMessageID(id int64) error {
// assigned and was not canceled before it had a chance to run, then we mark
// that runner as pending_delete.
func (w *Worker) HandleJobsCompleted(jobs []params.ScaleSetJobMessage) error {
for _, job := range jobs {
if job.RunnerName == "" {
// This job was not assigned to a runner, so we can skip it.
continue
}
// Set the runner to pending_delete.
runnerUpdateParams := params.UpdateInstanceParams{
Status: commonParams.InstancePendingDelete,
RunnerStatus: params.RunnerTerminated,
}
_, err := w.store.UpdateInstance(w.ctx, job.RunnerName, runnerUpdateParams)
if err != nil {
if !errors.Is(err, runnerErrors.ErrNotFound) {
return fmt.Errorf("updating runner %s: %w", job.RunnerName, err)
}
}
}
return nil
}
// HandleJobStarted updates the runners from idle to active in the DB and
// assigns the job to them.
func (w *Worker) HandleJobsStarted(jobs []params.ScaleSetJobMessage) error {
for _, job := range jobs {
if job.RunnerName == "" {
// This should not happen, but just in case.
continue
}
updateParams := params.UpdateInstanceParams{
RunnerStatus: params.RunnerActive,
}
_, err := w.store.UpdateInstance(w.ctx, job.RunnerName, updateParams)
if err != nil {
if errors.Is(err, runnerErrors.ErrNotFound) {
slog.InfoContext(w.ctx, "runner not found; handled by some other controller?", "runner_name", job.RunnerName)
continue
}
return fmt.Errorf("updating runner %s: %w", job.RunnerName, err)
}
}
return nil
}
func (w *Worker) SetDesiredRunnerCount(count int) error {
if err := w.store.SetScaleSetDesiredRunnerCount(w.ctx, w.scaleSet.ID, count); err != nil {
return fmt.Errorf("setting desired runner count: %w", err)
}
return nil
}

View file

@ -167,6 +167,11 @@ func (l *scaleSetListener) handleSessionMessage(msg params.RunnerScaleSetMessage
} else {
l.lastMessageID = msg.MessageID
}
if err := l.scaleSetHelper.SetDesiredRunnerCount(msg.Statistics.TotalAssignedJobs); err != nil {
slog.ErrorContext(l.ctx, "setting desired runner count", "error", err)
}
if err := l.messageSession.DeleteMessage(l.listenerCtx, msg.MessageID); err != nil {
slog.ErrorContext(l.ctx, "deleting message", "error", err)
}