Merge pull request #97 from gabriel-samfira/lxd-add-timeouts

Parallelization and LXD timeouts
This commit is contained in:
Gabriel 2023-06-08 13:42:47 +03:00 committed by GitHub
commit b46d7eb6fe
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 538 additions and 274 deletions

1
go.mod
View file

@ -26,6 +26,7 @@ require (
github.com/teris-io/shortid v0.0.0-20220617161101-71ec9f2aa569
golang.org/x/crypto v0.7.0
golang.org/x/oauth2 v0.6.0
golang.org/x/sync v0.1.0
golang.org/x/sys v0.6.0
gopkg.in/DATA-DOG/go-sqlmock.v1 v1.3.0
gopkg.in/natefinch/lumberjack.v2 v2.2.1

2
go.sum
View file

@ -238,6 +238,8 @@ golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181122145206-62eef0e2fa9b/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=

View file

@ -34,6 +34,7 @@ import (
"github.com/google/go-github/v48/github"
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"
)
var (
@ -68,6 +69,206 @@ type basePoolManager struct {
mux sync.Mutex
}
func (r *basePoolManager) HandleWorkflowJob(job params.WorkflowJob) (err error) {
if err := r.helper.ValidateOwner(job); err != nil {
return errors.Wrap(err, "validating owner")
}
defer func() {
if err != nil && errors.Is(err, runnerErrors.ErrUnauthorized) {
r.setPoolRunningState(false, fmt.Sprintf("failed to handle job: %q", err))
}
}()
switch job.Action {
case "queued":
// Create instance in database and set it to pending create.
// If we already have an idle runner around, that runner will pick up the job
// and trigger an "in_progress" update from github (see bellow), which in turn will set the
// runner state of the instance to "active". The ensureMinIdleRunners() function will
// exclude that runner from available runners and attempt to ensure
// the needed number of runners.
if err := r.acquireNewInstance(job); err != nil {
log.Printf("failed to add instance: %s", err)
}
case "completed":
// ignore the error here. A completed job may not have a runner name set
// if it was never assigned to a runner, and was canceled.
runnerInfo, err := r.getRunnerDetailsFromJob(job)
if err != nil {
if !errors.Is(err, runnerErrors.ErrUnauthorized) {
// Unassigned jobs will have an empty runner_name.
// We also need to ignore not found errors, as we may get a webhook regarding
// a workflow that is handled by a runner at a different hierarchy level.
return nil
}
return errors.Wrap(err, "updating runner")
}
// update instance workload state.
if err := r.setInstanceRunnerStatus(runnerInfo.Name, providerCommon.RunnerTerminated); err != nil {
if errors.Is(err, runnerErrors.ErrNotFound) {
return nil
}
log.Printf("failed to update runner %s status", util.SanitizeLogEntry(runnerInfo.Name))
return errors.Wrap(err, "updating runner")
}
log.Printf("marking instance %s as pending_delete", util.SanitizeLogEntry(runnerInfo.Name))
if err := r.setInstanceStatus(runnerInfo.Name, providerCommon.InstancePendingDelete, nil); err != nil {
if errors.Is(err, runnerErrors.ErrNotFound) {
return nil
}
log.Printf("failed to update runner %s status", util.SanitizeLogEntry(runnerInfo.Name))
return errors.Wrap(err, "updating runner")
}
case "in_progress":
// in_progress jobs must have a runner name/ID assigned. Sometimes github will send a hook without
// a runner set. In such cases, we attemt to fetch it from the API.
runnerInfo, err := r.getRunnerDetailsFromJob(job)
if err != nil {
if errors.Is(err, runnerErrors.ErrNotFound) {
// This is most likely a runner we're not managing. If we define a repo from within an org
// and also define that same org, we will get a hook from github from both the repo and the org
// regarding the same workflow. We look for the runner in the database, and make sure it exists and is
// part of a pool that this manager is responsible for. A not found error here will most likely mean
// that we are not responsible for that runner, and we should ignore it.
return nil
}
return errors.Wrap(err, "determining runner name")
}
// update instance workload state.
if err := r.setInstanceRunnerStatus(runnerInfo.Name, providerCommon.RunnerActive); err != nil {
if errors.Is(err, runnerErrors.ErrNotFound) {
return nil
}
log.Printf("failed to update runner %s status", util.SanitizeLogEntry(runnerInfo.Name))
return errors.Wrap(err, "updating runner")
}
}
return nil
}
func (r *basePoolManager) loop() {
scaleDownTimer := time.NewTicker(common.PoolScaleDownInterval)
consolidateTimer := time.NewTicker(common.PoolConsilitationInterval)
reapTimer := time.NewTicker(common.PoolReapTimeoutInterval)
toolUpdateTimer := time.NewTicker(common.PoolToolUpdateInterval)
defer func() {
log.Printf("%s loop exited", r.helper.String())
scaleDownTimer.Stop()
consolidateTimer.Stop()
reapTimer.Stop()
toolUpdateTimer.Stop()
close(r.done)
}()
log.Printf("starting loop for %s", r.helper.String())
// Consolidate runners on loop start. Provider runners must match runners
// in github and DB. When a Workflow job is received, we will first create/update
// an entity in the database, before sending the request to the provider to create/delete
// an instance. If a "queued" job is received, we create an entity in the db with
// a state of "pending_create". Once that instance is up and calls home, it is marked
// as "active". If a "completed" job is received from github, we mark the instance
// as "pending_delete". Once the provider deletes the instance, we mark it as "deleted"
// in the database.
// We also ensure we have runners created based on pool characteristics. This is where
// we spin up "MinWorkers" for each runner type.
for {
switch r.managerIsRunning {
case true:
select {
case <-reapTimer.C:
runners, err := r.helper.GetGithubRunners()
if err != nil {
failureReason := fmt.Sprintf("error fetching github runners for %s: %s", r.helper.String(), err)
r.setPoolRunningState(false, failureReason)
log.Print(failureReason)
if errors.Is(err, runnerErrors.ErrUnauthorized) {
break
}
continue
}
if err := r.reapTimedOutRunners(runners); err != nil {
log.Printf("failed to reap timed out runners: %q", err)
}
if err := r.runnerCleanup(); err != nil {
failureReason := fmt.Sprintf("failed to clean runners for %s: %q", r.helper.String(), err)
log.Print(failureReason)
if errors.Is(err, runnerErrors.ErrUnauthorized) {
r.setPoolRunningState(false, failureReason)
}
}
case <-consolidateTimer.C:
// consolidate.
r.consolidate()
case <-scaleDownTimer.C:
r.scaleDown()
case <-toolUpdateTimer.C:
// Update tools cache.
tools, err := r.helper.FetchTools()
if err != nil {
failureReason := fmt.Sprintf("failed to update tools for repo %s: %s", r.helper.String(), err)
r.setPoolRunningState(false, failureReason)
log.Print(failureReason)
if errors.Is(err, runnerErrors.ErrUnauthorized) {
break
}
continue
}
r.mux.Lock()
r.tools = tools
r.mux.Unlock()
case <-r.ctx.Done():
// daemon is shutting down.
return
case <-r.quit:
// this worker was stopped.
return
}
default:
select {
case <-r.ctx.Done():
// daemon is shutting down.
return
case <-r.quit:
// this worker was stopped.
return
default:
log.Printf("attempting to start pool manager for %s", r.helper.String())
tools, err := r.helper.FetchTools()
var failureReason string
if err != nil {
failureReason = fmt.Sprintf("failed to fetch tools from github for %s: %q", r.helper.String(), err)
r.setPoolRunningState(false, failureReason)
log.Print(failureReason)
if errors.Is(err, runnerErrors.ErrUnauthorized) {
r.waitForTimeoutOrCanceled(common.UnauthorizedBackoffTimer)
} else {
r.waitForTimeoutOrCanceled(60 * time.Second)
}
continue
}
r.mux.Lock()
r.tools = tools
r.mux.Unlock()
if err := r.runnerCleanup(); err != nil {
failureReason = fmt.Sprintf("failed to clean runners for %s: %q", r.helper.String(), err)
log.Print(failureReason)
if errors.Is(err, runnerErrors.ErrUnauthorized) {
r.setPoolRunningState(false, failureReason)
r.waitForTimeoutOrCanceled(common.UnauthorizedBackoffTimer)
}
continue
}
r.setPoolRunningState(true, "")
}
}
}
}
func controllerIDFromLabels(labels []string) string {
for _, lbl := range labels {
if strings.HasPrefix(lbl, controllerLabelPrefix) {
@ -131,6 +332,19 @@ func (r *basePoolManager) cleanupOrphanedProviderRunners(runners []*github.Runne
continue
}
switch instance.RunnerStatus {
case providerCommon.RunnerPending, providerCommon.RunnerInstalling:
// runner is still installing. We give it a chance to finish.
log.Printf("runner %s is still installing, give it a chance to finish", instance.Name)
continue
}
if time.Since(instance.UpdatedAt).Minutes() < 5 {
// instance was updated recently. We give it a chance to register itself in github.
log.Printf("instance %s was updated recently, skipping check", instance.Name)
continue
}
if ok := runnerNames[instance.Name]; !ok {
// Set pending_delete on DB field. Allow consolidate() to remove it.
if err := r.setInstanceStatus(instance.Name, providerCommon.InstancePendingDelete, nil); err != nil {
@ -194,6 +408,7 @@ func instanceInList(instanceName string, instances []params.Instance) (params.In
// first remove the instance from github, and then from our database.
func (r *basePoolManager) cleanupOrphanedGithubRunners(runners []*github.Runner) error {
poolInstanceCache := map[string][]params.Instance{}
g, ctx := errgroup.WithContext(r.ctx)
for _, runner := range runners {
if !r.isManagedRunner(labelsFromRunner(runner)) {
log.Printf("runner %s is not managed by a pool belonging to %s", *runner.Name, r.helper.String())
@ -220,20 +435,14 @@ func (r *basePoolManager) cleanupOrphanedGithubRunners(runners []*github.Runner)
if resp != nil && resp.StatusCode == http.StatusNotFound {
continue
}
if errors.Is(err, runnerErrors.ErrUnauthorized) {
failureReason := fmt.Sprintf("failed to remove github runner: %q", err)
r.setPoolRunningState(false, failureReason)
log.Print(failureReason)
}
return errors.Wrap(err, "removing runner")
}
continue
}
if providerCommon.InstanceStatus(dbInstance.Status) == providerCommon.InstancePendingDelete {
// already marked for deleting, which means the github workflow finished.
switch providerCommon.InstanceStatus(dbInstance.Status) {
case providerCommon.InstancePendingDelete, providerCommon.InstanceDeleting:
// already marked for deletion or is in the process of being deleted.
// Let consolidate take care of it.
continue
}
@ -259,48 +468,49 @@ func (r *basePoolManager) cleanupOrphanedGithubRunners(runners []*github.Runner)
}
poolInstanceCache[pool.ID] = poolInstances
}
providerInstance, ok := instanceInList(dbInstance.Name, poolInstances)
if !ok {
// The runner instance is no longer on the provider, and it appears offline in github.
// It should be safe to force remove it.
log.Printf("Runner instance for %s is no longer on the provider, removing from github", dbInstance.Name)
resp, err := r.helper.RemoveGithubRunner(*runner.ID)
if err != nil {
// Removed in the meantime?
if resp != nil && resp.StatusCode == http.StatusNotFound {
log.Printf("runner dissapeared from github")
} else {
if errors.Is(err, runnerErrors.ErrUnauthorized) {
failureReason := fmt.Sprintf("failed to remove github runner: %q", err)
r.setPoolRunningState(false, failureReason)
log.Print(failureReason)
// See: https://golang.org/doc/faq#closures_and_goroutines
runner := runner
g.Go(func() error {
providerInstance, ok := instanceInList(dbInstance.Name, poolInstances)
if !ok {
// The runner instance is no longer on the provider, and it appears offline in github.
// It should be safe to force remove it.
log.Printf("Runner instance for %s is no longer on the provider, removing from github", dbInstance.Name)
resp, err := r.helper.RemoveGithubRunner(*runner.ID)
if err != nil {
// Removed in the meantime?
if resp != nil && resp.StatusCode == http.StatusNotFound {
log.Printf("runner dissapeared from github")
} else {
return errors.Wrap(err, "removing runner from github")
}
}
// Remove the database entry for the runner.
log.Printf("Removing %s from database", dbInstance.Name)
if err := r.store.DeleteInstance(ctx, dbInstance.PoolID, dbInstance.Name); err != nil {
return errors.Wrap(err, "removing runner from database")
}
return nil
}
return errors.Wrap(err, "removing runner from github")
if providerInstance.Status == providerCommon.InstanceRunning {
// instance is running, but github reports runner as offline. Log the event.
// This scenario may require manual intervention.
// Perhaps it just came online and github did not yet change it's status?
log.Printf("instance %s is online but github reports runner as offline", dbInstance.Name)
return nil
} else {
log.Printf("instance %s was found in stopped state; starting", dbInstance.Name)
//start the instance
if err := provider.Start(r.ctx, dbInstance.ProviderID); err != nil {
return errors.Wrapf(err, "starting instance %s", dbInstance.ProviderID)
}
}
// Remove the database entry for the runner.
log.Printf("Removing %s from database", dbInstance.Name)
if err := r.store.DeleteInstance(r.ctx, dbInstance.PoolID, dbInstance.Name); err != nil {
return errors.Wrap(err, "removing runner from database")
}
continue
}
if providerInstance.Status == providerCommon.InstanceRunning {
// instance is running, but github reports runner as offline. Log the event.
// This scenario requires manual intervention.
// Perhaps it just came online and github did not yet change it's status?
log.Printf("instance %s is online but github reports runner as offline", dbInstance.Name)
continue
} else {
log.Printf("instance %s was found in stopped state; starting", dbInstance.Name)
//start the instance
if err := provider.Start(r.ctx, dbInstance.ProviderID); err != nil {
return errors.Wrapf(err, "starting instance %s", dbInstance.ProviderID)
}
}
return nil
})
}
if err := g.Wait(); err != nil {
return errors.Wrap(err, "removing orphaned github runners")
}
return nil
}
@ -441,122 +651,6 @@ func (r *basePoolManager) AddRunner(ctx context.Context, poolID string) error {
return nil
}
func (r *basePoolManager) loop() {
scaleDownTimer := time.NewTicker(common.PoolScaleDownInterval)
consolidateTimer := time.NewTicker(common.PoolConsilitationInterval)
reapTimer := time.NewTicker(common.PoolReapTimeoutInterval)
toolUpdateTimer := time.NewTicker(common.PoolToolUpdateInterval)
defer func() {
log.Printf("%s loop exited", r.helper.String())
scaleDownTimer.Stop()
consolidateTimer.Stop()
reapTimer.Stop()
toolUpdateTimer.Stop()
close(r.done)
}()
log.Printf("starting loop for %s", r.helper.String())
// Consolidate runners on loop start. Provider runners must match runners
// in github and DB. When a Workflow job is received, we will first create/update
// an entity in the database, before sending the request to the provider to create/delete
// an instance. If a "queued" job is received, we create an entity in the db with
// a state of "pending_create". Once that instance is up and calls home, it is marked
// as "active". If a "completed" job is received from github, we mark the instance
// as "pending_delete". Once the provider deletes the instance, we mark it as "deleted"
// in the database.
// We also ensure we have runners created based on pool characteristics. This is where
// we spin up "MinWorkers" for each runner type.
for {
switch r.managerIsRunning {
case true:
select {
case <-reapTimer.C:
runners, err := r.helper.GetGithubRunners()
if err != nil {
failureReason := fmt.Sprintf("error fetching github runners for %s: %s", r.helper.String(), err)
r.setPoolRunningState(false, failureReason)
log.Print(failureReason)
if errors.Is(err, runnerErrors.ErrUnauthorized) {
break
}
continue
}
if err := r.reapTimedOutRunners(runners); err != nil {
log.Printf("failed to reap timed out runners: %q", err)
}
if err := r.cleanupOrphanedGithubRunners(runners); err != nil {
log.Printf("failed to clean orphaned github runners: %q", err)
}
case <-consolidateTimer.C:
// consolidate.
r.consolidate()
case <-scaleDownTimer.C:
r.scaleDown()
case <-toolUpdateTimer.C:
// Update tools cache.
tools, err := r.helper.FetchTools()
if err != nil {
failureReason := fmt.Sprintf("failed to update tools for repo %s: %s", r.helper.String(), err)
r.setPoolRunningState(false, failureReason)
log.Print(failureReason)
if errors.Is(err, runnerErrors.ErrUnauthorized) {
break
}
continue
}
r.mux.Lock()
r.tools = tools
r.mux.Unlock()
case <-r.ctx.Done():
// daemon is shutting down.
return
case <-r.quit:
// this worker was stopped.
return
}
default:
select {
case <-r.ctx.Done():
// daemon is shutting down.
return
case <-r.quit:
// this worker was stopped.
return
default:
log.Printf("attempting to start pool manager for %s", r.helper.String())
tools, err := r.helper.FetchTools()
var failureReason string
if err != nil {
failureReason = fmt.Sprintf("failed to fetch tools from github for %s: %q", r.helper.String(), err)
r.setPoolRunningState(false, failureReason)
log.Print(failureReason)
if errors.Is(err, runnerErrors.ErrUnauthorized) {
r.waitForTimeoutOrCanceled(common.UnauthorizedBackoffTimer)
} else {
r.waitForTimeoutOrCanceled(60 * time.Second)
}
continue
}
r.mux.Lock()
r.tools = tools
r.mux.Unlock()
if err := r.runnerCleanup(); err != nil {
failureReason = fmt.Sprintf("failed to clean runners for %s: %q", r.helper.String(), err)
log.Print(failureReason)
if errors.Is(err, runnerErrors.ErrUnauthorized) {
r.setPoolRunningState(false, failureReason)
r.waitForTimeoutOrCanceled(common.UnauthorizedBackoffTimer)
}
continue
}
r.setPoolRunningState(true, "")
}
}
}
}
func (r *basePoolManager) Status() params.PoolManagerStatus {
r.mux.Lock()
defer r.mux.Unlock()
@ -671,11 +765,6 @@ func (r *basePoolManager) getRunnerDetailsFromJob(job params.WorkflowJob) (param
log.Printf("runner name not found in workflow job, attempting to fetch from API")
runnerInfo, err = r.helper.GetRunnerInfoFromWorkflow(job)
if err != nil {
if errors.Is(err, runnerErrors.ErrUnauthorized) {
failureReason := fmt.Sprintf("failed to fetch runner name from API: %q", err)
r.setPoolRunningState(false, failureReason)
log.Print(failureReason)
}
return params.RunnerInfo{}, errors.Wrap(err, "fetching runner name from API")
}
}
@ -693,77 +782,6 @@ func (r *basePoolManager) getRunnerDetailsFromJob(job params.WorkflowJob) (param
return runnerInfo, nil
}
func (r *basePoolManager) HandleWorkflowJob(job params.WorkflowJob) error {
if err := r.helper.ValidateOwner(job); err != nil {
return errors.Wrap(err, "validating owner")
}
switch job.Action {
case "queued":
// Create instance in database and set it to pending create.
// If we already have an idle runner around, that runner will pick up the job
// and trigger an "in_progress" update from github (see bellow), which in turn will set the
// runner state of the instance to "active". The ensureMinIdleRunners() function will
// exclude that runner from available runners and attempt to ensure
// the needed number of runners.
if err := r.acquireNewInstance(job); err != nil {
log.Printf("failed to add instance: %s", err)
}
case "completed":
// ignore the error here. A completed job may not have a runner name set
// if it was never assigned to a runner, and was canceled.
runnerInfo, err := r.getRunnerDetailsFromJob(job)
if err != nil {
// Unassigned jobs will have an empty runner_name.
// We also need to ignore not found errors, as we may get a webhook regarding
// a workflow that is handled by a runner at a different hierarchy level.
return nil
}
// update instance workload state.
if err := r.setInstanceRunnerStatus(runnerInfo.Name, providerCommon.RunnerTerminated); err != nil {
if errors.Is(err, runnerErrors.ErrNotFound) {
return nil
}
log.Printf("failed to update runner %s status", util.SanitizeLogEntry(runnerInfo.Name))
return errors.Wrap(err, "updating runner")
}
log.Printf("marking instance %s as pending_delete", util.SanitizeLogEntry(runnerInfo.Name))
if err := r.setInstanceStatus(runnerInfo.Name, providerCommon.InstancePendingDelete, nil); err != nil {
if errors.Is(err, runnerErrors.ErrNotFound) {
return nil
}
log.Printf("failed to update runner %s status", util.SanitizeLogEntry(runnerInfo.Name))
return errors.Wrap(err, "updating runner")
}
case "in_progress":
// in_progress jobs must have a runner name/ID assigned. Sometimes github will send a hook without
// a runner set. In such cases, we attemt to fetch it from the API.
runnerInfo, err := r.getRunnerDetailsFromJob(job)
if err != nil {
if errors.Is(err, runnerErrors.ErrNotFound) {
// This is most likely a runner we're not managing. If we define a repo from within an org
// and also define that same org, we will get a hook from github from both the repo and the org
// regarding the same workflow. We look for the runner in the database, and make sure it exists and is
// part of a pool that this manager is responsible for. A not found error here will most likely mean
// that we are not responsible for that runner, and we should ignore it.
return nil
}
return errors.Wrap(err, "determining runner name")
}
// update instance workload state.
if err := r.setInstanceRunnerStatus(runnerInfo.Name, providerCommon.RunnerActive); err != nil {
if errors.Is(err, runnerErrors.ErrNotFound) {
return nil
}
log.Printf("failed to update runner %s status", util.SanitizeLogEntry(runnerInfo.Name))
return errors.Wrap(err, "updating runner")
}
}
return nil
}
func (r *basePoolManager) poolLabel(poolID string) string {
return fmt.Sprintf("%s%s", poolIDLabelprefix, poolID)
}
@ -887,7 +905,7 @@ func (r *basePoolManager) retryFailedInstancesForOnePool(pool params.Pool) {
return
}
wg := sync.WaitGroup{}
g, _ := errgroup.WithContext(r.ctx)
for _, instance := range existingInstances {
if instance.Status != providerCommon.InstanceError {
continue
@ -895,14 +913,13 @@ func (r *basePoolManager) retryFailedInstancesForOnePool(pool params.Pool) {
if instance.CreateAttempt >= maxCreateAttempts {
continue
}
wg.Add(1)
go func(inst params.Instance) {
defer wg.Done()
instance := instance
g.Go(func() error {
// NOTE(gabriel-samfira): this is done in parallel. If there are many failed instances
// this has the potential to create many API requests to the target provider.
// TODO(gabriel-samfira): implement request throttling.
if err := r.deleteInstanceFromProvider(inst); err != nil {
log.Printf("failed to delete instance %s from provider: %s", inst.Name, err)
if err := r.deleteInstanceFromProvider(instance); err != nil {
log.Printf("failed to delete instance %s from provider: %s", instance.Name, err)
// Bail here, otherwise we risk creating multiple failing instances, and losing track
// of them. If Create instance failed to return a proper provider ID, we rely on the
// name to delete the instance. If we don't bail here, and end up with multiple
@ -910,7 +927,7 @@ func (r *basePoolManager) retryFailedInstancesForOnePool(pool params.Pool) {
// on any subsequent call, unless the external or native provider takes into account
// non unique names and loops over all of them. Something which is extremely hacky and
// which we would rather avoid.
return
return err
}
// TODO(gabriel-samfira): Incrementing CreateAttempt should be done within a transaction.
@ -918,18 +935,21 @@ func (r *basePoolManager) retryFailedInstancesForOnePool(pool params.Pool) {
// an instance in this state.
var tokenFetched bool = false
updateParams := params.UpdateInstanceParams{
CreateAttempt: inst.CreateAttempt + 1,
CreateAttempt: instance.CreateAttempt + 1,
TokenFetched: &tokenFetched,
Status: providerCommon.InstancePendingCreate,
}
log.Printf("queueing previously failed instance %s for retry", inst.Name)
log.Printf("queueing previously failed instance %s for retry", instance.Name)
// Set instance to pending create and wait for retry.
if err := r.updateInstance(inst.Name, updateParams); err != nil {
log.Printf("failed to update runner %s status", inst.Name)
if err := r.updateInstance(instance.Name, updateParams); err != nil {
log.Printf("failed to update runner %s status", instance.Name)
}
}(instance)
return nil
})
}
if err := g.Wait(); err != nil {
log.Printf("failed to retry failed instances for pool %s: %s", pool.ID, err)
}
wg.Wait()
}
func (r *basePoolManager) retryFailedInstances() {
@ -1014,7 +1034,7 @@ func (r *basePoolManager) deletePendingInstances() {
log.Printf("failed to fetch instances from store: %s", err)
return
}
g, ctx := errgroup.WithContext(r.ctx)
for _, instance := range instances {
if instance.Status != providerCommon.InstancePendingDelete {
// not in pending_delete status. Skip.
@ -1026,7 +1046,8 @@ func (r *basePoolManager) deletePendingInstances() {
if err := r.setInstanceStatus(instance.Name, providerCommon.InstanceDeleting, nil); err != nil {
log.Printf("failed to update runner %s status", instance.Name)
}
go func(instance params.Instance) (err error) {
instance := instance
g.Go(func() (err error) {
defer func(instance params.Instance) {
if err != nil {
// failed to remove from provider. Set the status back to pending_delete, which
@ -1042,11 +1063,14 @@ func (r *basePoolManager) deletePendingInstances() {
return errors.Wrap(err, "removing instance from provider")
}
if deleteErr := r.store.DeleteInstance(r.ctx, instance.PoolID, instance.Name); deleteErr != nil {
if deleteErr := r.store.DeleteInstance(ctx, instance.PoolID, instance.Name); deleteErr != nil {
return errors.Wrap(deleteErr, "deleting instance from database")
}
return
}(instance) //nolint
})
}
if err := g.Wait(); err != nil {
log.Printf("failed to delete pending instances: %s", err)
}
}
@ -1057,7 +1081,7 @@ func (r *basePoolManager) addPendingInstances() {
log.Printf("failed to fetch instances from store: %s", err)
return
}
g, _ := errgroup.WithContext(r.ctx)
for _, instance := range instances {
if instance.Status != providerCommon.InstancePendingCreate {
// not in pending_create status. Skip.
@ -1071,7 +1095,8 @@ func (r *basePoolManager) addPendingInstances() {
// when the loop runs again and we end up with multiple instances.
continue
}
go func(instance params.Instance) {
instance := instance
g.Go(func() error {
log.Printf("creating instance %s in pool %s", instance.Name, instance.PoolID)
if err := r.addInstanceToProvider(instance); err != nil {
log.Printf("failed to add instance to provider: %s", err)
@ -1081,7 +1106,11 @@ func (r *basePoolManager) addPendingInstances() {
}
log.Printf("failed to create instance in provider: %s", err)
}
}(instance)
return nil
})
}
if err := g.Wait(); err != nil {
log.Printf("failed to add pending instances: %s", err)
}
}
@ -1127,11 +1156,6 @@ func (r *basePoolManager) Wait() error {
func (r *basePoolManager) runnerCleanup() error {
runners, err := r.helper.GetGithubRunners()
if err != nil {
if errors.Is(err, runnerErrors.ErrUnauthorized) {
failureReason := fmt.Sprintf("failed to fetch runners: %q", err)
r.setPoolRunningState(false, failureReason)
log.Print(failureReason)
}
return errors.Wrap(err, "fetching github runners")
}
if err := r.cleanupOrphanedProviderRunners(runners); err != nil {

View file

@ -17,7 +17,9 @@ package lxd
import (
"context"
"fmt"
"log"
"sync"
"time"
"github.com/cloudbase/garm/config"
runnerErrors "github.com/cloudbase/garm/errors"
@ -358,6 +360,7 @@ func (l *LXD) DeleteInstance(ctx context.Context, instance string) error {
if err := l.setState(instance, "stop", true); err != nil {
if isNotFoundError(err) {
log.Printf("received not found error when stopping instance %s", instance)
return nil
}
// I am not proud of this, but the drivers.ErrInstanceIsStopped from LXD pulls in
@ -368,17 +371,39 @@ func (l *LXD) DeleteInstance(ctx context.Context, instance string) error {
}
}
op, err := cli.DeleteInstance(instance)
if err != nil {
if isNotFoundError(err) {
return nil
opResponse := make(chan struct {
op lxd.Operation
err error
})
var op lxd.Operation
go func() {
op, err := cli.DeleteInstance(instance)
opResponse <- struct {
op lxd.Operation
err error
}{op: op, err: err}
}()
select {
case resp := <-opResponse:
if resp.err != nil {
if isNotFoundError(resp.err) {
log.Printf("received not found error when deleting instance %s", instance)
return nil
}
return errors.Wrap(resp.err, "removing instance")
}
return errors.Wrap(err, "removing instance")
op = resp.op
case <-time.After(time.Second * 60):
return errors.Wrapf(runnerErrors.ErrTimeout, "removing instance %s", instance)
}
err = op.Wait()
opTimeout, cancel := context.WithTimeout(context.Background(), time.Second*60)
defer cancel()
err = op.WaitContext(opTimeout)
if err != nil {
if isNotFoundError(err) {
log.Printf("received not found error when waiting for instance deletion %s", instance)
return nil
}
return errors.Wrap(err, "waiting for instance deletion")
@ -386,6 +411,11 @@ func (l *LXD) DeleteInstance(ctx context.Context, instance string) error {
return nil
}
type listResponse struct {
instances []api.InstanceFull
err error
}
// ListInstances will list all instances for a provider.
func (l *LXD) ListInstances(ctx context.Context, poolID string) ([]params.Instance, error) {
cli, err := l.getCLI()
@ -393,9 +423,30 @@ func (l *LXD) ListInstances(ctx context.Context, poolID string) ([]params.Instan
return []params.Instance{}, errors.Wrap(err, "fetching client")
}
instances, err := cli.GetInstancesFull(api.InstanceTypeAny)
if err != nil {
return []params.Instance{}, errors.Wrap(err, "fetching instances")
result := make(chan listResponse, 1)
go func() {
// TODO(gabriel-samfira): if this blocks indefinitely, we will leak a goroutine.
// Convert the internal provider to an external one. Running the provider as an
// external process will allow us to not care if a goroutine leaks. Once a timeout
// is reached, the provider can just exit with an error. Something we can't do with
// internal providers.
instances, err := cli.GetInstancesFull(api.InstanceTypeAny)
result <- listResponse{
instances: instances,
err: err,
}
}()
var instances []api.InstanceFull
select {
case res := <-result:
if res.err != nil {
return []params.Instance{}, errors.Wrap(res.err, "fetching instances")
}
instances = res.instances
case <-time.After(time.Second * 60):
return []params.Instance{}, errors.Wrap(runnerErrors.ErrTimeout, "fetching instances from provider")
}
ret := []params.Instance{}
@ -449,7 +500,9 @@ func (l *LXD) setState(instance, state string, force bool) error {
if err != nil {
return errors.Wrapf(err, "setting state to %s", state)
}
err = op.Wait()
ctxTimeout, cancel := context.WithTimeout(context.Background(), time.Second*60)
defer cancel()
err = op.WaitContext(ctxTimeout)
if err != nil {
return errors.Wrapf(err, "waiting for instance to transition to state %s", state)
}

27
vendor/golang.org/x/sync/LICENSE generated vendored Normal file
View file

@ -0,0 +1,27 @@
Copyright (c) 2009 The Go Authors. All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are
met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above
copyright notice, this list of conditions and the following disclaimer
in the documentation and/or other materials provided with the
distribution.
* Neither the name of Google Inc. nor the names of its
contributors may be used to endorse or promote products derived from
this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

22
vendor/golang.org/x/sync/PATENTS generated vendored Normal file
View file

@ -0,0 +1,22 @@
Additional IP Rights Grant (Patents)
"This implementation" means the copyrightable works distributed by
Google as part of the Go project.
Google hereby grants to You a perpetual, worldwide, non-exclusive,
no-charge, royalty-free, irrevocable (except as stated in this section)
patent license to make, have made, use, offer to sell, sell, import,
transfer and otherwise run, modify and propagate the contents of this
implementation of Go, where such license applies only to those patent
claims, both currently owned or controlled by Google and acquired in
the future, licensable by Google that are necessarily infringed by this
implementation of Go. This grant does not include claims that would be
infringed only as a consequence of further modification of this
implementation. If you or your agent or exclusive licensee institute or
order or agree to the institution of patent litigation against any
entity (including a cross-claim or counterclaim in a lawsuit) alleging
that this implementation of Go or any code incorporated within this
implementation of Go constitutes direct or contributory patent
infringement, or inducement of patent infringement, then any patent
rights granted to you under this License for this implementation of Go
shall terminate as of the date such litigation is filed.

132
vendor/golang.org/x/sync/errgroup/errgroup.go generated vendored Normal file
View file

@ -0,0 +1,132 @@
// Copyright 2016 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package errgroup provides synchronization, error propagation, and Context
// cancelation for groups of goroutines working on subtasks of a common task.
package errgroup
import (
"context"
"fmt"
"sync"
)
type token struct{}
// A Group is a collection of goroutines working on subtasks that are part of
// the same overall task.
//
// A zero Group is valid, has no limit on the number of active goroutines,
// and does not cancel on error.
type Group struct {
cancel func()
wg sync.WaitGroup
sem chan token
errOnce sync.Once
err error
}
func (g *Group) done() {
if g.sem != nil {
<-g.sem
}
g.wg.Done()
}
// WithContext returns a new Group and an associated Context derived from ctx.
//
// The derived Context is canceled the first time a function passed to Go
// returns a non-nil error or the first time Wait returns, whichever occurs
// first.
func WithContext(ctx context.Context) (*Group, context.Context) {
ctx, cancel := context.WithCancel(ctx)
return &Group{cancel: cancel}, ctx
}
// Wait blocks until all function calls from the Go method have returned, then
// returns the first non-nil error (if any) from them.
func (g *Group) Wait() error {
g.wg.Wait()
if g.cancel != nil {
g.cancel()
}
return g.err
}
// Go calls the given function in a new goroutine.
// It blocks until the new goroutine can be added without the number of
// active goroutines in the group exceeding the configured limit.
//
// The first call to return a non-nil error cancels the group's context, if the
// group was created by calling WithContext. The error will be returned by Wait.
func (g *Group) Go(f func() error) {
if g.sem != nil {
g.sem <- token{}
}
g.wg.Add(1)
go func() {
defer g.done()
if err := f(); err != nil {
g.errOnce.Do(func() {
g.err = err
if g.cancel != nil {
g.cancel()
}
})
}
}()
}
// TryGo calls the given function in a new goroutine only if the number of
// active goroutines in the group is currently below the configured limit.
//
// The return value reports whether the goroutine was started.
func (g *Group) TryGo(f func() error) bool {
if g.sem != nil {
select {
case g.sem <- token{}:
// Note: this allows barging iff channels in general allow barging.
default:
return false
}
}
g.wg.Add(1)
go func() {
defer g.done()
if err := f(); err != nil {
g.errOnce.Do(func() {
g.err = err
if g.cancel != nil {
g.cancel()
}
})
}
}()
return true
}
// SetLimit limits the number of active goroutines in this group to at most n.
// A negative value indicates no limit.
//
// Any subsequent call to the Go method will block until it can add an active
// goroutine without exceeding the configured limit.
//
// The limit must not be modified while any goroutines in the group are active.
func (g *Group) SetLimit(n int) {
if n < 0 {
g.sem = nil
return
}
if len(g.sem) != 0 {
panic(fmt.Errorf("errgroup: modify limit while %v goroutines in the group are still active", len(g.sem)))
}
g.sem = make(chan token, n)
}

3
vendor/modules.txt vendored
View file

@ -248,6 +248,9 @@ golang.org/x/net/publicsuffix
## explicit; go 1.17
golang.org/x/oauth2
golang.org/x/oauth2/internal
# golang.org/x/sync v0.1.0
## explicit
golang.org/x/sync/errgroup
# golang.org/x/sys v0.6.0
## explicit; go 1.17
golang.org/x/sys/cpu