From 436fd7746fa1bcd7c58b87bbafcfa4627c3a97bc Mon Sep 17 00:00:00 2001 From: Gabriel Adrian Samfira Date: Wed, 23 Apr 2025 08:59:33 +0000 Subject: [PATCH] WiP Signed-off-by: Gabriel Adrian Samfira --- apiserver/controllers/instances.go | 2 +- apiserver/controllers/scalesets.go | 6 +- database/sql/models.go | 17 +++- database/sql/sql.go | 1 + database/sql/util.go | 34 ++++++++ workers/provider/provider.go | 123 ++++++++++++++++++++++++++++ workers/scaleset/scaleset.go | 125 ++++++++++++++++++++++++++++- 7 files changed, 301 insertions(+), 7 deletions(-) diff --git a/apiserver/controllers/instances.go b/apiserver/controllers/instances.go index fd6d2c45..3209a5c2 100644 --- a/apiserver/controllers/instances.go +++ b/apiserver/controllers/instances.go @@ -97,7 +97,7 @@ func (a *APIController) ListScaleSetInstancesHandler(w http.ResponseWriter, r *h } return } - id, err := strconv.ParseUint(scalesetID, 10, 64) + id, err := strconv.ParseUint(scalesetID, 10, 32) if err != nil { slog.With(slog.Any("error", err)).ErrorContext(ctx, "failed to parse id") handleError(ctx, w, gErrors.ErrBadRequest) diff --git a/apiserver/controllers/scalesets.go b/apiserver/controllers/scalesets.go index d12928f0..1d26221b 100644 --- a/apiserver/controllers/scalesets.go +++ b/apiserver/controllers/scalesets.go @@ -79,7 +79,7 @@ func (a *APIController) GetScaleSetByIDHandler(w http.ResponseWriter, r *http.Re } return } - id, err := strconv.ParseUint(scaleSetID, 10, 64) + id, err := strconv.ParseUint(scaleSetID, 10, 32) if err != nil { slog.With(slog.Any("error", err)).ErrorContext(ctx, "failed to parse id") handleError(ctx, w, gErrors.ErrBadRequest) @@ -130,7 +130,7 @@ func (a *APIController) DeleteScaleSetByIDHandler(w http.ResponseWriter, r *http return } - id, err := strconv.ParseUint(scalesetID, 10, 64) + id, err := strconv.ParseUint(scalesetID, 10, 32) if err != nil { slog.With(slog.Any("error", err)).ErrorContext(ctx, "failed to parse id") handleError(ctx, w, gErrors.ErrBadRequest) @@ -183,7 +183,7 @@ func (a *APIController) UpdateScaleSetByIDHandler(w http.ResponseWriter, r *http return } - id, err := strconv.ParseUint(scalesetID, 10, 64) + id, err := strconv.ParseUint(scalesetID, 10, 32) if err != nil { slog.With(slog.Any("error", err)).ErrorContext(ctx, "failed to parse id") handleError(ctx, w, gErrors.ErrBadRequest) diff --git a/database/sql/models.go b/database/sql/models.go index c1b6462d..3b1dcc9b 100644 --- a/database/sql/models.go +++ b/database/sql/models.go @@ -86,6 +86,17 @@ type Pool struct { Priority uint `gorm:"index:idx_pool_priority"` } +type ScaleSetEvent struct { + gorm.Model + + EventType params.EventType + EventLevel params.EventLevel + Message string `gorm:"type:text"` + + ScaleSetID uint `gorm:"index:idx_scale_set_event"` + ScaleSet ScaleSet `gorm:"foreignKey:ScaleSetID"` +} + // ScaleSet represents a github scale set. Scale sets are almost identical to pools with a few // notable exceptions: // - Labels are no longer relevant @@ -135,7 +146,11 @@ type ScaleSet struct { EnterpriseID *uuid.UUID `gorm:"index"` Enterprise Enterprise `gorm:"foreignKey:EnterpriseID"` - Instances []Instance `gorm:"foreignKey:ScaleSetFkID"` + Status string + StatusReason string `gorm:"type:text"` + + Instances []Instance `gorm:"foreignKey:ScaleSetFkID"` + Events []ScaleSetEvent `gorm:"foreignKey:ScaleSetID;constraint:OnDelete:CASCADE,OnUpdate:CASCADE;"` } type RepositoryEvent struct { diff --git a/database/sql/sql.go b/database/sql/sql.go index a704d9c3..878224c6 100644 --- a/database/sql/sql.go +++ b/database/sql/sql.go @@ -432,6 +432,7 @@ func (s *sqlDatabase) migrateDB() error { &ControllerInfo{}, &WorkflowJob{}, &ScaleSet{}, + &ScaleSetEvent{}, ); err != nil { return errors.Wrap(err, "running auto migrate") } diff --git a/database/sql/util.go b/database/sql/util.go index 112d0a76..5bd8de01 100644 --- a/database/sql/util.go +++ b/database/sql/util.go @@ -634,6 +634,40 @@ func (s *sqlDatabase) GetGithubEntity(_ context.Context, entityType params.Githu return entity, nil } +func (s *sqlDatabase) AddScaleSetEvent(ctx context.Context, scaleSetID uint, event params.EventType, eventLevel params.EventLevel, statusMessage string, maxEvents int) error { + scaleSet, err := s.GetScaleSetByID(ctx, scaleSetID) + if err != nil { + return errors.Wrap(err, "updating instance") + } + + msg := InstanceStatusUpdate{ + Message: statusMessage, + EventType: event, + EventLevel: eventLevel, + } + + if err := s.conn.Model(&scaleSet).Association("Events").Append(&msg); err != nil { + return errors.Wrap(err, "adding status message") + } + + if maxEvents > 0 { + var latestEvents []ScaleSetEvent + q := s.conn.Model(&ScaleSetEvent{}). + Limit(maxEvents).Order("id desc"). + Where("scale_set_id = ?", scaleSetID).Find(&latestEvents) + if q.Error != nil { + return errors.Wrap(q.Error, "fetching latest events") + } + if len(latestEvents) == maxEvents { + lastInList := latestEvents[len(latestEvents)-1] + if err := s.conn.Where("scale_set_id = ? and id < ?", scaleSetID, lastInList.ID).Unscoped().Delete(&ScaleSetEvent{}).Error; err != nil { + return errors.Wrap(err, "deleting old events") + } + } + } + return nil +} + func (s *sqlDatabase) addRepositoryEvent(ctx context.Context, repoID string, event params.EventType, eventLevel params.EventLevel, statusMessage string, maxEvents int) error { repo, err := s.GetRepositoryByID(ctx, repoID) if err != nil { diff --git a/workers/provider/provider.go b/workers/provider/provider.go index 7f0784e9..969a373d 100644 --- a/workers/provider/provider.go +++ b/workers/provider/provider.go @@ -3,10 +3,12 @@ package provider import ( "context" "fmt" + "log/slog" "sync" dbCommon "github.com/cloudbase/garm/database/common" "github.com/cloudbase/garm/database/watcher" + "github.com/cloudbase/garm/params" "github.com/cloudbase/garm/runner/common" ) @@ -32,12 +34,51 @@ type provider struct { store dbCommon.Store providers map[string]common.Provider + // A cache of all scale sets kept updated by the watcher. + // This helps us avoid a bunch of queries to the database. + scaleSets map[uint]params.ScaleSet + runners map[string]params.Instance mux sync.Mutex running bool quit chan struct{} } +func (p *provider) loadAllScaleSets() error { + p.mux.Lock() + defer p.mux.Unlock() + + scaleSets, err := p.store.ListAllScaleSets(p.ctx) + if err != nil { + return fmt.Errorf("fetching scale sets: %w", err) + } + + for _, scaleSet := range scaleSets { + p.scaleSets[scaleSet.ID] = scaleSet + } + + return nil +} + +// loadAllRunners loads all runners from the database. At this stage we only +// care about runners created by scale sets, but in the future, we will migrate +// the pool manager to the same model. +func (p *provider) loadAllRunners() error { + p.mux.Lock() + defer p.mux.Unlock() + + runners, err := p.store.ListAllInstances(p.ctx) + if err != nil { + return fmt.Errorf("fetching runners: %w", err) + } + + for _, runner := range runners { + p.runners[runner.Name] = runner + } + + return nil +} + func (p *provider) Start() error { p.mux.Lock() defer p.mux.Unlock() @@ -46,6 +87,14 @@ func (p *provider) Start() error { return nil } + if err := p.loadAllScaleSets(); err != nil { + return fmt.Errorf("loading all scale sets: %w", err) + } + + if err := p.loadAllRunners(); err != nil { + return fmt.Errorf("loading all runners: %w", err) + } + consumer, err := watcher.RegisterConsumer( p.ctx, p.consumerID, composeProviderWatcher()) if err != nil { @@ -55,6 +104,8 @@ func (p *provider) Start() error { p.quit = make(chan struct{}) p.running = true + go p.loop() + return nil } @@ -71,3 +122,75 @@ func (p *provider) Stop() error { p.running = false return nil } + +func (p *provider) loop() { + defer p.Stop() + for { + select { + case payload := <-p.consumer.Watch(): + slog.InfoContext(p.ctx, "received payload", slog.Any("payload", payload)) + go p.handleWatcherEvent(payload) + case <-p.ctx.Done(): + return + case <-p.quit: + return + } + } +} + +func (p *provider) handleWatcherEvent(payload dbCommon.ChangePayload) { + switch payload.EntityType { + case dbCommon.ScaleSetEntityType: + p.handleScaleSetEvent(payload) + case dbCommon.InstanceEntityType: + p.handleInstanceEvent(payload) + default: + slog.ErrorContext(p.ctx, "invalid entity type", "entity_type", payload.EntityType) + } +} + +func (p *provider) handleScaleSetEvent(event dbCommon.ChangePayload) { + p.mux.Lock() + defer p.mux.Unlock() + + scaleSet, ok := event.Payload.(params.ScaleSet) + if !ok { + slog.ErrorContext(p.ctx, "invalid payload type", "payload_type", fmt.Sprintf("%T", event.Payload)) + return + } + + switch event.Operation { + case dbCommon.CreateOperation, dbCommon.UpdateOperation: + slog.DebugContext(p.ctx, "got create/update operation") + p.scaleSets[scaleSet.ID] = scaleSet + case dbCommon.DeleteOperation: + slog.DebugContext(p.ctx, "got delete operation") + delete(p.scaleSets, scaleSet.ID) + default: + slog.ErrorContext(p.ctx, "invalid operation type", "operation_type", event.Operation) + return + } +} + +func (p *provider) handleInstanceEvent(event dbCommon.ChangePayload) { + p.mux.Lock() + defer p.mux.Unlock() + + instance, ok := event.Payload.(params.Instance) + if !ok { + slog.ErrorContext(p.ctx, "invalid payload type", "payload_type", fmt.Sprintf("%T", event.Payload)) + return + } + + switch event.Operation { + case dbCommon.CreateOperation, dbCommon.UpdateOperation: + slog.DebugContext(p.ctx, "got create/update operation") + p.runners[instance.Name] = instance + case dbCommon.DeleteOperation: + slog.DebugContext(p.ctx, "got delete operation") + delete(p.runners, instance.Name) + default: + slog.ErrorContext(p.ctx, "invalid operation type", "operation_type", event.Operation) + return + } +} diff --git a/workers/scaleset/scaleset.go b/workers/scaleset/scaleset.go index 24df1cbb..012a41d1 100644 --- a/workers/scaleset/scaleset.go +++ b/workers/scaleset/scaleset.go @@ -99,7 +99,96 @@ func (w *Worker) Start() (err error) { } for _, instance := range instances { + if instance.Status == commonParams.InstanceCreating { + // We're just starting up. We found an instance stuck in creating. + // When a provider creates an instance, it sets the db instance to + // creating and then issues an API call to the IaaS to create the + // instance using some userdata it needs to come up. But the instance + // will still need to call back home to fetch aditional metadata and + // complete its setup. We should remove the instance as it is not + // possible to reliably determine the state of the instance (if it's in + // mid boot before it reached the phase where it runs the metadtata, or + // if it already failed). + instanceState := commonParams.InstancePendingDelete + locking.Lock(instance.Name) + if instance.AgentID != 0 { + if err := w.scaleSetCli.RemoveRunner(w.ctx, instance.AgentID); err != nil { + // scale sets use JIT runners. This means that we create the runner in github + // before we create the actual instance that will use the credentials. We need + // to remove the runner from github if it exists. + if !errors.Is(err, runnerErrors.ErrNotFound) { + if errors.Is(err, runnerErrors.ErrUnauthorized) { + // we don't have access to remove the runner. This implies that our + // credentials may have expired. + // + // TODO: we need to set the scale set as inactive and stop the listener (if any). + slog.ErrorContext(w.ctx, "error removing runner", "runner_name", instance.Name, "error", err) + w.runners[instance.ID] = instance + locking.Unlock(instance.Name, false) + continue + } + // The runner may have come up, registered and is currently running a + // job, in which case, github will not allow us to remove it. + runnerInstance, err := w.scaleSetCli.GetRunner(w.ctx, instance.AgentID) + if err != nil { + if !errors.Is(err, runnerErrors.ErrNotFound) { + // We could not get info about the runner and it wasn't not found + slog.ErrorContext(w.ctx, "error getting runner details", "error", err) + w.runners[instance.ID] = instance + locking.Unlock(instance.Name, false) + continue + } + } + if runnerInstance.Status == string(params.RunnerIdle) || + runnerInstance.Status == string(params.RunnerActive) { + // This is a highly unlikely scenario, but let's account for it anyway. + // + // The runner is running a job or is idle. Mark it as running, as + // it appears that it finished booting and is now running. + // + // NOTE: if the instance was in creating and it managed to boot, there + // is a high chance that the we do not have a provider ID for the runner + // inside our database. When removing the runner, the provider will attempt + // to use the instance name instead of the provider ID, the same as when + // creation of the instance fails and we try to clean up any lingering resources + // in the provider. + slog.DebugContext(w.ctx, "runner is running a job or is idle; not removing", "runner_name", instance.Name) + instanceState = commonParams.InstanceRunning + } + } + } + } + runnerUpdateParams := params.UpdateInstanceParams{ + Status: instanceState, + } + instance, err = w.store.UpdateInstance(w.ctx, instance.Name, runnerUpdateParams) + if err != nil { + if !errors.Is(err, runnerErrors.ErrNotFound) { + locking.Unlock(instance.Name, false) + return fmt.Errorf("updating runner %s: %w", instance.Name, err) + } + } + locking.Unlock(instance.Name, false) + } else if instance.Status == commonParams.InstanceDeleting { + // Set the instance in deleting. It is assumed that the runner was already + // removed from github either by github or by garm. Deleting status indicates + // that it was already being handled by the provider. There should be no entry on + // github for the runner if that was the case. + // Setting it in pending_delete will cause the provider to try again, an operation + // which is idempotent (if it's already deleted, the provider reports success). + runnerUpdateParams := params.UpdateInstanceParams{ + Status: commonParams.InstancePendingDelete, + } + instance, err = w.store.UpdateInstance(w.ctx, instance.Name, runnerUpdateParams) + if err != nil { + if !errors.Is(err, runnerErrors.ErrNotFound) { + locking.Unlock(instance.Name, false) + return fmt.Errorf("updating runner %s: %w", instance.Name, err) + } + } + } w.runners[instance.ID] = instance + locking.Unlock(instance.Name, false) } consumer, err := watcher.RegisterConsumer( @@ -212,11 +301,43 @@ func (w *Worker) handleInstanceEntityEvent(event dbCommon.ChangePayload) { return } switch event.Operation { - case dbCommon.UpdateOperation, dbCommon.CreateOperation: - slog.DebugContext(w.ctx, "got update operation") + case dbCommon.CreateOperation: + slog.DebugContext(w.ctx, "got create operation") w.mux.Lock() w.runners[instance.ID] = instance w.mux.Unlock() + case dbCommon.UpdateOperation: + slog.DebugContext(w.ctx, "got update operation") + w.mux.Lock() + oldInstance, ok := w.runners[instance.ID] + w.runners[instance.ID] = instance + + if !ok { + slog.DebugContext(w.ctx, "instance not found in local cache; ignoring", "instance_id", instance.ID) + w.mux.Unlock() + return + } + if oldInstance.RunnerStatus != instance.RunnerStatus && instance.RunnerStatus == params.RunnerIdle { + serviceRuner, err := w.scaleSetCli.GetRunner(w.ctx, instance.AgentID) + if err != nil { + slog.ErrorContext(w.ctx, "error getting runner details", "error", err) + w.mux.Unlock() + return + } + status, ok := serviceRuner.Status.(string) + if !ok { + slog.ErrorContext(w.ctx, "error getting runner status", "runner_id", instance.AgentID) + w.mux.Unlock() + return + } + if status != string(params.RunnerIdle) && status != string(params.RunnerActive) { + // TODO: Wait for the status to change for a while (30 seconds?). Mark the instance as + // pending_delete if the runner never comes online. + w.mux.Unlock() + return + } + } + w.mux.Unlock() case dbCommon.DeleteOperation: slog.DebugContext(w.ctx, "got delete operation") w.mux.Lock()