Merge pull request #437 from gabriel-samfira/several-fixes

Some cleanup
This commit is contained in:
Gabriel 2025-06-21 20:54:03 +03:00 committed by GitHub
commit 42839917f3
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 104 additions and 70 deletions

View file

@ -646,12 +646,6 @@ func (c CreateGiteaEndpointParams) Validate() error {
return runnerErrors.NewBadRequestError("invalid api_base_url")
}
switch url.Scheme {
case httpsScheme, httpScheme:
default:
return runnerErrors.NewBadRequestError("invalid api_base_url")
}
if c.BaseURL == "" {
return runnerErrors.NewBadRequestError("missing base_url")
}

View file

@ -161,13 +161,9 @@ func (t *toolsUpdater) giteaUpdateLoop() {
t.sleepWithCancel(time.Duration(randInt.Int64()) * time.Millisecond)
tools, err := getTools()
if err != nil {
if err := t.store.AddEntityEvent(t.ctx, t.entity, params.StatusEvent, params.EventError, fmt.Sprintf("failed to update gitea tools: %q", err), 30); err != nil {
slog.ErrorContext(t.ctx, "failed to add entity event", "error", err)
}
t.addStatusEvent(fmt.Sprintf("failed to update gitea tools: %q", err), params.EventError)
} else {
if err := t.store.AddEntityEvent(t.ctx, t.entity, params.StatusEvent, params.EventInfo, "successfully updated tools", 30); err != nil {
slog.ErrorContext(t.ctx, "failed to add entity event", "error", err)
}
t.addStatusEvent("successfully updated tools", params.EventInfo)
cache.SetGithubToolsCache(t.entity, tools)
}
@ -184,15 +180,11 @@ func (t *toolsUpdater) giteaUpdateLoop() {
case <-ticker.C:
tools, err := getTools()
if err != nil {
if err := t.store.AddEntityEvent(t.ctx, t.entity, params.StatusEvent, params.EventError, fmt.Sprintf("failed to update gitea tools: %q", err), 30); err != nil {
slog.ErrorContext(t.ctx, "failed to add entity event", "error", err)
}
t.addStatusEvent(fmt.Sprintf("failed to update gitea tools: %q", err), params.EventError)
slog.DebugContext(t.ctx, "failed to update gitea tools", "error", err)
continue
}
if err := t.store.AddEntityEvent(t.ctx, t.entity, params.StatusEvent, params.EventInfo, "successfully updated tools", 30); err != nil {
slog.ErrorContext(t.ctx, "failed to add entity event", "error", err)
}
t.addStatusEvent("successfully updated tools", params.EventInfo)
cache.SetGithubToolsCache(t.entity, tools)
}
}
@ -213,18 +205,13 @@ func (t *toolsUpdater) loop() {
now := time.Now().UTC()
if now.After(t.lastUpdate.Add(40 * time.Minute)) {
if err := t.updateTools(); err != nil {
if err := t.store.AddEntityEvent(t.ctx, t.entity, params.StatusEvent, params.EventError, fmt.Sprintf("failed to update tools: %q", err), 30); err != nil {
slog.ErrorContext(t.ctx, "failed to add entity event", "error", err)
}
slog.ErrorContext(t.ctx, "initial tools update error", "error", err)
slog.ErrorContext(t.ctx, "updating tools", "error", err)
t.addStatusEvent(fmt.Sprintf("failed to update tools: %q", err), params.EventError)
resetTime = now.Add(5 * time.Minute)
slog.ErrorContext(t.ctx, "initial tools update error", "error", err)
} else {
// Tools are usually valid for 1 hour.
resetTime = t.lastUpdate.Add(40 * time.Minute)
if err := t.store.AddEntityEvent(t.ctx, t.entity, params.StatusEvent, params.EventInfo, "successfully updated tools", 30); err != nil {
slog.ErrorContext(t.ctx, "failed to add entity event", "error", err)
}
t.addStatusEvent("successfully updated tools", params.EventInfo)
}
}
@ -248,16 +235,12 @@ func (t *toolsUpdater) loop() {
now = time.Now().UTC()
if err := t.updateTools(); err != nil {
slog.ErrorContext(t.ctx, "updating tools", "error", err)
if err := t.store.AddEntityEvent(t.ctx, t.entity, params.StatusEvent, params.EventError, fmt.Sprintf("failed to update tools: %q", err), 30); err != nil {
slog.ErrorContext(t.ctx, "failed to add entity event", "error", err)
}
t.addStatusEvent(fmt.Sprintf("failed to update tools: %q", err), params.EventError)
resetTime = now.Add(5 * time.Minute)
} else {
// Tools are usually valid for 1 hour.
resetTime = t.lastUpdate.Add(40 * time.Minute)
if err := t.store.AddEntityEvent(t.ctx, t.entity, params.StatusEvent, params.EventInfo, "successfully updated tools", 30); err != nil {
slog.ErrorContext(t.ctx, "failed to add entity event", "error", err)
}
t.addStatusEvent("successfully updated tools", params.EventInfo)
}
case <-t.reset:
slog.DebugContext(t.ctx, "resetting tools updater")
@ -265,18 +248,20 @@ func (t *toolsUpdater) loop() {
now = time.Now().UTC()
if err := t.updateTools(); err != nil {
slog.ErrorContext(t.ctx, "updating tools", "error", err)
if err := t.store.AddEntityEvent(t.ctx, t.entity, params.StatusEvent, params.EventError, fmt.Sprintf("failed to update tools: %q", err), 30); err != nil {
slog.ErrorContext(t.ctx, "failed to add entity event", "error", err)
}
t.addStatusEvent(fmt.Sprintf("failed to update tools: %q", err), params.EventError)
resetTime = now.Add(5 * time.Minute)
} else {
// Tools are usually valid for 1 hour.
resetTime = t.lastUpdate.Add(40 * time.Minute)
if err := t.store.AddEntityEvent(t.ctx, t.entity, params.StatusEvent, params.EventInfo, "successfully updated tools", 30); err != nil {
slog.ErrorContext(t.ctx, "failed to add entity event", "error", err)
}
t.addStatusEvent("successfully updated tools", params.EventInfo)
}
}
timer.Stop()
}
}
func (t *toolsUpdater) addStatusEvent(msg string, level params.EventLevel) {
if err := t.store.AddEntityEvent(t.ctx, t.entity, params.StatusEvent, level, msg, 30); err != nil {
slog.With(slog.Any("error", err)).Error("failed to add entity event")
}
}

