From dc04bca95c9d7d2ddf83b09155026ff6b5f21e0d Mon Sep 17 00:00:00 2001 From: Gabriel Adrian Samfira Date: Tue, 10 May 2022 12:28:39 +0000 Subject: [PATCH] Retry failed runners * retry adding runners for up to 5 times if they fail. * various fixes --- apiserver/controllers/controllers.go | 2 - cmd/garm-cli/cmd/runner.go | 6 +- contrib/providers.d/openstack/README.md | 2 +- .../openstack/garm-external-provider | 45 +++- database/sql/instances.go | 5 + database/sql/models.go | 22 +- database/sql/util.go | 5 + params/params.go | 12 +- params/requests.go | 19 +- runner/pool/common.go | 236 ++++++++++++++---- runner/providers/common/common.go | 13 + runner/providers/external/external.go | 29 +++ runner/providers/lxd/lxd.go | 9 - 13 files changed, 307 insertions(+), 98 deletions(-) diff --git a/apiserver/controllers/controllers.go b/apiserver/controllers/controllers.go index 38027f9c..94375a47 100644 --- a/apiserver/controllers/controllers.go +++ b/apiserver/controllers/controllers.go @@ -96,8 +96,6 @@ func (a *APIController) handleWorkflowJobEvent(w http.ResponseWriter, r *http.Re signature := r.Header.Get("X-Hub-Signature-256") hookType := r.Header.Get("X-Github-Hook-Installation-Target-Type") - // fmt.Printf(">>> Signature: %s\n", signature) - // fmt.Printf(">>> HookType: %s\n", hookType) if err := a.r.DispatchWorkflowJob(hookType, signature, body); err != nil { log.Printf("failed to dispatch work: %s", err) diff --git a/cmd/garm-cli/cmd/runner.go b/cmd/garm-cli/cmd/runner.go index 23478d27..ba5a8a01 100644 --- a/cmd/garm-cli/cmd/runner.go +++ b/cmd/garm-cli/cmd/runner.go @@ -182,6 +182,10 @@ func formatSingleInstance(instance params.Instance) { } } + if len(instance.ProviderFault) > 0 { + t.AppendRow(table.Row{"Provider Fault", string(instance.ProviderFault)}, table.RowConfig{AutoMerge: true}) + } + if len(instance.StatusMessages) > 0 { for _, msg := range instance.StatusMessages { t.AppendRow(table.Row{"Status Updates", fmt.Sprintf("%s: %s", msg.CreatedAt.Format("2006-01-02T15:04:05"), msg.Message)}, table.RowConfig{AutoMerge: true}) @@ -190,7 +194,7 @@ func formatSingleInstance(instance params.Instance) { t.SetColumnConfigs([]table.ColumnConfig{ {Number: 1, AutoMerge: true}, - {Number: 2, AutoMerge: false}, + {Number: 2, AutoMerge: false, WidthMax: 100}, }) fmt.Println(t.Render()) } diff --git a/contrib/providers.d/openstack/README.md b/contrib/providers.d/openstack/README.md index 6aa88f35..4642e6d9 100644 --- a/contrib/providers.d/openstack/README.md +++ b/contrib/providers.d/openstack/README.md @@ -1,4 +1,4 @@ -# OpenStack external provider for GARM +# OpenStack external provider for garm This is an example external provider, written for OpenStack. It is a simple bash script that implements the external provider interface, in order to supply ```garm``` with compute instances. This is just an example, complete with a sample config file. diff --git a/contrib/providers.d/openstack/garm-external-provider b/contrib/providers.d/openstack/garm-external-provider index 3552d317..1709e026 100755 --- a/contrib/providers.d/openstack/garm-external-provider +++ b/contrib/providers.d/openstack/garm-external-provider @@ -46,6 +46,13 @@ GARM_TO_GH_ARCH_MAP["amd64"]="x64" GARM_TO_GH_ARCH_MAP["arm"]="arm" GARM_TO_GH_ARCH_MAP["arm64"]="arm64" +declare -A STATUS_MAP +STATUS_MAP["ACTIVE"]="running" +STATUS_MAP["SHUTOFF"]="stopped" +STATUS_MAP["BUILD"]="pending_create" +STATUS_MAP["ERROR"]="error" +STATUS_MAP["DELETING"]="pending_delete" + function checkValNotNull() { if [ -z "$1" -o "$1" == "null" ];then echo "failed to fetch value $2" @@ -92,8 +99,9 @@ function waitForVolume() { set +e status=$(openstack volume show "${volumeName}" -f json | jq -r -c '.status') if [ $? -ne 0 ];then + CODE=$? set -e - return $? + return $CODE fi set -e while [ "${status}" != "available" -a "${status}" != "error" ];do @@ -238,7 +246,7 @@ function waitForServer() { status=$(echo "${srv_info}" | jq -r -c '.status') while [ "${status}" != "ERROR" -a "${status}" != "ACTIVE" ];do - sleep 2 + sleep 0.5 srv_info=$(openstack server show -f json "${srv_id}") [ $? -ne 0 ] && return $? status=$(echo "${srv_info}" | jq -r -c '.status') @@ -283,26 +291,35 @@ function CreateInstance() { fi set +e - SRV_DETAILS=$(openstack server create ${SOURCE_ARGS} --flavor "${FLAVOR}" --user-data="${CC_FILE}" --network="${NET}" "${INSTANCE_NAME}") + + TAGS="--tag garm-controller-id=${GARM_CONTROLLER_ID} --tag garm-pool-id=${GARM_POOL_ID}" + SRV_DETAILS=$(openstack server create --os-compute-api-version 2.52 ${SOURCE_ARGS} ${TAGS} --flavor "${FLAVOR}" --user-data="${CC_FILE}" --network="${NET}" "${INSTANCE_NAME}") if [ $? -ne 0 ];then openstack volume delete "${INSTANCE_NAME}" || true exit 1 fi SRV_DETAILS=$(waitForServer "${INSTANCE_NAME}") if [ $? -ne 0 ];then - CODE="$?" + CODE=$? # cleanup rm -f "${CC_FILE}" || true openstack server delete "${INSTANCE_NAME}" || true openstack volume delete "${INSTANCE_NAME}" || true set -e - return $CODE + FAULT=$(echo "${SRV_DETAILS}"| jq -rc '.fault') + echo "Failed to create server: ${FAULT}" + exit $CODE fi set -e rm -f "${CC_FILE}" || true SRV_ID=$(echo "${SRV_DETAILS}" | jq -r -c '.id') STATUS=$(echo "${SRV_DETAILS}" | jq -r -c '.status') + FAULT=$(echo "${SRV_DETAILS}" | jq -r -c '.fault') + FAULT_VAL="" + if [ ! -z "${FAULT}" -a "${FAULT}" != "null" ];then + FAULT_VAL=$(echo "${FAULT}" | base64 -w0) + fi jq -rnc \ --arg PROVIDER_ID ${SRV_ID} \ @@ -311,8 +328,10 @@ function CreateInstance() { --arg OS_NAME "${DISTRO}" \ --arg OS_VERSION "${VERSION}" \ --arg ARCH "${ARCH}" \ - --arg STATUS "${STATUS}" \ - '{"id": "", "provider_id": $PROVIDER_ID, "name": $NAME, "os_type": $OS_TYPE, "os_name": $OS_NAME, "os_version": $OS_VERSION, "os_arch": $ARCH, "status": $STATUS, "runner_status": "", "pool_id": ""}' + --arg STATUS "${STATUS_MAP[${STATUS}]}" \ + --arg POOL_ID "${GARM_POOL_ID}" \ + --arg FAULT "${FAULT_VAL}" \ + '{"provider_id": $PROVIDER_ID, "name": $NAME, "os_type": $OS_TYPE, "os_name": $OS_NAME, "os_version": $OS_VERSION, "os_arch": $ARCH, "status": $STATUS, "pool_id": $POOL_ID, "provider_fault": $FAULT}' } function DeleteInstance() { @@ -322,7 +341,17 @@ function DeleteInstance() { return 1 fi - instance_info=$(openstack server show "${instance_id}" -f json) + set +e + instance_info=$(openstack server show "${instance_id}" -f json 2>&1) + if [ $? -ne 0 ];then + CODE=$? + set -e + if [ "${instance_info}" == "No server with a name or ID of*" ];then + return 0 + fi + return $CODE + fi + set -e VOLUMES=$(echo "${instance_info}" | jq -r -c '.volumes_attached[] | .id') openstack server delete "${instance_id}" diff --git a/database/sql/instances.go b/database/sql/instances.go index 727b447d..511c33fa 100644 --- a/database/sql/instances.go +++ b/database/sql/instances.go @@ -175,6 +175,11 @@ func (s *sqlDatabase) UpdateInstance(ctx context.Context, instanceID string, par if string(param.Status) != "" { instance.Status = param.Status } + if param.CreateAttempt != 0 { + instance.CreateAttempt = param.CreateAttempt + } + + instance.ProviderFault = param.ProviderFault q := s.conn.Save(&instance) if q.Error != nil { diff --git a/database/sql/models.go b/database/sql/models.go index 66dbecd8..a68e4069 100644 --- a/database/sql/models.go +++ b/database/sql/models.go @@ -114,16 +114,18 @@ type InstanceStatusUpdate struct { type Instance struct { Base - ProviderID *string `gorm:"uniqueIndex"` - Name string `gorm:"uniqueIndex"` - OSType config.OSType - OSArch config.OSArch - OSName string - OSVersion string - Addresses []Address `gorm:"foreignKey:InstanceID"` - Status common.InstanceStatus - RunnerStatus common.RunnerStatus - CallbackURL string + ProviderID *string `gorm:"uniqueIndex"` + Name string `gorm:"uniqueIndex"` + OSType config.OSType + OSArch config.OSArch + OSName string + OSVersion string + Addresses []Address `gorm:"foreignKey:InstanceID"` + Status common.InstanceStatus + RunnerStatus common.RunnerStatus + CallbackURL string + ProviderFault []byte `gorm:"type:longblob"` + CreateAttempt int PoolID uuid.UUID Pool Pool `gorm:"foreignKey:PoolID"` diff --git a/database/sql/util.go b/database/sql/util.go index b9eb3339..59ffd8c6 100644 --- a/database/sql/util.go +++ b/database/sql/util.go @@ -41,6 +41,11 @@ func (s *sqlDatabase) sqlToParamsInstance(instance Instance) params.Instance { PoolID: instance.PoolID.String(), CallbackURL: instance.CallbackURL, StatusMessages: []params.StatusMessage{}, + CreateAttempt: instance.CreateAttempt, + } + + if len(instance.ProviderFault) > 0 { + ret.ProviderFault = instance.ProviderFault } for _, addr := range instance.Addresses { diff --git a/params/params.go b/params/params.go index 37b81db6..226b220a 100644 --- a/params/params.go +++ b/params/params.go @@ -42,7 +42,7 @@ type StatusMessage struct { type Instance struct { // ID is the database ID of this instance. - ID string `json:"id"` + ID string `json:"id,omitempty"` // PeoviderID is the unique ID the provider associated // with the compute instance. We use this to identify the // instance in the provider. @@ -65,14 +65,16 @@ type Instance struct { // for this instance. Addresses []Address `json:"addresses,omitempty"` // Status is the status of the instance inside the provider (eg: running, stopped, etc) - Status common.InstanceStatus `json:"status"` - RunnerStatus common.RunnerStatus `json:"runner_status"` - PoolID string `json:"pool_id"` + Status common.InstanceStatus `json:"status,omitempty"` + RunnerStatus common.RunnerStatus `json:"runner_status,omitempty"` + PoolID string `json:"pool_id,omitempty"` + ProviderFault []byte `json:"provider_fault,omitempty"` StatusMessages []StatusMessage `json:"status_messages,omitempty"` // Do not serialize sensitive info. - CallbackURL string `json:"-"` + CallbackURL string `json:"-"` + CreateAttempt int `json:"-"` } type BootstrapInstance struct { diff --git a/params/requests.go b/params/requests.go index 9cd43d6d..14be3751 100644 --- a/params/requests.go +++ b/params/requests.go @@ -89,12 +89,13 @@ type UpdatePoolParams struct { } type CreateInstanceParams struct { - Name string - OSType config.OSType - OSArch config.OSArch - Status common.InstanceStatus - RunnerStatus common.RunnerStatus - CallbackURL string + Name string + OSType config.OSType + OSArch config.OSArch + Status common.InstanceStatus + RunnerStatus common.RunnerStatus + CallbackURL string + CreateAttempt int `json:"-"` Pool string } @@ -149,8 +150,10 @@ type UpdateInstanceParams struct { // for this instance. Addresses []Address `json:"addresses,omitempty"` // Status is the status of the instance inside the provider (eg: running, stopped, etc) - Status common.InstanceStatus `json:"status"` - RunnerStatus common.RunnerStatus `json:"runner_status"` + Status common.InstanceStatus `json:"status,omitempty"` + RunnerStatus common.RunnerStatus `json:"runner_status,omitempty"` + ProviderFault []byte `json:"provider_fault,omitempty"` + CreateAttempt int `json:"-"` } type UpdateUserParams struct { diff --git a/runner/pool/common.go b/runner/pool/common.go index 1a83aa9e..40525044 100644 --- a/runner/pool/common.go +++ b/runner/pool/common.go @@ -38,6 +38,13 @@ var ( controllerLabelPrefix = "runner-controller-id:" ) +const ( + // maxCreateAttempts is the number of times we will attempt to create an instance + // before we give up. + // TODO: make this configurable(?) + maxCreateAttempts = 5 +) + type basePool struct { ctx context.Context controllerID string @@ -86,20 +93,16 @@ func (r *basePool) cleanupOrphanedProviderRunners(runners []*github.Runner) erro } if ok := runnerNames[instance.Name]; !ok { // Set pending_delete on DB field. Allow consolidate() to remove it. - updateParams := params.UpdateInstanceParams{ - RunnerStatus: providerCommon.RunnerStatus(providerCommon.InstancePendingDelete), - } - _, err = r.store.UpdateInstance(r.ctx, instance.ID, updateParams) - if err != nil { - return errors.Wrap(err, "syncing local state with github") + if err := r.setInstanceStatus(instance.Name, providerCommon.InstancePendingDelete, nil); err != nil { + log.Printf("failed to update runner %s status", instance.Name) + return errors.Wrap(err, "updating runner") } } } return nil } -func (r *basePool) fetchInstanceFromJob(job params.WorkflowJob) (params.Instance, error) { - runnerName := job.WorkflowJob.RunnerName +func (r *basePool) fetchInstance(runnerName string) (params.Instance, error) { runner, err := r.store.GetInstanceByName(r.ctx, runnerName) if err != nil { return params.Instance{}, errors.Wrap(err, "fetching instance") @@ -109,7 +112,7 @@ func (r *basePool) fetchInstanceFromJob(job params.WorkflowJob) (params.Instance } func (r *basePool) setInstanceRunnerStatus(job params.WorkflowJob, status providerCommon.RunnerStatus) error { - runner, err := r.fetchInstanceFromJob(job) + runner, err := r.fetchInstance(job.WorkflowJob.RunnerName) if err != nil { return errors.Wrap(err, "fetching instance") } @@ -125,17 +128,25 @@ func (r *basePool) setInstanceRunnerStatus(job params.WorkflowJob, status provid return nil } -func (r *basePool) setInstanceStatus(job params.WorkflowJob, status providerCommon.InstanceStatus) error { - runner, err := r.fetchInstanceFromJob(job) +func (r *basePool) updateInstance(runnerName string, update params.UpdateInstanceParams) error { + runner, err := r.fetchInstance(runnerName) if err != nil { return errors.Wrap(err, "fetching instance") } + if _, err := r.store.UpdateInstance(r.ctx, runner.ID, update); err != nil { + return errors.Wrap(err, "updating runner state") + } + return nil +} + +func (r *basePool) setInstanceStatus(runnerName string, status providerCommon.InstanceStatus, providerFault []byte) error { updateParams := params.UpdateInstanceParams{ - Status: status, + Status: status, + ProviderFault: providerFault, } - if _, err := r.store.UpdateInstance(r.ctx, runner.ID, updateParams); err != nil { + if err := r.updateInstance(runnerName, updateParams); err != nil { return errors.Wrap(err, "updating runner state") } return nil @@ -186,32 +197,21 @@ func (r *basePool) AddRunner(ctx context.Context, poolID string) error { name := fmt.Sprintf("garm-%s", uuid.New()) createParams := params.CreateInstanceParams{ - Name: name, - Pool: poolID, - Status: providerCommon.InstancePendingCreate, - RunnerStatus: providerCommon.RunnerPending, - OSArch: pool.OSArch, - OSType: pool.OSType, - CallbackURL: r.helper.GetCallbackURL(), + Name: name, + Pool: poolID, + Status: providerCommon.InstancePendingCreate, + RunnerStatus: providerCommon.RunnerPending, + OSArch: pool.OSArch, + OSType: pool.OSType, + CallbackURL: r.helper.GetCallbackURL(), + CreateAttempt: 1, } - instance, err := r.store.CreateInstance(r.ctx, poolID, createParams) + _, err = r.store.CreateInstance(r.ctx, poolID, createParams) if err != nil { return errors.Wrap(err, "creating instance") } - updateParams := params.UpdateInstanceParams{ - OSName: instance.OSName, - OSVersion: instance.OSVersion, - Addresses: instance.Addresses, - Status: instance.Status, - ProviderID: instance.ProviderID, - } - - if _, err := r.store.UpdateInstance(r.ctx, instance.ID, updateParams); err != nil { - return errors.Wrap(err, "updating runner state") - } - return nil } @@ -267,8 +267,6 @@ func (r *basePool) addInstanceToProvider(instance params.Instance) error { return runnerErrors.NewNotFoundError("invalid provider ID") } - log.Printf(">>> %v", pool.Tags) - labels := []string{} for _, tag := range pool.Tags { labels = append(labels, tag.Name) @@ -305,6 +303,7 @@ func (r *basePool) addInstanceToProvider(instance params.Instance) error { if err != nil { return errors.Wrap(err, "creating instance") } + log.Printf("provider CreateInstance returned error: %v", err) updateInstanceArgs := r.updateArgsFromProviderInstance(providerInstance) if _, err := r.store.UpdateInstance(r.ctx, instance.ID, updateInstanceArgs); err != nil { @@ -330,6 +329,11 @@ func (r *basePool) HandleWorkflowJob(job params.WorkflowJob) error { switch job.Action { case "queued": // Create instance in database and set it to pending create. + // If we already have an idle runner around, that runner will pick up the job + // and trigger an "in_progress" update from github (see bellow), which in turn will set the + // runner state of the instance to "active". The ensureMinIdleRunners() function will + // exclude that runner from available runners and attempt to ensure + // the needed number of runners. if err := r.acquireNewInstance(job); err != nil { log.Printf("failed to add instance") } @@ -342,7 +346,12 @@ func (r *basePool) HandleWorkflowJob(job params.WorkflowJob) error { return nil } log.Printf("marking instance %s as pending_delete", job.WorkflowJob.RunnerName) - if err := r.setInstanceStatus(job, providerCommon.InstancePendingDelete); err != nil { + if err := r.setInstanceStatus(job.WorkflowJob.RunnerName, providerCommon.InstancePendingDelete, nil); err != nil { + log.Printf("failed to update runner %s status", job.WorkflowJob.RunnerName) + return errors.Wrap(err, "updating runner") + } + // update instance workload state. Set job_id in instance state. + if err := r.setInstanceRunnerStatus(job, providerCommon.RunnerTerminated); err != nil { log.Printf("failed to update runner %s status", job.WorkflowJob.RunnerName) return errors.Wrap(err, "updating runner") } @@ -366,12 +375,13 @@ func (r *basePool) controllerLabel() string { func (r *basePool) updateArgsFromProviderInstance(providerInstance params.Instance) params.UpdateInstanceParams { return params.UpdateInstanceParams{ - ProviderID: providerInstance.ProviderID, - OSName: providerInstance.OSName, - OSVersion: providerInstance.OSVersion, - Addresses: providerInstance.Addresses, - Status: providerInstance.Status, - RunnerStatus: providerInstance.RunnerStatus, + ProviderID: providerInstance.ProviderID, + OSName: providerInstance.OSName, + OSVersion: providerInstance.OSVersion, + Addresses: providerInstance.Addresses, + Status: providerInstance.Status, + RunnerStatus: providerInstance.RunnerStatus, + ProviderFault: providerInstance.ProviderFault, } } @@ -419,6 +429,90 @@ func (r *basePool) ensureIdleRunnersForOnePool(pool params.Pool) { } } +func (r *basePool) retryFailedInstancesForOnePool(pool params.Pool) { + if !pool.Enabled { + log.Printf("pool %s is disabled, skipping", pool.ID) + return + } + + provider, ok := r.providers[pool.ProviderName] + if !ok { + log.Printf("unknown provider %s for pool %s", pool.ProviderName, pool.ID) + return + } + + existingInstances, err := r.store.ListPoolInstances(r.ctx, pool.ID) + if err != nil { + log.Printf("failed to ensure minimum idle workers for pool %s: %s", pool.ID, err) + return + } + + wg := sync.WaitGroup{} + + for _, instance := range existingInstances { + if instance.Status == providerCommon.InstanceError { + if instance.CreateAttempt >= maxCreateAttempts { + log.Printf("instance %s max create attempts (%d) reached", instance.Name, instance.CreateAttempt) + continue + } + wg.Add(1) + go func(instance params.Instance) { + defer wg.Done() + // cleanup potentially failed instance from provider. If we have a provider ID, we use that + // for cleanup. Otherwise, attempt to pass in the name of the instance to the provider, in an + // attempt to cleanup the failed machine. + // NOTE(gabriel-samfira): this is done in parallel. If there are many failed instances + // this has the potential to create many API requests to the target provider. + // TODO(gabriel-samfira): implement request throttling. + if instance.ProviderID == "" && instance.Name == "" { + return + } + deleteIDValue := instance.ProviderID + if deleteIDValue == "" { + deleteIDValue = instance.Name + } + log.Printf("running provider cleanup for instance %s", deleteIDValue) + if err := provider.DeleteInstance(r.ctx, deleteIDValue); err != nil { + if !errors.Is(err, runnerErrors.ErrNotFound) { + log.Printf("failed to cleanup instance: %s", instance.ProviderID) + return + } + } + // TODO(gabriel-samfira): Incrementing CreateAttempt should be done within a transaction. + // It's fairly safe to do here (for now), as there should be no other code path that updates + // an instance in this state. + updateParams := params.UpdateInstanceParams{ + CreateAttempt: instance.CreateAttempt + 1, + Status: providerCommon.InstancePendingCreate, + } + log.Printf("queueing previously failed instance %s for retry", instance.Name) + // Set instance to pending create and wait for retry. + if err := r.updateInstance(instance.Name, updateParams); err != nil { + log.Printf("failed to update runner %s status", instance.Name) + } + }(instance) + } + } + wg.Wait() +} + +func (r *basePool) retryFailedInstances() { + pools, err := r.helper.ListPools() + if err != nil { + log.Printf("error listing pools: %s", err) + return + } + wg := sync.WaitGroup{} + wg.Add(len(pools)) + for _, pool := range pools { + go func(pool params.Pool) { + defer wg.Done() + r.retryFailedInstancesForOnePool(pool) + }(pool) + } + wg.Wait() +} + func (r *basePool) ensureMinIdleRunners() { pools, err := r.helper.ListPools() if err != nil { @@ -521,7 +615,14 @@ func (r *basePool) deleteInstanceFromProvider(instance params.Instance) error { return runnerErrors.NewNotFoundError("invalid provider ID") } - if err := provider.DeleteInstance(r.ctx, instance.ProviderID); err != nil { + identifier := instance.ProviderID + if identifier == "" { + // provider did not return a provider ID? + // try with name + identifier = instance.Name + } + + if err := provider.DeleteInstance(r.ctx, identifier); err != nil { return errors.Wrap(err, "removing instance") } @@ -547,18 +648,23 @@ func (r *basePool) deletePendingInstances() { log.Printf("failed to fetch instances from store: %s", err) return } - + wg := sync.WaitGroup{} for _, instance := range instances { - log.Printf("instance status for %s is %s", instance.Name, instance.Status) if instance.Status != providerCommon.InstancePendingDelete { // not in pending_delete status. Skip. continue } - if err := r.deleteInstanceFromProvider(instance); err != nil { - log.Printf("failed to delete instance from provider: %+v", err) - } + wg.Add(1) + go func(instance params.Instance) { + defer wg.Done() + if err := r.deleteInstanceFromProvider(instance); err != nil { + log.Printf("failed to delete instance from provider: %+v", err) + } + }(instance) } + + wg.Wait() } func (r *basePool) addPendingInstances() { @@ -568,21 +674,32 @@ func (r *basePool) addPendingInstances() { log.Printf("failed to fetch instances from store: %s", err) return } + wg := sync.WaitGroup{} for _, instance := range instances { if instance.Status != providerCommon.InstancePendingCreate { // not in pending_create status. Skip. continue } - // asJs, _ := json.MarshalIndent(instance, "", " ") - // log.Printf(">>> %s", string(asJs)) - if err := r.addInstanceToProvider(instance); err != nil { - log.Printf("failed to create instance in provider: %s", err) - } + wg.Add(1) + go func(instance params.Instance) { + defer wg.Done() + log.Printf("creating instance %s in provider", instance.Name) + if err := r.addInstanceToProvider(instance); err != nil { + log.Printf("failed to add instance to provider: %s", err) + errAsBytes := []byte(err.Error()) + if err := r.setInstanceStatus(instance.Name, providerCommon.InstanceError, errAsBytes); err != nil { + log.Printf("failed to update runner %s status", instance.Name) + } + log.Printf("failed to create instance in provider: %s", err) + } + }(instance) } + wg.Wait() } func (r *basePool) consolidate() { + // TODO(gabriel-samfira): replace this with something more efficient. r.mux.Lock() defer r.mux.Unlock() @@ -597,7 +714,18 @@ func (r *basePool) consolidate() { r.addPendingInstances() }() wg.Wait() - r.ensureMinIdleRunners() + + wg.Add(2) + go func() { + defer wg.Done() + r.ensureMinIdleRunners() + }() + + go func() { + defer wg.Done() + r.retryFailedInstances() + }() + wg.Wait() } func (r *basePool) Wait() error { diff --git a/runner/providers/common/common.go b/runner/providers/common/common.go index 5f1dcb5c..78ac14c3 100644 --- a/runner/providers/common/common.go +++ b/runner/providers/common/common.go @@ -20,13 +20,26 @@ type RunnerStatus string const ( InstanceRunning InstanceStatus = "running" InstanceStopped InstanceStatus = "stopped" + InstanceError InstanceStatus = "error" InstancePendingDelete InstanceStatus = "pending_delete" InstancePendingCreate InstanceStatus = "pending_create" InstanceStatusUnknown InstanceStatus = "unknown" RunnerIdle RunnerStatus = "idle" RunnerPending RunnerStatus = "pending" + RunnerTerminated RunnerStatus = "terminated" RunnerInstalling RunnerStatus = "installing" RunnerFailed RunnerStatus = "failed" RunnerActive RunnerStatus = "active" ) + +func IsValidStatus(status InstanceStatus) bool { + switch status { + case InstanceRunning, InstanceError, InstancePendingCreate, + InstancePendingDelete, InstanceStatusUnknown, InstanceStopped: + + return true + default: + return false + } +} diff --git a/runner/providers/external/external.go b/runner/providers/external/external.go index fa95dfd1..dc3da784 100644 --- a/runner/providers/external/external.go +++ b/runner/providers/external/external.go @@ -4,11 +4,13 @@ import ( "context" "encoding/json" "fmt" + "log" "garm/config" garmErrors "garm/errors" "garm/params" "garm/runner/common" + providerCommon "garm/runner/providers/common" "garm/util/exec" "github.com/pkg/errors" @@ -44,6 +46,26 @@ func (e *external) configEnvVar() string { return fmt.Sprintf("GARM_PROVIDER_CONFIG_FILE=%s", e.cfg.External.ConfigFile) } +func (e *external) validateCreateResult(inst params.Instance, bootstrapParams params.BootstrapInstance) error { + if inst.ProviderID == "" { + return garmErrors.NewProviderError("missing provider ID after create call") + } + + if inst.Name == "" { + return garmErrors.NewProviderError("missing instance name after create call") + } + + if inst.OSName == "" || inst.OSArch == "" || inst.OSType == "" { + // we can still function without this info (I think) + log.Printf("WARNING: missing OS information after create call") + } + if !providerCommon.IsValidStatus(inst.Status) { + return garmErrors.NewProviderError("invalid status returned (%s) after create call", inst.Status) + } + + return nil +} + // CreateInstance creates a new compute instance in the provider. func (e *external) CreateInstance(ctx context.Context, bootstrapParams params.BootstrapInstance) (params.Instance, error) { asEnv := bootstrapParamsToEnv(bootstrapParams) @@ -66,6 +88,13 @@ func (e *external) CreateInstance(ctx context.Context, bootstrapParams params.Bo if err := json.Unmarshal(out, ¶m); err != nil { return params.Instance{}, garmErrors.NewProviderError("failed to decode response from binary: %s", err) } + + if err := e.validateCreateResult(param, bootstrapParams); err != nil { + return params.Instance{}, garmErrors.NewProviderError("failed to validate result: %s", err) + } + + retAsJs, _ := json.MarshalIndent(param, "", " ") + log.Printf("provider returned: %s", string(retAsJs)) return param, nil } diff --git a/runner/providers/lxd/lxd.go b/runner/providers/lxd/lxd.go index f0f7bea5..63dd5e89 100644 --- a/runner/providers/lxd/lxd.go +++ b/runner/providers/lxd/lxd.go @@ -16,9 +16,7 @@ package lxd import ( "context" - "encoding/json" "fmt" - "log" "garm/config" runnerErrors "garm/errors" @@ -225,8 +223,6 @@ func (l *LXD) getCreateInstanceArgs(bootstrapParams params.BootstrapInstance) (a return api.InstancesPost{}, errors.Wrap(err, "generating cloud-config") } - fmt.Printf(">>> Cloud-config: \n%s\n", cloudCfg) - args := api.InstancesPost{ InstancePut: api.InstancePut{ Architecture: image.Architecture, @@ -250,7 +246,6 @@ func (l *LXD) getCreateInstanceArgs(bootstrapParams params.BootstrapInstance) (a } func (l *LXD) AsParams() params.Provider { - log.Printf("<<<<< %s", l.cfg.Name) return params.Provider{ Name: l.cfg.Name, ProviderType: l.cfg.ProviderType, @@ -292,8 +287,6 @@ func (l *LXD) launchInstance(createArgs api.InstancesPost) error { // CreateInstance creates a new compute instance in the provider. func (l *LXD) CreateInstance(ctx context.Context, bootstrapParams params.BootstrapInstance) (params.Instance, error) { - asJs, _ := json.MarshalIndent(bootstrapParams, "", " ") - fmt.Printf(">>> %s\n", string(asJs)) args, err := l.getCreateInstanceArgs(bootstrapParams) if err != nil { return params.Instance{}, errors.Wrap(err, "fetching create args") @@ -310,8 +303,6 @@ func (l *LXD) CreateInstance(ctx context.Context, bootstrapParams params.Bootstr return params.Instance{}, errors.Wrap(err, "fetching instance") } - asJs2, _ := json.MarshalIndent(ret, "", " ") - fmt.Printf(">>>22 %s\n", string(asJs2)) return ret, nil }