Merge pull request #448 from gabriel-samfira/fix-sleep
Fix sleepWithCancel and ensure closed channel
This commit is contained in:
commit
19e025c2be
4 changed files with 17 additions and 7 deletions
|
|
@ -1369,6 +1369,9 @@ func (r *basePoolManager) deleteInstanceFromProvider(ctx context.Context, instan
|
|||
}
|
||||
|
||||
func (r *basePoolManager) sleepWithCancel(sleepTime time.Duration) (canceled bool) {
|
||||
if sleepTime == 0 {
|
||||
return false
|
||||
}
|
||||
ticker := time.NewTicker(sleepTime)
|
||||
defer ticker.Stop()
|
||||
|
||||
|
|
|
|||
3
workers/cache/tool_cache.go
vendored
3
workers/cache/tool_cache.go
vendored
|
|
@ -135,6 +135,9 @@ func (t *toolsUpdater) Reset() {
|
|||
}
|
||||
|
||||
func (t *toolsUpdater) sleepWithCancel(sleepTime time.Duration) (canceled bool) {
|
||||
if sleepTime == 0 {
|
||||
return false
|
||||
}
|
||||
ticker := time.NewTicker(sleepTime)
|
||||
defer ticker.Stop()
|
||||
|
||||
|
|
|
|||
|
|
@ -634,6 +634,9 @@ func (w *Worker) loop() {
|
|||
}
|
||||
|
||||
func (w *Worker) sleepWithCancel(sleepTime time.Duration) (canceled bool) {
|
||||
if sleepTime == 0 {
|
||||
return false
|
||||
}
|
||||
ticker := time.NewTicker(sleepTime)
|
||||
defer ticker.Stop()
|
||||
|
||||
|
|
@ -663,10 +666,7 @@ Loop:
|
|||
}
|
||||
continue
|
||||
}
|
||||
// noop if already started. If the scaleset was just enabled, we need to
|
||||
// start the listener here, or the <-w.listener.Wait() channel receive bellow
|
||||
// will block forever, even if we start the listener, as a nil channel will
|
||||
// block forever.
|
||||
// noop if already started.
|
||||
if err := w.listener.Start(); err != nil {
|
||||
slog.ErrorContext(w.ctx, "error starting listener", "error", err, "consumer_id", w.consumerID)
|
||||
if canceled := w.sleepWithCancel(2 * time.Second); canceled {
|
||||
|
|
|
|||
|
|
@ -25,6 +25,10 @@ import (
|
|||
"github.com/cloudbase/garm/util/github/scalesets"
|
||||
)
|
||||
|
||||
var closed = make(chan struct{})
|
||||
|
||||
func init() { close(closed) }
|
||||
|
||||
func newListener(ctx context.Context, scaleSetHelper scaleSetHelper) *scaleSetListener {
|
||||
return &scaleSetListener{
|
||||
ctx: ctx,
|
||||
|
|
@ -278,11 +282,11 @@ func (l *scaleSetListener) loop() {
|
|||
|
||||
func (l *scaleSetListener) Wait() <-chan struct{} {
|
||||
l.mux.Lock()
|
||||
defer l.mux.Unlock()
|
||||
|
||||
if !l.running {
|
||||
slog.DebugContext(l.ctx, "scale set listener is not running")
|
||||
l.mux.Unlock()
|
||||
return nil
|
||||
return closed
|
||||
}
|
||||
l.mux.Unlock()
|
||||
return l.loopExited
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue