Scale sets stability fixes

This change adds a number of fixes for scale sets:

* Reset last message ID when we need to recreate the scale set in GitHub.
Message ID gets reset in github when this happens and we end up ignoring
messages because we see that they are older than we have recorded.
* Clean up deleted instances from state scale set state
* Properly stop instance handler in the provider worker when an update
operation comes in that signals that an instance has been marked as "deleted"

Signed-off-by: Gabriel Adrian Samfira <gsamfira@cloudbasesolutions.com>
This commit is contained in:
Gabriel Adrian Samfira 2025-08-28 11:53:40 +00:00
parent cef85036b1
commit 62a038f6a1
4 changed files with 36 additions and 12 deletions

View file

@ -189,7 +189,7 @@ func (p *Provider) loop() {
slog.ErrorContext(p.ctx, "watcher channel closed")
return
}
slog.InfoContext(p.ctx, "received payload")
slog.InfoContext(p.ctx, "received payload", "operation", payload.Operation, "entity_type", payload.EntityType)
go p.handleWatcherEvent(payload)
case <-p.ctx.Done():
return
@ -250,6 +250,20 @@ func (p *Provider) handleInstanceAdded(instance params.Instance) error {
return nil
}
func (p *Provider) stopAndDeleteInstance(instance params.Instance) error {
if instance.Status != commonParams.InstanceDeleted {
return nil
}
existingInstance, ok := p.runners[instance.Name]
if ok {
if err := existingInstance.Stop(); err != nil {
return fmt.Errorf("failed to stop instance manager: %w", err)
}
delete(p.runners, instance.Name)
}
return nil
}
func (p *Provider) handleInstanceEvent(event dbCommon.ChangePayload) {
p.mux.Lock()
defer p.mux.Unlock()
@ -265,7 +279,7 @@ func (p *Provider) handleInstanceEvent(event dbCommon.ChangePayload) {
return
}
slog.DebugContext(p.ctx, "handling instance event", "instance_name", instance.Name)
slog.DebugContext(p.ctx, "handling instance event", "instance_name", instance.Name, "operation", event.Operation)
switch event.Operation {
case dbCommon.CreateOperation:
slog.DebugContext(p.ctx, "got create operation")
@ -284,21 +298,24 @@ func (p *Provider) handleInstanceEvent(event dbCommon.ChangePayload) {
}
} else {
slog.DebugContext(p.ctx, "updating instance", "instance_name", instance.Name)
if instance.Status == commonParams.InstanceDeleted {
if err := p.stopAndDeleteInstance(instance); err != nil {
slog.ErrorContext(p.ctx, "failed to clean up instance manager", "error", err)
return
}
return
}
if err := existingInstance.Update(event); err != nil {
slog.ErrorContext(p.ctx, "failed to update instance", "error", err)
slog.ErrorContext(p.ctx, "failed to update instance", "error", err, "instance_name", instance.Name, "payload", event.Payload)
return
}
}
case dbCommon.DeleteOperation:
slog.DebugContext(p.ctx, "got delete operation", "instance_name", instance.Name)
existingInstance, ok := p.runners[instance.Name]
if ok {
if err := existingInstance.Stop(); err != nil {
slog.ErrorContext(p.ctx, "failed to stop instance", "error", err)
return
}
if err := p.stopAndDeleteInstance(instance); err != nil {
slog.ErrorContext(p.ctx, "failed to clean up instance manager", "error", err)
return
}
delete(p.runners, instance.Name)
default:
slog.ErrorContext(p.ctx, "invalid operation type", "operation_type", event.Operation)
return

View file

@ -166,7 +166,7 @@ func (c *Controller) Stop() error {
// ConsolidateRunnerState will send a list of existing github runners to each scale set worker.
// The scale set worker will then need to cross check the existing runners in Github with the sate
// in the database. Any inconsistencies will b reconciliated. This cleans up any manually removed
// in the database. Any inconsistencies will be reconciliated. This cleans up any manually removed
// runners in either github or the providers.
func (c *Controller) ConsolidateRunnerState(byScaleSetID map[int][]params.RunnerReference) error {
g, ctx := errgroup.WithContext(c.ctx)

View file

@ -131,6 +131,12 @@ func (w *Worker) ensureScaleSetInGitHub() error {
if err != nil {
return fmt.Errorf("failed to update scale set: %w", err)
}
// The scale set was recreated. We need to reset the last message ID we recorded previously,
// otherwise we'll ignore every message we get from the queue.
if err := w.SetLastMessageID(0); err != nil {
return fmt.Errorf("failed to reset last message id: %w", err)
}
w.scaleSet.ScaleSetID = runnerScaleSet.ID
return nil
@ -615,6 +621,7 @@ func (w *Worker) handleInstanceCleanup(instance params.Instance) error {
return fmt.Errorf("deleting instance %s: %w", instance.ID, err)
}
}
delete(w.runners, instance.ID)
}
return nil
}

View file

@ -138,7 +138,7 @@ func (l *scaleSetListener) handleSessionMessage(msg params.RunnerScaleSetMessage
}
if msg.MessageID < l.lastMessageID {
slog.DebugContext(l.ctx, "message is older than last message, ignoring")
slog.InfoContext(l.ctx, "message is older than last message, ignoring", "received_msg_id", fmt.Sprintf("%d", msg.MessageID), "recorded_msg_id", fmt.Sprintf("%d", l.lastMessageID))
return
}