diff --git a/workers/provider/provider.go b/workers/provider/provider.go index 78e50955..92be1390 100644 --- a/workers/provider/provider.go +++ b/workers/provider/provider.go @@ -189,7 +189,7 @@ func (p *Provider) loop() { slog.ErrorContext(p.ctx, "watcher channel closed") return } - slog.InfoContext(p.ctx, "received payload") + slog.InfoContext(p.ctx, "received payload", "operation", payload.Operation, "entity_type", payload.EntityType) go p.handleWatcherEvent(payload) case <-p.ctx.Done(): return @@ -250,6 +250,20 @@ func (p *Provider) handleInstanceAdded(instance params.Instance) error { return nil } +func (p *Provider) stopAndDeleteInstance(instance params.Instance) error { + if instance.Status != commonParams.InstanceDeleted { + return nil + } + existingInstance, ok := p.runners[instance.Name] + if ok { + if err := existingInstance.Stop(); err != nil { + return fmt.Errorf("failed to stop instance manager: %w", err) + } + delete(p.runners, instance.Name) + } + return nil +} + func (p *Provider) handleInstanceEvent(event dbCommon.ChangePayload) { p.mux.Lock() defer p.mux.Unlock() @@ -265,7 +279,7 @@ func (p *Provider) handleInstanceEvent(event dbCommon.ChangePayload) { return } - slog.DebugContext(p.ctx, "handling instance event", "instance_name", instance.Name) + slog.DebugContext(p.ctx, "handling instance event", "instance_name", instance.Name, "operation", event.Operation) switch event.Operation { case dbCommon.CreateOperation: slog.DebugContext(p.ctx, "got create operation") @@ -284,21 +298,24 @@ func (p *Provider) handleInstanceEvent(event dbCommon.ChangePayload) { } } else { slog.DebugContext(p.ctx, "updating instance", "instance_name", instance.Name) + if instance.Status == commonParams.InstanceDeleted { + if err := p.stopAndDeleteInstance(instance); err != nil { + slog.ErrorContext(p.ctx, "failed to clean up instance manager", "error", err) + return + } + return + } if err := existingInstance.Update(event); err != nil { - slog.ErrorContext(p.ctx, "failed to update instance", "error", err) + slog.ErrorContext(p.ctx, "failed to update instance", "error", err, "instance_name", instance.Name, "payload", event.Payload) return } } case dbCommon.DeleteOperation: slog.DebugContext(p.ctx, "got delete operation", "instance_name", instance.Name) - existingInstance, ok := p.runners[instance.Name] - if ok { - if err := existingInstance.Stop(); err != nil { - slog.ErrorContext(p.ctx, "failed to stop instance", "error", err) - return - } + if err := p.stopAndDeleteInstance(instance); err != nil { + slog.ErrorContext(p.ctx, "failed to clean up instance manager", "error", err) + return } - delete(p.runners, instance.Name) default: slog.ErrorContext(p.ctx, "invalid operation type", "operation_type", event.Operation) return diff --git a/workers/scaleset/controller.go b/workers/scaleset/controller.go index 32d3d713..1452daeb 100644 --- a/workers/scaleset/controller.go +++ b/workers/scaleset/controller.go @@ -166,7 +166,7 @@ func (c *Controller) Stop() error { // ConsolidateRunnerState will send a list of existing github runners to each scale set worker. // The scale set worker will then need to cross check the existing runners in Github with the sate -// in the database. Any inconsistencies will b reconciliated. This cleans up any manually removed +// in the database. Any inconsistencies will be reconciliated. This cleans up any manually removed // runners in either github or the providers. func (c *Controller) ConsolidateRunnerState(byScaleSetID map[int][]params.RunnerReference) error { g, ctx := errgroup.WithContext(c.ctx) diff --git a/workers/scaleset/scaleset.go b/workers/scaleset/scaleset.go index 8c0abefa..49c8ed3b 100644 --- a/workers/scaleset/scaleset.go +++ b/workers/scaleset/scaleset.go @@ -131,6 +131,12 @@ func (w *Worker) ensureScaleSetInGitHub() error { if err != nil { return fmt.Errorf("failed to update scale set: %w", err) } + + // The scale set was recreated. We need to reset the last message ID we recorded previously, + // otherwise we'll ignore every message we get from the queue. + if err := w.SetLastMessageID(0); err != nil { + return fmt.Errorf("failed to reset last message id: %w", err) + } w.scaleSet.ScaleSetID = runnerScaleSet.ID return nil @@ -615,6 +621,7 @@ func (w *Worker) handleInstanceCleanup(instance params.Instance) error { return fmt.Errorf("deleting instance %s: %w", instance.ID, err) } } + delete(w.runners, instance.ID) } return nil } diff --git a/workers/scaleset/scaleset_listener.go b/workers/scaleset/scaleset_listener.go index 7808f9f6..3cdcae62 100644 --- a/workers/scaleset/scaleset_listener.go +++ b/workers/scaleset/scaleset_listener.go @@ -138,7 +138,7 @@ func (l *scaleSetListener) handleSessionMessage(msg params.RunnerScaleSetMessage } if msg.MessageID < l.lastMessageID { - slog.DebugContext(l.ctx, "message is older than last message, ignoring") + slog.InfoContext(l.ctx, "message is older than last message, ignoring", "received_msg_id", fmt.Sprintf("%d", msg.MessageID), "recorded_msg_id", fmt.Sprintf("%d", l.lastMessageID)) return }