Add a backoff mechanism when deleting runners
This change adds a backoff mechanism when deleting github runners. If the delete operation fails, we record the event and retry with a geometric progression of 1.5 starting from 5 seconds, which is the pool consolidation timeout. Signed-off-by: Gabriel Adrian Samfira <gsamfira@cloudbasesolutions.com>
This commit is contained in:
parent
e0e60d42c8
commit
edbaf47970
2 changed files with 98 additions and 12 deletions
|
|
@ -1,6 +1,15 @@
|
|||
package pool
|
||||
|
||||
import "sync"
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/cloudbase/garm/runner/common"
|
||||
)
|
||||
|
||||
const (
|
||||
maxBackoffSeconds float64 = 1200 // 20 minutes
|
||||
)
|
||||
|
||||
type keyMutex struct {
|
||||
muxes sync.Map
|
||||
|
|
@ -27,3 +36,53 @@ func (k *keyMutex) Unlock(key string, remove bool) {
|
|||
func (k *keyMutex) Delete(key string) {
|
||||
k.muxes.Delete(key)
|
||||
}
|
||||
|
||||
type instanceBackOff struct {
|
||||
backoffSeconds float64
|
||||
lastRecordedFailureTime time.Time
|
||||
mux sync.Mutex
|
||||
}
|
||||
|
||||
type instanceDeleteBackoff struct {
|
||||
muxes sync.Map
|
||||
}
|
||||
|
||||
func (i *instanceDeleteBackoff) ShouldProcess(key string) (bool, time.Time) {
|
||||
backoff, loaded := i.muxes.LoadOrStore(key, &instanceBackOff{})
|
||||
if !loaded {
|
||||
return true, time.Time{}
|
||||
}
|
||||
|
||||
ib := backoff.(*instanceBackOff)
|
||||
ib.mux.Lock()
|
||||
defer ib.mux.Unlock()
|
||||
|
||||
if ib.lastRecordedFailureTime.IsZero() || ib.backoffSeconds == 0 {
|
||||
return true, time.Time{}
|
||||
}
|
||||
|
||||
now := time.Now().UTC()
|
||||
deadline := ib.lastRecordedFailureTime.Add(time.Duration(ib.backoffSeconds) * time.Second)
|
||||
return deadline.After(now), deadline
|
||||
}
|
||||
|
||||
func (i *instanceDeleteBackoff) Delete(key string) {
|
||||
i.muxes.Delete(key)
|
||||
}
|
||||
|
||||
func (i *instanceDeleteBackoff) RecordFailure(key string) {
|
||||
backoff, _ := i.muxes.LoadOrStore(key, &instanceBackOff{})
|
||||
ib := backoff.(*instanceBackOff)
|
||||
ib.mux.Lock()
|
||||
defer ib.mux.Unlock()
|
||||
|
||||
ib.lastRecordedFailureTime = time.Now().UTC()
|
||||
if ib.backoffSeconds == 0 {
|
||||
ib.backoffSeconds = common.PoolConsilitationInterval.Seconds()
|
||||
} else {
|
||||
// Geometric progression of 1.5
|
||||
newBackoff := ib.backoffSeconds * 1.5
|
||||
// Cap the backoff to 20 minutes
|
||||
ib.backoffSeconds = min(newBackoff, maxBackoffSeconds)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -16,9 +16,11 @@ package pool
|
|||
|
||||
import (
|
||||
"context"
|
||||
"crypto/rand"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"math"
|
||||
"math/big"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
|
@ -90,6 +92,7 @@ func NewEntityPoolManager(ctx context.Context, entity params.GithubEntity, insta
|
|||
|
||||
wg := &sync.WaitGroup{}
|
||||
keyMuxes := &keyMutex{}
|
||||
backoff := &instanceDeleteBackoff{}
|
||||
|
||||
repo := &basePoolManager{
|
||||
ctx: ctx,
|
||||
|
|
@ -103,6 +106,7 @@ func NewEntityPoolManager(ctx context.Context, entity params.GithubEntity, insta
|
|||
quit: make(chan struct{}),
|
||||
wg: wg,
|
||||
keyMux: keyMuxes,
|
||||
backoff: backoff,
|
||||
consumer: consumer,
|
||||
}
|
||||
return repo, nil
|
||||
|
|
@ -125,9 +129,10 @@ type basePoolManager struct {
|
|||
managerIsRunning bool
|
||||
managerErrorReason string
|
||||
|
||||
mux sync.Mutex
|
||||
wg *sync.WaitGroup
|
||||
keyMux *keyMutex
|
||||
mux sync.Mutex
|
||||
wg *sync.WaitGroup
|
||||
keyMux *keyMutex
|
||||
backoff *instanceDeleteBackoff
|
||||
}
|
||||
|
||||
func (r *basePoolManager) getProviderBaseParams(pool params.Pool) common.ProviderBaseParams {
|
||||
|
|
@ -1391,21 +1396,35 @@ func (r *basePoolManager) deletePendingInstances() error {
|
|||
continue
|
||||
}
|
||||
|
||||
currentStatus := instance.Status
|
||||
// Set the status to deleting before launching the goroutine that removes
|
||||
// the runner from the provider (which can take a long time).
|
||||
if _, err := r.setInstanceStatus(instance.Name, commonParams.InstanceDeleting, nil); err != nil {
|
||||
slog.With(slog.Any("error", err)).ErrorContext(
|
||||
r.ctx, "failed to update runner status",
|
||||
"runner_name", instance.Name)
|
||||
shouldProcess, deadline := r.backoff.ShouldProcess(instance.Name)
|
||||
if !shouldProcess {
|
||||
slog.DebugContext(
|
||||
r.ctx, "backoff in effect for instance",
|
||||
"runner_name", instance.Name, "deadline", deadline)
|
||||
r.keyMux.Unlock(instance.Name, false)
|
||||
continue
|
||||
}
|
||||
|
||||
go func(instance params.Instance) (err error) {
|
||||
// Prevent Thundering Herd. Should alleviate some of the database
|
||||
// is locked errors in sqlite3.
|
||||
num, err := rand.Int(rand.Reader, big.NewInt(2000))
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to generate random number: %w", err)
|
||||
}
|
||||
jitter := time.Duration(num.Int64()) * time.Millisecond
|
||||
time.Sleep(jitter)
|
||||
|
||||
currentStatus := instance.Status
|
||||
deleteMux := false
|
||||
defer func() {
|
||||
r.keyMux.Unlock(instance.Name, deleteMux)
|
||||
if deleteMux {
|
||||
// deleteMux is set only when the instance was successfully removed.
|
||||
// We can use it as a marker to signal that the backoff is no longer
|
||||
// needed.
|
||||
r.backoff.Delete(instance.Name)
|
||||
}
|
||||
}()
|
||||
defer func(instance params.Instance) {
|
||||
if err != nil {
|
||||
|
|
@ -1414,14 +1433,22 @@ func (r *basePoolManager) deletePendingInstances() error {
|
|||
"runner_name", instance.Name)
|
||||
// failed to remove from provider. Set status to previous value, which will retry
|
||||
// the operation.
|
||||
if _, err := r.setInstanceStatus(instance.Name, currentStatus, nil); err != nil {
|
||||
if _, err := r.setInstanceStatus(instance.Name, currentStatus, []byte(err.Error())); err != nil {
|
||||
slog.With(slog.Any("error", err)).ErrorContext(
|
||||
r.ctx, "failed to update runner status",
|
||||
"runner_name", instance.Name)
|
||||
}
|
||||
r.backoff.RecordFailure(instance.Name)
|
||||
}
|
||||
}(instance)
|
||||
|
||||
if _, err := r.setInstanceStatus(instance.Name, commonParams.InstanceDeleting, nil); err != nil {
|
||||
slog.With(slog.Any("error", err)).ErrorContext(
|
||||
r.ctx, "failed to update runner status",
|
||||
"runner_name", instance.Name)
|
||||
return err
|
||||
}
|
||||
|
||||
slog.DebugContext(
|
||||
r.ctx, "removing instance from provider",
|
||||
"runner_name", instance.Name)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue