Retry failed runners

* retry adding runners for up to 5 times if they fail.
  * various fixes
This commit is contained in:
Gabriel Adrian Samfira 2022-05-10 12:28:39 +00:00
parent 0b70a30944
commit dc04bca95c
13 changed files with 307 additions and 98 deletions

View file

@ -38,6 +38,13 @@ var (
controllerLabelPrefix = "runner-controller-id:"
)
const (
// maxCreateAttempts is the number of times we will attempt to create an instance
// before we give up.
// TODO: make this configurable(?)
maxCreateAttempts = 5
)
type basePool struct {
ctx context.Context
controllerID string
@ -86,20 +93,16 @@ func (r *basePool) cleanupOrphanedProviderRunners(runners []*github.Runner) erro
}
if ok := runnerNames[instance.Name]; !ok {
// Set pending_delete on DB field. Allow consolidate() to remove it.
updateParams := params.UpdateInstanceParams{
RunnerStatus: providerCommon.RunnerStatus(providerCommon.InstancePendingDelete),
}
_, err = r.store.UpdateInstance(r.ctx, instance.ID, updateParams)
if err != nil {
return errors.Wrap(err, "syncing local state with github")
if err := r.setInstanceStatus(instance.Name, providerCommon.InstancePendingDelete, nil); err != nil {
log.Printf("failed to update runner %s status", instance.Name)
return errors.Wrap(err, "updating runner")
}
}
}
return nil
}
func (r *basePool) fetchInstanceFromJob(job params.WorkflowJob) (params.Instance, error) {
runnerName := job.WorkflowJob.RunnerName
func (r *basePool) fetchInstance(runnerName string) (params.Instance, error) {
runner, err := r.store.GetInstanceByName(r.ctx, runnerName)
if err != nil {
return params.Instance{}, errors.Wrap(err, "fetching instance")
@ -109,7 +112,7 @@ func (r *basePool) fetchInstanceFromJob(job params.WorkflowJob) (params.Instance
}
func (r *basePool) setInstanceRunnerStatus(job params.WorkflowJob, status providerCommon.RunnerStatus) error {
runner, err := r.fetchInstanceFromJob(job)
runner, err := r.fetchInstance(job.WorkflowJob.RunnerName)
if err != nil {
return errors.Wrap(err, "fetching instance")
}
@ -125,17 +128,25 @@ func (r *basePool) setInstanceRunnerStatus(job params.WorkflowJob, status provid
return nil
}
func (r *basePool) setInstanceStatus(job params.WorkflowJob, status providerCommon.InstanceStatus) error {
runner, err := r.fetchInstanceFromJob(job)
func (r *basePool) updateInstance(runnerName string, update params.UpdateInstanceParams) error {
runner, err := r.fetchInstance(runnerName)
if err != nil {
return errors.Wrap(err, "fetching instance")
}
if _, err := r.store.UpdateInstance(r.ctx, runner.ID, update); err != nil {
return errors.Wrap(err, "updating runner state")
}
return nil
}
func (r *basePool) setInstanceStatus(runnerName string, status providerCommon.InstanceStatus, providerFault []byte) error {
updateParams := params.UpdateInstanceParams{
Status: status,
Status: status,
ProviderFault: providerFault,
}
if _, err := r.store.UpdateInstance(r.ctx, runner.ID, updateParams); err != nil {
if err := r.updateInstance(runnerName, updateParams); err != nil {
return errors.Wrap(err, "updating runner state")
}
return nil
@ -186,32 +197,21 @@ func (r *basePool) AddRunner(ctx context.Context, poolID string) error {
name := fmt.Sprintf("garm-%s", uuid.New())
createParams := params.CreateInstanceParams{
Name: name,
Pool: poolID,
Status: providerCommon.InstancePendingCreate,
RunnerStatus: providerCommon.RunnerPending,
OSArch: pool.OSArch,
OSType: pool.OSType,
CallbackURL: r.helper.GetCallbackURL(),
Name: name,
Pool: poolID,
Status: providerCommon.InstancePendingCreate,
RunnerStatus: providerCommon.RunnerPending,
OSArch: pool.OSArch,
OSType: pool.OSType,
CallbackURL: r.helper.GetCallbackURL(),
CreateAttempt: 1,
}
instance, err := r.store.CreateInstance(r.ctx, poolID, createParams)
_, err = r.store.CreateInstance(r.ctx, poolID, createParams)
if err != nil {
return errors.Wrap(err, "creating instance")
}
updateParams := params.UpdateInstanceParams{
OSName: instance.OSName,
OSVersion: instance.OSVersion,
Addresses: instance.Addresses,
Status: instance.Status,
ProviderID: instance.ProviderID,
}
if _, err := r.store.UpdateInstance(r.ctx, instance.ID, updateParams); err != nil {
return errors.Wrap(err, "updating runner state")
}
return nil
}
@ -267,8 +267,6 @@ func (r *basePool) addInstanceToProvider(instance params.Instance) error {
return runnerErrors.NewNotFoundError("invalid provider ID")
}
log.Printf(">>> %v", pool.Tags)
labels := []string{}
for _, tag := range pool.Tags {
labels = append(labels, tag.Name)
@ -305,6 +303,7 @@ func (r *basePool) addInstanceToProvider(instance params.Instance) error {
if err != nil {
return errors.Wrap(err, "creating instance")
}
log.Printf("provider CreateInstance returned error: %v", err)
updateInstanceArgs := r.updateArgsFromProviderInstance(providerInstance)
if _, err := r.store.UpdateInstance(r.ctx, instance.ID, updateInstanceArgs); err != nil {
@ -330,6 +329,11 @@ func (r *basePool) HandleWorkflowJob(job params.WorkflowJob) error {
switch job.Action {
case "queued":
// Create instance in database and set it to pending create.
// If we already have an idle runner around, that runner will pick up the job
// and trigger an "in_progress" update from github (see bellow), which in turn will set the
// runner state of the instance to "active". The ensureMinIdleRunners() function will
// exclude that runner from available runners and attempt to ensure
// the needed number of runners.
if err := r.acquireNewInstance(job); err != nil {
log.Printf("failed to add instance")
}
@ -342,7 +346,12 @@ func (r *basePool) HandleWorkflowJob(job params.WorkflowJob) error {
return nil
}
log.Printf("marking instance %s as pending_delete", job.WorkflowJob.RunnerName)
if err := r.setInstanceStatus(job, providerCommon.InstancePendingDelete); err != nil {
if err := r.setInstanceStatus(job.WorkflowJob.RunnerName, providerCommon.InstancePendingDelete, nil); err != nil {
log.Printf("failed to update runner %s status", job.WorkflowJob.RunnerName)
return errors.Wrap(err, "updating runner")
}
// update instance workload state. Set job_id in instance state.
if err := r.setInstanceRunnerStatus(job, providerCommon.RunnerTerminated); err != nil {
log.Printf("failed to update runner %s status", job.WorkflowJob.RunnerName)
return errors.Wrap(err, "updating runner")
}
@ -366,12 +375,13 @@ func (r *basePool) controllerLabel() string {
func (r *basePool) updateArgsFromProviderInstance(providerInstance params.Instance) params.UpdateInstanceParams {
return params.UpdateInstanceParams{
ProviderID: providerInstance.ProviderID,
OSName: providerInstance.OSName,
OSVersion: providerInstance.OSVersion,
Addresses: providerInstance.Addresses,
Status: providerInstance.Status,
RunnerStatus: providerInstance.RunnerStatus,
ProviderID: providerInstance.ProviderID,
OSName: providerInstance.OSName,
OSVersion: providerInstance.OSVersion,
Addresses: providerInstance.Addresses,
Status: providerInstance.Status,
RunnerStatus: providerInstance.RunnerStatus,
ProviderFault: providerInstance.ProviderFault,
}
}
@ -419,6 +429,90 @@ func (r *basePool) ensureIdleRunnersForOnePool(pool params.Pool) {
}
}
func (r *basePool) retryFailedInstancesForOnePool(pool params.Pool) {
if !pool.Enabled {
log.Printf("pool %s is disabled, skipping", pool.ID)
return
}
provider, ok := r.providers[pool.ProviderName]
if !ok {
log.Printf("unknown provider %s for pool %s", pool.ProviderName, pool.ID)
return
}
existingInstances, err := r.store.ListPoolInstances(r.ctx, pool.ID)
if err != nil {
log.Printf("failed to ensure minimum idle workers for pool %s: %s", pool.ID, err)
return
}
wg := sync.WaitGroup{}
for _, instance := range existingInstances {
if instance.Status == providerCommon.InstanceError {
if instance.CreateAttempt >= maxCreateAttempts {
log.Printf("instance %s max create attempts (%d) reached", instance.Name, instance.CreateAttempt)
continue
}
wg.Add(1)
go func(instance params.Instance) {
defer wg.Done()
// cleanup potentially failed instance from provider. If we have a provider ID, we use that
// for cleanup. Otherwise, attempt to pass in the name of the instance to the provider, in an
// attempt to cleanup the failed machine.
// NOTE(gabriel-samfira): this is done in parallel. If there are many failed instances
// this has the potential to create many API requests to the target provider.
// TODO(gabriel-samfira): implement request throttling.
if instance.ProviderID == "" && instance.Name == "" {
return
}
deleteIDValue := instance.ProviderID
if deleteIDValue == "" {
deleteIDValue = instance.Name
}
log.Printf("running provider cleanup for instance %s", deleteIDValue)
if err := provider.DeleteInstance(r.ctx, deleteIDValue); err != nil {
if !errors.Is(err, runnerErrors.ErrNotFound) {
log.Printf("failed to cleanup instance: %s", instance.ProviderID)
return
}
}
// TODO(gabriel-samfira): Incrementing CreateAttempt should be done within a transaction.
// It's fairly safe to do here (for now), as there should be no other code path that updates
// an instance in this state.
updateParams := params.UpdateInstanceParams{
CreateAttempt: instance.CreateAttempt + 1,
Status: providerCommon.InstancePendingCreate,
}
log.Printf("queueing previously failed instance %s for retry", instance.Name)
// Set instance to pending create and wait for retry.
if err := r.updateInstance(instance.Name, updateParams); err != nil {
log.Printf("failed to update runner %s status", instance.Name)
}
}(instance)
}
}
wg.Wait()
}
func (r *basePool) retryFailedInstances() {
pools, err := r.helper.ListPools()
if err != nil {
log.Printf("error listing pools: %s", err)
return
}
wg := sync.WaitGroup{}
wg.Add(len(pools))
for _, pool := range pools {
go func(pool params.Pool) {
defer wg.Done()
r.retryFailedInstancesForOnePool(pool)
}(pool)
}
wg.Wait()
}
func (r *basePool) ensureMinIdleRunners() {
pools, err := r.helper.ListPools()
if err != nil {
@ -521,7 +615,14 @@ func (r *basePool) deleteInstanceFromProvider(instance params.Instance) error {
return runnerErrors.NewNotFoundError("invalid provider ID")
}
if err := provider.DeleteInstance(r.ctx, instance.ProviderID); err != nil {
identifier := instance.ProviderID
if identifier == "" {
// provider did not return a provider ID?
// try with name
identifier = instance.Name
}
if err := provider.DeleteInstance(r.ctx, identifier); err != nil {
return errors.Wrap(err, "removing instance")
}
@ -547,18 +648,23 @@ func (r *basePool) deletePendingInstances() {
log.Printf("failed to fetch instances from store: %s", err)
return
}
wg := sync.WaitGroup{}
for _, instance := range instances {
log.Printf("instance status for %s is %s", instance.Name, instance.Status)
if instance.Status != providerCommon.InstancePendingDelete {
// not in pending_delete status. Skip.
continue
}
if err := r.deleteInstanceFromProvider(instance); err != nil {
log.Printf("failed to delete instance from provider: %+v", err)
}
wg.Add(1)
go func(instance params.Instance) {
defer wg.Done()
if err := r.deleteInstanceFromProvider(instance); err != nil {
log.Printf("failed to delete instance from provider: %+v", err)
}
}(instance)
}
wg.Wait()
}
func (r *basePool) addPendingInstances() {
@ -568,21 +674,32 @@ func (r *basePool) addPendingInstances() {
log.Printf("failed to fetch instances from store: %s", err)
return
}
wg := sync.WaitGroup{}
for _, instance := range instances {
if instance.Status != providerCommon.InstancePendingCreate {
// not in pending_create status. Skip.
continue
}
// asJs, _ := json.MarshalIndent(instance, "", " ")
// log.Printf(">>> %s", string(asJs))
if err := r.addInstanceToProvider(instance); err != nil {
log.Printf("failed to create instance in provider: %s", err)
}
wg.Add(1)
go func(instance params.Instance) {
defer wg.Done()
log.Printf("creating instance %s in provider", instance.Name)
if err := r.addInstanceToProvider(instance); err != nil {
log.Printf("failed to add instance to provider: %s", err)
errAsBytes := []byte(err.Error())
if err := r.setInstanceStatus(instance.Name, providerCommon.InstanceError, errAsBytes); err != nil {
log.Printf("failed to update runner %s status", instance.Name)
}
log.Printf("failed to create instance in provider: %s", err)
}
}(instance)
}
wg.Wait()
}
func (r *basePool) consolidate() {
// TODO(gabriel-samfira): replace this with something more efficient.
r.mux.Lock()
defer r.mux.Unlock()
@ -597,7 +714,18 @@ func (r *basePool) consolidate() {
r.addPendingInstances()
}()
wg.Wait()
r.ensureMinIdleRunners()
wg.Add(2)
go func() {
defer wg.Done()
r.ensureMinIdleRunners()
}()
go func() {
defer wg.Done()
r.retryFailedInstances()
}()
wg.Wait()
}
func (r *basePool) Wait() error {