diff --git a/cmd/garm/main.go b/cmd/garm/main.go index ebb30d55..3ffcdc1f 100644 --- a/cmd/garm/main.go +++ b/cmd/garm/main.go @@ -49,6 +49,7 @@ import ( garmUtil "github.com/cloudbase/garm/util" "github.com/cloudbase/garm/util/appdefaults" "github.com/cloudbase/garm/websocket" + "github.com/cloudbase/garm/workers/entity" ) var ( @@ -230,6 +231,14 @@ func main() { log.Fatal(err) } + entityController, err := entity.NewController(ctx, db, *cfg) + if err != nil { + log.Fatalf("failed to create entity controller: %+v", err) + } + if err := entityController.Start(); err != nil { + log.Fatalf("failed to start entity controller: %+v", err) + } + runner, err := runner.NewRunner(ctx, *cfg, db) if err != nil { log.Fatalf("failed to create controller: %+v", err) @@ -326,6 +335,11 @@ func main() { <-ctx.Done() + slog.InfoContext(ctx, "shutting down entity controller") + if err := entityController.Stop(); err != nil { + slog.With(slog.Any("error", err)).ErrorContext(ctx, "failed to stop entity controller") + } + shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 60*time.Second) defer shutdownCancel() if err := srv.Shutdown(shutdownCtx); err != nil { diff --git a/database/sql/models.go b/database/sql/models.go index d040760c..c44baceb 100644 --- a/database/sql/models.go +++ b/database/sql/models.go @@ -96,6 +96,8 @@ type ScaleSet struct { // ScaleSetID is the github ID of the scale set. This field may not be set if // the scale set was ceated in GARM but has not yet been created in GitHub. + // The scale set ID is also not globally unique. It is only unique within the context + // of an entity. ScaleSetID int `gorm:"index:idx_scale_set"` Name string `gorm:"index:idx_name"` DisableUpdate bool diff --git a/params/github.go b/params/github.go index 81540683..2d132d50 100644 --- a/params/github.go +++ b/params/github.go @@ -15,6 +15,8 @@ package params import ( + "encoding/base64" + "encoding/json" "fmt" "net/url" "time" @@ -420,6 +422,21 @@ type RunnerScaleSetJitRunnerConfig struct { EncodedJITConfig string `json:"encodedJITConfig"` } +func (r RunnerScaleSetJitRunnerConfig) DecodedJITConfig() (map[string]string, error) { + if r.EncodedJITConfig == "" { + return nil, fmt.Errorf("no encoded JIT config specified") + } + decoded, err := base64.StdEncoding.DecodeString(r.EncodedJITConfig) + if err != nil { + return nil, fmt.Errorf("failed to decode JIT config: %w", err) + } + jitConfig := make(map[string]string) + if err := json.Unmarshal(decoded, &jitConfig); err != nil { + return nil, fmt.Errorf("failed to unmarshal JIT config: %w", err) + } + return jitConfig, nil +} + type RunnerReferenceList struct { Count int `json:"count"` RunnerReferences []RunnerReference `json:"value"` diff --git a/util/github/client.go b/util/github/client.go index f8c04d28..ae0b6485 100644 --- a/util/github/client.go +++ b/util/github/client.go @@ -452,8 +452,8 @@ func Client(ctx context.Context, entity params.GithubEntity) (common.GithubClien return nil, errors.Wrap(err, "fetching http client") } - slog.InfoContext( - ctx, "creating client with", + slog.DebugContext( + ctx, "creating client for entity", "entity", entity.String(), "base_url", entity.Credentials.APIBaseURL, "upload_url", entity.Credentials.UploadBaseURL) diff --git a/workers/common/interfaces.go b/workers/common/interfaces.go new file mode 100644 index 00000000..4791a500 --- /dev/null +++ b/workers/common/interfaces.go @@ -0,0 +1,9 @@ +package common + +import ( + commonParams "github.com/cloudbase/garm-provider-common/params" +) + +type ToolsGetter interface { + GetTools() ([]commonParams.RunnerApplicationDownload, error) +} diff --git a/workers/entity/controller.go b/workers/entity/controller.go new file mode 100644 index 00000000..c1547302 --- /dev/null +++ b/workers/entity/controller.go @@ -0,0 +1,208 @@ +package entity + +import ( + "context" + "fmt" + "log/slog" + "sync" + + "github.com/cloudbase/garm/auth" + "github.com/cloudbase/garm/config" + dbCommon "github.com/cloudbase/garm/database/common" + "github.com/cloudbase/garm/database/watcher" + "github.com/cloudbase/garm/runner/common" + "github.com/cloudbase/garm/runner/providers" + garmUtil "github.com/cloudbase/garm/util" +) + +func NewController(ctx context.Context, store dbCommon.Store, cfg config.Config) (*Controller, error) { + consumerID := "entity-controller" + ctrlID, err := store.ControllerInfo() + if err != nil { + return nil, fmt.Errorf("getting controller info: %w", err) + } + + ctx = garmUtil.WithSlogContext( + ctx, + slog.Any("worker", consumerID)) + ctx = auth.GetAdminContext(ctx) + + providers, err := providers.LoadProvidersFromConfig(ctx, cfg, ctrlID.ControllerID.String()) + if err != nil { + return nil, fmt.Errorf("loading providers: %w", err) + } + + return &Controller{ + consumerID: consumerID, + ctx: ctx, + store: store, + providers: providers, + Entities: make(map[string]*Worker), + }, nil +} + +type Controller struct { + consumerID string + ctx context.Context + + consumer dbCommon.Consumer + store dbCommon.Store + + providers map[string]common.Provider + Entities map[string]*Worker + + running bool + quit chan struct{} + + mux sync.Mutex +} + +func (c *Controller) loadAllRepositories() error { + c.mux.Lock() + defer c.mux.Unlock() + repos, err := c.store.ListRepositories(c.ctx) + if err != nil { + return fmt.Errorf("fetching repositories: %w", err) + } + + for _, repo := range repos { + entity, err := repo.GetEntity() + if err != nil { + return fmt.Errorf("getting entity: %w", err) + } + worker, err := NewWorker(c.ctx, c.store, entity, c.providers) + if err != nil { + return fmt.Errorf("creating worker: %w", err) + } + slog.DebugContext(c.ctx, "starting entity worker", "entity_id", entity.ID, "entity_type", entity.EntityType) + if err := worker.Start(); err != nil { + return fmt.Errorf("starting worker: %w", err) + } + c.Entities[entity.ID] = worker + } + return nil +} + +func (c *Controller) loadAllOrganizations() error { + c.mux.Lock() + defer c.mux.Unlock() + orgs, err := c.store.ListOrganizations(c.ctx) + if err != nil { + return fmt.Errorf("fetching organizations: %w", err) + } + for _, org := range orgs { + entity, err := org.GetEntity() + if err != nil { + return fmt.Errorf("getting entity: %w", err) + } + worker, err := NewWorker(c.ctx, c.store, entity, c.providers) + if err != nil { + return fmt.Errorf("creating worker: %w", err) + } + slog.DebugContext(c.ctx, "starting entity worker", "entity_id", entity.ID, "entity_type", entity.EntityType) + if err := worker.Start(); err != nil { + return fmt.Errorf("starting worker: %w", err) + } + c.Entities[entity.ID] = worker + } + return nil +} + +func (c *Controller) loadAllEnterprises() error { + c.mux.Lock() + defer c.mux.Unlock() + enterprises, err := c.store.ListEnterprises(c.ctx) + if err != nil { + return fmt.Errorf("fetching enterprises: %w", err) + } + for _, enterprise := range enterprises { + entity, err := enterprise.GetEntity() + if err != nil { + return fmt.Errorf("getting entity: %w", err) + } + worker, err := NewWorker(c.ctx, c.store, entity, c.providers) + if err != nil { + return fmt.Errorf("creating worker: %w", err) + } + slog.DebugContext(c.ctx, "starting entity worker", "entity_id", entity.ID, "entity_type", entity.EntityType) + if err := worker.Start(); err != nil { + return fmt.Errorf("starting worker: %w", err) + } + c.Entities[entity.ID] = worker + } + return nil +} + +func (c *Controller) Start() error { + c.mux.Lock() + if c.running { + c.mux.Unlock() + return nil + } + c.mux.Unlock() + + if err := c.loadAllRepositories(); err != nil { + return fmt.Errorf("loading repositories: %w", err) + } + if err := c.loadAllOrganizations(); err != nil { + return fmt.Errorf("loading organizations: %w", err) + } + if err := c.loadAllEnterprises(); err != nil { + return fmt.Errorf("loading enterprises: %w", err) + } + + consumer, err := watcher.RegisterConsumer( + c.ctx, c.consumerID, + composeControllerWatcherFilters(), + ) + + if err != nil { + return fmt.Errorf("failed to create consumer for entity controller: %w", err) + } + + c.mux.Lock() + c.consumer = consumer + c.running = true + c.quit = make(chan struct{}) + c.mux.Unlock() + + go c.loop() + + return nil +} + +func (c *Controller) Stop() error { + c.mux.Lock() + defer c.mux.Unlock() + if !c.running { + return nil + } + slog.DebugContext(c.ctx, "stopping entity controller") + + for entityID, worker := range c.Entities { + if err := worker.Stop(); err != nil { + slog.ErrorContext(c.ctx, "stopping worker for entity", "entity_id", entityID, "error", err) + } + } + + c.running = false + close(c.quit) + c.quit = nil + c.consumer.Close() + return nil +} + +func (c *Controller) loop() { + defer c.Stop() + for { + select { + case payload := <-c.consumer.Watch(): + slog.InfoContext(c.ctx, "received payload", slog.Any("payload", payload)) + go c.handleWatcherEvent(payload) + case <-c.ctx.Done(): + return + case <-c.quit: + return + } + } +} diff --git a/workers/entity/controller_watcher.go b/workers/entity/controller_watcher.go new file mode 100644 index 00000000..ace63702 --- /dev/null +++ b/workers/entity/controller_watcher.go @@ -0,0 +1,98 @@ +package entity + +import ( + "log/slog" + + dbCommon "github.com/cloudbase/garm/database/common" + "github.com/cloudbase/garm/params" +) + +func (c *Controller) handleWatcherEvent(event dbCommon.ChangePayload) { + var entityGetter params.EntityGetter + switch event.EntityType { + case dbCommon.RepositoryEntityType: + slog.DebugContext(c.ctx, "got repository payload event") + repo, ok := event.Payload.(params.Repository) + if !ok { + slog.ErrorContext(c.ctx, "invalid payload for entity type", "entity_type", event.EntityType, "payload", event.Payload) + } + entityGetter = repo + case dbCommon.OrganizationEntityType: + slog.DebugContext(c.ctx, "got organization payload event") + org, ok := event.Payload.(params.Organization) + if !ok { + slog.ErrorContext(c.ctx, "invalid payload for entity type", "entity_type", event.EntityType, "payload", event.Payload) + } + entityGetter = org + case dbCommon.EnterpriseEntityType: + slog.DebugContext(c.ctx, "got enterprise payload event") + ent, ok := event.Payload.(params.Enterprise) + if !ok { + slog.ErrorContext(c.ctx, "invalid payload for entity type", "entity_type", event.EntityType, "payload", event.Payload) + } + entityGetter = ent + default: + slog.ErrorContext(c.ctx, "invalid entity type", "entity_type", event.EntityType) + return + } + + if entityGetter == nil { + return + } + + switch event.Operation { + case dbCommon.CreateOperation: + slog.DebugContext(c.ctx, "got create operation") + c.handleWatcherCreateOperation(entityGetter, event) + case dbCommon.DeleteOperation: + slog.DebugContext(c.ctx, "got delete operation") + c.handleWatcherDeleteOperation(entityGetter, event) + default: + slog.ErrorContext(c.ctx, "invalid operation type", "operation_type", event.Operation) + return + } +} + +func (c *Controller) handleWatcherCreateOperation(entityGetter params.EntityGetter, event dbCommon.ChangePayload) { + c.mux.Lock() + defer c.mux.Unlock() + entity, err := entityGetter.GetEntity() + if err != nil { + slog.ErrorContext(c.ctx, "getting entity from repository", "entity_type", event.EntityType, "payload", event.Payload, "error", err) + return + } + worker, err := NewWorker(c.ctx, c.store, entity, c.providers) + if err != nil { + slog.ErrorContext(c.ctx, "creating worker from repository", "entity_type", event.EntityType, "payload", event.Payload, "error", err) + return + } + + slog.InfoContext(c.ctx, "starting entity worker", "entity_id", entity.ID, "entity_type", entity.EntityType) + if err := worker.Start(); err != nil { + slog.ErrorContext(c.ctx, "starting worker", "entity_id", entity.ID, "error", err) + return + } + + c.Entities[entity.ID] = worker +} + +func (c *Controller) handleWatcherDeleteOperation(entityGetter params.EntityGetter, event dbCommon.ChangePayload) { + c.mux.Lock() + defer c.mux.Unlock() + entity, err := entityGetter.GetEntity() + if err != nil { + slog.ErrorContext(c.ctx, "getting entity from repository", "entity_type", event.EntityType, "payload", event.Payload, "error", err) + return + } + worker, ok := c.Entities[entity.ID] + if !ok { + slog.InfoContext(c.ctx, "entity not found in worker list", "entity_id", entity.ID) + return + } + slog.InfoContext(c.ctx, "stopping entity worker", "entity_id", entity.ID, "entity_type", entity.EntityType) + if err := worker.Stop(); err != nil { + slog.ErrorContext(c.ctx, "stopping worker", "entity_id", entity.ID, "error", err) + return + } + delete(c.Entities, entity.ID) +} diff --git a/workers/entity/util.go b/workers/entity/util.go new file mode 100644 index 00000000..28b9f955 --- /dev/null +++ b/workers/entity/util.go @@ -0,0 +1,35 @@ +package entity + +import ( + dbCommon "github.com/cloudbase/garm/database/common" + "github.com/cloudbase/garm/database/watcher" + "github.com/cloudbase/garm/params" +) + +func composeControllerWatcherFilters() dbCommon.PayloadFilterFunc { + return watcher.WithAll( + watcher.WithAny( + watcher.WithEntityTypeFilter(dbCommon.RepositoryEntityType), + watcher.WithEntityTypeFilter(dbCommon.OrganizationEntityType), + watcher.WithEntityTypeFilter(dbCommon.EnterpriseEntityType), + ), + watcher.WithAny( + watcher.WithOperationTypeFilter(dbCommon.CreateOperation), + watcher.WithOperationTypeFilter(dbCommon.DeleteOperation), + ), + ) +} + +func composeWorkerWatcherFilters(entity params.GithubEntity) dbCommon.PayloadFilterFunc { + return watcher.WithAny( + watcher.WithAll( + watcher.WithEntityFilter(entity), + watcher.WithOperationTypeFilter(dbCommon.UpdateOperation), + ), + // Watch for credentials updates. + watcher.WithAll( + watcher.WithGithubCredentialsFilter(entity.Credentials), + watcher.WithOperationTypeFilter(dbCommon.UpdateOperation), + ), + ) +} diff --git a/workers/entity/worker.go b/workers/entity/worker.go new file mode 100644 index 00000000..2a8591cb --- /dev/null +++ b/workers/entity/worker.go @@ -0,0 +1,110 @@ +package entity + +import ( + "context" + "fmt" + "log/slog" + "sync" + + dbCommon "github.com/cloudbase/garm/database/common" + "github.com/cloudbase/garm/database/watcher" + "github.com/cloudbase/garm/params" + "github.com/cloudbase/garm/runner/common" + garmUtil "github.com/cloudbase/garm/util" + "github.com/cloudbase/garm/workers/scaleset" +) + +func NewWorker(ctx context.Context, store dbCommon.Store, entity params.GithubEntity, providers map[string]common.Provider) (*Worker, error) { + consumerID := fmt.Sprintf("entity-worker-%s", entity.String()) + + ctx = garmUtil.WithSlogContext( + ctx, + slog.Any("worker", consumerID)) + + return &Worker{ + ctx: ctx, + consumerID: consumerID, + store: store, + Entity: entity, + providers: providers, + }, nil +} + +type Worker struct { + ctx context.Context + consumerID string + + consumer dbCommon.Consumer + store dbCommon.Store + + Entity params.GithubEntity + providers map[string]common.Provider + scaleSetController *scaleset.Controller + // TODO(gabriel-samfira): replace current pool manager with something similar + // to the scale set controller. + // poolManager *pool.Controller + + mux sync.Mutex + running bool + quit chan struct{} +} + +func (w *Worker) Stop() error { + w.mux.Lock() + defer w.mux.Unlock() + + if !w.running { + return nil + } + slog.DebugContext(w.ctx, "stopping entity worker") + + if err := w.scaleSetController.Stop(); err != nil { + return fmt.Errorf("stopping scale set controller: %w", err) + } + w.scaleSetController = nil + + w.running = false + close(w.quit) + w.consumer.Close() + return nil +} + +func (w *Worker) Start() error { + w.mux.Lock() + defer w.mux.Unlock() + + scaleSetController, err := scaleset.NewController(w.ctx, w.store, w.Entity, w.providers) + if err != nil { + return fmt.Errorf("creating scale set controller: %w", err) + } + w.scaleSetController = scaleSetController + + consumer, err := watcher.RegisterConsumer( + w.ctx, w.consumerID, + composeWorkerWatcherFilters(w.Entity), + ) + if err != nil { + return fmt.Errorf("registering consumer: %w", err) + } + w.consumer = consumer + + w.running = true + w.quit = make(chan struct{}) + go w.loop() + return nil +} + +func (w *Worker) loop() { + defer w.Stop() + for { + select { + case payload := <-w.consumer.Watch(): + slog.InfoContext(w.ctx, "received payload", slog.Any("payload", payload)) + go w.handleWorkerWatcherEvent(payload) + case <-w.ctx.Done(): + return + case <-w.quit: + return + } + } +} diff --git a/workers/entity/worker_watcher.go b/workers/entity/worker_watcher.go new file mode 100644 index 00000000..7e00112e --- /dev/null +++ b/workers/entity/worker_watcher.go @@ -0,0 +1,76 @@ +package entity + +import ( + "log/slog" + + dbCommon "github.com/cloudbase/garm/database/common" + "github.com/cloudbase/garm/params" +) + +func (w *Worker) handleWorkerWatcherEvent(event dbCommon.ChangePayload) { + // This worker may be for a repo, org or enterprise. React only to the entity type + // that this worker is for. + entityType := dbCommon.DatabaseEntityType(w.Entity.EntityType) + switch event.EntityType { + case entityType: + entityGetter, ok := event.Payload.(params.EntityGetter) + if !ok { + slog.ErrorContext(w.ctx, "invalid payload for entity type", "entity_type", event.EntityType, "payload", event.Payload) + return + } + entity, err := entityGetter.GetEntity() + if err != nil { + slog.ErrorContext(w.ctx, "getting entity from repository", "entity_type", event.EntityType, "payload", event.Payload, "error", err) + return + } + w.handleEntityEventPayload(entity, event) + return + case dbCommon.GithubCredentialsEntityType: + slog.DebugContext(w.ctx, "got github credentials payload event") + credentials, ok := event.Payload.(params.GithubCredentials) + if !ok { + slog.ErrorContext(w.ctx, "invalid payload for entity type", "entity_type", event.EntityType, "payload", event.Payload) + return + } + w.handleEntityCredentialsEventPayload(credentials, event) + default: + slog.DebugContext(w.ctx, "invalid entity type; ignoring", "entity_type", event.EntityType) + } +} + +func (w *Worker) handleEntityEventPayload(entity params.GithubEntity, event dbCommon.ChangePayload) { + switch event.Operation { + case dbCommon.UpdateOperation: + slog.DebugContext(w.ctx, "got update operation") + w.mux.Lock() + defer w.mux.Unlock() + + credentials := entity.Credentials + if w.Entity.Credentials.ID != credentials.ID { + // credentials were swapped on the entity. We need to recompose the watcher + // filters. + w.consumer.SetFilters(composeWorkerWatcherFilters(entity)) + } + w.Entity = entity + default: + slog.ErrorContext(w.ctx, "invalid operation type", "operation_type", event.Operation) + } +} + +func (w *Worker) handleEntityCredentialsEventPayload(credentials params.GithubCredentials, event dbCommon.ChangePayload) { + switch event.Operation { + case dbCommon.UpdateOperation: + slog.DebugContext(w.ctx, "got delete operation") + w.mux.Lock() + defer w.mux.Unlock() + if w.Entity.Credentials.ID != credentials.ID { + // The channel is buffered. We may get an old update. If credentials get updated + // immediately after they are swapped on the entity, we may still get an update + // pushed to the channel before the filters are swapped. We can ignore the update. + return + } + w.Entity.Credentials = credentials + default: + slog.ErrorContext(w.ctx, "invalid operation type", "operation_type", event.Operation) + } +} diff --git a/workers/scaleset/controller.go b/workers/scaleset/controller.go new file mode 100644 index 00000000..de426621 --- /dev/null +++ b/workers/scaleset/controller.go @@ -0,0 +1,237 @@ +package scaleset + +import ( + "context" + "fmt" + "log/slog" + "sync" + "time" + + commonParams "github.com/cloudbase/garm-provider-common/params" + + dbCommon "github.com/cloudbase/garm/database/common" + "github.com/cloudbase/garm/database/watcher" + "github.com/cloudbase/garm/params" + "github.com/cloudbase/garm/runner/common" + garmUtil "github.com/cloudbase/garm/util" + "github.com/cloudbase/garm/util/github" + "github.com/cloudbase/garm/util/github/scalesets" +) + +func NewController(ctx context.Context, store dbCommon.Store, entity params.GithubEntity, providers map[string]common.Provider) (*Controller, error) { + + consumerID := fmt.Sprintf("scaleset-worker-%s", entity.String()) + + ctx = garmUtil.WithSlogContext( + ctx, + slog.Any("worker", consumerID)) + + return &Controller{ + ctx: ctx, + consumerID: consumerID, + ScaleSets: make(map[uint]*scaleSet), + Entity: entity, + providers: providers, + store: store, + statusUpdates: make(chan scaleSetStatus, 10), + }, nil +} + +type scaleSet struct { + scaleSet params.ScaleSet + status scaleSetStatus + worker *Worker + + mux sync.Mutex +} + +func (s *scaleSet) updateStatus(status scaleSetStatus) { + s.mux.Lock() + defer s.mux.Unlock() + + s.status = status +} + +func (s *scaleSet) Stop() error { + s.mux.Lock() + defer s.mux.Unlock() + + if s.worker == nil { + return nil + } + + return s.worker.Stop() +} + +// Controller is responsible for managing scale sets for one github entity. +type Controller struct { + ctx context.Context + consumerID string + + ScaleSets map[uint]*scaleSet + + Entity params.GithubEntity + + consumer dbCommon.Consumer + store dbCommon.Store + providers map[string]common.Provider + + tools []commonParams.RunnerApplicationDownload + ghCli common.GithubClient + scaleSetCli *scalesets.ScaleSetClient + forgeCredsAreValid bool + + statusUpdates chan scaleSetStatus + + mux sync.Mutex + running bool + quit chan struct{} +} + +func (c *Controller) loadAllScaleSets() error { + scaleSets, err := c.store.ListEntityScaleSets(c.ctx, c.Entity) + if err != nil { + return fmt.Errorf("listing scale sets: %w", err) + } + + for _, sSet := range scaleSets { + if err := c.handleScaleSetCreateOperation(sSet); err != nil { + slog.With(slog.Any("error", err)).ErrorContext(c.ctx, "failed to handle scale set create operation") + continue + } + } + return nil +} + +func (c *Controller) Start() (err error) { + c.mux.Lock() + if c.running { + c.mux.Unlock() + return nil + } + c.mux.Unlock() + + if err := c.loadAllScaleSets(); err != nil { + return fmt.Errorf("loading all scale sets: %w", err) + } + + ghCli, err := github.Client(c.ctx, c.Entity) + if err != nil { + return fmt.Errorf("creating github client: %w", err) + } + + consumer, err := watcher.RegisterConsumer( + c.ctx, c.consumerID, + composeControllerWatcherFilters(c.Entity), + ) + if err != nil { + return fmt.Errorf("registering consumer: %w", err) + } + + c.mux.Lock() + c.ghCli = ghCli + c.consumer = consumer + c.running = true + c.quit = make(chan struct{}) + c.mux.Unlock() + + go c.loop() + return nil +} + +func (c *Controller) Stop() error { + c.mux.Lock() + defer c.mux.Unlock() + + if !c.running { + return nil + } + slog.DebugContext(c.ctx, "stopping scaleset controller", "entity", c.Entity.String()) + + for scaleSetID, scaleSet := range c.ScaleSets { + if err := scaleSet.Stop(); err != nil { + slog.ErrorContext(c.ctx, "stopping worker for scale set", "scale_set_id", scaleSetID, "error", err) + continue + } + delete(c.ScaleSets, scaleSetID) + } + + c.running = false + close(c.quit) + c.quit = nil + close(c.statusUpdates) + c.statusUpdates = nil + c.consumer.Close() + + return nil +} + +func (c *Controller) updateTools() error { + c.mux.Lock() + defer c.mux.Unlock() + + tools, err := garmUtil.FetchTools(c.ctx, c.ghCli) + if err != nil { + slog.With(slog.Any("error", err)).ErrorContext( + c.ctx, "failed to update tools for entity", "entity", c.Entity.String()) + c.forgeCredsAreValid = false + return fmt.Errorf("failed to update tools for entity %s: %w", c.Entity.String(), err) + } + c.forgeCredsAreValid = true + c.tools = tools + return nil +} + +func (c *Controller) handleScaleSetStatusUpdates(status scaleSetStatus) { + if status.scaleSet.ID == 0 { + slog.DebugContext(c.ctx, "invalid scale set ID; ignoring") + return + } + + scaleSet, ok := c.ScaleSets[status.scaleSet.ID] + if !ok { + slog.DebugContext(c.ctx, "scale set not found; ignoring") + return + } + + scaleSet.updateStatus(status) +} + +func (c *Controller) loop() { + defer c.Stop() + updateToolsTicker := time.NewTicker(common.PoolToolUpdateInterval) + initialToolUpdate := make(chan struct{}, 1) + go func() { + slog.Info("running initial tool update") + if err := c.updateTools(); err != nil { + slog.With(slog.Any("error", err)).Error("failed to update tools") + } + initialToolUpdate <- struct{}{} + }() + + for { + select { + case payload := <-c.consumer.Watch(): + slog.InfoContext(c.ctx, "received payload", slog.Any("payload", payload)) + go c.handleWatcherEvent(payload) + case <-c.ctx.Done(): + return + case _, ok := <-initialToolUpdate: + if ok { + // channel received the initial update slug. We can close it now. + close(initialToolUpdate) + } + case update, ok := <-c.statusUpdates: + if !ok { + return + } + go c.handleScaleSetStatusUpdates(update) + case <-updateToolsTicker.C: + if err := c.updateTools(); err != nil { + slog.With(slog.Any("error", err)).Error("failed to update tools") + } + case <-c.quit: + return + } + } +} diff --git a/workers/scaleset/controller_watcher.go b/workers/scaleset/controller_watcher.go new file mode 100644 index 00000000..4b347f59 --- /dev/null +++ b/workers/scaleset/controller_watcher.go @@ -0,0 +1,207 @@ +package scaleset + +import ( + "fmt" + "log/slog" + + dbCommon "github.com/cloudbase/garm/database/common" + "github.com/cloudbase/garm/params" + "github.com/cloudbase/garm/util/github" + scaleSetCli "github.com/cloudbase/garm/util/github/scalesets" +) + +func (c *Controller) handleWatcherEvent(event dbCommon.ChangePayload) { + entityType := dbCommon.DatabaseEntityType(c.Entity.EntityType) + switch event.EntityType { + case dbCommon.ScaleSetEntityType: + slog.DebugContext(c.ctx, "got scale set payload event") + c.handleScaleSet(event) + case entityType: + slog.DebugContext(c.ctx, "got entity payload event") + c.handleEntityEvent(event) + case dbCommon.GithubCredentialsEntityType: + slog.DebugContext(c.ctx, "got github credentials payload event") + c.handleCredentialsEvent(event) + default: + slog.ErrorContext(c.ctx, "invalid entity type", "entity_type", event.EntityType) + return + } +} + +func (c *Controller) handleScaleSet(event dbCommon.ChangePayload) { + scaleSet, ok := event.Payload.(params.ScaleSet) + if !ok { + slog.ErrorContext(c.ctx, "invalid payload for entity type", "entity_type", event.EntityType, "payload", event.Payload) + return + } + + switch event.Operation { + case dbCommon.CreateOperation: + slog.DebugContext(c.ctx, "got create operation for scale set", "scale_set_id", scaleSet.ID, "scale_set_name", scaleSet.Name) + if err := c.handleScaleSetCreateOperation(scaleSet); err != nil { + slog.With(slog.Any("error", err)).ErrorContext(c.ctx, "failed to handle scale set create operation") + } + case dbCommon.UpdateOperation: + slog.DebugContext(c.ctx, "got update operation for scale set", "scale_set_id", scaleSet.ID, "scale_set_name", scaleSet.Name) + if err := c.handleScaleSetUpdateOperation(scaleSet); err != nil { + slog.With(slog.Any("error", err)).ErrorContext(c.ctx, "failed to handle scale set update operation") + } + case dbCommon.DeleteOperation: + slog.DebugContext(c.ctx, "got delete operation") + if err := c.handleScaleSetDeleteOperation(scaleSet); err != nil { + slog.With(slog.Any("error", err)).ErrorContext(c.ctx, "failed to handle scale set delete operation") + } + default: + slog.ErrorContext(c.ctx, "invalid operation type", "operation_type", event.Operation) + return + } +} + +func (c *Controller) handleScaleSetCreateOperation(sSet params.ScaleSet) error { + c.mux.Lock() + defer c.mux.Unlock() + + if _, ok := c.ScaleSets[sSet.ID]; ok { + slog.DebugContext(c.ctx, "scale set already exists in worker list", "scale_set_id", sSet.ID) + return nil + } + + provider, ok := c.providers[sSet.ProviderName] + if !ok { + // Providers are currently static, set in the config and cannot be updated without a restart. + // ScaleSets and pools also do not allow updating the provider. This condition is not recoverable + // without a restart, so we don't need to instantiate a worker for this scale set. + return fmt.Errorf("provider %s not found for scale set %s", sSet.ProviderName, sSet.Name) + } + + worker, err := NewWorker(c.ctx, c.store, sSet, provider) + if err != nil { + return fmt.Errorf("creating scale set worker: %w", err) + } + if err := worker.Start(); err != nil { + // The Start() function should only return an error if an unrecoverable error occurs. + // For transient errors, it should mark the scale set as being in error, but continue + // to retry fixing the condition. For example, not being able to retrieve tools due to bad + // credentials should not stop the worker. The credentials can be fixed and the worker + // can continue to work. + return fmt.Errorf("starting scale set worker: %w", err) + } + c.ScaleSets[sSet.ID] = &scaleSet{ + scaleSet: sSet, + status: scaleSetStatus{}, + worker: worker, + } + return nil +} + +func (c *Controller) handleScaleSetDeleteOperation(sSet params.ScaleSet) error { + c.mux.Lock() + defer c.mux.Unlock() + + set, ok := c.ScaleSets[sSet.ID] + if !ok { + slog.DebugContext(c.ctx, "scale set not found in worker list", "scale_set_id", sSet.ID) + return nil + } + + slog.DebugContext(c.ctx, "stopping scale set worker", "scale_set_id", sSet.ID) + if err := set.worker.Stop(); err != nil { + return fmt.Errorf("stopping scale set worker: %w", err) + } + delete(c.ScaleSets, sSet.ID) + return nil +} + +func (c *Controller) handleScaleSetUpdateOperation(sSet params.ScaleSet) error { + c.mux.Lock() + defer c.mux.Unlock() + + if _, ok := c.ScaleSets[sSet.ID]; !ok { + // Some error may have occured when the scale set was first created, so we + // attempt to create it after the user updated the scale set, hopefully + // fixing the reason for the failure. + return c.handleScaleSetCreateOperation(sSet) + } + // We let the watcher in the scale set worker handle the update operation. + return nil +} + +func (c *Controller) handleCredentialsEvent(event dbCommon.ChangePayload) { + credentials, ok := event.Payload.(params.GithubCredentials) + if !ok { + slog.ErrorContext(c.ctx, "invalid payload for entity type", "entity_type", event.EntityType, "payload", event.Payload) + return + } + + switch event.Operation { + case dbCommon.UpdateOperation: + slog.DebugContext(c.ctx, "got update operation") + c.mux.Lock() + defer c.mux.Unlock() + + if c.Entity.Credentials.ID != credentials.ID { + // credentials were swapped on the entity. We need to recompose the watcher + // filters. + return + } + c.Entity.Credentials = credentials + + if err := c.updateAndBroadcastCredentials(c.Entity); err != nil { + slog.With(slog.Any("error", err)).ErrorContext(c.ctx, "failed to update credentials") + return + } + default: + slog.ErrorContext(c.ctx, "invalid operation type", "operation_type", event.Operation) + return + } +} + +func (c *Controller) handleEntityEvent(event dbCommon.ChangePayload) { + entity, ok := event.Payload.(params.GithubEntity) + if !ok { + slog.ErrorContext(c.ctx, "invalid payload for entity type", "entity_type", event.EntityType, "payload", event.Payload) + return + } + + switch event.Operation { + case dbCommon.UpdateOperation: + slog.DebugContext(c.ctx, "got update operation") + c.mux.Lock() + defer c.mux.Unlock() + + if c.Entity.Credentials.ID != entity.Credentials.ID { + // credentials were swapped on the entity. We need to recompose the watcher + // filters. + c.consumer.SetFilters(composeControllerWatcherFilters(entity)) + if err := c.updateAndBroadcastCredentials(c.Entity); err != nil { + slog.With(slog.Any("error", err)).ErrorContext(c.ctx, "failed to update credentials") + } + } + c.Entity = entity + default: + slog.ErrorContext(c.ctx, "invalid operation type", "operation_type", event.Operation) + return + } +} + +func (c *Controller) updateAndBroadcastCredentials(entity params.GithubEntity) error { + ghCli, err := github.Client(c.ctx, entity) + if err != nil { + return fmt.Errorf("creating github client: %w", err) + } + + setCli, err := scaleSetCli.NewClient(ghCli) + if err != nil { + return fmt.Errorf("creating scaleset client: %w", err) + } + c.ghCli = ghCli + c.scaleSetCli = setCli + + for _, scaleSet := range c.ScaleSets { + if err := scaleSet.worker.SetGithubClient(ghCli, setCli); err != nil { + slog.ErrorContext(c.ctx, "setting github client on worker", "error", err) + continue + } + } + return nil +} diff --git a/workers/scaleset/scaleset.go b/workers/scaleset/scaleset.go new file mode 100644 index 00000000..0592a8cf --- /dev/null +++ b/workers/scaleset/scaleset.go @@ -0,0 +1,76 @@ +package scaleset + +import ( + "context" + "sync" + + commonParams "github.com/cloudbase/garm-provider-common/params" + dbCommon "github.com/cloudbase/garm/database/common" + "github.com/cloudbase/garm/params" + "github.com/cloudbase/garm/runner/common" + "github.com/cloudbase/garm/util/github/scalesets" +) + +func NewWorker(ctx context.Context, store dbCommon.Store, scaleSet params.ScaleSet, provider common.Provider) (*Worker, error) { + return &Worker{ + ctx: ctx, + store: store, + provider: provider, + Entity: scaleSet, + }, nil +} + +type Worker struct { + ctx context.Context + + provider common.Provider + store dbCommon.Store + Entity params.ScaleSet + tools []commonParams.RunnerApplicationDownload + + ghCli common.GithubClient + scaleSetCli *scalesets.ScaleSetClient + + mux sync.Mutex + running bool + quit chan struct{} +} + +func (w *Worker) Stop() error { + return nil +} + +func (w *Worker) Start() error { + w.mux.Lock() + defer w.mux.Unlock() + + go w.loop() + return nil +} + +func (w *Worker) SetTools(tools []commonParams.RunnerApplicationDownload) { + w.mux.Lock() + defer w.mux.Unlock() + + w.tools = tools +} + +func (w *Worker) SetGithubClient(client common.GithubClient, scaleSetCli *scalesets.ScaleSetClient) error { + w.mux.Lock() + defer w.mux.Unlock() + + // TODO: + // * stop current listener if any + + w.ghCli = client + w.scaleSetCli = scaleSetCli + + // TODO: + // * start new listener + + return nil +} + +func (w *Worker) loop() { + +} diff --git a/workers/scaleset/status.go b/workers/scaleset/status.go new file mode 100644 index 00000000..29d9ae4f --- /dev/null +++ b/workers/scaleset/status.go @@ -0,0 +1,13 @@ +package scaleset + +import ( + "time" + + "github.com/cloudbase/garm/params" +) + +type scaleSetStatus struct { + err error + heartbeat time.Time + scaleSet params.ScaleSet +} diff --git a/workers/scaleset/util.go b/workers/scaleset/util.go new file mode 100644 index 00000000..a594f88c --- /dev/null +++ b/workers/scaleset/util.go @@ -0,0 +1,28 @@ +package scaleset + +import ( + dbCommon "github.com/cloudbase/garm/database/common" + "github.com/cloudbase/garm/database/watcher" + "github.com/cloudbase/garm/params" +) + +func composeControllerWatcherFilters(entity params.GithubEntity) dbCommon.PayloadFilterFunc { + return watcher.WithAny( + watcher.WithAll( + watcher.WithEntityScaleSetFilter(entity), + watcher.WithAny( + watcher.WithOperationTypeFilter(dbCommon.CreateOperation), + watcher.WithOperationTypeFilter(dbCommon.UpdateOperation), + watcher.WithOperationTypeFilter(dbCommon.DeleteOperation), + ), + ), + watcher.WithAll( + watcher.WithEntityFilter(entity), + watcher.WithOperationTypeFilter(dbCommon.UpdateOperation), + ), + watcher.WithAll( + watcher.WithGithubCredentialsFilter(entity.Credentials), + watcher.WithOperationTypeFilter(dbCommon.UpdateOperation), + ), + ) +}