From 1ec99e8695bf902420136662b641aa194038a795 Mon Sep 17 00:00:00 2001 From: Gabriel Adrian Samfira Date: Sat, 31 May 2025 20:55:21 +0000 Subject: [PATCH] Some cleanup Signed-off-by: Gabriel Adrian Samfira --- params/requests.go | 6 --- workers/cache/tool_cache.go | 49 ++++++++--------------- workers/entity/controller_watcher.go | 57 ++++++++++++++++++++------- workers/entity/util.go | 1 + workers/entity/worker.go | 16 ++++++-- workers/entity/worker_watcher.go | 7 +--- workers/scaleset/scaleset.go | 33 +++++++++++----- workers/scaleset/scaleset_listener.go | 5 ++- 8 files changed, 104 insertions(+), 70 deletions(-) diff --git a/params/requests.go b/params/requests.go index 5be0e3a1..3f2fcfab 100644 --- a/params/requests.go +++ b/params/requests.go @@ -646,12 +646,6 @@ func (c CreateGiteaEndpointParams) Validate() error { return runnerErrors.NewBadRequestError("invalid api_base_url") } - switch url.Scheme { - case httpsScheme, httpScheme: - default: - return runnerErrors.NewBadRequestError("invalid api_base_url") - } - if c.BaseURL == "" { return runnerErrors.NewBadRequestError("missing base_url") } diff --git a/workers/cache/tool_cache.go b/workers/cache/tool_cache.go index 3df103ec..727c82b4 100644 --- a/workers/cache/tool_cache.go +++ b/workers/cache/tool_cache.go @@ -161,13 +161,9 @@ func (t *toolsUpdater) giteaUpdateLoop() { t.sleepWithCancel(time.Duration(randInt.Int64()) * time.Millisecond) tools, err := getTools() if err != nil { - if err := t.store.AddEntityEvent(t.ctx, t.entity, params.StatusEvent, params.EventError, fmt.Sprintf("failed to update gitea tools: %q", err), 30); err != nil { - slog.ErrorContext(t.ctx, "failed to add entity event", "error", err) - } + t.addStatusEvent(fmt.Sprintf("failed to update gitea tools: %q", err), params.EventError) } else { - if err := t.store.AddEntityEvent(t.ctx, t.entity, params.StatusEvent, params.EventInfo, "successfully updated tools", 30); err != nil { - slog.ErrorContext(t.ctx, "failed to add entity event", "error", err) - } + t.addStatusEvent("successfully updated tools", params.EventInfo) cache.SetGithubToolsCache(t.entity, tools) } @@ -184,15 +180,11 @@ func (t *toolsUpdater) giteaUpdateLoop() { case <-ticker.C: tools, err := getTools() if err != nil { - if err := t.store.AddEntityEvent(t.ctx, t.entity, params.StatusEvent, params.EventError, fmt.Sprintf("failed to update gitea tools: %q", err), 30); err != nil { - slog.ErrorContext(t.ctx, "failed to add entity event", "error", err) - } + t.addStatusEvent(fmt.Sprintf("failed to update gitea tools: %q", err), params.EventError) slog.DebugContext(t.ctx, "failed to update gitea tools", "error", err) continue } - if err := t.store.AddEntityEvent(t.ctx, t.entity, params.StatusEvent, params.EventInfo, "successfully updated tools", 30); err != nil { - slog.ErrorContext(t.ctx, "failed to add entity event", "error", err) - } + t.addStatusEvent("successfully updated tools", params.EventInfo) cache.SetGithubToolsCache(t.entity, tools) } } @@ -213,18 +205,13 @@ func (t *toolsUpdater) loop() { now := time.Now().UTC() if now.After(t.lastUpdate.Add(40 * time.Minute)) { if err := t.updateTools(); err != nil { - if err := t.store.AddEntityEvent(t.ctx, t.entity, params.StatusEvent, params.EventError, fmt.Sprintf("failed to update tools: %q", err), 30); err != nil { - slog.ErrorContext(t.ctx, "failed to add entity event", "error", err) - } - slog.ErrorContext(t.ctx, "initial tools update error", "error", err) + slog.ErrorContext(t.ctx, "updating tools", "error", err) + t.addStatusEvent(fmt.Sprintf("failed to update tools: %q", err), params.EventError) resetTime = now.Add(5 * time.Minute) - slog.ErrorContext(t.ctx, "initial tools update error", "error", err) } else { // Tools are usually valid for 1 hour. resetTime = t.lastUpdate.Add(40 * time.Minute) - if err := t.store.AddEntityEvent(t.ctx, t.entity, params.StatusEvent, params.EventInfo, "successfully updated tools", 30); err != nil { - slog.ErrorContext(t.ctx, "failed to add entity event", "error", err) - } + t.addStatusEvent("successfully updated tools", params.EventInfo) } } @@ -248,16 +235,12 @@ func (t *toolsUpdater) loop() { now = time.Now().UTC() if err := t.updateTools(); err != nil { slog.ErrorContext(t.ctx, "updating tools", "error", err) - if err := t.store.AddEntityEvent(t.ctx, t.entity, params.StatusEvent, params.EventError, fmt.Sprintf("failed to update tools: %q", err), 30); err != nil { - slog.ErrorContext(t.ctx, "failed to add entity event", "error", err) - } + t.addStatusEvent(fmt.Sprintf("failed to update tools: %q", err), params.EventError) resetTime = now.Add(5 * time.Minute) } else { // Tools are usually valid for 1 hour. resetTime = t.lastUpdate.Add(40 * time.Minute) - if err := t.store.AddEntityEvent(t.ctx, t.entity, params.StatusEvent, params.EventInfo, "successfully updated tools", 30); err != nil { - slog.ErrorContext(t.ctx, "failed to add entity event", "error", err) - } + t.addStatusEvent("successfully updated tools", params.EventInfo) } case <-t.reset: slog.DebugContext(t.ctx, "resetting tools updater") @@ -265,18 +248,20 @@ func (t *toolsUpdater) loop() { now = time.Now().UTC() if err := t.updateTools(); err != nil { slog.ErrorContext(t.ctx, "updating tools", "error", err) - if err := t.store.AddEntityEvent(t.ctx, t.entity, params.StatusEvent, params.EventError, fmt.Sprintf("failed to update tools: %q", err), 30); err != nil { - slog.ErrorContext(t.ctx, "failed to add entity event", "error", err) - } + t.addStatusEvent(fmt.Sprintf("failed to update tools: %q", err), params.EventError) resetTime = now.Add(5 * time.Minute) } else { // Tools are usually valid for 1 hour. resetTime = t.lastUpdate.Add(40 * time.Minute) - if err := t.store.AddEntityEvent(t.ctx, t.entity, params.StatusEvent, params.EventInfo, "successfully updated tools", 30); err != nil { - slog.ErrorContext(t.ctx, "failed to add entity event", "error", err) - } + t.addStatusEvent("successfully updated tools", params.EventInfo) } } timer.Stop() } } + +func (t *toolsUpdater) addStatusEvent(msg string, level params.EventLevel) { + if err := t.store.AddEntityEvent(t.ctx, t.entity, params.StatusEvent, level, msg, 30); err != nil { + slog.With(slog.Any("error", err)).Error("failed to add entity event") + } +} diff --git a/workers/entity/controller_watcher.go b/workers/entity/controller_watcher.go index 6bd3e173..d907d25a 100644 --- a/workers/entity/controller_watcher.go +++ b/workers/entity/controller_watcher.go @@ -14,6 +14,7 @@ package entity import ( + "fmt" "log/slog" dbCommon "github.com/cloudbase/garm/database/common" @@ -28,6 +29,7 @@ func (c *Controller) handleWatcherEvent(event dbCommon.ChangePayload) { repo, ok := event.Payload.(params.Repository) if !ok { slog.ErrorContext(c.ctx, "invalid payload for entity type", "entity_type", event.EntityType, "payload", event.Payload) + return } entityGetter = repo case dbCommon.OrganizationEntityType: @@ -35,6 +37,7 @@ func (c *Controller) handleWatcherEvent(event dbCommon.ChangePayload) { org, ok := event.Payload.(params.Organization) if !ok { slog.ErrorContext(c.ctx, "invalid payload for entity type", "entity_type", event.EntityType, "payload", event.Payload) + return } entityGetter = org case dbCommon.EnterpriseEntityType: @@ -42,6 +45,7 @@ func (c *Controller) handleWatcherEvent(event dbCommon.ChangePayload) { ent, ok := event.Payload.(params.Enterprise) if !ok { slog.ErrorContext(c.ctx, "invalid payload for entity type", "entity_type", event.EntityType, "payload", event.Payload) + return } entityGetter = ent default: @@ -49,34 +53,63 @@ func (c *Controller) handleWatcherEvent(event dbCommon.ChangePayload) { return } - if entityGetter == nil { + entity, err := entityGetter.GetEntity() + if err != nil { + slog.ErrorContext(c.ctx, "getting entity from repository", "entity_type", event.EntityType, "payload", event.Payload, "error", err) return } switch event.Operation { case dbCommon.CreateOperation: slog.DebugContext(c.ctx, "got create operation") - c.handleWatcherCreateOperation(entityGetter, event) + c.handleWatcherCreateOperation(entity) case dbCommon.DeleteOperation: slog.DebugContext(c.ctx, "got delete operation") - c.handleWatcherDeleteOperation(entityGetter, event) + c.handleWatcherDeleteOperation(entity) + case dbCommon.UpdateOperation: + slog.DebugContext(c.ctx, "got update operation") + c.handleWatcherUpdateOperation(entity) default: slog.ErrorContext(c.ctx, "invalid operation type", "operation_type", event.Operation) return } } -func (c *Controller) handleWatcherCreateOperation(entityGetter params.EntityGetter, event dbCommon.ChangePayload) { +func (c *Controller) handleWatcherUpdateOperation(entity params.ForgeEntity) { c.mux.Lock() defer c.mux.Unlock() - entity, err := entityGetter.GetEntity() - if err != nil { - slog.ErrorContext(c.ctx, "getting entity from repository", "entity_type", event.EntityType, "payload", event.Payload, "error", err) + + worker, ok := c.Entities[entity.ID] + if !ok { + slog.InfoContext(c.ctx, "entity not found in worker list", "entity_id", entity.ID) return } + + if worker.IsRunning() { + // The worker is running. It watches for updates to its own entity. We only care about updates + // in the controller, if for some reason, the worker is not running. + slog.DebugContext(c.ctx, "worker is already running, skipping update", "entity_id", entity.ID) + return + } + + slog.InfoContext(c.ctx, "updating entity worker", "entity_id", entity.ID, "entity_type", entity.EntityType) + worker.Entity = entity + if err := worker.Start(); err != nil { + slog.ErrorContext(c.ctx, "starting worker after update", "entity_id", entity.ID, "error", err) + worker.addStatusEvent(fmt.Sprintf("failed to start worker for %s (%s) after update: %s", entity.ID, entity.ForgeURL(), err.Error()), params.EventError) + return + } + slog.InfoContext(c.ctx, "entity worker updated and successfully started", "entity_id", entity.ID, "entity_type", entity.EntityType) + worker.addStatusEvent(fmt.Sprintf("worker updated and successfully started for entity: %s (%s)", entity.ID, entity.ForgeURL()), params.EventInfo) +} + +func (c *Controller) handleWatcherCreateOperation(entity params.ForgeEntity) { + c.mux.Lock() + defer c.mux.Unlock() + worker, err := NewWorker(c.ctx, c.store, entity, c.providers) if err != nil { - slog.ErrorContext(c.ctx, "creating worker from repository", "entity_type", event.EntityType, "payload", event.Payload, "error", err) + slog.ErrorContext(c.ctx, "creating worker from repository", "entity_type", entity.EntityType, "error", err) return } @@ -89,14 +122,10 @@ func (c *Controller) handleWatcherCreateOperation(entityGetter params.EntityGett c.Entities[entity.ID] = worker } -func (c *Controller) handleWatcherDeleteOperation(entityGetter params.EntityGetter, event dbCommon.ChangePayload) { +func (c *Controller) handleWatcherDeleteOperation(entity params.ForgeEntity) { c.mux.Lock() defer c.mux.Unlock() - entity, err := entityGetter.GetEntity() - if err != nil { - slog.ErrorContext(c.ctx, "getting entity from repository", "entity_type", event.EntityType, "payload", event.Payload, "error", err) - return - } + worker, ok := c.Entities[entity.ID] if !ok { slog.InfoContext(c.ctx, "entity not found in worker list", "entity_id", entity.ID) diff --git a/workers/entity/util.go b/workers/entity/util.go index 2216c326..38e011b0 100644 --- a/workers/entity/util.go +++ b/workers/entity/util.go @@ -40,6 +40,7 @@ func composeControllerWatcherFilters() dbCommon.PayloadFilterFunc { watcher.WithAny( watcher.WithOperationTypeFilter(dbCommon.CreateOperation), watcher.WithOperationTypeFilter(dbCommon.DeleteOperation), + watcher.WithOperationTypeFilter(dbCommon.UpdateOperation), ), ) } diff --git a/workers/entity/worker.go b/workers/entity/worker.go index 583ab2c8..1cb40ad5 100644 --- a/workers/entity/worker.go +++ b/workers/entity/worker.go @@ -132,6 +132,12 @@ func (w *Worker) Start() (err error) { return nil } +func (w *Worker) IsRunning() bool { + w.mux.Lock() + defer w.mux.Unlock() + return w.running +} + // consolidateRunnerState will list all runners on GitHub for this entity, sort by // pool or scale set and pass those runners to the appropriate controller (pools or scale sets). // The controller will then pass along to their respective workers the list of runners @@ -212,9 +218,7 @@ func (w *Worker) consolidateRunnerLoop() { return } if err := w.consolidateRunnerState(); err != nil { - if err := w.store.AddEntityEvent(w.ctx, w.Entity, params.StatusEvent, params.EventError, fmt.Sprintf("failed to consolidate runner state: %q", err.Error()), 30); err != nil { - slog.With(slog.Any("error", err)).Error("failed to add entity event") - } + w.addStatusEvent(fmt.Sprintf("failed to consolidate runner state: %q", err.Error()), params.EventError) slog.With(slog.Any("error", err)).Error("failed to consolidate runner state") } case <-w.ctx.Done(): @@ -239,3 +243,9 @@ func (w *Worker) loop() { } } } + +func (w *Worker) addStatusEvent(msg string, level params.EventLevel) { + if err := w.store.AddEntityEvent(w.ctx, w.Entity, params.StatusEvent, level, msg, 30); err != nil { + slog.With(slog.Any("error", err)).Error("failed to add entity event") + } +} diff --git a/workers/entity/worker_watcher.go b/workers/entity/worker_watcher.go index 9acfbc60..ce8fd244 100644 --- a/workers/entity/worker_watcher.go +++ b/workers/entity/worker_watcher.go @@ -29,7 +29,6 @@ func (w *Worker) handleWorkerWatcherEvent(event dbCommon.ChangePayload) { switch event.EntityType { case entityType: w.handleEntityEventPayload(event) - return case dbCommon.GithubCredentialsEntityType, dbCommon.GiteaCredentialsEntityType: slog.DebugContext(w.ctx, "got github credentials payload event") w.handleEntityCredentialsEventPayload(event) @@ -90,20 +89,18 @@ func (w *Worker) handleEntityCredentialsEventPayload(event dbCommon.ChangePayloa return } - credentials := creds - switch event.Operation { case dbCommon.UpdateOperation: slog.DebugContext(w.ctx, "got delete operation") w.mux.Lock() defer w.mux.Unlock() - if w.Entity.Credentials.GetID() != credentials.GetID() { + if w.Entity.Credentials.GetID() != creds.GetID() { // The channel is buffered. We may get an old update. If credentials get updated // immediately after they are swapped on the entity, we may still get an update // pushed to the channel before the filters are swapped. We can ignore the update. return } - w.Entity.Credentials = credentials + w.Entity.Credentials = creds ghCli, err := github.Client(w.ctx, w.Entity) if err != nil { slog.ErrorContext(w.ctx, "creating github client", "entity_id", w.Entity.ID, "error", err) diff --git a/workers/scaleset/scaleset.go b/workers/scaleset/scaleset.go index e4faba70..5022217a 100644 --- a/workers/scaleset/scaleset.go +++ b/workers/scaleset/scaleset.go @@ -646,24 +646,38 @@ func (w *Worker) sleepWithCancel(sleepTime time.Duration) (canceled bool) { return true } +func (w *Worker) sessionLoopMayRun() bool { + w.mux.Lock() + defer w.mux.Unlock() + return w.scaleSet.Enabled +} + func (w *Worker) keepListenerAlive() { var backoff time.Duration +Loop: for { - w.mux.Lock() - if !w.scaleSet.Enabled { + if !w.sessionLoopMayRun() { if canceled := w.sleepWithCancel(2 * time.Second); canceled { - slog.DebugContext(w.ctx, "worker is stopped; exiting keepListenerAlive") - w.mux.Unlock() + slog.InfoContext(w.ctx, "worker is stopped; exiting keepListenerAlive") return } - w.mux.Unlock() continue } // noop if already started. If the scaleset was just enabled, we need to // start the listener here, or the <-w.listener.Wait() channel receive bellow // will block forever, even if we start the listener, as a nil channel will // block forever. - w.listener.Start() + if err := w.listener.Start(); err != nil { + slog.ErrorContext(w.ctx, "error starting listener", "error", err, "consumer_id", w.consumerID) + if canceled := w.sleepWithCancel(2 * time.Second); canceled { + slog.InfoContext(w.ctx, "worker is stopped; exiting keepListenerAlive") + w.mux.Unlock() + return + } + // we failed to start the listener. Try again. + w.mux.Unlock() + continue + } w.mux.Unlock() select { @@ -675,8 +689,9 @@ func (w *Worker) keepListenerAlive() { slog.DebugContext(w.ctx, "listener is stopped; attempting to restart") w.mux.Lock() if !w.scaleSet.Enabled { + w.listener.Stop() // cleanup w.mux.Unlock() - continue + continue Loop } w.mux.Unlock() for { @@ -684,7 +699,7 @@ func (w *Worker) keepListenerAlive() { w.listener.Stop() // cleanup if !w.scaleSet.Enabled { w.mux.Unlock() - break + continue Loop } slog.DebugContext(w.ctx, "attempting to restart") if err := w.listener.Start(); err != nil { @@ -707,7 +722,7 @@ func (w *Worker) keepListenerAlive() { continue } w.mux.Unlock() - break + continue Loop } } } diff --git a/workers/scaleset/scaleset_listener.go b/workers/scaleset/scaleset_listener.go index 76a321f4..d69092f5 100644 --- a/workers/scaleset/scaleset_listener.go +++ b/workers/scaleset/scaleset_listener.go @@ -59,6 +59,10 @@ func (l *scaleSetListener) Start() error { l.mux.Lock() defer l.mux.Unlock() + if l.running { + return nil + } + l.listenerCtx, l.cancelFunc = context.WithCancel(context.Background()) scaleSet := l.scaleSetHelper.GetScaleSet() scaleSetClient, err := l.scaleSetHelper.GetScaleSetClient() @@ -103,7 +107,6 @@ func (l *scaleSetListener) Stop() error { } } - l.messageSession.Close() l.running = false close(l.quit) l.cancelFunc()