Add scaleset watcher to provider
Fixes provider not spawning runners for newly added scale set Signed-off-by: Gabriel Adrian Samfira <gsamfira@cloudbasesolutions.com>
This commit is contained in:
parent
64d1501b0e
commit
22302fdd7a
8 changed files with 7 additions and 14 deletions
|
|
@ -431,6 +431,7 @@ type RunnerReference struct {
|
|||
Status interface{} `json:"status"`
|
||||
DisableUpdate bool `json:"disableUpdate"`
|
||||
ProvisioningState string `json:"provisioningState"`
|
||||
Labels []Label `json:"labels,omitempty"`
|
||||
}
|
||||
|
||||
type RunnerScaleSetJitRunnerConfig struct {
|
||||
|
|
|
|||
|
|
@ -172,7 +172,6 @@ func (c *Controller) Stop() error {
|
|||
|
||||
c.running = false
|
||||
close(c.quit)
|
||||
c.quit = nil
|
||||
c.consumer.Close()
|
||||
return nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -59,7 +59,6 @@ func (w *Worker) Stop() error {
|
|||
if err := w.scaleSetController.Stop(); err != nil {
|
||||
return fmt.Errorf("stopping scale set controller: %w", err)
|
||||
}
|
||||
w.scaleSetController = nil
|
||||
|
||||
w.running = false
|
||||
close(w.quit)
|
||||
|
|
@ -85,7 +84,6 @@ func (w *Worker) Start() (err error) {
|
|||
defer func() {
|
||||
if err != nil {
|
||||
w.scaleSetController.Stop()
|
||||
w.scaleSetController = nil
|
||||
}
|
||||
}()
|
||||
|
||||
|
|
|
|||
|
|
@ -241,6 +241,11 @@ func (p *Provider) handleInstanceEvent(event dbCommon.ChangePayload) {
|
|||
return
|
||||
}
|
||||
|
||||
if instance.ScaleSetID == 0 {
|
||||
slog.DebugContext(p.ctx, "skipping instance event for non scale set instance")
|
||||
return
|
||||
}
|
||||
|
||||
switch event.Operation {
|
||||
case dbCommon.CreateOperation:
|
||||
slog.DebugContext(p.ctx, "got create operation")
|
||||
|
|
|
|||
|
|
@ -13,5 +13,6 @@ func composeProviderWatcher() dbCommon.PayloadFilterFunc {
|
|||
commonParams.InstancePendingDelete,
|
||||
commonParams.InstancePendingForceDelete,
|
||||
),
|
||||
watcher.WithEntityTypeFilter(dbCommon.ScaleSetEntityType),
|
||||
)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -148,7 +148,6 @@ func (c *Controller) Stop() error {
|
|||
|
||||
c.running = false
|
||||
close(c.quit)
|
||||
c.quit = nil
|
||||
c.consumer.Close()
|
||||
|
||||
return nil
|
||||
|
|
|
|||
|
|
@ -84,10 +84,8 @@ func (w *Worker) Stop() error {
|
|||
w.running = false
|
||||
if w.quit != nil {
|
||||
close(w.quit)
|
||||
w.quit = nil
|
||||
}
|
||||
w.listener.Stop()
|
||||
w.listener = nil
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
@ -214,7 +212,6 @@ func (w *Worker) Start() (err error) {
|
|||
defer func() {
|
||||
if err != nil {
|
||||
consumer.Close()
|
||||
w.consumer = nil
|
||||
}
|
||||
}()
|
||||
|
||||
|
|
@ -282,8 +279,6 @@ func (w *Worker) handleScaleSetEvent(event dbCommon.ChangePayload) {
|
|||
slog.ErrorContext(w.ctx, "error stopping listener", "error", err)
|
||||
}
|
||||
}
|
||||
// nolint:golangci-lint,godox
|
||||
// TODO: should we kick off auto-scaling if desired runner count changes?
|
||||
w.scaleSet = scaleSet
|
||||
w.mux.Unlock()
|
||||
default:
|
||||
|
|
|
|||
|
|
@ -85,7 +85,6 @@ func (l *scaleSetListener) Stop() error {
|
|||
|
||||
l.messageSession.Close()
|
||||
l.running = false
|
||||
l.listenerCtx = nil
|
||||
close(l.quit)
|
||||
l.cancelFunc()
|
||||
return nil
|
||||
|
|
@ -201,10 +200,6 @@ func (l *scaleSetListener) loop() {
|
|||
return
|
||||
default:
|
||||
slog.DebugContext(l.ctx, "getting message", "last_message_id", l.lastMessageID, "max_runners", l.scaleSetHelper.GetScaleSet().MaxRunners)
|
||||
// nolint:golangci-lint,godox
|
||||
// TODO(gabriel-samfira): consume initial message on startup and consolidate.
|
||||
// The scale set may have undergone several messages while GARM was
|
||||
// down.
|
||||
msg, err := l.messageSession.GetMessage(
|
||||
l.listenerCtx, l.lastMessageID, l.scaleSetHelper.GetScaleSet().MaxRunners)
|
||||
if err != nil {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue