From d0c9462a5d8023a0979a832044377612d249ab91 Mon Sep 17 00:00:00 2001 From: Gabriel Adrian Samfira Date: Wed, 7 May 2025 08:01:36 +0000 Subject: [PATCH] Add cache worker Add dedicated worker to maintain cache. Signed-off-by: Gabriel Adrian Samfira --- cache/cache_test.go | 2 +- cache/entity_cache.go | 17 +- cmd/garm/main.go | 8 +- runner/pool/pool.go | 7 - runner/pool/util.go | 2 - runner/pool/watcher.go | 20 -- workers/cache/cache.go | 338 +++++++++++++++++++++++++ workers/credentials/credentials.go | 133 ---------- workers/entity/controller.go | 10 - workers/entity/controller_watcher.go | 2 - workers/provider/provider.go | 15 -- workers/scaleset/controller.go | 2 - workers/scaleset/controller_watcher.go | 7 - 13 files changed, 355 insertions(+), 208 deletions(-) create mode 100644 workers/cache/cache.go delete mode 100644 workers/credentials/credentials.go diff --git a/cache/cache_test.go b/cache/cache_test.go index aef4e94a..43b15953 100644 --- a/cache/cache_test.go +++ b/cache/cache_test.go @@ -324,7 +324,7 @@ func (c *CacheTestSuite) TestReplaceEntityScaleSets() { } SetEntity(entity) - ReplaceEntityScaleSets(entity.ID, map[uint]params.ScaleSet{1: scaleSet1, 2: scaleSet2}) + ReplaceEntityScaleSets(entity.ID, []params.ScaleSet{scaleSet1, scaleSet2}) cachedEntity, ok := GetEntity(entity.ID) c.Require().True(ok) c.Require().Equal(entity.ID, cachedEntity.ID) diff --git a/cache/entity_cache.go b/cache/entity_cache.go index cd8d80eb..5a71b184 100644 --- a/cache/entity_cache.go +++ b/cache/entity_cache.go @@ -82,14 +82,21 @@ func (e *EntityCache) ReplaceEntityPools(entityID string, pools []params.Pool) { e.entities[entityID] = cache } -func (e *EntityCache) ReplaceEntityScaleSets(entityID string, scaleSets map[uint]params.ScaleSet) { +func (e *EntityCache) ReplaceEntityScaleSets(entityID string, scaleSets []params.ScaleSet) { e.mux.Lock() defer e.mux.Unlock() - if cache, ok := e.entities[entityID]; ok { - cache.ScaleSets = scaleSets - e.entities[entityID] = cache + cache, ok := e.entities[entityID] + if !ok { + return } + + scaleSetsByID := map[uint]params.ScaleSet{} + for _, scaleSet := range scaleSets { + scaleSetsByID[scaleSet.ID] = scaleSet + } + cache.ScaleSets = scaleSetsByID + e.entities[entityID] = cache } func (e *EntityCache) DeleteEntity(entityID string) { @@ -219,7 +226,7 @@ func ReplaceEntityPools(entityID string, pools []params.Pool) { entityCache.ReplaceEntityPools(entityID, pools) } -func ReplaceEntityScaleSets(entityID string, scaleSets map[uint]params.ScaleSet) { +func ReplaceEntityScaleSets(entityID string, scaleSets []params.ScaleSet) { entityCache.ReplaceEntityScaleSets(entityID, scaleSets) } diff --git a/cmd/garm/main.go b/cmd/garm/main.go index 958ea001..20f34eba 100644 --- a/cmd/garm/main.go +++ b/cmd/garm/main.go @@ -51,7 +51,7 @@ import ( garmUtil "github.com/cloudbase/garm/util" "github.com/cloudbase/garm/util/appdefaults" "github.com/cloudbase/garm/websocket" - "github.com/cloudbase/garm/workers/credentials" + "github.com/cloudbase/garm/workers/cache" "github.com/cloudbase/garm/workers/entity" "github.com/cloudbase/garm/workers/provider" ) @@ -238,11 +238,11 @@ func main() { log.Fatal(err) } - credsWorker, err := credentials.NewWorker(ctx, db) + cacheWorker := cache.NewWorker(ctx, db) if err != nil { log.Fatalf("failed to create credentials worker: %+v", err) } - if err := credsWorker.Start(); err != nil { + if err := cacheWorker.Start(); err != nil { log.Fatalf("failed to start credentials worker: %+v", err) } @@ -370,7 +370,7 @@ func main() { <-ctx.Done() - if err := credsWorker.Stop(); err != nil { + if err := cacheWorker.Stop(); err != nil { slog.With(slog.Any("error", err)).ErrorContext(ctx, "failed to stop credentials worker") } diff --git a/runner/pool/pool.go b/runner/pool/pool.go index 3e96d1e9..3cb8bff3 100644 --- a/runner/pool/pool.go +++ b/runner/pool/pool.go @@ -1595,13 +1595,6 @@ func (r *basePoolManager) cleanupOrphanedRunners(runners []*github.Runner) error } func (r *basePoolManager) Start() error { - // load pools in cache - pools, err := r.store.ListEntityPools(r.ctx, r.entity) - if err != nil { - return fmt.Errorf("failed to list pools: %w", err) - } - cache.ReplaceEntityPools(r.entity.ID, pools) - initialToolUpdate := make(chan struct{}, 1) go func() { slog.Info("running initial tool update") diff --git a/runner/pool/util.go b/runner/pool/util.go index d7b2c416..9b7b7f14 100644 --- a/runner/pool/util.go +++ b/runner/pool/util.go @@ -132,7 +132,5 @@ func composeWatcherFilters(entity params.GithubEntity) dbCommon.PayloadFilterFun watcher.WithEntityFilter(entity), // Watch for changes to the github credentials watcher.WithGithubCredentialsFilter(entity.Credentials), - // Watch for entity pool operations - watcher.WithEntityPoolFilter(entity), ) } diff --git a/runner/pool/watcher.go b/runner/pool/watcher.go index 61a1117c..7f05d93b 100644 --- a/runner/pool/watcher.go +++ b/runner/pool/watcher.go @@ -6,7 +6,6 @@ import ( "github.com/pkg/errors" runnerErrors "github.com/cloudbase/garm-provider-common/errors" - "github.com/cloudbase/garm/cache" "github.com/cloudbase/garm/database/common" "github.com/cloudbase/garm/params" runnerCommon "github.com/cloudbase/garm/runner/common" @@ -122,23 +121,6 @@ func (r *basePoolManager) handleCredentialsUpdate(credentials params.GithubCrede r.mux.Unlock() } -func (r *basePoolManager) handleEntityPoolEvent(event common.ChangePayload) { - pool, ok := event.Payload.(params.Pool) - if !ok { - slog.ErrorContext(r.ctx, "failed to cast payload to pool") - return - } - - switch event.Operation { - case common.CreateOperation, common.UpdateOperation: - slog.DebugContext(r.ctx, "updating pool in cache", "pool_id", pool.ID) - cache.SetEntityPool(r.entity.ID, pool) - case common.DeleteOperation: - slog.DebugContext(r.ctx, "deleting pool from cache", "pool_id", pool.ID) - cache.DeleteEntityPool(r.entity.ID, pool.ID) - } -} - func (r *basePoolManager) handleWatcherEvent(event common.ChangePayload) { dbEntityType := common.DatabaseEntityType(r.entity.EntityType) switch event.EntityType { @@ -168,8 +150,6 @@ func (r *basePoolManager) handleWatcherEvent(event common.ChangePayload) { return } r.handleEntityUpdate(entityInfo, event.Operation) - case common.PoolEntityType: - r.handleEntityPoolEvent(event) } } diff --git a/workers/cache/cache.go b/workers/cache/cache.go new file mode 100644 index 00000000..3973e7c7 --- /dev/null +++ b/workers/cache/cache.go @@ -0,0 +1,338 @@ +package cache + +import ( + "context" + "fmt" + "log/slog" + "sync" + + "github.com/cloudbase/garm/cache" + "github.com/cloudbase/garm/database/common" + "github.com/cloudbase/garm/database/watcher" + "github.com/cloudbase/garm/params" + garmUtil "github.com/cloudbase/garm/util" +) + +func NewWorker(ctx context.Context, store common.Store) *Worker { + consumerID := "cache" + ctx = garmUtil.WithSlogContext( + ctx, + slog.Any("worker", consumerID)) + + return &Worker{ + ctx: ctx, + store: store, + consumerID: consumerID, + quit: make(chan struct{}), + } +} + +type Worker struct { + ctx context.Context + consumerID string + + consumer common.Consumer + store common.Store + + mux sync.Mutex + running bool + quit chan struct{} +} + +func (w *Worker) setCacheForEntity(entityGetter params.EntityGetter, pools []params.Pool, scaleSets []params.ScaleSet) error { + entity, err := entityGetter.GetEntity() + if err != nil { + return fmt.Errorf("getting entity: %w", err) + } + cache.SetEntity(entity) + var repoPools []params.Pool + var repoScaleSets []params.ScaleSet + + for _, pool := range pools { + if pool.RepoID == entity.ID { + repoPools = append(repoPools, pool) + } + } + + for _, scaleSet := range scaleSets { + if scaleSet.RepoID == entity.ID { + repoScaleSets = append(repoScaleSets, scaleSet) + } + } + + cache.ReplaceEntityPools(entity.ID, repoPools) + cache.ReplaceEntityScaleSets(entity.ID, repoScaleSets) + + return nil +} + +func (w *Worker) loadAllEntities() error { + pools, err := w.store.ListAllPools(w.ctx) + if err != nil { + return fmt.Errorf("listing pools: %w", err) + } + + scaleSets, err := w.store.ListAllScaleSets(w.ctx) + if err != nil { + return fmt.Errorf("listing scale sets: %w", err) + } + + repos, err := w.store.ListRepositories(w.ctx) + if err != nil { + return fmt.Errorf("listing repositories: %w", err) + } + + orgs, err := w.store.ListOrganizations(w.ctx) + if err != nil { + return fmt.Errorf("listing organizations: %w", err) + } + + enterprises, err := w.store.ListEnterprises(w.ctx) + if err != nil { + return fmt.Errorf("listing enterprises: %w", err) + } + + for _, repo := range repos { + if err := w.setCacheForEntity(repo, pools, scaleSets); err != nil { + return fmt.Errorf("setting cache for repo: %w", err) + } + } + + for _, org := range orgs { + if err := w.setCacheForEntity(org, pools, scaleSets); err != nil { + return fmt.Errorf("setting cache for org: %w", err) + } + } + + for _, enterprise := range enterprises { + if err := w.setCacheForEntity(enterprise, pools, scaleSets); err != nil { + return fmt.Errorf("setting cache for enterprise: %w", err) + } + } + + return nil +} + +func (w *Worker) loadAllInstances() error { + instances, err := w.store.ListAllInstances(w.ctx) + if err != nil { + return fmt.Errorf("listing instances: %w", err) + } + + for _, instance := range instances { + cache.SetInstanceCache(instance) + } + return nil +} + +func (w *Worker) loadAllCredentials() error { + creds, err := w.store.ListGithubCredentials(w.ctx) + if err != nil { + return fmt.Errorf("listing github credentials: %w", err) + } + + for _, cred := range creds { + cache.SetGithubCredentials(cred) + } + return nil +} + +func (w *Worker) Start() error { + slog.DebugContext(w.ctx, "starting cache worker") + w.mux.Lock() + defer w.mux.Unlock() + + if w.running { + return nil + } + + if err := w.loadAllEntities(); err != nil { + return fmt.Errorf("loading all entities: %w", err) + } + + if err := w.loadAllInstances(); err != nil { + return fmt.Errorf("loading all instances: %w", err) + } + + if err := w.loadAllCredentials(); err != nil { + return fmt.Errorf("loading all credentials: %w", err) + } + + consumer, err := watcher.RegisterConsumer( + w.ctx, w.consumerID, + watcher.WithAll()) + if err != nil { + return fmt.Errorf("registering consumer: %w", err) + } + w.consumer = consumer + w.running = true + w.quit = make(chan struct{}) + + go w.loop() + return nil +} + +func (w *Worker) Stop() error { + slog.DebugContext(w.ctx, "stopping cache worker") + w.mux.Lock() + defer w.mux.Unlock() + + if !w.running { + return nil + } + + w.consumer.Close() + w.running = false + close(w.quit) + return nil +} + +func (w *Worker) handleEntityEvent(entityGetter params.EntityGetter, op common.OperationType) { + entity, err := entityGetter.GetEntity() + if err != nil { + slog.DebugContext(w.ctx, "getting entity from event", "error", err) + return + } + switch op { + case common.CreateOperation, common.UpdateOperation: + cache.SetEntity(entity) + case common.DeleteOperation: + cache.DeleteEntity(entity.ID) + } +} + +func (w *Worker) handleRepositoryEvent(event common.ChangePayload) { + repo, ok := event.Payload.(params.Repository) + if !ok { + slog.DebugContext(w.ctx, "invalid payload type for repository event", "payload", event.Payload) + return + } + w.handleEntityEvent(repo, event.Operation) +} + +func (w *Worker) handleOrgEvent(event common.ChangePayload) { + org, ok := event.Payload.(params.Organization) + if !ok { + slog.DebugContext(w.ctx, "invalid payload type for org event", "payload", event.Payload) + return + } + w.handleEntityEvent(org, event.Operation) +} + +func (w *Worker) handleEnterpriseEvent(event common.ChangePayload) { + enterprise, ok := event.Payload.(params.Enterprise) + if !ok { + slog.DebugContext(w.ctx, "invalid payload type for enterprise event", "payload", event.Payload) + return + } + w.handleEntityEvent(enterprise, event.Operation) +} + +func (w *Worker) handlePoolEvent(event common.ChangePayload) { + pool, ok := event.Payload.(params.Pool) + if !ok { + slog.DebugContext(w.ctx, "invalid payload type for pool event", "payload", event.Payload) + return + } + entity, err := pool.GetEntity() + if err != nil { + slog.DebugContext(w.ctx, "getting entity from pool", "error", err) + return + } + + switch event.Operation { + case common.CreateOperation, common.UpdateOperation: + cache.SetEntityPool(entity.ID, pool) + case common.DeleteOperation: + cache.DeleteEntityPool(entity.ID, pool.ID) + } +} + +func (w *Worker) handleScaleSetEvent(event common.ChangePayload) { + scaleSet, ok := event.Payload.(params.ScaleSet) + if !ok { + slog.DebugContext(w.ctx, "invalid payload type for pool event", "payload", event.Payload) + return + } + entity, err := scaleSet.GetEntity() + if err != nil { + slog.DebugContext(w.ctx, "getting entity from pool", "error", err) + return + } + + switch event.Operation { + case common.CreateOperation, common.UpdateOperation: + cache.SetEntityScaleSet(entity.ID, scaleSet) + case common.DeleteOperation: + cache.DeleteEntityScaleSet(entity.ID, scaleSet.ID) + } +} + +func (w *Worker) handleInstanceEvent(event common.ChangePayload) { + instance, ok := event.Payload.(params.Instance) + if !ok { + slog.DebugContext(w.ctx, "invalid payload type for instance event", "payload", event.Payload) + return + } + switch event.Operation { + case common.CreateOperation, common.UpdateOperation: + cache.SetInstanceCache(instance) + case common.DeleteOperation: + cache.DeleteInstanceCache(instance.Name) + } +} + +func (w *Worker) handleCredentialsEvent(event common.ChangePayload) { + credentials, ok := event.Payload.(params.GithubCredentials) + if !ok { + slog.DebugContext(w.ctx, "invalid payload type for credentials event", "payload", event.Payload) + return + } + switch event.Operation { + case common.CreateOperation, common.UpdateOperation: + cache.SetGithubCredentials(credentials) + case common.DeleteOperation: + cache.DeleteGithubCredentials(credentials.ID) + } +} + +func (w *Worker) handleEvent(event common.ChangePayload) { + slog.DebugContext(w.ctx, "handling event", "event", event) + switch event.EntityType { + case common.PoolEntityType: + w.handlePoolEvent(event) + case common.ScaleSetEntityType: + w.handleScaleSetEvent(event) + case common.InstanceEntityType: + w.handleInstanceEvent(event) + case common.RepositoryEntityType: + w.handleRepositoryEvent(event) + case common.OrganizationEntityType: + w.handleOrgEvent(event) + case common.EnterpriseEntityType: + w.handleEnterpriseEvent(event) + case common.GithubCredentialsEntityType: + w.handleCredentialsEvent(event) + default: + slog.DebugContext(w.ctx, "unknown entity type", "entity_type", event.EntityType) + } +} + +func (w *Worker) loop() { + defer w.Stop() + for { + select { + case <-w.quit: + return + case event, ok := <-w.consumer.Watch(): + if !ok { + slog.InfoContext(w.ctx, "consumer channel closed") + return + } + w.handleEvent(event) + case <-w.ctx.Done(): + slog.DebugContext(w.ctx, "context done") + return + } + } +} diff --git a/workers/credentials/credentials.go b/workers/credentials/credentials.go deleted file mode 100644 index 7c590401..00000000 --- a/workers/credentials/credentials.go +++ /dev/null @@ -1,133 +0,0 @@ -package credentials - -import ( - "context" - "fmt" - "log/slog" - "sync" - - "github.com/cloudbase/garm/cache" - dbCommon "github.com/cloudbase/garm/database/common" - "github.com/cloudbase/garm/database/watcher" - "github.com/cloudbase/garm/params" - garmUtil "github.com/cloudbase/garm/util" -) - -func NewWorker(ctx context.Context, store dbCommon.Store) (*Worker, error) { - consumerID := "credentials-worker" - - ctx = garmUtil.WithSlogContext( - ctx, - slog.Any("worker", consumerID)) - - return &Worker{ - ctx: ctx, - consumerID: consumerID, - store: store, - running: false, - quit: make(chan struct{}), - credentials: make(map[uint]params.GithubCredentials), - }, nil -} - -// Worker is responsible for maintaining the credentials cache. -type Worker struct { - consumerID string - ctx context.Context - - consumer dbCommon.Consumer - store dbCommon.Store - - credentials map[uint]params.GithubCredentials - - running bool - quit chan struct{} - - mux sync.Mutex -} - -func (w *Worker) loadAllCredentials() error { - creds, err := w.store.ListGithubCredentials(w.ctx) - if err != nil { - return err - } - - for _, cred := range creds { - w.credentials[cred.ID] = cred - cache.SetGithubCredentials(cred) - } - - return nil -} - -func (w *Worker) Start() error { - w.mux.Lock() - defer w.mux.Unlock() - - if w.running { - return nil - } - slog.DebugContext(w.ctx, "starting credentials worker") - if err := w.loadAllCredentials(); err != nil { - return fmt.Errorf("loading credentials: %w", err) - } - - consumer, err := watcher.RegisterConsumer( - w.ctx, w.consumerID, - watcher.WithEntityTypeFilter(dbCommon.GithubCredentialsEntityType), - ) - if err != nil { - return fmt.Errorf("failed to create consumer for entity controller: %w", err) - } - w.consumer = consumer - - w.running = true - go w.loop() - return nil -} - -func (w *Worker) Stop() error { - w.mux.Lock() - defer w.mux.Unlock() - - if !w.running { - return nil - } - - close(w.quit) - w.running = false - - return nil -} - -func (w *Worker) loop() { - defer w.Stop() - - for { - select { - case <-w.quit: - return - case event, ok := <-w.consumer.Watch(): - if !ok { - slog.ErrorContext(w.ctx, "consumer channel closed") - return - } - creds, ok := event.Payload.(params.GithubCredentials) - if !ok { - slog.ErrorContext(w.ctx, "invalid payload for entity type", "entity_type", event.EntityType, "payload", event.Payload) - continue - } - w.mux.Lock() - switch event.Operation { - case dbCommon.DeleteOperation: - slog.DebugContext(w.ctx, "got delete operation") - delete(w.credentials, creds.ID) - cache.DeleteGithubCredentials(creds.ID) - default: - w.credentials[creds.ID] = creds - cache.SetGithubCredentials(creds) - } - w.mux.Unlock() - } - } -} diff --git a/workers/entity/controller.go b/workers/entity/controller.go index 066bdfe3..07fb38ce 100644 --- a/workers/entity/controller.go +++ b/workers/entity/controller.go @@ -7,7 +7,6 @@ import ( "sync" "github.com/cloudbase/garm/auth" - "github.com/cloudbase/garm/cache" dbCommon "github.com/cloudbase/garm/database/common" "github.com/cloudbase/garm/database/watcher" "github.com/cloudbase/garm/runner/common" @@ -67,9 +66,6 @@ func (c *Controller) loadAllRepositories() error { return fmt.Errorf("starting worker: %w", err) } c.Entities[entity.ID] = worker - // take advantage of the fact that we're loading all entities - // and set the cache. - cache.SetEntity(entity) } return nil } @@ -94,9 +90,6 @@ func (c *Controller) loadAllOrganizations() error { return fmt.Errorf("starting worker: %w", err) } c.Entities[entity.ID] = worker - // take advantage of the fact that we're loading all entities - // and set the cache. - cache.SetEntity(entity) } return nil } @@ -121,9 +114,6 @@ func (c *Controller) loadAllEnterprises() error { return fmt.Errorf("starting worker: %w", err) } c.Entities[entity.ID] = worker - // take advantage of the fact that we're loading all entities - // and set the cache. - cache.SetEntity(entity) } return nil } diff --git a/workers/entity/controller_watcher.go b/workers/entity/controller_watcher.go index dcd6ee9a..ace63702 100644 --- a/workers/entity/controller_watcher.go +++ b/workers/entity/controller_watcher.go @@ -3,7 +3,6 @@ package entity import ( "log/slog" - "github.com/cloudbase/garm/cache" dbCommon "github.com/cloudbase/garm/database/common" "github.com/cloudbase/garm/params" ) @@ -96,5 +95,4 @@ func (c *Controller) handleWatcherDeleteOperation(entityGetter params.EntityGett return } delete(c.Entities, entity.ID) - cache.DeleteEntity(entity.ID) } diff --git a/workers/provider/provider.go b/workers/provider/provider.go index 05a78c7e..b1ab1220 100644 --- a/workers/provider/provider.go +++ b/workers/provider/provider.go @@ -8,7 +8,6 @@ import ( commonParams "github.com/cloudbase/garm-provider-common/params" "github.com/cloudbase/garm/auth" - "github.com/cloudbase/garm/cache" dbCommon "github.com/cloudbase/garm/database/common" "github.com/cloudbase/garm/database/watcher" "github.com/cloudbase/garm/params" @@ -81,7 +80,6 @@ func (p *Provider) loadAllRunners() error { } for _, runner := range runners { - cache.SetInstanceCache(runner) // Skip non scale set instances for now. This condition needs to be // removed once we replace the current pool manager. if runner.ScaleSetID == 0 { @@ -239,15 +237,6 @@ func (p *Provider) handleInstanceAdded(instance params.Instance) error { return nil } -func (p *Provider) updateInstanceCache(instance params.Instance, op dbCommon.OperationType) { - if op == dbCommon.DeleteOperation { - slog.DebugContext(p.ctx, "deleting instance from cache", "instance_name", instance.Name) - cache.DeleteInstanceCache(instance.Name) - return - } - cache.SetInstanceCache(instance) -} - func (p *Provider) handleInstanceEvent(event dbCommon.ChangePayload) { p.mux.Lock() defer p.mux.Unlock() @@ -257,7 +246,6 @@ func (p *Provider) handleInstanceEvent(event dbCommon.ChangePayload) { slog.ErrorContext(p.ctx, "invalid payload type", "payload_type", fmt.Sprintf("%T", event.Payload)) return } - p.updateInstanceCache(instance, event.Operation) if instance.ScaleSetID == 0 { slog.DebugContext(p.ctx, "skipping instance event for non scale set instance") @@ -267,14 +255,12 @@ func (p *Provider) handleInstanceEvent(event dbCommon.ChangePayload) { slog.DebugContext(p.ctx, "handling instance event", "instance_name", instance.Name) switch event.Operation { case dbCommon.CreateOperation: - cache.SetInstanceCache(instance) slog.DebugContext(p.ctx, "got create operation") if err := p.handleInstanceAdded(instance); err != nil { slog.ErrorContext(p.ctx, "failed to handle instance added", "error", err) return } case dbCommon.UpdateOperation: - cache.SetInstanceCache(instance) slog.DebugContext(p.ctx, "got update operation") existingInstance, ok := p.runners[instance.Name] if !ok { @@ -300,7 +286,6 @@ func (p *Provider) handleInstanceEvent(event dbCommon.ChangePayload) { } } delete(p.runners, instance.Name) - cache.DeleteInstanceCache(instance.Name) default: slog.ErrorContext(p.ctx, "invalid operation type", "operation_type", event.Operation) return diff --git a/workers/scaleset/controller.go b/workers/scaleset/controller.go index e1758550..b6d61f54 100644 --- a/workers/scaleset/controller.go +++ b/workers/scaleset/controller.go @@ -91,8 +91,6 @@ func (c *Controller) loadAllScaleSets(cli common.GithubClient) error { } for _, sSet := range scaleSets { - cache.SetEntityScaleSet(c.Entity.ID, sSet) - slog.DebugContext(c.ctx, "loading scale set", "scale_set", sSet.ID) if err := c.handleScaleSetCreateOperation(sSet, cli); err != nil { slog.With(slog.Any("error", err)).ErrorContext(c.ctx, "failed to handle scale set create operation") diff --git a/workers/scaleset/controller_watcher.go b/workers/scaleset/controller_watcher.go index 99fd4617..9d94c794 100644 --- a/workers/scaleset/controller_watcher.go +++ b/workers/scaleset/controller_watcher.go @@ -4,7 +4,6 @@ import ( "fmt" "log/slog" - "github.com/cloudbase/garm/cache" dbCommon "github.com/cloudbase/garm/database/common" "github.com/cloudbase/garm/params" "github.com/cloudbase/garm/runner/common" @@ -61,7 +60,6 @@ func (c *Controller) handleScaleSet(event dbCommon.ChangePayload) { func (c *Controller) handleScaleSetCreateOperation(sSet params.ScaleSet, ghCli common.GithubClient) error { c.mux.Lock() defer c.mux.Unlock() - cache.SetEntityScaleSet(c.Entity.ID, sSet) if _, ok := c.ScaleSets[sSet.ID]; ok { slog.DebugContext(c.ctx, "scale set already exists in worker list", "scale_set_id", sSet.ID) @@ -110,7 +108,6 @@ func (c *Controller) handleScaleSetDeleteOperation(sSet params.ScaleSet) error { return fmt.Errorf("stopping scale set worker: %w", err) } delete(c.ScaleSets, sSet.ID) - cache.DeleteEntityScaleSet(c.Entity.ID, sSet.ID) return nil } @@ -118,8 +115,6 @@ func (c *Controller) handleScaleSetUpdateOperation(sSet params.ScaleSet) error { c.mux.Lock() defer c.mux.Unlock() - cache.SetEntityScaleSet(c.Entity.ID, sSet) - set, ok := c.ScaleSets[sSet.ID] if !ok { // Some error may have occurred when the scale set was first created, so we @@ -146,7 +141,6 @@ func (c *Controller) handleCredentialsEvent(event dbCommon.ChangePayload) { c.mux.Lock() defer c.mux.Unlock() - cache.SetGithubCredentials(credentials) if c.Entity.Credentials.ID != credentials.ID { // stale update event. return @@ -185,7 +179,6 @@ func (c *Controller) handleEntityEvent(event dbCommon.ChangePayload) { } } c.Entity = entity - cache.SetEntity(c.Entity) default: slog.ErrorContext(c.ctx, "invalid operation type", "operation_type", event.Operation) return