View file

@ -14,6 +14,7 @@
package entity
import (
"fmt"
"log/slog"
dbCommon "github.com/cloudbase/garm/database/common"
@ -28,6 +29,7 @@ func (c *Controller) handleWatcherEvent(event dbCommon.ChangePayload) {
repo, ok := event.Payload.(params.Repository)
if !ok {
slog.ErrorContext(c.ctx, "invalid payload for entity type", "entity_type", event.EntityType, "payload", event.Payload)
return
}
entityGetter = repo
case dbCommon.OrganizationEntityType:
@ -35,6 +37,7 @@ func (c *Controller) handleWatcherEvent(event dbCommon.ChangePayload) {
org, ok := event.Payload.(params.Organization)
if !ok {
slog.ErrorContext(c.ctx, "invalid payload for entity type", "entity_type", event.EntityType, "payload", event.Payload)
return
}
entityGetter = org
case dbCommon.EnterpriseEntityType:
@ -42,6 +45,7 @@ func (c *Controller) handleWatcherEvent(event dbCommon.ChangePayload) {
ent, ok := event.Payload.(params.Enterprise)
if !ok {
slog.ErrorContext(c.ctx, "invalid payload for entity type", "entity_type", event.EntityType, "payload", event.Payload)
return
}
entityGetter = ent
default:
@ -49,34 +53,63 @@ func (c *Controller) handleWatcherEvent(event dbCommon.ChangePayload) {
return
}
if entityGetter == nil {
entity, err := entityGetter.GetEntity()
if err != nil {
slog.ErrorContext(c.ctx, "getting entity from repository", "entity_type", event.EntityType, "payload", event.Payload, "error", err)
return
}
switch event.Operation {
case dbCommon.CreateOperation:
slog.DebugContext(c.ctx, "got create operation")
c.handleWatcherCreateOperation(entityGetter, event)
c.handleWatcherCreateOperation(entity)
case dbCommon.DeleteOperation:
slog.DebugContext(c.ctx, "got delete operation")
c.handleWatcherDeleteOperation(entityGetter, event)
c.handleWatcherDeleteOperation(entity)
case dbCommon.UpdateOperation:
slog.DebugContext(c.ctx, "got update operation")
c.handleWatcherUpdateOperation(entity)
default:
slog.ErrorContext(c.ctx, "invalid operation type", "operation_type", event.Operation)
return
}
}
func (c *Controller) handleWatcherCreateOperation(entityGetter params.EntityGetter, event dbCommon.ChangePayload) {
func (c *Controller) handleWatcherUpdateOperation(entity params.ForgeEntity) {
c.mux.Lock()
defer c.mux.Unlock()
entity, err := entityGetter.GetEntity()
if err != nil {
slog.ErrorContext(c.ctx, "getting entity from repository", "entity_type", event.EntityType, "payload", event.Payload, "error", err)
worker, ok := c.Entities[entity.ID]
if !ok {
slog.InfoContext(c.ctx, "entity not found in worker list", "entity_id", entity.ID)
return
}
if worker.IsRunning() {
// The worker is running. It watches for updates to its own entity. We only care about updates
// in the controller, if for some reason, the worker is not running.
slog.DebugContext(c.ctx, "worker is already running, skipping update", "entity_id", entity.ID)
return
}
slog.InfoContext(c.ctx, "updating entity worker", "entity_id", entity.ID, "entity_type", entity.EntityType)
worker.Entity = entity
if err := worker.Start(); err != nil {
slog.ErrorContext(c.ctx, "starting worker after update", "entity_id", entity.ID, "error", err)
worker.addStatusEvent(fmt.Sprintf("failed to start worker for %s (%s) after update: %s", entity.ID, entity.ForgeURL(), err.Error()), params.EventError)
return
}
slog.InfoContext(c.ctx, "entity worker updated and successfully started", "entity_id", entity.ID, "entity_type", entity.EntityType)
worker.addStatusEvent(fmt.Sprintf("worker updated and successfully started for entity: %s (%s)", entity.ID, entity.ForgeURL()), params.EventInfo)
}
func (c *Controller) handleWatcherCreateOperation(entity params.ForgeEntity) {
c.mux.Lock()
defer c.mux.Unlock()
worker, err := NewWorker(c.ctx, c.store, entity, c.providers)
if err != nil {
slog.ErrorContext(c.ctx, "creating worker from repository", "entity_type", event.EntityType, "payload", event.Payload, "error", err)
slog.ErrorContext(c.ctx, "creating worker from repository", "entity_type", entity.EntityType, "error", err)
return
}
@ -89,14 +122,10 @@ func (c *Controller) handleWatcherCreateOperation(entityGetter params.EntityGett
c.Entities[entity.ID] = worker
}
func (c *Controller) handleWatcherDeleteOperation(entityGetter params.EntityGetter, event dbCommon.ChangePayload) {
func (c *Controller) handleWatcherDeleteOperation(entity params.ForgeEntity) {
c.mux.Lock()
defer c.mux.Unlock()
entity, err := entityGetter.GetEntity()
if err != nil {
slog.ErrorContext(c.ctx, "getting entity from repository", "entity_type", event.EntityType, "payload", event.Payload, "error", err)
return
}
worker, ok := c.Entities[entity.ID]
if !ok {
slog.InfoContext(c.ctx, "entity not found in worker list", "entity_id", entity.ID)

View file

@ -40,6 +40,7 @@ func composeControllerWatcherFilters() dbCommon.PayloadFilterFunc {
watcher.WithAny(
watcher.WithOperationTypeFilter(dbCommon.CreateOperation),
watcher.WithOperationTypeFilter(dbCommon.DeleteOperation),
watcher.WithOperationTypeFilter(dbCommon.UpdateOperation),
),
)
}

View file

@ -132,6 +132,12 @@ func (w *Worker) Start() (err error) {
return nil
}
func (w *Worker) IsRunning() bool {
w.mux.Lock()
defer w.mux.Unlock()
return w.running
}
// consolidateRunnerState will list all runners on GitHub for this entity, sort by
// pool or scale set and pass those runners to the appropriate controller (pools or scale sets).
// The controller will then pass along to their respective workers the list of runners
@ -212,9 +218,7 @@ func (w *Worker) consolidateRunnerLoop() {
return
}
if err := w.consolidateRunnerState(); err != nil {
if err := w.store.AddEntityEvent(w.ctx, w.Entity, params.StatusEvent, params.EventError, fmt.Sprintf("failed to consolidate runner state: %q", err.Error()), 30); err != nil {
slog.With(slog.Any("error", err)).Error("failed to add entity event")
}
w.addStatusEvent(fmt.Sprintf("failed to consolidate runner state: %q", err.Error()), params.EventError)
slog.With(slog.Any("error", err)).Error("failed to consolidate runner state")
}
case <-w.ctx.Done():
@ -239,3 +243,9 @@ func (w *Worker) loop() {
}
}
}
func (w *Worker) addStatusEvent(msg string, level params.EventLevel) {
if err := w.store.AddEntityEvent(w.ctx, w.Entity, params.StatusEvent, level, msg, 30); err != nil {
slog.With(slog.Any("error", err)).Error("failed to add entity event")
}
}

View file

@ -29,7 +29,6 @@ func (w *Worker) handleWorkerWatcherEvent(event dbCommon.ChangePayload) {
switch event.EntityType {
case entityType:
w.handleEntityEventPayload(event)
return
case dbCommon.GithubCredentialsEntityType, dbCommon.GiteaCredentialsEntityType:
slog.DebugContext(w.ctx, "got github credentials payload event")
w.handleEntityCredentialsEventPayload(event)
@ -90,20 +89,18 @@ func (w *Worker) handleEntityCredentialsEventPayload(event dbCommon.ChangePayloa
return
}
credentials := creds
switch event.Operation {
case dbCommon.UpdateOperation:
slog.DebugContext(w.ctx, "got delete operation")
w.mux.Lock()
defer w.mux.Unlock()
if w.Entity.Credentials.GetID() != credentials.GetID() {
if w.Entity.Credentials.GetID() != creds.GetID() {
// The channel is buffered. We may get an old update. If credentials get updated
// immediately after they are swapped on the entity, we may still get an update
// pushed to the channel before the filters are swapped. We can ignore the update.
return
}
w.Entity.Credentials = credentials
w.Entity.Credentials = creds
ghCli, err := github.Client(w.ctx, w.Entity)
if err != nil {
slog.ErrorContext(w.ctx, "creating github client", "entity_id", w.Entity.ID, "error", err)

View file

@ -646,24 +646,38 @@ func (w *Worker) sleepWithCancel(sleepTime time.Duration) (canceled bool) {
return true
}
func (w *Worker) sessionLoopMayRun() bool {
w.mux.Lock()
defer w.mux.Unlock()
return w.scaleSet.Enabled
}
func (w *Worker) keepListenerAlive() {
var backoff time.Duration
Loop:
for {
w.mux.Lock()
if !w.scaleSet.Enabled {
if !w.sessionLoopMayRun() {
if canceled := w.sleepWithCancel(2 * time.Second); canceled {
slog.DebugContext(w.ctx, "worker is stopped; exiting keepListenerAlive")
w.mux.Unlock()
slog.InfoContext(w.ctx, "worker is stopped; exiting keepListenerAlive")
return
}
w.mux.Unlock()
continue
}
// noop if already started. If the scaleset was just enabled, we need to
// start the listener here, or the <-w.listener.Wait() channel receive bellow
// will block forever, even if we start the listener, as a nil channel will
// block forever.
w.listener.Start()
if err := w.listener.Start(); err != nil {
slog.ErrorContext(w.ctx, "error starting listener", "error", err, "consumer_id", w.consumerID)
if canceled := w.sleepWithCancel(2 * time.Second); canceled {
slog.InfoContext(w.ctx, "worker is stopped; exiting keepListenerAlive")
w.mux.Unlock()
return
}
// we failed to start the listener. Try again.
w.mux.Unlock()
continue
}
w.mux.Unlock()
select {
@ -675,8 +689,9 @@ func (w *Worker) keepListenerAlive() {
slog.DebugContext(w.ctx, "listener is stopped; attempting to restart")
w.mux.Lock()
if !w.scaleSet.Enabled {
w.listener.Stop() // cleanup
w.mux.Unlock()
continue
continue Loop
}
w.mux.Unlock()
for {
@ -684,7 +699,7 @@ func (w *Worker) keepListenerAlive() {
w.listener.Stop() // cleanup
if !w.scaleSet.Enabled {
w.mux.Unlock()
break
continue Loop
}
slog.DebugContext(w.ctx, "attempting to restart")
if err := w.listener.Start(); err != nil {
@ -707,7 +722,7 @@ func (w *Worker) keepListenerAlive() {
continue
}
w.mux.Unlock()
break
continue Loop
}
}
}

View file

@ -59,6 +59,10 @@ func (l *scaleSetListener) Start() error {
l.mux.Lock()
defer l.mux.Unlock()
if l.running {
return nil
}
l.listenerCtx, l.cancelFunc = context.WithCancel(context.Background())
scaleSet := l.scaleSetHelper.GetScaleSet()
scaleSetClient, err := l.scaleSetHelper.GetScaleSetClient()
@ -103,7 +107,6 @@ func (l *scaleSetListener) Stop() error {
}
}
l.messageSession.Close()
l.running = false
close(l.quit)
l.cancelFunc()