Remove some blocking code

* added 2 new statuses: creating and deleting
  * remove wait on create/delete, speeding things up a bit
This commit is contained in:
Gabriel Adrian Samfira 2022-05-13 23:34:16 +00:00
parent e7eb13acc9
commit 98eb594cd6
4 changed files with 86 additions and 63 deletions

View file

@ -65,6 +65,8 @@ type Store interface {
ListRepoInstances(ctx context.Context, repoID string) ([]params.Instance, error)
ListOrgInstances(ctx context.Context, orgID string) ([]params.Instance, error)
PoolInstanceCount(ctx context.Context, poolID string) (int64, error)
// Probably a bad idea without some king of filter or at least pagination
// TODO: add filter/pagination
ListAllInstances(ctx context.Context) ([]params.Instance, error)

View file

@ -230,3 +230,17 @@ func (s *sqlDatabase) ListAllInstances(ctx context.Context) ([]params.Instance,
}
return ret, nil
}
func (s *sqlDatabase) PoolInstanceCount(ctx context.Context, poolID string) (int64, error) {
pool, err := s.getPoolByID(ctx, poolID)
if err != nil {
return 0, errors.Wrap(err, "fetching pool")
}
var cnt int64
q := s.conn.Model(&Instance{}).Where("pool_id = ?", pool.ID).Count(&cnt)
if q.Error != nil {
return 0, errors.Wrap(q.Error, "fetching instance count")
}
return cnt, nil
}

View file

