diff --git a/cache/cache.go b/cache/cache.go new file mode 100644 index 00000000..2fa52456 --- /dev/null +++ b/cache/cache.go @@ -0,0 +1,65 @@ +package cache + +import ( + "sync" + "time" + + commonParams "github.com/cloudbase/garm-provider-common/params" + + "github.com/cloudbase/garm/params" +) + +var githubToolsCache *GithubToolsCache + +func init() { + ghToolsCache := &GithubToolsCache{ + entities: make(map[string]GithubEntityTools), + } + githubToolsCache = ghToolsCache +} + +type GithubEntityTools struct { + updatedAt time.Time + entity params.GithubEntity + tools []commonParams.RunnerApplicationDownload +} + +type GithubToolsCache struct { + mux sync.Mutex + // entity IDs are UUID4s. It is highly unlikely they will collide (🤞). + entities map[string]GithubEntityTools +} + +func (g *GithubToolsCache) Get(entity params.GithubEntity) ([]commonParams.RunnerApplicationDownload, bool) { + g.mux.Lock() + defer g.mux.Unlock() + + if cache, ok := g.entities[entity.String()]; ok { + if time.Since(cache.updatedAt) > 1*time.Hour { + // Stale cache, remove it. + delete(g.entities, entity.String()) + return nil, false + } + return cache.tools, true + } + return nil, false +} + +func (g *GithubToolsCache) Set(entity params.GithubEntity, tools []commonParams.RunnerApplicationDownload) { + g.mux.Lock() + defer g.mux.Unlock() + + g.entities[entity.String()] = GithubEntityTools{ + updatedAt: time.Now(), + entity: entity, + tools: tools, + } +} + +func SetGithubToolsCache(entity params.GithubEntity, tools []commonParams.RunnerApplicationDownload) { + githubToolsCache.Set(entity, tools) +} + +func GetGithubToolsCache(entity params.GithubEntity) ([]commonParams.RunnerApplicationDownload, bool) { + return githubToolsCache.Get(entity) +} diff --git a/database/common/store.go b/database/common/store.go index 860ed8ac..c732400a 100644 --- a/database/common/store.go +++ b/database/common/store.go @@ -163,4 +163,5 @@ type Store interface { ControllerInfo() (params.ControllerInfo, error) InitController() (params.ControllerInfo, error) GetGithubEntity(_ context.Context, entityType params.GithubEntityType, entityID string) (params.GithubEntity, error) + AddEntityEvent(ctx context.Context, entity params.GithubEntity, event params.EventType, eventLevel params.EventLevel, statusMessage string, maxEvents int) error } diff --git a/database/sql/models.go b/database/sql/models.go index c44baceb..e443e75a 100644 --- a/database/sql/models.go +++ b/database/sql/models.go @@ -136,6 +136,17 @@ type ScaleSet struct { Instances []Instance `gorm:"foreignKey:ScaleSetFkID"` } +type RepositoryEvent struct { + gorm.Model + + EventType params.EventType + EventLevel params.EventLevel + Message string `gorm:"type:text"` + + RepoID uuid.UUID `gorm:"index:idx_repo_event"` + Repo Repository `gorm:"foreignKey:RepoID"` +} + type Repository struct { Base @@ -154,8 +165,20 @@ type Repository struct { EndpointName *string `gorm:"index:idx_owner_nocase,unique,collate:nocase"` Endpoint GithubEndpoint `gorm:"foreignKey:EndpointName;constraint:OnDelete:SET NULL"` + + Events []RepositoryEvent `gorm:"foreignKey:RepoID;constraint:OnDelete:CASCADE,OnUpdate:CASCADE;"` } +type OrganizationEvent struct { + gorm.Model + + EventType params.EventType + EventLevel params.EventLevel + Message string `gorm:"type:text"` + + OrgID uuid.UUID `gorm:"index:idx_org_event"` + Org Organization `gorm:"foreignKey:OrgID"` +} type Organization struct { Base @@ -173,6 +196,19 @@ type Organization struct { EndpointName *string `gorm:"index:idx_org_name_nocase,collate:nocase"` Endpoint GithubEndpoint `gorm:"foreignKey:EndpointName;constraint:OnDelete:SET NULL"` + + Events []OrganizationEvent `gorm:"foreignKey:OrgID;constraint:OnDelete:CASCADE,OnUpdate:CASCADE;"` +} + +type EnterpriseEvent struct { + gorm.Model + + EventType params.EventType + EventLevel params.EventLevel + Message string `gorm:"type:text"` + + EnterpriseID uuid.UUID `gorm:"index:idx_enterprise_event"` + Enterprise Enterprise `gorm:"foreignKey:EnterpriseID"` } type Enterprise struct { @@ -192,6 +228,8 @@ type Enterprise struct { EndpointName *string `gorm:"index:idx_ent_name_nocase,collate:nocase"` Endpoint GithubEndpoint `gorm:"foreignKey:EndpointName;constraint:OnDelete:SET NULL"` + + Events []EnterpriseEvent `gorm:"foreignKey:EnterpriseID;constraint:OnDelete:CASCADE,OnUpdate:CASCADE;"` } type Address struct { diff --git a/database/sql/sql.go b/database/sql/sql.go index 4d23d253..a704d9c3 100644 --- a/database/sql/sql.go +++ b/database/sql/sql.go @@ -423,6 +423,9 @@ func (s *sqlDatabase) migrateDB() error { &Repository{}, &Organization{}, &Enterprise{}, + &EnterpriseEvent{}, + &OrganizationEvent{}, + &RepositoryEvent{}, &Address{}, &InstanceStatusUpdate{}, &Instance{}, diff --git a/database/sql/util.go b/database/sql/util.go index fb627814..c1a44cb8 100644 --- a/database/sql/util.go +++ b/database/sql/util.go @@ -631,3 +631,136 @@ func (s *sqlDatabase) GetGithubEntity(_ context.Context, entityType params.Githu } return entity, nil } + +func (s *sqlDatabase) addRepositoryEvent(ctx context.Context, repoID string, event params.EventType, eventLevel params.EventLevel, statusMessage string, maxEvents int) error { + repo, err := s.GetRepositoryByID(ctx, repoID) + if err != nil { + return errors.Wrap(err, "updating instance") + } + + msg := InstanceStatusUpdate{ + Message: statusMessage, + EventType: event, + EventLevel: eventLevel, + } + + if err := s.conn.Model(&repo).Association("Events").Append(&msg); err != nil { + return errors.Wrap(err, "adding status message") + } + + if maxEvents > 0 { + repoID, err := uuid.Parse(repo.ID) + if err != nil { + return errors.Wrap(runnerErrors.ErrBadRequest, "parsing id") + } + var latestEvents []OrganizationEvent + q := s.conn.Model(&OrganizationEvent{}). + Limit(maxEvents).Order("id desc"). + Where("repo_id = ?", repoID).Find(&latestEvents) + if q.Error != nil { + return errors.Wrap(q.Error, "fetching latest events") + } + if len(latestEvents) == maxEvents { + lastInList := latestEvents[len(latestEvents)-1] + if err := s.conn.Where("repo_id = ? and id < ?", repoID, lastInList.ID).Unscoped().Delete(&OrganizationEvent{}).Error; err != nil { + return errors.Wrap(err, "deleting old events") + } + } + } + return nil +} + +func (s *sqlDatabase) addOrgEvent(ctx context.Context, orgID string, event params.EventType, eventLevel params.EventLevel, statusMessage string, maxEvents int) error { + org, err := s.GetOrganizationByID(ctx, orgID) + if err != nil { + return errors.Wrap(err, "updating instance") + } + + msg := InstanceStatusUpdate{ + Message: statusMessage, + EventType: event, + EventLevel: eventLevel, + } + + if err := s.conn.Model(&org).Association("Events").Append(&msg); err != nil { + return errors.Wrap(err, "adding status message") + } + + if maxEvents > 0 { + orgID, err := uuid.Parse(org.ID) + if err != nil { + return errors.Wrap(runnerErrors.ErrBadRequest, "parsing id") + } + var latestEvents []OrganizationEvent + q := s.conn.Model(&OrganizationEvent{}). + Limit(maxEvents).Order("id desc"). + Where("org_id = ?", orgID).Find(&latestEvents) + if q.Error != nil { + return errors.Wrap(q.Error, "fetching latest events") + } + if len(latestEvents) == maxEvents { + lastInList := latestEvents[len(latestEvents)-1] + if err := s.conn.Where("org_id = ? and id < ?", orgID, lastInList.ID).Unscoped().Delete(&OrganizationEvent{}).Error; err != nil { + return errors.Wrap(err, "deleting old events") + } + } + } + return nil +} + +func (s *sqlDatabase) addEnterpriseEvent(ctx context.Context, entID string, event params.EventType, eventLevel params.EventLevel, statusMessage string, maxEvents int) error { + ent, err := s.GetEnterpriseByID(ctx, entID) + if err != nil { + return errors.Wrap(err, "updating instance") + } + + msg := InstanceStatusUpdate{ + Message: statusMessage, + EventType: event, + EventLevel: eventLevel, + } + + if err := s.conn.Model(&ent).Association("Events").Append(&msg); err != nil { + return errors.Wrap(err, "adding status message") + } + + if maxEvents > 0 { + entID, err := uuid.Parse(ent.ID) + if err != nil { + return errors.Wrap(runnerErrors.ErrBadRequest, "parsing id") + } + var latestEvents []EnterpriseEvent + q := s.conn.Model(&EnterpriseEvent{}). + Limit(maxEvents).Order("id desc"). + Where("enterprise_id = ?", entID).Find(&latestEvents) + if q.Error != nil { + return errors.Wrap(q.Error, "fetching latest events") + } + if len(latestEvents) == maxEvents { + lastInList := latestEvents[len(latestEvents)-1] + if err := s.conn.Where("enterprise_id = ? and id < ?", entID, lastInList.ID).Unscoped().Delete(&EnterpriseEvent{}).Error; err != nil { + return errors.Wrap(err, "deleting old events") + } + } + } + + return nil +} + +func (s *sqlDatabase) AddEntityEvent(ctx context.Context, entity params.GithubEntity, event params.EventType, eventLevel params.EventLevel, statusMessage string, maxEvents int) error { + if maxEvents == 0 { + return errors.Wrap(runnerErrors.ErrBadRequest, "max events cannot be 0") + } + // TODO(gabriel-samfira): Should we send watcher notifications for events? + // Not sure it's of any value. + switch entity.EntityType { + case params.GithubEntityTypeRepository: + return s.addRepositoryEvent(ctx, entity.ID, event, eventLevel, statusMessage, maxEvents) + case params.GithubEntityTypeOrganization: + return s.addOrgEvent(ctx, entity.ID, event, eventLevel, statusMessage, maxEvents) + case params.GithubEntityTypeEnterprise: + return s.addEnterpriseEvent(ctx, entity.ID, event, eventLevel, statusMessage, maxEvents) + default: + return errors.Wrap(runnerErrors.ErrBadRequest, "invalid entity type") + } +} diff --git a/params/github.go b/params/github.go index 2d132d50..9eec6e8c 100644 --- a/params/github.go +++ b/params/github.go @@ -402,6 +402,18 @@ type RunnerScaleSetMessage struct { Statistics *RunnerScaleSetStatistic `json:"statistics"` } +func (r RunnerScaleSetMessage) GetJobsFromBody() ([]ScaleSetJobMessage, error) { + var body []ScaleSetJobMessage + if r.Body == "" { + return nil, fmt.Errorf("no body specified") + } + + if err := json.Unmarshal([]byte(r.Body), &body); err != nil { + return nil, fmt.Errorf("failed to unmarshal body: %w", err) + } + return body, nil +} + type RunnerReference struct { ID int `json:"id"` Name string `json:"name"` @@ -469,3 +481,23 @@ type RunnerGroupList struct { Count int `json:"count"` RunnerGroups []RunnerGroup `json:"value"` } + +type ScaleSetJobMessage struct { + MessageType string `json:"messageType,omitempty"` + RunnerRequestId int64 `json:"runnerRequestId,omitempty"` + RepositoryName string `json:"repositoryName,omitempty"` + OwnerName string `json:"ownerName,omitempty"` + JobWorkflowRef string `json:"jobWorkflowRef,omitempty"` + JobDisplayName string `json:"jobDisplayName,omitempty"` + WorkflowRunId int64 `json:"workflowRunId,omitempty"` + EventName string `json:"eventName,omitempty"` + RequestLabels []string `json:"requestLabels,omitempty"` + QueueTime time.Time `json:"queueTime,omitempty"` + ScaleSetAssignTime time.Time `json:"scaleSetAssignTime,omitempty"` + RunnerAssignTime time.Time `json:"runnerAssignTime,omitempty"` + FinishTime time.Time `json:"finishTime,omitempty"` + Result string `json:"result,omitempty"` + RunnerId int `json:"runnerId,omitempty"` + RunnerName string `json:"runnerName,omitempty"` + AcquireJobUrl string `json:"acquireJobUrl,omitempty"` +} diff --git a/runner/scalesets.go b/runner/scalesets.go index 34ba699a..ef45a783 100644 --- a/runner/scalesets.go +++ b/runner/scalesets.go @@ -94,6 +94,7 @@ func (r *Runner) DeleteScaleSetByID(ctx context.Context, scaleSetID uint) error return errors.Wrap(err, "getting scaleset client") } + slog.DebugContext(ctx, "deleting scale set", "scale_set_id", scaleSet.ScaleSetID) if err := scalesetCli.DeleteRunnerScaleSet(ctx, scaleSet.ScaleSetID); err != nil { if !errors.Is(err, runnerErrors.ErrNotFound) { slog.InfoContext(ctx, "scale set not found", "scale_set_id", scaleSet.ScaleSetID) diff --git a/workers/entity/controller.go b/workers/entity/controller.go index c1547302..1e0035c0 100644 --- a/workers/entity/controller.go +++ b/workers/entity/controller.go @@ -74,7 +74,6 @@ func (c *Controller) loadAllRepositories() error { if err != nil { return fmt.Errorf("creating worker: %w", err) } - slog.DebugContext(c.ctx, "starting entity worker", "entity_id", entity.ID, "entity_type", entity.EntityType) if err := worker.Start(); err != nil { return fmt.Errorf("starting worker: %w", err) } @@ -99,7 +98,6 @@ func (c *Controller) loadAllOrganizations() error { if err != nil { return fmt.Errorf("creating worker: %w", err) } - slog.DebugContext(c.ctx, "starting entity worker", "entity_id", entity.ID, "entity_type", entity.EntityType) if err := worker.Start(); err != nil { return fmt.Errorf("starting worker: %w", err) } @@ -124,7 +122,6 @@ func (c *Controller) loadAllEnterprises() error { if err != nil { return fmt.Errorf("creating worker: %w", err) } - slog.DebugContext(c.ctx, "starting entity worker", "entity_id", entity.ID, "entity_type", entity.EntityType) if err := worker.Start(); err != nil { return fmt.Errorf("starting worker: %w", err) } @@ -172,6 +169,7 @@ func (c *Controller) Start() error { } func (c *Controller) Stop() error { + slog.DebugContext(c.ctx, "stopping entity controller", "entity", c.consumerID) c.mux.Lock() defer c.mux.Unlock() if !c.running { diff --git a/workers/entity/worker.go b/workers/entity/worker.go index 2a8591cb..49fb75cb 100644 --- a/workers/entity/worker.go +++ b/workers/entity/worker.go @@ -50,6 +50,7 @@ type Worker struct { } func (w *Worker) Stop() error { + slog.DebugContext(w.ctx, "stopping entity worker", "entity", w.consumerID) w.mux.Lock() defer w.mux.Unlock() @@ -69,7 +70,8 @@ func (w *Worker) Stop() error { return nil } -func (w *Worker) Start() error { +func (w *Worker) Start() (err error) { + slog.DebugContext(w.ctx, "starting entity worker", "entity", w.consumerID) w.mux.Lock() defer w.mux.Unlock() @@ -77,8 +79,19 @@ func (w *Worker) Start() error { if err != nil { return fmt.Errorf("creating scale set controller: %w", err) } + + if err := scaleSetController.Start(); err != nil { + return fmt.Errorf("starting scale set controller: %w", err) + } w.scaleSetController = scaleSetController + defer func() { + if err != nil { + w.scaleSetController.Stop() + w.scaleSetController = nil + } + }() + consumer, err := watcher.RegisterConsumer( w.ctx, w.consumerID, composeWorkerWatcherFilters(w.Entity), @@ -90,6 +103,7 @@ func (w *Worker) Start() error { w.running = true w.quit = make(chan struct{}) + go w.loop() return nil } diff --git a/workers/scaleset/controller.go b/workers/scaleset/controller.go index de426621..809a2cba 100644 --- a/workers/scaleset/controller.go +++ b/workers/scaleset/controller.go @@ -2,20 +2,21 @@ package scaleset import ( "context" + "errors" "fmt" "log/slog" "sync" "time" - commonParams "github.com/cloudbase/garm-provider-common/params" + runnerErrors "github.com/cloudbase/garm-provider-common/errors" + "github.com/cloudbase/garm/cache" dbCommon "github.com/cloudbase/garm/database/common" "github.com/cloudbase/garm/database/watcher" "github.com/cloudbase/garm/params" "github.com/cloudbase/garm/runner/common" garmUtil "github.com/cloudbase/garm/util" "github.com/cloudbase/garm/util/github" - "github.com/cloudbase/garm/util/github/scalesets" ) func NewController(ctx context.Context, store dbCommon.Store, entity params.GithubEntity, providers map[string]common.Provider) (*Controller, error) { @@ -76,9 +77,7 @@ type Controller struct { store dbCommon.Store providers map[string]common.Provider - tools []commonParams.RunnerApplicationDownload ghCli common.GithubClient - scaleSetCli *scalesets.ScaleSetClient forgeCredsAreValid bool statusUpdates chan scaleSetStatus @@ -88,14 +87,15 @@ type Controller struct { quit chan struct{} } -func (c *Controller) loadAllScaleSets() error { +func (c *Controller) loadAllScaleSets(cli common.GithubClient) error { scaleSets, err := c.store.ListEntityScaleSets(c.ctx, c.Entity) if err != nil { return fmt.Errorf("listing scale sets: %w", err) } for _, sSet := range scaleSets { - if err := c.handleScaleSetCreateOperation(sSet); err != nil { + slog.DebugContext(c.ctx, "loading scale set", "scale_set", sSet.ID) + if err := c.handleScaleSetCreateOperation(sSet, cli); err != nil { slog.With(slog.Any("error", err)).ErrorContext(c.ctx, "failed to handle scale set create operation") continue } @@ -104,6 +104,7 @@ func (c *Controller) loadAllScaleSets() error { } func (c *Controller) Start() (err error) { + slog.DebugContext(c.ctx, "starting scale set controller", "scale_set", c.consumerID) c.mux.Lock() if c.running { c.mux.Unlock() @@ -111,15 +112,16 @@ func (c *Controller) Start() (err error) { } c.mux.Unlock() - if err := c.loadAllScaleSets(); err != nil { - return fmt.Errorf("loading all scale sets: %w", err) - } - ghCli, err := github.Client(c.ctx, c.Entity) if err != nil { return fmt.Errorf("creating github client: %w", err) } + slog.DebugContext(c.ctx, "loaging scale sets", "entity", c.Entity.String()) + if err := c.loadAllScaleSets(ghCli); err != nil { + return fmt.Errorf("loading all scale sets: %w", err) + } + consumer, err := watcher.RegisterConsumer( c.ctx, c.consumerID, composeControllerWatcherFilters(c.Entity), @@ -140,6 +142,7 @@ func (c *Controller) Start() (err error) { } func (c *Controller) Stop() error { + slog.DebugContext(c.ctx, "stopping scale set controller", "scale_set", c.consumerID) c.mux.Lock() defer c.mux.Unlock() @@ -170,15 +173,21 @@ func (c *Controller) updateTools() error { c.mux.Lock() defer c.mux.Unlock() + slog.DebugContext(c.ctx, "updating tools for entity", "entity", c.Entity.String()) + tools, err := garmUtil.FetchTools(c.ctx, c.ghCli) if err != nil { slog.With(slog.Any("error", err)).ErrorContext( c.ctx, "failed to update tools for entity", "entity", c.Entity.String()) - c.forgeCredsAreValid = false + if errors.Is(err, runnerErrors.ErrUnauthorized) { + // TODO: block all scale sets + c.forgeCredsAreValid = false + } return fmt.Errorf("failed to update tools for entity %s: %w", c.Entity.String(), err) } + slog.DebugContext(c.ctx, "tools successfully updated for entity", "entity", c.Entity.String()) c.forgeCredsAreValid = true - c.tools = tools + cache.SetGithubToolsCache(c.Entity, tools) return nil } @@ -202,7 +211,7 @@ func (c *Controller) loop() { updateToolsTicker := time.NewTicker(common.PoolToolUpdateInterval) initialToolUpdate := make(chan struct{}, 1) go func() { - slog.Info("running initial tool update") + slog.InfoContext(c.ctx, "running initial tool update") if err := c.updateTools(); err != nil { slog.With(slog.Any("error", err)).Error("failed to update tools") } @@ -211,7 +220,11 @@ func (c *Controller) loop() { for { select { - case payload := <-c.consumer.Watch(): + case payload, ok := <-c.consumer.Watch(): + if !ok { + slog.InfoContext(c.ctx, "consumer channel closed") + return + } slog.InfoContext(c.ctx, "received payload", slog.Any("payload", payload)) go c.handleWatcherEvent(payload) case <-c.ctx.Done(): diff --git a/workers/scaleset/controller_watcher.go b/workers/scaleset/controller_watcher.go index 4b347f59..591e768e 100644 --- a/workers/scaleset/controller_watcher.go +++ b/workers/scaleset/controller_watcher.go @@ -6,8 +6,8 @@ import ( dbCommon "github.com/cloudbase/garm/database/common" "github.com/cloudbase/garm/params" + "github.com/cloudbase/garm/runner/common" "github.com/cloudbase/garm/util/github" - scaleSetCli "github.com/cloudbase/garm/util/github/scalesets" ) func (c *Controller) handleWatcherEvent(event dbCommon.ChangePayload) { @@ -38,7 +38,7 @@ func (c *Controller) handleScaleSet(event dbCommon.ChangePayload) { switch event.Operation { case dbCommon.CreateOperation: slog.DebugContext(c.ctx, "got create operation for scale set", "scale_set_id", scaleSet.ID, "scale_set_name", scaleSet.Name) - if err := c.handleScaleSetCreateOperation(scaleSet); err != nil { + if err := c.handleScaleSetCreateOperation(scaleSet, c.ghCli); err != nil { slog.With(slog.Any("error", err)).ErrorContext(c.ctx, "failed to handle scale set create operation") } case dbCommon.UpdateOperation: @@ -57,7 +57,7 @@ func (c *Controller) handleScaleSet(event dbCommon.ChangePayload) { } } -func (c *Controller) handleScaleSetCreateOperation(sSet params.ScaleSet) error { +func (c *Controller) handleScaleSetCreateOperation(sSet params.ScaleSet, ghCli common.GithubClient) error { c.mux.Lock() defer c.mux.Unlock() @@ -74,7 +74,7 @@ func (c *Controller) handleScaleSetCreateOperation(sSet params.ScaleSet) error { return fmt.Errorf("provider %s not found for scale set %s", sSet.ProviderName, sSet.Name) } - worker, err := NewWorker(c.ctx, c.store, sSet, provider) + worker, err := NewWorker(c.ctx, c.store, sSet, provider, ghCli) if err != nil { return fmt.Errorf("creating scale set worker: %w", err) } @@ -120,7 +120,7 @@ func (c *Controller) handleScaleSetUpdateOperation(sSet params.ScaleSet) error { // Some error may have occured when the scale set was first created, so we // attempt to create it after the user updated the scale set, hopefully // fixing the reason for the failure. - return c.handleScaleSetCreateOperation(sSet) + return c.handleScaleSetCreateOperation(sSet, c.ghCli) } // We let the watcher in the scale set worker handle the update operation. return nil @@ -140,8 +140,7 @@ func (c *Controller) handleCredentialsEvent(event dbCommon.ChangePayload) { defer c.mux.Unlock() if c.Entity.Credentials.ID != credentials.ID { - // credentials were swapped on the entity. We need to recompose the watcher - // filters. + // stale update event. return } c.Entity.Credentials = credentials @@ -190,15 +189,10 @@ func (c *Controller) updateAndBroadcastCredentials(entity params.GithubEntity) e return fmt.Errorf("creating github client: %w", err) } - setCli, err := scaleSetCli.NewClient(ghCli) - if err != nil { - return fmt.Errorf("creating scaleset client: %w", err) - } c.ghCli = ghCli - c.scaleSetCli = setCli for _, scaleSet := range c.ScaleSets { - if err := scaleSet.worker.SetGithubClient(ghCli, setCli); err != nil { + if err := scaleSet.worker.SetGithubClient(ghCli); err != nil { slog.ErrorContext(c.ctx, "setting github client on worker", "error", err) continue } diff --git a/workers/scaleset/interfaces.go b/workers/scaleset/interfaces.go new file mode 100644 index 00000000..365ac0be --- /dev/null +++ b/workers/scaleset/interfaces.go @@ -0,0 +1,12 @@ +package scaleset + +import ( + "github.com/cloudbase/garm/params" + "github.com/cloudbase/garm/util/github/scalesets" +) + +type scaleSetHelper interface { + ScaleSetCLI() *scalesets.ScaleSetClient + GetScaleSet() params.ScaleSet + Owner() string +} diff --git a/workers/scaleset/scaleset.go b/workers/scaleset/scaleset.go index 0592a8cf..c5e31b5d 100644 --- a/workers/scaleset/scaleset.go +++ b/workers/scaleset/scaleset.go @@ -2,34 +2,54 @@ package scaleset import ( "context" + "fmt" + "log/slog" "sync" + "time" - commonParams "github.com/cloudbase/garm-provider-common/params" dbCommon "github.com/cloudbase/garm/database/common" + "github.com/cloudbase/garm/database/watcher" "github.com/cloudbase/garm/params" "github.com/cloudbase/garm/runner/common" "github.com/cloudbase/garm/util/github/scalesets" ) -func NewWorker(ctx context.Context, store dbCommon.Store, scaleSet params.ScaleSet, provider common.Provider) (*Worker, error) { +func NewWorker(ctx context.Context, store dbCommon.Store, scaleSet params.ScaleSet, provider common.Provider, ghCli common.GithubClient) (*Worker, error) { + consumerID := fmt.Sprintf("scaleset-worker-%s-%d", scaleSet.Name, scaleSet.ID) + controllerInfo, err := store.ControllerInfo() + if err != nil { + return nil, fmt.Errorf("getting controller info: %w", err) + } + scaleSetCli, err := scalesets.NewClient(ghCli) + if err != nil { + return nil, fmt.Errorf("creating scale set client: %w", err) + } return &Worker{ - ctx: ctx, - store: store, - provider: provider, - Entity: scaleSet, + ctx: ctx, + controllerInfo: controllerInfo, + consumerID: consumerID, + store: store, + provider: provider, + Entity: scaleSet, + ghCli: ghCli, + scaleSetCli: scaleSetCli, }, nil } type Worker struct { - ctx context.Context + ctx context.Context + consumerID string + controllerInfo params.ControllerInfo provider common.Provider store dbCommon.Store Entity params.ScaleSet - tools []commonParams.RunnerApplicationDownload ghCli common.GithubClient scaleSetCli *scalesets.ScaleSetClient + consumer dbCommon.Consumer + + listener *scaleSetListener mux sync.Mutex running bool @@ -37,40 +57,173 @@ type Worker struct { } func (w *Worker) Stop() error { - return nil -} - -func (w *Worker) Start() error { + slog.DebugContext(w.ctx, "stopping scale set worker", "scale_set", w.consumerID) w.mux.Lock() defer w.mux.Unlock() + if !w.running { + return nil + } + + w.consumer.Close() + w.running = false + if w.quit != nil { + close(w.quit) + w.quit = nil + } + w.listener.Stop() + w.listener = nil + return nil +} + +func (w *Worker) Start() (err error) { + slog.DebugContext(w.ctx, "starting scale set worker", "scale_set", w.consumerID) + w.mux.Lock() + defer w.mux.Unlock() + + if w.running { + return nil + } + + consumer, err := watcher.RegisterConsumer( + w.ctx, w.consumerID, + watcher.WithAll( + watcher.WithScaleSetFilter(w.Entity), + watcher.WithOperationTypeFilter(dbCommon.UpdateOperation), + ), + ) + if err != nil { + return fmt.Errorf("error registering consumer: %w", err) + } + defer func() { + if err != nil { + consumer.Close() + w.consumer = nil + } + }() + + slog.DebugContext(w.ctx, "creating scale set listener") + listener := newListener(w.ctx, w) + + slog.DebugContext(w.ctx, "starting scale set listener") + if err := listener.Start(); err != nil { + return fmt.Errorf("error starting listener: %w", err) + } + + w.listener = listener + w.consumer = consumer + w.running = true + w.quit = make(chan struct{}) + + slog.DebugContext(w.ctx, "starting scale set worker loops", "scale_set", w.consumerID) go w.loop() + go w.keepListenerAlive() return nil } -func (w *Worker) SetTools(tools []commonParams.RunnerApplicationDownload) { +func (w *Worker) SetGithubClient(client common.GithubClient) error { w.mux.Lock() defer w.mux.Unlock() - w.tools = tools -} - -func (w *Worker) SetGithubClient(client common.GithubClient, scaleSetCli *scalesets.ScaleSetClient) error { - w.mux.Lock() - defer w.mux.Unlock() - - // TODO: - // * stop current listener if any + if err := w.listener.Stop(); err != nil { + slog.ErrorContext(w.ctx, "error stopping listener", "error", err) + } w.ghCli = client + scaleSetCli, err := scalesets.NewClient(client) + if err != nil { + return fmt.Errorf("error creating scale set client: %w", err) + } w.scaleSetCli = scaleSetCli - - // TODO: - // * start new listener - return nil } +func (w *Worker) handleEvent(event dbCommon.ChangePayload) { + scaleSet, ok := event.Payload.(params.ScaleSet) + if !ok { + slog.ErrorContext(w.ctx, "invalid payload for scale set type", "scale_set_type", event.EntityType, "payload", event.Payload) + return + } + switch event.Operation { + case dbCommon.UpdateOperation: + slog.DebugContext(w.ctx, "got update operation") + w.mux.Lock() + w.Entity = scaleSet + w.mux.Unlock() + default: + slog.DebugContext(w.ctx, "invalid operation type; ignoring", "operation_type", event.Operation) + } +} + func (w *Worker) loop() { + defer w.Stop() + for { + select { + case <-w.quit: + return + case event, ok := <-w.consumer.Watch(): + if !ok { + slog.InfoContext(w.ctx, "consumer channel closed") + return + } + go w.handleEvent(event) + case <-w.ctx.Done(): + slog.DebugContext(w.ctx, "context done") + return + } + } +} + +func (w *Worker) sleepWithCancel(sleepTime time.Duration) (canceled bool) { + ticker := time.NewTicker(sleepTime) + defer ticker.Stop() + + select { + case <-ticker.C: + return false + case <-w.quit: + return true + case <-w.ctx.Done(): + return true + } +} + +func (w *Worker) keepListenerAlive() { + var backoff time.Duration + for { + select { + case <-w.quit: + return + case <-w.ctx.Done(): + return + case <-w.listener.Wait(): + slog.DebugContext(w.ctx, "listener is stopped; attempting to restart") + for { + w.mux.Lock() + w.listener.Stop() //cleanup + slog.DebugContext(w.ctx, "attempting to restart") + if err := w.listener.Start(); err != nil { + w.mux.Unlock() + slog.ErrorContext(w.ctx, "error restarting listener", "error", err) + if backoff > 60*time.Second { + backoff = 60 * time.Second + } else if backoff == 0 { + backoff = 5 * time.Second + slog.InfoContext(w.ctx, "backing off restart attempt", "backoff", backoff) + } else { + backoff *= 2 + } + slog.ErrorContext(w.ctx, "error restarting listener", "error", err, "backoff", backoff) + if canceled := w.sleepWithCancel(backoff); canceled { + slog.DebugContext(w.ctx, "listener restart canceled") + return + } + continue + } + w.mux.Unlock() + break + } + } + } } diff --git a/workers/scaleset/scaleset_helper.go b/workers/scaleset/scaleset_helper.go new file mode 100644 index 00000000..8cfa9264 --- /dev/null +++ b/workers/scaleset/scaleset_helper.go @@ -0,0 +1,20 @@ +package scaleset + +import ( + "fmt" + + "github.com/cloudbase/garm/params" + "github.com/cloudbase/garm/util/github/scalesets" +) + +func (w *Worker) ScaleSetCLI() *scalesets.ScaleSetClient { + return w.scaleSetCli +} + +func (w *Worker) GetScaleSet() params.ScaleSet { + return w.Entity +} + +func (w *Worker) Owner() string { + return fmt.Sprintf("garm-%s", w.controllerInfo.ControllerID) +} diff --git a/workers/scaleset/scaleset_listener.go b/workers/scaleset/scaleset_listener.go new file mode 100644 index 00000000..f92eaff1 --- /dev/null +++ b/workers/scaleset/scaleset_listener.go @@ -0,0 +1,134 @@ +package scaleset + +import ( + "context" + "errors" + "fmt" + "log/slog" + "sync" + + "github.com/cloudbase/garm/params" + "github.com/cloudbase/garm/util/github/scalesets" +) + +func newListener(ctx context.Context, scaleSetHelper scaleSetHelper) *scaleSetListener { + return &scaleSetListener{ + ctx: ctx, + scaleSetHelper: scaleSetHelper, + } +} + +type scaleSetListener struct { + // ctx is the global context for the worker + ctx context.Context + // listenerCtx is the context for the listener. We pass this + // context to GetMessages() which blocks until a message is + // available. We need to be able to cancel that longpoll request + // independent of the worker context, in case we need to restart + // the listener without restarting the worker. + listenerCtx context.Context + cancelFunc context.CancelFunc + lastMessageID int64 + + scaleSetHelper scaleSetHelper + messageSession *scalesets.MessageSession + + mux sync.Mutex + running bool + quit chan struct{} +} + +func (l *scaleSetListener) Start() error { + slog.DebugContext(l.ctx, "starting scale set listener", "scale_set", l.scaleSetHelper.GetScaleSet().ScaleSetID) + l.mux.Lock() + defer l.mux.Unlock() + + l.listenerCtx, l.cancelFunc = context.WithCancel(context.Background()) + scaleSet := l.scaleSetHelper.GetScaleSet() + slog.DebugContext(l.ctx, "creating new message session", "scale_set", scaleSet.ScaleSetID) + session, err := l.scaleSetHelper.ScaleSetCLI().CreateMessageSession( + l.listenerCtx, scaleSet.ScaleSetID, + l.scaleSetHelper.Owner(), + ) + if err != nil { + return fmt.Errorf("creating message session: %w", err) + } + l.messageSession = session + l.quit = make(chan struct{}) + l.running = true + go l.loop() + + return nil +} + +func (l *scaleSetListener) Stop() error { + l.mux.Lock() + defer l.mux.Unlock() + + if !l.running { + return nil + } + + if l.messageSession != nil { + slog.DebugContext(l.ctx, "closing message session", "scale_set", l.scaleSetHelper.GetScaleSet().ScaleSetID) + if err := l.messageSession.Close(); err != nil { + slog.ErrorContext(l.ctx, "closing message session", "error", err) + } + if err := l.scaleSetHelper.ScaleSetCLI().DeleteMessageSession(context.Background(), l.messageSession); err != nil { + slog.ErrorContext(l.ctx, "error deleting message session", "error", err) + } + } + l.cancelFunc() + l.messageSession.Close() + l.running = false + close(l.quit) + return nil +} + +func (l *scaleSetListener) handleSessionMessage(msg params.RunnerScaleSetMessage) { + l.mux.Lock() + defer l.mux.Unlock() + body, err := msg.GetJobsFromBody() + if err != nil { + slog.ErrorContext(l.ctx, "getting jobs from body", "error", err) + return + } + slog.InfoContext(l.ctx, "handling message", "message", msg, "body", body) + l.lastMessageID = msg.MessageID +} + +func (l *scaleSetListener) loop() { + defer l.Stop() + + slog.DebugContext(l.ctx, "starting scale set listener loop", "scale_set", l.scaleSetHelper.GetScaleSet().ScaleSetID) + for { + select { + case <-l.quit: + return + case <-l.listenerCtx.Done(): + slog.DebugContext(l.ctx, "stopping scale set listener") + return + case <-l.ctx.Done(): + slog.DebugContext(l.ctx, "scaleset worker has stopped") + return + default: + slog.DebugContext(l.ctx, "getting message") + msg, err := l.messageSession.GetMessage( + l.listenerCtx, l.lastMessageID, l.scaleSetHelper.GetScaleSet().MaxRunners) + if err != nil { + if !errors.Is(err, context.Canceled) { + slog.ErrorContext(l.ctx, "getting message", "error", err) + } + return + } + l.handleSessionMessage(msg) + } + } +} + +func (l *scaleSetListener) Wait() <-chan struct{} { + if !l.running { + return nil + } + return l.listenerCtx.Done() +}