Add some worker code

Signed-off-by: Gabriel Adrian Samfira <gsamfira@cloudbasesolutions.com>
This commit is contained in:
Gabriel Adrian Samfira 2025-04-15 09:36:28 +00:00
parent 7174e030e2
commit 6a5c309399
15 changed files with 1132 additions and 2 deletions

View file

@ -49,6 +49,7 @@ import (
garmUtil "github.com/cloudbase/garm/util"
"github.com/cloudbase/garm/util/appdefaults"
"github.com/cloudbase/garm/websocket"
"github.com/cloudbase/garm/workers/entity"
)
var (
@ -230,6 +231,14 @@ func main() {
log.Fatal(err)
}
entityController, err := entity.NewController(ctx, db, *cfg)
if err != nil {
log.Fatalf("failed to create entity controller: %+v", err)
}
if err := entityController.Start(); err != nil {
log.Fatalf("failed to start entity controller: %+v", err)
}
runner, err := runner.NewRunner(ctx, *cfg, db)
if err != nil {
log.Fatalf("failed to create controller: %+v", err)
@ -326,6 +335,11 @@ func main() {
<-ctx.Done()
slog.InfoContext(ctx, "shutting down entity controller")
if err := entityController.Stop(); err != nil {
slog.With(slog.Any("error", err)).ErrorContext(ctx, "failed to stop entity controller")
}
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 60*time.Second)
defer shutdownCancel()
if err := srv.Shutdown(shutdownCtx); err != nil {

View file

@ -96,6 +96,8 @@ type ScaleSet struct {
// ScaleSetID is the github ID of the scale set. This field may not be set if
// the scale set was ceated in GARM but has not yet been created in GitHub.
// The scale set ID is also not globally unique. It is only unique within the context
// of an entity.
ScaleSetID int `gorm:"index:idx_scale_set"`
Name string `gorm:"index:idx_name"`
DisableUpdate bool

View file

@ -15,6 +15,8 @@
package params
import (
"encoding/base64"
"encoding/json"
"fmt"
"net/url"
"time"
@ -420,6 +422,21 @@ type RunnerScaleSetJitRunnerConfig struct {
EncodedJITConfig string `json:"encodedJITConfig"`
}
func (r RunnerScaleSetJitRunnerConfig) DecodedJITConfig() (map[string]string, error) {
if r.EncodedJITConfig == "" {
return nil, fmt.Errorf("no encoded JIT config specified")
}
decoded, err := base64.StdEncoding.DecodeString(r.EncodedJITConfig)
if err != nil {
return nil, fmt.Errorf("failed to decode JIT config: %w", err)
}
jitConfig := make(map[string]string)
if err := json.Unmarshal(decoded, &jitConfig); err != nil {
return nil, fmt.Errorf("failed to unmarshal JIT config: %w", err)
}
return jitConfig, nil
}
type RunnerReferenceList struct {
Count int `json:"count"`
RunnerReferences []RunnerReference `json:"value"`

View file

@ -452,8 +452,8 @@ func Client(ctx context.Context, entity params.GithubEntity) (common.GithubClien
return nil, errors.Wrap(err, "fetching http client")
}
slog.InfoContext(
ctx, "creating client with",
slog.DebugContext(
ctx, "creating client for entity",
"entity", entity.String(), "base_url", entity.Credentials.APIBaseURL,
"upload_url", entity.Credentials.UploadBaseURL)

View file

@ -0,0 +1,9 @@
package common
import (
commonParams "github.com/cloudbase/garm-provider-common/params"
)
type ToolsGetter interface {
GetTools() ([]commonParams.RunnerApplicationDownload, error)
}

View file

@ -0,0 +1,208 @@
package entity
import (
"context"
"fmt"
"log/slog"
"sync"
"github.com/cloudbase/garm/auth"
"github.com/cloudbase/garm/config"
dbCommon "github.com/cloudbase/garm/database/common"
"github.com/cloudbase/garm/database/watcher"
"github.com/cloudbase/garm/runner/common"
"github.com/cloudbase/garm/runner/providers"
garmUtil "github.com/cloudbase/garm/util"
)
func NewController(ctx context.Context, store dbCommon.Store, cfg config.Config) (*Controller, error) {
consumerID := "entity-controller"
ctrlID, err := store.ControllerInfo()
if err != nil {
return nil, fmt.Errorf("getting controller info: %w", err)
}
ctx = garmUtil.WithSlogContext(
ctx,
slog.Any("worker", consumerID))
ctx = auth.GetAdminContext(ctx)
providers, err := providers.LoadProvidersFromConfig(ctx, cfg, ctrlID.ControllerID.String())
if err != nil {
return nil, fmt.Errorf("loading providers: %w", err)
}
return &Controller{
consumerID: consumerID,
ctx: ctx,
store: store,
providers: providers,
Entities: make(map[string]*Worker),
}, nil
}
type Controller struct {
consumerID string
ctx context.Context
consumer dbCommon.Consumer
store dbCommon.Store
providers map[string]common.Provider
Entities map[string]*Worker
running bool
quit chan struct{}
mux sync.Mutex
}
func (c *Controller) loadAllRepositories() error {
c.mux.Lock()
defer c.mux.Unlock()
repos, err := c.store.ListRepositories(c.ctx)
if err != nil {
return fmt.Errorf("fetching repositories: %w", err)
}
for _, repo := range repos {
entity, err := repo.GetEntity()
if err != nil {
return fmt.Errorf("getting entity: %w", err)
}
worker, err := NewWorker(c.ctx, c.store, entity, c.providers)
if err != nil {
return fmt.Errorf("creating worker: %w", err)
}
slog.DebugContext(c.ctx, "starting entity worker", "entity_id", entity.ID, "entity_type", entity.EntityType)
if err := worker.Start(); err != nil {
return fmt.Errorf("starting worker: %w", err)
}
c.Entities[entity.ID] = worker
}
return nil
}
func (c *Controller) loadAllOrganizations() error {
c.mux.Lock()
defer c.mux.Unlock()
orgs, err := c.store.ListOrganizations(c.ctx)
if err != nil {
return fmt.Errorf("fetching organizations: %w", err)
}
for _, org := range orgs {
entity, err := org.GetEntity()
if err != nil {
return fmt.Errorf("getting entity: %w", err)
}
worker, err := NewWorker(c.ctx, c.store, entity, c.providers)
if err != nil {
return fmt.Errorf("creating worker: %w", err)
}
slog.DebugContext(c.ctx, "starting entity worker", "entity_id", entity.ID, "entity_type", entity.EntityType)
if err := worker.Start(); err != nil {
return fmt.Errorf("starting worker: %w", err)
}
c.Entities[entity.ID] = worker
}
return nil
}
func (c *Controller) loadAllEnterprises() error {
c.mux.Lock()
defer c.mux.Unlock()
enterprises, err := c.store.ListEnterprises(c.ctx)
if err != nil {
return fmt.Errorf("fetching enterprises: %w", err)
}
for _, enterprise := range enterprises {
entity, err := enterprise.GetEntity()
if err != nil {
return fmt.Errorf("getting entity: %w", err)
}
worker, err := NewWorker(c.ctx, c.store, entity, c.providers)
if err != nil {
return fmt.Errorf("creating worker: %w", err)
}
slog.DebugContext(c.ctx, "starting entity worker", "entity_id", entity.ID, "entity_type", entity.EntityType)
if err := worker.Start(); err != nil {
return fmt.Errorf("starting worker: %w", err)
}
c.Entities[entity.ID] = worker
}
return nil
}
func (c *Controller) Start() error {
c.mux.Lock()
if c.running {
c.mux.Unlock()
return nil
}
c.mux.Unlock()
if err := c.loadAllRepositories(); err != nil {
return fmt.Errorf("loading repositories: %w", err)
}
if err := c.loadAllOrganizations(); err != nil {
return fmt.Errorf("loading organizations: %w", err)
}
if err := c.loadAllEnterprises(); err != nil {
return fmt.Errorf("loading enterprises: %w", err)
}
consumer, err := watcher.RegisterConsumer(
c.ctx, c.consumerID,
composeControllerWatcherFilters(),
)
if err != nil {
return fmt.Errorf("failed to create consumer for entity controller: %w", err)
}
c.mux.Lock()
c.consumer = consumer
c.running = true
c.quit = make(chan struct{})
c.mux.Unlock()
go c.loop()
return nil
}
func (c *Controller) Stop() error {
c.mux.Lock()
defer c.mux.Unlock()
if !c.running {
return nil
}
slog.DebugContext(c.ctx, "stopping entity controller")
for entityID, worker := range c.Entities {
if err := worker.Stop(); err != nil {
slog.ErrorContext(c.ctx, "stopping worker for entity", "entity_id", entityID, "error", err)
}
}
c.running = false
close(c.quit)
c.quit = nil
c.consumer.Close()
return nil
}
func (c *Controller) loop() {
defer c.Stop()
for {
select {
case payload := <-c.consumer.Watch():
slog.InfoContext(c.ctx, "received payload", slog.Any("payload", payload))
go c.handleWatcherEvent(payload)
case <-c.ctx.Done():
return
case <-c.quit:
return
}
}
}

View file

@ -0,0 +1,98 @@
package entity
import (
"log/slog"
dbCommon "github.com/cloudbase/garm/database/common"
"github.com/cloudbase/garm/params"
)
func (c *Controller) handleWatcherEvent(event dbCommon.ChangePayload) {
var entityGetter params.EntityGetter
switch event.EntityType {
case dbCommon.RepositoryEntityType:
slog.DebugContext(c.ctx, "got repository payload event")
repo, ok := event.Payload.(params.Repository)
if !ok {
slog.ErrorContext(c.ctx, "invalid payload for entity type", "entity_type", event.EntityType, "payload", event.Payload)
}
entityGetter = repo
case dbCommon.OrganizationEntityType:
slog.DebugContext(c.ctx, "got organization payload event")
org, ok := event.Payload.(params.Organization)
if !ok {
slog.ErrorContext(c.ctx, "invalid payload for entity type", "entity_type", event.EntityType, "payload", event.Payload)
}
entityGetter = org
case dbCommon.EnterpriseEntityType:
slog.DebugContext(c.ctx, "got enterprise payload event")
ent, ok := event.Payload.(params.Enterprise)
if !ok {
slog.ErrorContext(c.ctx, "invalid payload for entity type", "entity_type", event.EntityType, "payload", event.Payload)
}
entityGetter = ent
default:
slog.ErrorContext(c.ctx, "invalid entity type", "entity_type", event.EntityType)
return
}
if entityGetter == nil {
return
}
switch event.Operation {
case dbCommon.CreateOperation:
slog.DebugContext(c.ctx, "got create operation")
c.handleWatcherCreateOperation(entityGetter, event)
case dbCommon.DeleteOperation:
slog.DebugContext(c.ctx, "got delete operation")
c.handleWatcherDeleteOperation(entityGetter, event)
default:
slog.ErrorContext(c.ctx, "invalid operation type", "operation_type", event.Operation)
return
}
}
func (c *Controller) handleWatcherCreateOperation(entityGetter params.EntityGetter, event dbCommon.ChangePayload) {
c.mux.Lock()
defer c.mux.Unlock()
entity, err := entityGetter.GetEntity()
if err != nil {
slog.ErrorContext(c.ctx, "getting entity from repository", "entity_type", event.EntityType, "payload", event.Payload, "error", err)
return
}
worker, err := NewWorker(c.ctx, c.store, entity, c.providers)
if err != nil {
slog.ErrorContext(c.ctx, "creating worker from repository", "entity_type", event.EntityType, "payload", event.Payload, "error", err)
return
}
slog.InfoContext(c.ctx, "starting entity worker", "entity_id", entity.ID, "entity_type", entity.EntityType)
if err := worker.Start(); err != nil {
slog.ErrorContext(c.ctx, "starting worker", "entity_id", entity.ID, "error", err)
return
}
c.Entities[entity.ID] = worker
}
func (c *Controller) handleWatcherDeleteOperation(entityGetter params.EntityGetter, event dbCommon.ChangePayload) {
c.mux.Lock()
defer c.mux.Unlock()
entity, err := entityGetter.GetEntity()
if err != nil {
slog.ErrorContext(c.ctx, "getting entity from repository", "entity_type", event.EntityType, "payload", event.Payload, "error", err)
return
}
worker, ok := c.Entities[entity.ID]
if !ok {
slog.InfoContext(c.ctx, "entity not found in worker list", "entity_id", entity.ID)
return
}
slog.InfoContext(c.ctx, "stopping entity worker", "entity_id", entity.ID, "entity_type", entity.EntityType)
if err := worker.Stop(); err != nil {
slog.ErrorContext(c.ctx, "stopping worker", "entity_id", entity.ID, "error", err)
return
}
delete(c.Entities, entity.ID)
}

35
workers/entity/util.go Normal file
View file

@ -0,0 +1,35 @@
package entity
import (
dbCommon "github.com/cloudbase/garm/database/common"
"github.com/cloudbase/garm/database/watcher"
"github.com/cloudbase/garm/params"
)
func composeControllerWatcherFilters() dbCommon.PayloadFilterFunc {
return watcher.WithAll(
watcher.WithAny(
watcher.WithEntityTypeFilter(dbCommon.RepositoryEntityType),
watcher.WithEntityTypeFilter(dbCommon.OrganizationEntityType),
watcher.WithEntityTypeFilter(dbCommon.EnterpriseEntityType),
),
watcher.WithAny(
watcher.WithOperationTypeFilter(dbCommon.CreateOperation),
watcher.WithOperationTypeFilter(dbCommon.DeleteOperation),
),
)
}
func composeWorkerWatcherFilters(entity params.GithubEntity) dbCommon.PayloadFilterFunc {
return watcher.WithAny(
watcher.WithAll(
watcher.WithEntityFilter(entity),
watcher.WithOperationTypeFilter(dbCommon.UpdateOperation),
),
// Watch for credentials updates.
watcher.WithAll(
watcher.WithGithubCredentialsFilter(entity.Credentials),
watcher.WithOperationTypeFilter(dbCommon.UpdateOperation),
),
)
}

110
workers/entity/worker.go Normal file
View file

@ -0,0 +1,110 @@
package entity
import (
"context"
"fmt"
"log/slog"
"sync"
dbCommon "github.com/cloudbase/garm/database/common"
"github.com/cloudbase/garm/database/watcher"
"github.com/cloudbase/garm/params"
"github.com/cloudbase/garm/runner/common"
garmUtil "github.com/cloudbase/garm/util"
"github.com/cloudbase/garm/workers/scaleset"
)
func NewWorker(ctx context.Context, store dbCommon.Store, entity params.GithubEntity, providers map[string]common.Provider) (*Worker, error) {
consumerID := fmt.Sprintf("entity-worker-%s", entity.String())
ctx = garmUtil.WithSlogContext(
ctx,
slog.Any("worker", consumerID))
return &Worker{
ctx: ctx,
consumerID: consumerID,
store: store,
Entity: entity,
providers: providers,
}, nil
}
type Worker struct {
ctx context.Context
consumerID string
consumer dbCommon.Consumer
store dbCommon.Store
Entity params.GithubEntity
providers map[string]common.Provider
scaleSetController *scaleset.Controller
// TODO(gabriel-samfira): replace current pool manager with something similar
// to the scale set controller.
// poolManager *pool.Controller
mux sync.Mutex
running bool
quit chan struct{}
}
func (w *Worker) Stop() error {
w.mux.Lock()
defer w.mux.Unlock()
if !w.running {
return nil
}
slog.DebugContext(w.ctx, "stopping entity worker")
if err := w.scaleSetController.Stop(); err != nil {
return fmt.Errorf("stopping scale set controller: %w", err)
}
w.scaleSetController = nil
w.running = false
close(w.quit)
w.consumer.Close()
return nil
}
func (w *Worker) Start() error {
w.mux.Lock()
defer w.mux.Unlock()
scaleSetController, err := scaleset.NewController(w.ctx, w.store, w.Entity, w.providers)
if err != nil {
return fmt.Errorf("creating scale set controller: %w", err)
}
w.scaleSetController = scaleSetController
consumer, err := watcher.RegisterConsumer(
w.ctx, w.consumerID,
composeWorkerWatcherFilters(w.Entity),
)
if err != nil {
return fmt.Errorf("registering consumer: %w", err)
}
w.consumer = consumer
w.running = true
w.quit = make(chan struct{})
go w.loop()
return nil
}
func (w *Worker) loop() {
defer w.Stop()
for {
select {
case payload := <-w.consumer.Watch():
slog.InfoContext(w.ctx, "received payload", slog.Any("payload", payload))
go w.handleWorkerWatcherEvent(payload)
case <-w.ctx.Done():
return
case <-w.quit:
return
}
}
}

View file

@ -0,0 +1,76 @@
package entity
import (
"log/slog"
dbCommon "github.com/cloudbase/garm/database/common"
"github.com/cloudbase/garm/params"
)
func (w *Worker) handleWorkerWatcherEvent(event dbCommon.ChangePayload) {
// This worker may be for a repo, org or enterprise. React only to the entity type
// that this worker is for.
entityType := dbCommon.DatabaseEntityType(w.Entity.EntityType)
switch event.EntityType {
case entityType:
entityGetter, ok := event.Payload.(params.EntityGetter)
if !ok {
slog.ErrorContext(w.ctx, "invalid payload for entity type", "entity_type", event.EntityType, "payload", event.Payload)
return
}
entity, err := entityGetter.GetEntity()
if err != nil {
slog.ErrorContext(w.ctx, "getting entity from repository", "entity_type", event.EntityType, "payload", event.Payload, "error", err)
return
}
w.handleEntityEventPayload(entity, event)
return
case dbCommon.GithubCredentialsEntityType:
slog.DebugContext(w.ctx, "got github credentials payload event")
credentials, ok := event.Payload.(params.GithubCredentials)
if !ok {
slog.ErrorContext(w.ctx, "invalid payload for entity type", "entity_type", event.EntityType, "payload", event.Payload)
return
}
w.handleEntityCredentialsEventPayload(credentials, event)
default:
slog.DebugContext(w.ctx, "invalid entity type; ignoring", "entity_type", event.EntityType)
}
}
func (w *Worker) handleEntityEventPayload(entity params.GithubEntity, event dbCommon.ChangePayload) {
switch event.Operation {
case dbCommon.UpdateOperation:
slog.DebugContext(w.ctx, "got update operation")
w.mux.Lock()
defer w.mux.Unlock()
credentials := entity.Credentials
if w.Entity.Credentials.ID != credentials.ID {
// credentials were swapped on the entity. We need to recompose the watcher
// filters.
w.consumer.SetFilters(composeWorkerWatcherFilters(entity))
}
w.Entity = entity
default:
slog.ErrorContext(w.ctx, "invalid operation type", "operation_type", event.Operation)
}
}
func (w *Worker) handleEntityCredentialsEventPayload(credentials params.GithubCredentials, event dbCommon.ChangePayload) {
switch event.Operation {
case dbCommon.UpdateOperation:
slog.DebugContext(w.ctx, "got delete operation")
w.mux.Lock()
defer w.mux.Unlock()
if w.Entity.Credentials.ID != credentials.ID {
// The channel is buffered. We may get an old update. If credentials get updated
// immediately after they are swapped on the entity, we may still get an update
// pushed to the channel before the filters are swapped. We can ignore the update.
return
}
w.Entity.Credentials = credentials
default:
slog.ErrorContext(w.ctx, "invalid operation type", "operation_type", event.Operation)
}
}

View file

@ -0,0 +1,237 @@
package scaleset
import (
"context"
"fmt"
"log/slog"
"sync"
"time"
commonParams "github.com/cloudbase/garm-provider-common/params"
dbCommon "github.com/cloudbase/garm/database/common"
"github.com/cloudbase/garm/database/watcher"
"github.com/cloudbase/garm/params"
"github.com/cloudbase/garm/runner/common"
garmUtil "github.com/cloudbase/garm/util"
"github.com/cloudbase/garm/util/github"
"github.com/cloudbase/garm/util/github/scalesets"
)
func NewController(ctx context.Context, store dbCommon.Store, entity params.GithubEntity, providers map[string]common.Provider) (*Controller, error) {
consumerID := fmt.Sprintf("scaleset-worker-%s", entity.String())
ctx = garmUtil.WithSlogContext(
ctx,
slog.Any("worker", consumerID))
return &Controller{
ctx: ctx,
consumerID: consumerID,
ScaleSets: make(map[uint]*scaleSet),
Entity: entity,
providers: providers,
store: store,
statusUpdates: make(chan scaleSetStatus, 10),
}, nil
}
type scaleSet struct {
scaleSet params.ScaleSet
status scaleSetStatus
worker *Worker
mux sync.Mutex
}
func (s *scaleSet) updateStatus(status scaleSetStatus) {
s.mux.Lock()
defer s.mux.Unlock()
s.status = status
}
func (s *scaleSet) Stop() error {
s.mux.Lock()
defer s.mux.Unlock()
if s.worker == nil {
return nil
}
return s.worker.Stop()
}
// Controller is responsible for managing scale sets for one github entity.
type Controller struct {
ctx context.Context
consumerID string
ScaleSets map[uint]*scaleSet
Entity params.GithubEntity
consumer dbCommon.Consumer
store dbCommon.Store
providers map[string]common.Provider
tools []commonParams.RunnerApplicationDownload
ghCli common.GithubClient
scaleSetCli *scalesets.ScaleSetClient
forgeCredsAreValid bool
statusUpdates chan scaleSetStatus
mux sync.Mutex
running bool
quit chan struct{}
}
func (c *Controller) loadAllScaleSets() error {
scaleSets, err := c.store.ListEntityScaleSets(c.ctx, c.Entity)
if err != nil {
return fmt.Errorf("listing scale sets: %w", err)
}
for _, sSet := range scaleSets {
if err := c.handleScaleSetCreateOperation(sSet); err != nil {
slog.With(slog.Any("error", err)).ErrorContext(c.ctx, "failed to handle scale set create operation")
continue
}
}
return nil
}
func (c *Controller) Start() (err error) {
c.mux.Lock()
if c.running {
c.mux.Unlock()
return nil
}
c.mux.Unlock()
if err := c.loadAllScaleSets(); err != nil {
return fmt.Errorf("loading all scale sets: %w", err)
}
ghCli, err := github.Client(c.ctx, c.Entity)
if err != nil {
return fmt.Errorf("creating github client: %w", err)
}
consumer, err := watcher.RegisterConsumer(
c.ctx, c.consumerID,
composeControllerWatcherFilters(c.Entity),
)
if err != nil {
return fmt.Errorf("registering consumer: %w", err)
}
c.mux.Lock()
c.ghCli = ghCli
c.consumer = consumer
c.running = true
c.quit = make(chan struct{})
c.mux.Unlock()
go c.loop()
return nil
}
func (c *Controller) Stop() error {
c.mux.Lock()
defer c.mux.Unlock()
if !c.running {
return nil
}
slog.DebugContext(c.ctx, "stopping scaleset controller", "entity", c.Entity.String())
for scaleSetID, scaleSet := range c.ScaleSets {
if err := scaleSet.Stop(); err != nil {
slog.ErrorContext(c.ctx, "stopping worker for scale set", "scale_set_id", scaleSetID, "error", err)
continue
}
delete(c.ScaleSets, scaleSetID)
}
c.running = false
close(c.quit)
c.quit = nil
close(c.statusUpdates)
c.statusUpdates = nil
c.consumer.Close()
return nil
}
func (c *Controller) updateTools() error {
c.mux.Lock()
defer c.mux.Unlock()
tools, err := garmUtil.FetchTools(c.ctx, c.ghCli)
if err != nil {
slog.With(slog.Any("error", err)).ErrorContext(
c.ctx, "failed to update tools for entity", "entity", c.Entity.String())
c.forgeCredsAreValid = false
return fmt.Errorf("failed to update tools for entity %s: %w", c.Entity.String(), err)
}
c.forgeCredsAreValid = true
c.tools = tools
return nil
}
func (c *Controller) handleScaleSetStatusUpdates(status scaleSetStatus) {
if status.scaleSet.ID == 0 {
slog.DebugContext(c.ctx, "invalid scale set ID; ignoring")
return
}
scaleSet, ok := c.ScaleSets[status.scaleSet.ID]
if !ok {
slog.DebugContext(c.ctx, "scale set not found; ignoring")
return
}
scaleSet.updateStatus(status)
}
func (c *Controller) loop() {
defer c.Stop()
updateToolsTicker := time.NewTicker(common.PoolToolUpdateInterval)
initialToolUpdate := make(chan struct{}, 1)
go func() {
slog.Info("running initial tool update")
if err := c.updateTools(); err != nil {
slog.With(slog.Any("error", err)).Error("failed to update tools")
}
initialToolUpdate <- struct{}{}
}()
for {
select {
case payload := <-c.consumer.Watch():
slog.InfoContext(c.ctx, "received payload", slog.Any("payload", payload))
go c.handleWatcherEvent(payload)
case <-c.ctx.Done():
return
case _, ok := <-initialToolUpdate:
if ok {
// channel received the initial update slug. We can close it now.
close(initialToolUpdate)
}
case update, ok := <-c.statusUpdates:
if !ok {
return
}
go c.handleScaleSetStatusUpdates(update)
case <-updateToolsTicker.C:
if err := c.updateTools(); err != nil {
slog.With(slog.Any("error", err)).Error("failed to update tools")
}
case <-c.quit:
return
}
}
}

View file

@ -0,0 +1,207 @@
package scaleset
import (
"fmt"
"log/slog"
dbCommon "github.com/cloudbase/garm/database/common"
"github.com/cloudbase/garm/params"
"github.com/cloudbase/garm/util/github"
scaleSetCli "github.com/cloudbase/garm/util/github/scalesets"
)
func (c *Controller) handleWatcherEvent(event dbCommon.ChangePayload) {
entityType := dbCommon.DatabaseEntityType(c.Entity.EntityType)
switch event.EntityType {
case dbCommon.ScaleSetEntityType:
slog.DebugContext(c.ctx, "got scale set payload event")
c.handleScaleSet(event)
case entityType:
slog.DebugContext(c.ctx, "got entity payload event")
c.handleEntityEvent(event)
case dbCommon.GithubCredentialsEntityType:
slog.DebugContext(c.ctx, "got github credentials payload event")
c.handleCredentialsEvent(event)
default:
slog.ErrorContext(c.ctx, "invalid entity type", "entity_type", event.EntityType)
return
}
}
func (c *Controller) handleScaleSet(event dbCommon.ChangePayload) {
scaleSet, ok := event.Payload.(params.ScaleSet)
if !ok {
slog.ErrorContext(c.ctx, "invalid payload for entity type", "entity_type", event.EntityType, "payload", event.Payload)
return
}
switch event.Operation {
case dbCommon.CreateOperation:
slog.DebugContext(c.ctx, "got create operation for scale set", "scale_set_id", scaleSet.ID, "scale_set_name", scaleSet.Name)
if err := c.handleScaleSetCreateOperation(scaleSet); err != nil {
slog.With(slog.Any("error", err)).ErrorContext(c.ctx, "failed to handle scale set create operation")
}
case dbCommon.UpdateOperation:
slog.DebugContext(c.ctx, "got update operation for scale set", "scale_set_id", scaleSet.ID, "scale_set_name", scaleSet.Name)
if err := c.handleScaleSetUpdateOperation(scaleSet); err != nil {
slog.With(slog.Any("error", err)).ErrorContext(c.ctx, "failed to handle scale set update operation")
}
case dbCommon.DeleteOperation:
slog.DebugContext(c.ctx, "got delete operation")
if err := c.handleScaleSetDeleteOperation(scaleSet); err != nil {
slog.With(slog.Any("error", err)).ErrorContext(c.ctx, "failed to handle scale set delete operation")
}
default:
slog.ErrorContext(c.ctx, "invalid operation type", "operation_type", event.Operation)
return
}
}
func (c *Controller) handleScaleSetCreateOperation(sSet params.ScaleSet) error {
c.mux.Lock()
defer c.mux.Unlock()
if _, ok := c.ScaleSets[sSet.ID]; ok {
slog.DebugContext(c.ctx, "scale set already exists in worker list", "scale_set_id", sSet.ID)
return nil
}
provider, ok := c.providers[sSet.ProviderName]
if !ok {
// Providers are currently static, set in the config and cannot be updated without a restart.
// ScaleSets and pools also do not allow updating the provider. This condition is not recoverable
// without a restart, so we don't need to instantiate a worker for this scale set.
return fmt.Errorf("provider %s not found for scale set %s", sSet.ProviderName, sSet.Name)
}
worker, err := NewWorker(c.ctx, c.store, sSet, provider)
if err != nil {
return fmt.Errorf("creating scale set worker: %w", err)
}
if err := worker.Start(); err != nil {
// The Start() function should only return an error if an unrecoverable error occurs.
// For transient errors, it should mark the scale set as being in error, but continue
// to retry fixing the condition. For example, not being able to retrieve tools due to bad
// credentials should not stop the worker. The credentials can be fixed and the worker
// can continue to work.
return fmt.Errorf("starting scale set worker: %w", err)
}
c.ScaleSets[sSet.ID] = &scaleSet{
scaleSet: sSet,
status: scaleSetStatus{},
worker: worker,
}
return nil
}
func (c *Controller) handleScaleSetDeleteOperation(sSet params.ScaleSet) error {
c.mux.Lock()
defer c.mux.Unlock()
set, ok := c.ScaleSets[sSet.ID]
if !ok {
slog.DebugContext(c.ctx, "scale set not found in worker list", "scale_set_id", sSet.ID)
return nil
}
slog.DebugContext(c.ctx, "stopping scale set worker", "scale_set_id", sSet.ID)
if err := set.worker.Stop(); err != nil {
return fmt.Errorf("stopping scale set worker: %w", err)
}
delete(c.ScaleSets, sSet.ID)
return nil
}
func (c *Controller) handleScaleSetUpdateOperation(sSet params.ScaleSet) error {
c.mux.Lock()
defer c.mux.Unlock()
if _, ok := c.ScaleSets[sSet.ID]; !ok {
// Some error may have occured when the scale set was first created, so we
// attempt to create it after the user updated the scale set, hopefully
// fixing the reason for the failure.
return c.handleScaleSetCreateOperation(sSet)
}
// We let the watcher in the scale set worker handle the update operation.
return nil
}
func (c *Controller) handleCredentialsEvent(event dbCommon.ChangePayload) {
credentials, ok := event.Payload.(params.GithubCredentials)
if !ok {
slog.ErrorContext(c.ctx, "invalid payload for entity type", "entity_type", event.EntityType, "payload", event.Payload)
return
}
switch event.Operation {
case dbCommon.UpdateOperation:
slog.DebugContext(c.ctx, "got update operation")
c.mux.Lock()
defer c.mux.Unlock()
if c.Entity.Credentials.ID != credentials.ID {
// credentials were swapped on the entity. We need to recompose the watcher
// filters.
return
}
c.Entity.Credentials = credentials
if err := c.updateAndBroadcastCredentials(c.Entity); err != nil {
slog.With(slog.Any("error", err)).ErrorContext(c.ctx, "failed to update credentials")
return
}
default:
slog.ErrorContext(c.ctx, "invalid operation type", "operation_type", event.Operation)
return
}
}
func (c *Controller) handleEntityEvent(event dbCommon.ChangePayload) {
entity, ok := event.Payload.(params.GithubEntity)
if !ok {
slog.ErrorContext(c.ctx, "invalid payload for entity type", "entity_type", event.EntityType, "payload", event.Payload)
return
}
switch event.Operation {
case dbCommon.UpdateOperation:
slog.DebugContext(c.ctx, "got update operation")
c.mux.Lock()
defer c.mux.Unlock()
if c.Entity.Credentials.ID != entity.Credentials.ID {
// credentials were swapped on the entity. We need to recompose the watcher
// filters.
c.consumer.SetFilters(composeControllerWatcherFilters(entity))
if err := c.updateAndBroadcastCredentials(c.Entity); err != nil {
slog.With(slog.Any("error", err)).ErrorContext(c.ctx, "failed to update credentials")
}
}
c.Entity = entity
default:
slog.ErrorContext(c.ctx, "invalid operation type", "operation_type", event.Operation)
return
}
}
func (c *Controller) updateAndBroadcastCredentials(entity params.GithubEntity) error {
ghCli, err := github.Client(c.ctx, entity)
if err != nil {
return fmt.Errorf("creating github client: %w", err)
}
setCli, err := scaleSetCli.NewClient(ghCli)
if err != nil {
return fmt.Errorf("creating scaleset client: %w", err)
}
c.ghCli = ghCli
c.scaleSetCli = setCli
for _, scaleSet := range c.ScaleSets {
if err := scaleSet.worker.SetGithubClient(ghCli, setCli); err != nil {
slog.ErrorContext(c.ctx, "setting github client on worker", "error", err)
continue
}
}
return nil
}

View file

@ -0,0 +1,76 @@
package scaleset
import (
"context"
"sync"
commonParams "github.com/cloudbase/garm-provider-common/params"
dbCommon "github.com/cloudbase/garm/database/common"
"github.com/cloudbase/garm/params"
"github.com/cloudbase/garm/runner/common"
"github.com/cloudbase/garm/util/github/scalesets"
)
func NewWorker(ctx context.Context, store dbCommon.Store, scaleSet params.ScaleSet, provider common.Provider) (*Worker, error) {
return &Worker{
ctx: ctx,
store: store,
provider: provider,
Entity: scaleSet,
}, nil
}
type Worker struct {
ctx context.Context
provider common.Provider
store dbCommon.Store
Entity params.ScaleSet
tools []commonParams.RunnerApplicationDownload
ghCli common.GithubClient
scaleSetCli *scalesets.ScaleSetClient
mux sync.Mutex
running bool
quit chan struct{}
}
func (w *Worker) Stop() error {
return nil
}
func (w *Worker) Start() error {
w.mux.Lock()
defer w.mux.Unlock()
go w.loop()
return nil
}
func (w *Worker) SetTools(tools []commonParams.RunnerApplicationDownload) {
w.mux.Lock()
defer w.mux.Unlock()
w.tools = tools
}
func (w *Worker) SetGithubClient(client common.GithubClient, scaleSetCli *scalesets.ScaleSetClient) error {
w.mux.Lock()
defer w.mux.Unlock()
// TODO:
// * stop current listener if any
w.ghCli = client
w.scaleSetCli = scaleSetCli
// TODO:
// * start new listener
return nil
}
func (w *Worker) loop() {
}

View file

@ -0,0 +1,13 @@
package scaleset
import (
"time"
"github.com/cloudbase/garm/params"
)
type scaleSetStatus struct {
err error
heartbeat time.Time
scaleSet params.ScaleSet
}

28
workers/scaleset/util.go Normal file
View file

@ -0,0 +1,28 @@
package scaleset
import (
dbCommon "github.com/cloudbase/garm/database/common"
"github.com/cloudbase/garm/database/watcher"
"github.com/cloudbase/garm/params"
)
func composeControllerWatcherFilters(entity params.GithubEntity) dbCommon.PayloadFilterFunc {
return watcher.WithAny(
watcher.WithAll(
watcher.WithEntityScaleSetFilter(entity),
watcher.WithAny(
watcher.WithOperationTypeFilter(dbCommon.CreateOperation),
watcher.WithOperationTypeFilter(dbCommon.UpdateOperation),
watcher.WithOperationTypeFilter(dbCommon.DeleteOperation),
),
),
watcher.WithAll(
watcher.WithEntityFilter(entity),
watcher.WithOperationTypeFilter(dbCommon.UpdateOperation),
),
watcher.WithAll(
watcher.WithGithubCredentialsFilter(entity.Credentials),
watcher.WithOperationTypeFilter(dbCommon.UpdateOperation),
),
)
}