@ -170,13 +170,12 @@ func (r *basePool) acquireNewInstance(job params.WorkflowJob) error {
return nil
}
// TODO: implement count
poolInstances, err := r.store.ListPoolInstances(r.ctx, pool.ID)
poolInstances, err := r.store.PoolInstanceCount(r.ctx, pool.ID)
if err != nil {
return errors.Wrap(err, "fetching instances")
}
if len(poolInstances) >= int(pool.MaxRunners) {
if poolInstances >= int64(pool.MaxRunners) {
log.Printf("max_runners (%d) reached for pool %s, skipping...", pool.MaxRunners, pool.ID)
return nil
}
@ -299,11 +298,31 @@ func (r *basePool) addInstanceToProvider(instance params.Instance) error {
PoolID: instance.PoolID,
}
var instanceIDToDelete string
defer func() {
if instanceIDToDelete != "" {
if err := provider.DeleteInstance(r.ctx, instanceIDToDelete); err != nil {
if !errors.Is(err, runnerErrors.ErrNotFound) {
log.Printf("failed to cleanup instance: %s", instanceIDToDelete)
}
}
}
}()
providerInstance, err := provider.CreateInstance(r.ctx, bootstrapArgs)
if err != nil {
instanceIDToDelete = instance.Name
return errors.Wrap(err, "creating instance")
}
if providerInstance.Status == providerCommon.InstanceError {
instanceIDToDelete = instance.ProviderID
if instanceIDToDelete == "" {
instanceIDToDelete = instance.Name
}
}
updateInstanceArgs := r.updateArgsFromProviderInstance(providerInstance)
if _, err := r.store.UpdateInstance(r.ctx, instance.ID, updateInstanceArgs); err != nil {
return errors.Wrap(err, "updating instance")
@ -434,65 +453,43 @@ func (r *basePool) retryFailedInstancesForOnePool(pool params.Pool) {
return
}
provider, ok := r.providers[pool.ProviderName]
if !ok {
log.Printf("unknown provider %s for pool %s", pool.ProviderName, pool.ID)
return
}
existingInstances, err := r.store.ListPoolInstances(r.ctx, pool.ID)
if err != nil {
log.Printf("failed to ensure minimum idle workers for pool %s: %s", pool.ID, err)
return
}
wg := sync.WaitGroup{}
for _, instance := range existingInstances {
if instance.Status == providerCommon.InstanceError {
if instance.CreateAttempt >= maxCreateAttempts {
log.Printf("instance %s max create attempts (%d) reached", instance.Name, instance.CreateAttempt)
continue
}
wg.Add(1)
go func(instance params.Instance) {
defer wg.Done()
// cleanup potentially failed instance from provider. If we have a provider ID, we use that
// for cleanup. Otherwise, attempt to pass in the name of the instance to the provider, in an
// attempt to cleanup the failed machine.
// NOTE(gabriel-samfira): this is done in parallel. If there are many failed instances
// this has the potential to create many API requests to the target provider.
// TODO(gabriel-samfira): implement request throttling.
if instance.ProviderID == "" && instance.Name == "" {
return
}
deleteIDValue := instance.ProviderID
if deleteIDValue == "" {
deleteIDValue = instance.Name
}
log.Printf("running provider cleanup for instance %s", deleteIDValue)
if err := provider.DeleteInstance(r.ctx, deleteIDValue); err != nil {
if !errors.Is(err, runnerErrors.ErrNotFound) {
log.Printf("failed to cleanup instance: %s", instance.ProviderID)
return
}
}
// TODO(gabriel-samfira): Incrementing CreateAttempt should be done within a transaction.
// It's fairly safe to do here (for now), as there should be no other code path that updates
// an instance in this state.
updateParams := params.UpdateInstanceParams{
CreateAttempt: instance.CreateAttempt + 1,
Status: providerCommon.InstancePendingCreate,
}
log.Printf("queueing previously failed instance %s for retry", instance.Name)
// Set instance to pending create and wait for retry.
if err := r.updateInstance(instance.Name, updateParams); err != nil {
log.Printf("failed to update runner %s status", instance.Name)
}
}(instance)
if instance.Status != providerCommon.InstanceError {
continue
}
if instance.CreateAttempt >= maxCreateAttempts {
log.Printf("instance %s max create attempts (%d) reached", instance.Name, instance.CreateAttempt)
continue
}
// cleanup the failed instance from provider. If we have a provider ID, we use that
// for cleanup. Otherwise, attempt to pass in the name of the instance to the provider, in an
// attempt to cleanup the failed machine.
// NOTE(gabriel-samfira): this is done in parallel. If there are many failed instances
// this has the potential to create many API requests to the target provider.
// TODO(gabriel-samfira): implement request throttling.
if instance.ProviderID == "" && instance.Name == "" {
return
}
// TODO(gabriel-samfira): Incrementing CreateAttempt should be done within a transaction.
// It's fairly safe to do here (for now), as there should be no other code path that updates
// an instance in this state.
updateParams := params.UpdateInstanceParams{
CreateAttempt: instance.CreateAttempt + 1,
Status: providerCommon.InstancePendingCreate,
}
log.Printf("queueing previously failed instance %s for retry", instance.Name)
// Set instance to pending create and wait for retry.
if err := r.updateInstance(instance.Name, updateParams); err != nil {
log.Printf("failed to update runner %s status", instance.Name)
}
}
wg.Wait()
}
func (r *basePool) retryFailedInstances() {
@ -647,23 +644,29 @@ func (r *basePool) deletePendingInstances() {
log.Printf("failed to fetch instances from store: %s", err)
return
}
wg := sync.WaitGroup{}
for _, instance := range instances {
if instance.Status != providerCommon.InstancePendingDelete {
// not in pending_delete status. Skip.
continue
}
wg.Add(1)
// Set the status to deleting before launching the goroutine that removes
// the runner from the provider (which can take a long time).
if err := r.setInstanceStatus(instance.Name, providerCommon.InstanceDeleting, nil); err != nil {
log.Printf("failed to update runner %s status", instance.Name)
}
go func(instance params.Instance) {
defer wg.Done()
if err := r.deleteInstanceFromProvider(instance); err != nil {
// failed to remove from provider. Set the status back to pending_delete, which
// will retry the operation.
if err := r.setInstanceStatus(instance.Name, providerCommon.InstancePendingDelete, nil); err != nil {
log.Printf("failed to update runner %s status", instance.Name)
}
log.Printf("failed to delete instance from provider: %+v", err)
}
}(instance)
}
wg.Wait()
}
func (r *basePool) addPendingInstances() {
@ -673,16 +676,18 @@ func (r *basePool) addPendingInstances() {
log.Printf("failed to fetch instances from store: %s", err)
return
}
wg := sync.WaitGroup{}
for _, instance := range instances {
if instance.Status != providerCommon.InstancePendingCreate {
// not in pending_create status. Skip.
continue
}
wg.Add(1)
// Set the instance to "creating" before launching the goroutine. This will ensure that addPendingInstances()
// won't attempt to create the runner a second time.
if err := r.setInstanceStatus(instance.Name, providerCommon.InstanceCreating, nil); err != nil {
log.Printf("failed to update runner %s status", instance.Name)
}
go func(instance params.Instance) {
defer wg.Done()
log.Printf("creating instance %s in pool %s", instance.Name, instance.PoolID)
if err := r.addInstanceToProvider(instance); err != nil {
log.Printf("failed to add instance to provider: %s", err)
@ -694,7 +699,6 @@ func (r *basePool) addPendingInstances() {
}
}(instance)
}
wg.Wait()
}
func (r *basePool) consolidate() {

View file

@ -22,7 +22,9 @@ const (
InstanceStopped InstanceStatus = "stopped"
InstanceError InstanceStatus = "error"
InstancePendingDelete InstanceStatus = "pending_delete"
InstanceDeleting InstanceStatus = "deleting"
InstancePendingCreate InstanceStatus = "pending_create"
InstanceCreating InstanceStatus = "creating"
InstanceStatusUnknown InstanceStatus = "unknown"
RunnerIdle RunnerStatus = "idle"
@ -36,7 +38,8 @@ const (
func IsValidStatus(status InstanceStatus) bool {
switch status {
case InstanceRunning, InstanceError, InstancePendingCreate,
InstancePendingDelete, InstanceStatusUnknown, InstanceStopped:
InstancePendingDelete, InstanceStatusUnknown, InstanceStopped,
InstanceCreating, InstanceDeleting:
return true
default: