diff --git a/database/common/common.go b/database/common/common.go index f3bc5b32..57d3c897 100644 --- a/database/common/common.go +++ b/database/common/common.go @@ -65,6 +65,8 @@ type Store interface { ListRepoInstances(ctx context.Context, repoID string) ([]params.Instance, error) ListOrgInstances(ctx context.Context, orgID string) ([]params.Instance, error) + PoolInstanceCount(ctx context.Context, poolID string) (int64, error) + // Probably a bad idea without some king of filter or at least pagination // TODO: add filter/pagination ListAllInstances(ctx context.Context) ([]params.Instance, error) diff --git a/database/sql/instances.go b/database/sql/instances.go index b57e835f..055095d0 100644 --- a/database/sql/instances.go +++ b/database/sql/instances.go @@ -230,3 +230,17 @@ func (s *sqlDatabase) ListAllInstances(ctx context.Context) ([]params.Instance, } return ret, nil } + +func (s *sqlDatabase) PoolInstanceCount(ctx context.Context, poolID string) (int64, error) { + pool, err := s.getPoolByID(ctx, poolID) + if err != nil { + return 0, errors.Wrap(err, "fetching pool") + } + + var cnt int64 + q := s.conn.Model(&Instance{}).Where("pool_id = ?", pool.ID).Count(&cnt) + if q.Error != nil { + return 0, errors.Wrap(q.Error, "fetching instance count") + } + return cnt, nil +} diff --git a/runner/pool/common.go b/runner/pool/common.go index f6845dda..6092c5d7 100644 --- a/runner/pool/common.go +++ b/runner/pool/common.go @@ -170,13 +170,12 @@ func (r *basePool) acquireNewInstance(job params.WorkflowJob) error { return nil } - // TODO: implement count - poolInstances, err := r.store.ListPoolInstances(r.ctx, pool.ID) + poolInstances, err := r.store.PoolInstanceCount(r.ctx, pool.ID) if err != nil { return errors.Wrap(err, "fetching instances") } - if len(poolInstances) >= int(pool.MaxRunners) { + if poolInstances >= int64(pool.MaxRunners) { log.Printf("max_runners (%d) reached for pool %s, skipping...", pool.MaxRunners, pool.ID) return nil } @@ -299,11 +298,31 @@ func (r *basePool) addInstanceToProvider(instance params.Instance) error { PoolID: instance.PoolID, } + var instanceIDToDelete string + + defer func() { + if instanceIDToDelete != "" { + if err := provider.DeleteInstance(r.ctx, instanceIDToDelete); err != nil { + if !errors.Is(err, runnerErrors.ErrNotFound) { + log.Printf("failed to cleanup instance: %s", instanceIDToDelete) + } + } + } + }() + providerInstance, err := provider.CreateInstance(r.ctx, bootstrapArgs) if err != nil { + instanceIDToDelete = instance.Name return errors.Wrap(err, "creating instance") } + if providerInstance.Status == providerCommon.InstanceError { + instanceIDToDelete = instance.ProviderID + if instanceIDToDelete == "" { + instanceIDToDelete = instance.Name + } + } + updateInstanceArgs := r.updateArgsFromProviderInstance(providerInstance) if _, err := r.store.UpdateInstance(r.ctx, instance.ID, updateInstanceArgs); err != nil { return errors.Wrap(err, "updating instance") @@ -434,65 +453,43 @@ func (r *basePool) retryFailedInstancesForOnePool(pool params.Pool) { return } - provider, ok := r.providers[pool.ProviderName] - if !ok { - log.Printf("unknown provider %s for pool %s", pool.ProviderName, pool.ID) - return - } - existingInstances, err := r.store.ListPoolInstances(r.ctx, pool.ID) if err != nil { log.Printf("failed to ensure minimum idle workers for pool %s: %s", pool.ID, err) return } - wg := sync.WaitGroup{} - for _, instance := range existingInstances { - if instance.Status == providerCommon.InstanceError { - if instance.CreateAttempt >= maxCreateAttempts { - log.Printf("instance %s max create attempts (%d) reached", instance.Name, instance.CreateAttempt) - continue - } - wg.Add(1) - go func(instance params.Instance) { - defer wg.Done() - // cleanup potentially failed instance from provider. If we have a provider ID, we use that - // for cleanup. Otherwise, attempt to pass in the name of the instance to the provider, in an - // attempt to cleanup the failed machine. - // NOTE(gabriel-samfira): this is done in parallel. If there are many failed instances - // this has the potential to create many API requests to the target provider. - // TODO(gabriel-samfira): implement request throttling. - if instance.ProviderID == "" && instance.Name == "" { - return - } - deleteIDValue := instance.ProviderID - if deleteIDValue == "" { - deleteIDValue = instance.Name - } - log.Printf("running provider cleanup for instance %s", deleteIDValue) - if err := provider.DeleteInstance(r.ctx, deleteIDValue); err != nil { - if !errors.Is(err, runnerErrors.ErrNotFound) { - log.Printf("failed to cleanup instance: %s", instance.ProviderID) - return - } - } - // TODO(gabriel-samfira): Incrementing CreateAttempt should be done within a transaction. - // It's fairly safe to do here (for now), as there should be no other code path that updates - // an instance in this state. - updateParams := params.UpdateInstanceParams{ - CreateAttempt: instance.CreateAttempt + 1, - Status: providerCommon.InstancePendingCreate, - } - log.Printf("queueing previously failed instance %s for retry", instance.Name) - // Set instance to pending create and wait for retry. - if err := r.updateInstance(instance.Name, updateParams); err != nil { - log.Printf("failed to update runner %s status", instance.Name) - } - }(instance) + if instance.Status != providerCommon.InstanceError { + continue + } + if instance.CreateAttempt >= maxCreateAttempts { + log.Printf("instance %s max create attempts (%d) reached", instance.Name, instance.CreateAttempt) + continue + } + + // cleanup the failed instance from provider. If we have a provider ID, we use that + // for cleanup. Otherwise, attempt to pass in the name of the instance to the provider, in an + // attempt to cleanup the failed machine. + // NOTE(gabriel-samfira): this is done in parallel. If there are many failed instances + // this has the potential to create many API requests to the target provider. + // TODO(gabriel-samfira): implement request throttling. + if instance.ProviderID == "" && instance.Name == "" { + return + } + // TODO(gabriel-samfira): Incrementing CreateAttempt should be done within a transaction. + // It's fairly safe to do here (for now), as there should be no other code path that updates + // an instance in this state. + updateParams := params.UpdateInstanceParams{ + CreateAttempt: instance.CreateAttempt + 1, + Status: providerCommon.InstancePendingCreate, + } + log.Printf("queueing previously failed instance %s for retry", instance.Name) + // Set instance to pending create and wait for retry. + if err := r.updateInstance(instance.Name, updateParams); err != nil { + log.Printf("failed to update runner %s status", instance.Name) } } - wg.Wait() } func (r *basePool) retryFailedInstances() { @@ -647,23 +644,29 @@ func (r *basePool) deletePendingInstances() { log.Printf("failed to fetch instances from store: %s", err) return } - wg := sync.WaitGroup{} + for _, instance := range instances { if instance.Status != providerCommon.InstancePendingDelete { // not in pending_delete status. Skip. continue } - wg.Add(1) + // Set the status to deleting before launching the goroutine that removes + // the runner from the provider (which can take a long time). + if err := r.setInstanceStatus(instance.Name, providerCommon.InstanceDeleting, nil); err != nil { + log.Printf("failed to update runner %s status", instance.Name) + } go func(instance params.Instance) { - defer wg.Done() if err := r.deleteInstanceFromProvider(instance); err != nil { + // failed to remove from provider. Set the status back to pending_delete, which + // will retry the operation. + if err := r.setInstanceStatus(instance.Name, providerCommon.InstancePendingDelete, nil); err != nil { + log.Printf("failed to update runner %s status", instance.Name) + } log.Printf("failed to delete instance from provider: %+v", err) } }(instance) } - - wg.Wait() } func (r *basePool) addPendingInstances() { @@ -673,16 +676,18 @@ func (r *basePool) addPendingInstances() { log.Printf("failed to fetch instances from store: %s", err) return } - wg := sync.WaitGroup{} for _, instance := range instances { if instance.Status != providerCommon.InstancePendingCreate { // not in pending_create status. Skip. continue } - wg.Add(1) + // Set the instance to "creating" before launching the goroutine. This will ensure that addPendingInstances() + // won't attempt to create the runner a second time. + if err := r.setInstanceStatus(instance.Name, providerCommon.InstanceCreating, nil); err != nil { + log.Printf("failed to update runner %s status", instance.Name) + } go func(instance params.Instance) { - defer wg.Done() log.Printf("creating instance %s in pool %s", instance.Name, instance.PoolID) if err := r.addInstanceToProvider(instance); err != nil { log.Printf("failed to add instance to provider: %s", err) @@ -694,7 +699,6 @@ func (r *basePool) addPendingInstances() { } }(instance) } - wg.Wait() } func (r *basePool) consolidate() { diff --git a/runner/providers/common/common.go b/runner/providers/common/common.go index 78ac14c3..41b50027 100644 --- a/runner/providers/common/common.go +++ b/runner/providers/common/common.go @@ -22,7 +22,9 @@ const ( InstanceStopped InstanceStatus = "stopped" InstanceError InstanceStatus = "error" InstancePendingDelete InstanceStatus = "pending_delete" + InstanceDeleting InstanceStatus = "deleting" InstancePendingCreate InstanceStatus = "pending_create" + InstanceCreating InstanceStatus = "creating" InstanceStatusUnknown InstanceStatus = "unknown" RunnerIdle RunnerStatus = "idle" @@ -36,7 +38,8 @@ const ( func IsValidStatus(status InstanceStatus) bool { switch status { case InstanceRunning, InstanceError, InstancePendingCreate, - InstancePendingDelete, InstanceStatusUnknown, InstanceStopped: + InstancePendingDelete, InstanceStatusUnknown, InstanceStopped, + InstanceCreating, InstanceDeleting: return true default: