From 9efefc0d6a7de3b38654fe14123a757b8e26cbce Mon Sep 17 00:00:00 2001 From: Gabriel Adrian Samfira Date: Tue, 6 Jun 2023 16:07:07 +0300 Subject: [PATCH 1/6] Parallelization and LXD timeouts * cleanupOrphanedGithubRunners() now uses errgroup to parallelize and report errors when removing runners from the provider. * retryFailedInstancesForOnePool() now uses errgroup * Removed some setPoolRunningState which should be treated in the loop where those errors eventually bubble up and can be handled. * Added a number of timeouts in the LXD provider for delete and list instances. This provider should be converted into an external provider. Signed-off-by: Gabriel Adrian Samfira --- go.mod | 1 + go.sum | 2 + runner/pool/pool.go | 156 +++++++++--------- runner/providers/lxd/lxd.go | 73 ++++++-- vendor/golang.org/x/sync/LICENSE | 27 +++ vendor/golang.org/x/sync/PATENTS | 22 +++ vendor/golang.org/x/sync/errgroup/errgroup.go | 132 +++++++++++++++ vendor/modules.txt | 3 + 8 files changed, 329 insertions(+), 87 deletions(-) create mode 100644 vendor/golang.org/x/sync/LICENSE create mode 100644 vendor/golang.org/x/sync/PATENTS create mode 100644 vendor/golang.org/x/sync/errgroup/errgroup.go diff --git a/go.mod b/go.mod index d5e3ca47..055b69bc 100644 --- a/go.mod +++ b/go.mod @@ -26,6 +26,7 @@ require ( github.com/teris-io/shortid v0.0.0-20220617161101-71ec9f2aa569 golang.org/x/crypto v0.7.0 golang.org/x/oauth2 v0.6.0 + golang.org/x/sync v0.1.0 golang.org/x/sys v0.6.0 gopkg.in/DATA-DOG/go-sqlmock.v1 v1.3.0 gopkg.in/natefinch/lumberjack.v2 v2.2.1 diff --git a/go.sum b/go.sum index a1b53e64..4e838a1e 100644 --- a/go.sum +++ b/go.sum @@ -238,6 +238,8 @@ golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o= +golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181122145206-62eef0e2fa9b/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= diff --git a/runner/pool/pool.go b/runner/pool/pool.go index c800797f..fc46b187 100644 --- a/runner/pool/pool.go +++ b/runner/pool/pool.go @@ -34,6 +34,7 @@ import ( "github.com/google/go-github/v48/github" "github.com/pkg/errors" + "golang.org/x/sync/errgroup" ) var ( @@ -194,6 +195,7 @@ func instanceInList(instanceName string, instances []params.Instance) (params.In // first remove the instance from github, and then from our database. func (r *basePoolManager) cleanupOrphanedGithubRunners(runners []*github.Runner) error { poolInstanceCache := map[string][]params.Instance{} + g, ctx := errgroup.WithContext(r.ctx) for _, runner := range runners { if !r.isManagedRunner(labelsFromRunner(runner)) { log.Printf("runner %s is not managed by a pool belonging to %s", *runner.Name, r.helper.String()) @@ -220,20 +222,14 @@ func (r *basePoolManager) cleanupOrphanedGithubRunners(runners []*github.Runner) if resp != nil && resp.StatusCode == http.StatusNotFound { continue } - - if errors.Is(err, runnerErrors.ErrUnauthorized) { - failureReason := fmt.Sprintf("failed to remove github runner: %q", err) - r.setPoolRunningState(false, failureReason) - log.Print(failureReason) - } - return errors.Wrap(err, "removing runner") } continue } - if providerCommon.InstanceStatus(dbInstance.Status) == providerCommon.InstancePendingDelete { - // already marked for deleting, which means the github workflow finished. + switch providerCommon.InstanceStatus(dbInstance.Status) { + case providerCommon.InstancePendingDelete, providerCommon.InstanceDeleting: + // already marked for deleting or is in the process of being deleted. // Let consolidate take care of it. continue } @@ -259,48 +255,49 @@ func (r *basePoolManager) cleanupOrphanedGithubRunners(runners []*github.Runner) } poolInstanceCache[pool.ID] = poolInstances } - - providerInstance, ok := instanceInList(dbInstance.Name, poolInstances) - if !ok { - // The runner instance is no longer on the provider, and it appears offline in github. - // It should be safe to force remove it. - log.Printf("Runner instance for %s is no longer on the provider, removing from github", dbInstance.Name) - resp, err := r.helper.RemoveGithubRunner(*runner.ID) - if err != nil { - // Removed in the meantime? - if resp != nil && resp.StatusCode == http.StatusNotFound { - log.Printf("runner dissapeared from github") - } else { - if errors.Is(err, runnerErrors.ErrUnauthorized) { - failureReason := fmt.Sprintf("failed to remove github runner: %q", err) - r.setPoolRunningState(false, failureReason) - log.Print(failureReason) + // See: https://golang.org/doc/faq#closures_and_goroutines + runner := runner + g.Go(func() error { + providerInstance, ok := instanceInList(dbInstance.Name, poolInstances) + if !ok { + // The runner instance is no longer on the provider, and it appears offline in github. + // It should be safe to force remove it. + log.Printf("Runner instance for %s is no longer on the provider, removing from github", dbInstance.Name) + resp, err := r.helper.RemoveGithubRunner(*runner.ID) + if err != nil { + // Removed in the meantime? + if resp != nil && resp.StatusCode == http.StatusNotFound { + log.Printf("runner dissapeared from github") + } else { + return errors.Wrap(err, "removing runner from github") } + } + // Remove the database entry for the runner. + log.Printf("Removing %s from database", dbInstance.Name) + if err := r.store.DeleteInstance(ctx, dbInstance.PoolID, dbInstance.Name); err != nil { + return errors.Wrap(err, "removing runner from database") + } + return nil + } - return errors.Wrap(err, "removing runner from github") + if providerInstance.Status == providerCommon.InstanceRunning { + // instance is running, but github reports runner as offline. Log the event. + // This scenario requires manual intervention. + // Perhaps it just came online and github did not yet change it's status? + log.Printf("instance %s is online but github reports runner as offline", dbInstance.Name) + return nil + } else { + log.Printf("instance %s was found in stopped state; starting", dbInstance.Name) + //start the instance + if err := provider.Start(r.ctx, dbInstance.ProviderID); err != nil { + return errors.Wrapf(err, "starting instance %s", dbInstance.ProviderID) } } - // Remove the database entry for the runner. - log.Printf("Removing %s from database", dbInstance.Name) - if err := r.store.DeleteInstance(r.ctx, dbInstance.PoolID, dbInstance.Name); err != nil { - return errors.Wrap(err, "removing runner from database") - } - continue - } - - if providerInstance.Status == providerCommon.InstanceRunning { - // instance is running, but github reports runner as offline. Log the event. - // This scenario requires manual intervention. - // Perhaps it just came online and github did not yet change it's status? - log.Printf("instance %s is online but github reports runner as offline", dbInstance.Name) - continue - } else { - log.Printf("instance %s was found in stopped state; starting", dbInstance.Name) - //start the instance - if err := provider.Start(r.ctx, dbInstance.ProviderID); err != nil { - return errors.Wrapf(err, "starting instance %s", dbInstance.ProviderID) - } - } + return nil + }) + } + if err := g.Wait(); err != nil { + return errors.Wrap(err, "removing orphaned github runners") } return nil } @@ -485,8 +482,12 @@ func (r *basePoolManager) loop() { log.Printf("failed to reap timed out runners: %q", err) } - if err := r.cleanupOrphanedGithubRunners(runners); err != nil { - log.Printf("failed to clean orphaned github runners: %q", err) + if err := r.runnerCleanup(); err != nil { + failureReason := fmt.Sprintf("failed to clean runners for %s: %q", r.helper.String(), err) + log.Print(failureReason) + if errors.Is(err, runnerErrors.ErrUnauthorized) { + r.setPoolRunningState(false, failureReason) + } } case <-consolidateTimer.C: // consolidate. @@ -671,11 +672,6 @@ func (r *basePoolManager) getRunnerDetailsFromJob(job params.WorkflowJob) (param log.Printf("runner name not found in workflow job, attempting to fetch from API") runnerInfo, err = r.helper.GetRunnerInfoFromWorkflow(job) if err != nil { - if errors.Is(err, runnerErrors.ErrUnauthorized) { - failureReason := fmt.Sprintf("failed to fetch runner name from API: %q", err) - r.setPoolRunningState(false, failureReason) - log.Print(failureReason) - } return params.RunnerInfo{}, errors.Wrap(err, "fetching runner name from API") } } @@ -693,11 +689,17 @@ func (r *basePoolManager) getRunnerDetailsFromJob(job params.WorkflowJob) (param return runnerInfo, nil } -func (r *basePoolManager) HandleWorkflowJob(job params.WorkflowJob) error { +func (r *basePoolManager) HandleWorkflowJob(job params.WorkflowJob) (err error) { if err := r.helper.ValidateOwner(job); err != nil { return errors.Wrap(err, "validating owner") } + defer func() { + if err != nil && errors.Is(err, runnerErrors.ErrUnauthorized) { + r.setPoolRunningState(false, fmt.Sprintf("failed to handle job: %q", err)) + } + }() + switch job.Action { case "queued": // Create instance in database and set it to pending create. @@ -714,10 +716,13 @@ func (r *basePoolManager) HandleWorkflowJob(job params.WorkflowJob) error { // if it was never assigned to a runner, and was canceled. runnerInfo, err := r.getRunnerDetailsFromJob(job) if err != nil { - // Unassigned jobs will have an empty runner_name. - // We also need to ignore not found errors, as we may get a webhook regarding - // a workflow that is handled by a runner at a different hierarchy level. - return nil + if !errors.Is(err, runnerErrors.ErrUnauthorized) { + // Unassigned jobs will have an empty runner_name. + // We also need to ignore not found errors, as we may get a webhook regarding + // a workflow that is handled by a runner at a different hierarchy level. + return nil + } + return errors.Wrap(err, "updating runner") } // update instance workload state. @@ -887,7 +892,7 @@ func (r *basePoolManager) retryFailedInstancesForOnePool(pool params.Pool) { return } - wg := sync.WaitGroup{} + g, _ := errgroup.WithContext(r.ctx) for _, instance := range existingInstances { if instance.Status != providerCommon.InstanceError { continue @@ -895,14 +900,13 @@ func (r *basePoolManager) retryFailedInstancesForOnePool(pool params.Pool) { if instance.CreateAttempt >= maxCreateAttempts { continue } - wg.Add(1) - go func(inst params.Instance) { - defer wg.Done() + instance := instance + g.Go(func() error { // NOTE(gabriel-samfira): this is done in parallel. If there are many failed instances // this has the potential to create many API requests to the target provider. // TODO(gabriel-samfira): implement request throttling. - if err := r.deleteInstanceFromProvider(inst); err != nil { - log.Printf("failed to delete instance %s from provider: %s", inst.Name, err) + if err := r.deleteInstanceFromProvider(instance); err != nil { + log.Printf("failed to delete instance %s from provider: %s", instance.Name, err) // Bail here, otherwise we risk creating multiple failing instances, and losing track // of them. If Create instance failed to return a proper provider ID, we rely on the // name to delete the instance. If we don't bail here, and end up with multiple @@ -910,7 +914,7 @@ func (r *basePoolManager) retryFailedInstancesForOnePool(pool params.Pool) { // on any subsequent call, unless the external or native provider takes into account // non unique names and loops over all of them. Something which is extremely hacky and // which we would rather avoid. - return + return err } // TODO(gabriel-samfira): Incrementing CreateAttempt should be done within a transaction. @@ -918,18 +922,21 @@ func (r *basePoolManager) retryFailedInstancesForOnePool(pool params.Pool) { // an instance in this state. var tokenFetched bool = false updateParams := params.UpdateInstanceParams{ - CreateAttempt: inst.CreateAttempt + 1, + CreateAttempt: instance.CreateAttempt + 1, TokenFetched: &tokenFetched, Status: providerCommon.InstancePendingCreate, } - log.Printf("queueing previously failed instance %s for retry", inst.Name) + log.Printf("queueing previously failed instance %s for retry", instance.Name) // Set instance to pending create and wait for retry. - if err := r.updateInstance(inst.Name, updateParams); err != nil { - log.Printf("failed to update runner %s status", inst.Name) + if err := r.updateInstance(instance.Name, updateParams); err != nil { + log.Printf("failed to update runner %s status", instance.Name) } - }(instance) + return nil + }) + } + if err := g.Wait(); err != nil { + log.Printf("failed to retry failed instances for pool %s: %s", pool.ID, err) } - wg.Wait() } func (r *basePoolManager) retryFailedInstances() { @@ -1127,11 +1134,6 @@ func (r *basePoolManager) Wait() error { func (r *basePoolManager) runnerCleanup() error { runners, err := r.helper.GetGithubRunners() if err != nil { - if errors.Is(err, runnerErrors.ErrUnauthorized) { - failureReason := fmt.Sprintf("failed to fetch runners: %q", err) - r.setPoolRunningState(false, failureReason) - log.Print(failureReason) - } return errors.Wrap(err, "fetching github runners") } if err := r.cleanupOrphanedProviderRunners(runners); err != nil { diff --git a/runner/providers/lxd/lxd.go b/runner/providers/lxd/lxd.go index 8b56098d..dec6361f 100644 --- a/runner/providers/lxd/lxd.go +++ b/runner/providers/lxd/lxd.go @@ -17,7 +17,9 @@ package lxd import ( "context" "fmt" + "log" "sync" + "time" "github.com/cloudbase/garm/config" runnerErrors "github.com/cloudbase/garm/errors" @@ -358,6 +360,7 @@ func (l *LXD) DeleteInstance(ctx context.Context, instance string) error { if err := l.setState(instance, "stop", true); err != nil { if isNotFoundError(err) { + log.Printf("received not found error when stopping instance %s", instance) return nil } // I am not proud of this, but the drivers.ErrInstanceIsStopped from LXD pulls in @@ -368,17 +371,39 @@ func (l *LXD) DeleteInstance(ctx context.Context, instance string) error { } } - op, err := cli.DeleteInstance(instance) - if err != nil { - if isNotFoundError(err) { - return nil + opResponse := make(chan struct { + op lxd.Operation + err error + }) + var op lxd.Operation + go func() { + op, err := cli.DeleteInstance(instance) + opResponse <- struct { + op lxd.Operation + err error + }{op: op, err: err} + }() + + select { + case resp := <-opResponse: + if resp.err != nil { + if isNotFoundError(resp.err) { + log.Printf("received not found error when deleting instance %s", instance) + return nil + } + return errors.Wrap(resp.err, "removing instance") } - return errors.Wrap(err, "removing instance") + op = resp.op + case <-time.After(time.Second * 60): + return errors.Wrapf(runnerErrors.ErrTimeout, "removing instance %s", instance) } - err = op.Wait() + opTimeout, cancel := context.WithTimeout(context.Background(), time.Second*60) + defer cancel() + err = op.WaitContext(opTimeout) if err != nil { if isNotFoundError(err) { + log.Printf("received not found error when waiting for instance deletion %s", instance) return nil } return errors.Wrap(err, "waiting for instance deletion") @@ -386,6 +411,11 @@ func (l *LXD) DeleteInstance(ctx context.Context, instance string) error { return nil } +type listResponse struct { + instances []api.InstanceFull + err error +} + // ListInstances will list all instances for a provider. func (l *LXD) ListInstances(ctx context.Context, poolID string) ([]params.Instance, error) { cli, err := l.getCLI() @@ -393,9 +423,30 @@ func (l *LXD) ListInstances(ctx context.Context, poolID string) ([]params.Instan return []params.Instance{}, errors.Wrap(err, "fetching client") } - instances, err := cli.GetInstancesFull(api.InstanceTypeAny) - if err != nil { - return []params.Instance{}, errors.Wrap(err, "fetching instances") + result := make(chan listResponse, 1) + + go func() { + // TODO(gabriel-samfira): if this blocks indefinitely, we will leak a goroutine. + // Convert the internal provider to an external one. Running the provider as an + // external process will allow us to not care if a goroutine leaks. Once a timeout + // is reached, the provider can just exit with an error. Something we can't do with + // internal providers. + instances, err := cli.GetInstancesFull(api.InstanceTypeAny) + result <- listResponse{ + instances: instances, + err: err, + } + }() + + var instances []api.InstanceFull + select { + case res := <-result: + if res.err != nil { + return []params.Instance{}, errors.Wrap(res.err, "fetching instances") + } + instances = res.instances + case <-time.After(time.Second * 60): + return []params.Instance{}, errors.Wrap(runnerErrors.ErrTimeout, "fetching instances from provider") } ret := []params.Instance{} @@ -449,7 +500,9 @@ func (l *LXD) setState(instance, state string, force bool) error { if err != nil { return errors.Wrapf(err, "setting state to %s", state) } - err = op.Wait() + ctxTimeout, cancel := context.WithTimeout(context.Background(), time.Second*60) + defer cancel() + err = op.WaitContext(ctxTimeout) if err != nil { return errors.Wrapf(err, "waiting for instance to transition to state %s", state) } diff --git a/vendor/golang.org/x/sync/LICENSE b/vendor/golang.org/x/sync/LICENSE new file mode 100644 index 00000000..6a66aea5 --- /dev/null +++ b/vendor/golang.org/x/sync/LICENSE @@ -0,0 +1,27 @@ +Copyright (c) 2009 The Go Authors. All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are +met: + + * Redistributions of source code must retain the above copyright +notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above +copyright notice, this list of conditions and the following disclaimer +in the documentation and/or other materials provided with the +distribution. + * Neither the name of Google Inc. nor the names of its +contributors may be used to endorse or promote products derived from +this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/vendor/golang.org/x/sync/PATENTS b/vendor/golang.org/x/sync/PATENTS new file mode 100644 index 00000000..73309904 --- /dev/null +++ b/vendor/golang.org/x/sync/PATENTS @@ -0,0 +1,22 @@ +Additional IP Rights Grant (Patents) + +"This implementation" means the copyrightable works distributed by +Google as part of the Go project. + +Google hereby grants to You a perpetual, worldwide, non-exclusive, +no-charge, royalty-free, irrevocable (except as stated in this section) +patent license to make, have made, use, offer to sell, sell, import, +transfer and otherwise run, modify and propagate the contents of this +implementation of Go, where such license applies only to those patent +claims, both currently owned or controlled by Google and acquired in +the future, licensable by Google that are necessarily infringed by this +implementation of Go. This grant does not include claims that would be +infringed only as a consequence of further modification of this +implementation. If you or your agent or exclusive licensee institute or +order or agree to the institution of patent litigation against any +entity (including a cross-claim or counterclaim in a lawsuit) alleging +that this implementation of Go or any code incorporated within this +implementation of Go constitutes direct or contributory patent +infringement, or inducement of patent infringement, then any patent +rights granted to you under this License for this implementation of Go +shall terminate as of the date such litigation is filed. diff --git a/vendor/golang.org/x/sync/errgroup/errgroup.go b/vendor/golang.org/x/sync/errgroup/errgroup.go new file mode 100644 index 00000000..cbee7a4e --- /dev/null +++ b/vendor/golang.org/x/sync/errgroup/errgroup.go @@ -0,0 +1,132 @@ +// Copyright 2016 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package errgroup provides synchronization, error propagation, and Context +// cancelation for groups of goroutines working on subtasks of a common task. +package errgroup + +import ( + "context" + "fmt" + "sync" +) + +type token struct{} + +// A Group is a collection of goroutines working on subtasks that are part of +// the same overall task. +// +// A zero Group is valid, has no limit on the number of active goroutines, +// and does not cancel on error. +type Group struct { + cancel func() + + wg sync.WaitGroup + + sem chan token + + errOnce sync.Once + err error +} + +func (g *Group) done() { + if g.sem != nil { + <-g.sem + } + g.wg.Done() +} + +// WithContext returns a new Group and an associated Context derived from ctx. +// +// The derived Context is canceled the first time a function passed to Go +// returns a non-nil error or the first time Wait returns, whichever occurs +// first. +func WithContext(ctx context.Context) (*Group, context.Context) { + ctx, cancel := context.WithCancel(ctx) + return &Group{cancel: cancel}, ctx +} + +// Wait blocks until all function calls from the Go method have returned, then +// returns the first non-nil error (if any) from them. +func (g *Group) Wait() error { + g.wg.Wait() + if g.cancel != nil { + g.cancel() + } + return g.err +} + +// Go calls the given function in a new goroutine. +// It blocks until the new goroutine can be added without the number of +// active goroutines in the group exceeding the configured limit. +// +// The first call to return a non-nil error cancels the group's context, if the +// group was created by calling WithContext. The error will be returned by Wait. +func (g *Group) Go(f func() error) { + if g.sem != nil { + g.sem <- token{} + } + + g.wg.Add(1) + go func() { + defer g.done() + + if err := f(); err != nil { + g.errOnce.Do(func() { + g.err = err + if g.cancel != nil { + g.cancel() + } + }) + } + }() +} + +// TryGo calls the given function in a new goroutine only if the number of +// active goroutines in the group is currently below the configured limit. +// +// The return value reports whether the goroutine was started. +func (g *Group) TryGo(f func() error) bool { + if g.sem != nil { + select { + case g.sem <- token{}: + // Note: this allows barging iff channels in general allow barging. + default: + return false + } + } + + g.wg.Add(1) + go func() { + defer g.done() + + if err := f(); err != nil { + g.errOnce.Do(func() { + g.err = err + if g.cancel != nil { + g.cancel() + } + }) + } + }() + return true +} + +// SetLimit limits the number of active goroutines in this group to at most n. +// A negative value indicates no limit. +// +// Any subsequent call to the Go method will block until it can add an active +// goroutine without exceeding the configured limit. +// +// The limit must not be modified while any goroutines in the group are active. +func (g *Group) SetLimit(n int) { + if n < 0 { + g.sem = nil + return + } + if len(g.sem) != 0 { + panic(fmt.Errorf("errgroup: modify limit while %v goroutines in the group are still active", len(g.sem))) + } + g.sem = make(chan token, n) +} diff --git a/vendor/modules.txt b/vendor/modules.txt index bd55d435..07a96f26 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -248,6 +248,9 @@ golang.org/x/net/publicsuffix ## explicit; go 1.17 golang.org/x/oauth2 golang.org/x/oauth2/internal +# golang.org/x/sync v0.1.0 +## explicit +golang.org/x/sync/errgroup # golang.org/x/sys v0.6.0 ## explicit; go 1.17 golang.org/x/sys/cpu From 05a79d298c906f9f077a12c0fe9791708473630b Mon Sep 17 00:00:00 2001 From: Gabriel Adrian Samfira Date: Tue, 6 Jun 2023 16:31:30 +0300 Subject: [PATCH 2/6] Move some code around Signed-off-by: Gabriel Adrian Samfira --- runner/pool/pool.go | 400 ++++++++++++++++++++++---------------------- 1 file changed, 200 insertions(+), 200 deletions(-) diff --git a/runner/pool/pool.go b/runner/pool/pool.go index fc46b187..5687f0e1 100644 --- a/runner/pool/pool.go +++ b/runner/pool/pool.go @@ -69,6 +69,206 @@ type basePoolManager struct { mux sync.Mutex } +func (r *basePoolManager) HandleWorkflowJob(job params.WorkflowJob) (err error) { + if err := r.helper.ValidateOwner(job); err != nil { + return errors.Wrap(err, "validating owner") + } + + defer func() { + if err != nil && errors.Is(err, runnerErrors.ErrUnauthorized) { + r.setPoolRunningState(false, fmt.Sprintf("failed to handle job: %q", err)) + } + }() + + switch job.Action { + case "queued": + // Create instance in database and set it to pending create. + // If we already have an idle runner around, that runner will pick up the job + // and trigger an "in_progress" update from github (see bellow), which in turn will set the + // runner state of the instance to "active". The ensureMinIdleRunners() function will + // exclude that runner from available runners and attempt to ensure + // the needed number of runners. + if err := r.acquireNewInstance(job); err != nil { + log.Printf("failed to add instance: %s", err) + } + case "completed": + // ignore the error here. A completed job may not have a runner name set + // if it was never assigned to a runner, and was canceled. + runnerInfo, err := r.getRunnerDetailsFromJob(job) + if err != nil { + if !errors.Is(err, runnerErrors.ErrUnauthorized) { + // Unassigned jobs will have an empty runner_name. + // We also need to ignore not found errors, as we may get a webhook regarding + // a workflow that is handled by a runner at a different hierarchy level. + return nil + } + return errors.Wrap(err, "updating runner") + } + + // update instance workload state. + if err := r.setInstanceRunnerStatus(runnerInfo.Name, providerCommon.RunnerTerminated); err != nil { + if errors.Is(err, runnerErrors.ErrNotFound) { + return nil + } + log.Printf("failed to update runner %s status", util.SanitizeLogEntry(runnerInfo.Name)) + return errors.Wrap(err, "updating runner") + } + log.Printf("marking instance %s as pending_delete", util.SanitizeLogEntry(runnerInfo.Name)) + if err := r.setInstanceStatus(runnerInfo.Name, providerCommon.InstancePendingDelete, nil); err != nil { + if errors.Is(err, runnerErrors.ErrNotFound) { + return nil + } + log.Printf("failed to update runner %s status", util.SanitizeLogEntry(runnerInfo.Name)) + return errors.Wrap(err, "updating runner") + } + case "in_progress": + // in_progress jobs must have a runner name/ID assigned. Sometimes github will send a hook without + // a runner set. In such cases, we attemt to fetch it from the API. + runnerInfo, err := r.getRunnerDetailsFromJob(job) + if err != nil { + if errors.Is(err, runnerErrors.ErrNotFound) { + // This is most likely a runner we're not managing. If we define a repo from within an org + // and also define that same org, we will get a hook from github from both the repo and the org + // regarding the same workflow. We look for the runner in the database, and make sure it exists and is + // part of a pool that this manager is responsible for. A not found error here will most likely mean + // that we are not responsible for that runner, and we should ignore it. + return nil + } + return errors.Wrap(err, "determining runner name") + } + + // update instance workload state. + if err := r.setInstanceRunnerStatus(runnerInfo.Name, providerCommon.RunnerActive); err != nil { + if errors.Is(err, runnerErrors.ErrNotFound) { + return nil + } + log.Printf("failed to update runner %s status", util.SanitizeLogEntry(runnerInfo.Name)) + return errors.Wrap(err, "updating runner") + } + } + return nil +} + +func (r *basePoolManager) loop() { + scaleDownTimer := time.NewTicker(common.PoolScaleDownInterval) + consolidateTimer := time.NewTicker(common.PoolConsilitationInterval) + reapTimer := time.NewTicker(common.PoolReapTimeoutInterval) + toolUpdateTimer := time.NewTicker(common.PoolToolUpdateInterval) + defer func() { + log.Printf("%s loop exited", r.helper.String()) + scaleDownTimer.Stop() + consolidateTimer.Stop() + reapTimer.Stop() + toolUpdateTimer.Stop() + close(r.done) + }() + log.Printf("starting loop for %s", r.helper.String()) + + // Consolidate runners on loop start. Provider runners must match runners + // in github and DB. When a Workflow job is received, we will first create/update + // an entity in the database, before sending the request to the provider to create/delete + // an instance. If a "queued" job is received, we create an entity in the db with + // a state of "pending_create". Once that instance is up and calls home, it is marked + // as "active". If a "completed" job is received from github, we mark the instance + // as "pending_delete". Once the provider deletes the instance, we mark it as "deleted" + // in the database. + // We also ensure we have runners created based on pool characteristics. This is where + // we spin up "MinWorkers" for each runner type. + for { + switch r.managerIsRunning { + case true: + select { + case <-reapTimer.C: + runners, err := r.helper.GetGithubRunners() + if err != nil { + failureReason := fmt.Sprintf("error fetching github runners for %s: %s", r.helper.String(), err) + r.setPoolRunningState(false, failureReason) + log.Print(failureReason) + if errors.Is(err, runnerErrors.ErrUnauthorized) { + break + } + continue + } + if err := r.reapTimedOutRunners(runners); err != nil { + log.Printf("failed to reap timed out runners: %q", err) + } + + if err := r.runnerCleanup(); err != nil { + failureReason := fmt.Sprintf("failed to clean runners for %s: %q", r.helper.String(), err) + log.Print(failureReason) + if errors.Is(err, runnerErrors.ErrUnauthorized) { + r.setPoolRunningState(false, failureReason) + } + } + case <-consolidateTimer.C: + // consolidate. + r.consolidate() + case <-scaleDownTimer.C: + r.scaleDown() + case <-toolUpdateTimer.C: + // Update tools cache. + tools, err := r.helper.FetchTools() + if err != nil { + failureReason := fmt.Sprintf("failed to update tools for repo %s: %s", r.helper.String(), err) + r.setPoolRunningState(false, failureReason) + log.Print(failureReason) + if errors.Is(err, runnerErrors.ErrUnauthorized) { + break + } + continue + } + r.mux.Lock() + r.tools = tools + r.mux.Unlock() + case <-r.ctx.Done(): + // daemon is shutting down. + return + case <-r.quit: + // this worker was stopped. + return + } + default: + select { + case <-r.ctx.Done(): + // daemon is shutting down. + return + case <-r.quit: + // this worker was stopped. + return + default: + log.Printf("attempting to start pool manager for %s", r.helper.String()) + tools, err := r.helper.FetchTools() + var failureReason string + if err != nil { + failureReason = fmt.Sprintf("failed to fetch tools from github for %s: %q", r.helper.String(), err) + r.setPoolRunningState(false, failureReason) + log.Print(failureReason) + if errors.Is(err, runnerErrors.ErrUnauthorized) { + r.waitForTimeoutOrCanceled(common.UnauthorizedBackoffTimer) + } else { + r.waitForTimeoutOrCanceled(60 * time.Second) + } + continue + } + r.mux.Lock() + r.tools = tools + r.mux.Unlock() + + if err := r.runnerCleanup(); err != nil { + failureReason = fmt.Sprintf("failed to clean runners for %s: %q", r.helper.String(), err) + log.Print(failureReason) + if errors.Is(err, runnerErrors.ErrUnauthorized) { + r.setPoolRunningState(false, failureReason) + r.waitForTimeoutOrCanceled(common.UnauthorizedBackoffTimer) + } + continue + } + r.setPoolRunningState(true, "") + } + } + } +} + func controllerIDFromLabels(labels []string) string { for _, lbl := range labels { if strings.HasPrefix(lbl, controllerLabelPrefix) { @@ -438,126 +638,6 @@ func (r *basePoolManager) AddRunner(ctx context.Context, poolID string) error { return nil } -func (r *basePoolManager) loop() { - scaleDownTimer := time.NewTicker(common.PoolScaleDownInterval) - consolidateTimer := time.NewTicker(common.PoolConsilitationInterval) - reapTimer := time.NewTicker(common.PoolReapTimeoutInterval) - toolUpdateTimer := time.NewTicker(common.PoolToolUpdateInterval) - defer func() { - log.Printf("%s loop exited", r.helper.String()) - scaleDownTimer.Stop() - consolidateTimer.Stop() - reapTimer.Stop() - toolUpdateTimer.Stop() - close(r.done) - }() - log.Printf("starting loop for %s", r.helper.String()) - - // Consolidate runners on loop start. Provider runners must match runners - // in github and DB. When a Workflow job is received, we will first create/update - // an entity in the database, before sending the request to the provider to create/delete - // an instance. If a "queued" job is received, we create an entity in the db with - // a state of "pending_create". Once that instance is up and calls home, it is marked - // as "active". If a "completed" job is received from github, we mark the instance - // as "pending_delete". Once the provider deletes the instance, we mark it as "deleted" - // in the database. - // We also ensure we have runners created based on pool characteristics. This is where - // we spin up "MinWorkers" for each runner type. - for { - switch r.managerIsRunning { - case true: - select { - case <-reapTimer.C: - runners, err := r.helper.GetGithubRunners() - if err != nil { - failureReason := fmt.Sprintf("error fetching github runners for %s: %s", r.helper.String(), err) - r.setPoolRunningState(false, failureReason) - log.Print(failureReason) - if errors.Is(err, runnerErrors.ErrUnauthorized) { - break - } - continue - } - if err := r.reapTimedOutRunners(runners); err != nil { - log.Printf("failed to reap timed out runners: %q", err) - } - - if err := r.runnerCleanup(); err != nil { - failureReason := fmt.Sprintf("failed to clean runners for %s: %q", r.helper.String(), err) - log.Print(failureReason) - if errors.Is(err, runnerErrors.ErrUnauthorized) { - r.setPoolRunningState(false, failureReason) - } - } - case <-consolidateTimer.C: - // consolidate. - r.consolidate() - case <-scaleDownTimer.C: - r.scaleDown() - case <-toolUpdateTimer.C: - // Update tools cache. - tools, err := r.helper.FetchTools() - if err != nil { - failureReason := fmt.Sprintf("failed to update tools for repo %s: %s", r.helper.String(), err) - r.setPoolRunningState(false, failureReason) - log.Print(failureReason) - if errors.Is(err, runnerErrors.ErrUnauthorized) { - break - } - continue - } - r.mux.Lock() - r.tools = tools - r.mux.Unlock() - case <-r.ctx.Done(): - // daemon is shutting down. - return - case <-r.quit: - // this worker was stopped. - return - } - default: - select { - case <-r.ctx.Done(): - // daemon is shutting down. - return - case <-r.quit: - // this worker was stopped. - return - default: - log.Printf("attempting to start pool manager for %s", r.helper.String()) - tools, err := r.helper.FetchTools() - var failureReason string - if err != nil { - failureReason = fmt.Sprintf("failed to fetch tools from github for %s: %q", r.helper.String(), err) - r.setPoolRunningState(false, failureReason) - log.Print(failureReason) - if errors.Is(err, runnerErrors.ErrUnauthorized) { - r.waitForTimeoutOrCanceled(common.UnauthorizedBackoffTimer) - } else { - r.waitForTimeoutOrCanceled(60 * time.Second) - } - continue - } - r.mux.Lock() - r.tools = tools - r.mux.Unlock() - - if err := r.runnerCleanup(); err != nil { - failureReason = fmt.Sprintf("failed to clean runners for %s: %q", r.helper.String(), err) - log.Print(failureReason) - if errors.Is(err, runnerErrors.ErrUnauthorized) { - r.setPoolRunningState(false, failureReason) - r.waitForTimeoutOrCanceled(common.UnauthorizedBackoffTimer) - } - continue - } - r.setPoolRunningState(true, "") - } - } - } -} - func (r *basePoolManager) Status() params.PoolManagerStatus { r.mux.Lock() defer r.mux.Unlock() @@ -689,86 +769,6 @@ func (r *basePoolManager) getRunnerDetailsFromJob(job params.WorkflowJob) (param return runnerInfo, nil } -func (r *basePoolManager) HandleWorkflowJob(job params.WorkflowJob) (err error) { - if err := r.helper.ValidateOwner(job); err != nil { - return errors.Wrap(err, "validating owner") - } - - defer func() { - if err != nil && errors.Is(err, runnerErrors.ErrUnauthorized) { - r.setPoolRunningState(false, fmt.Sprintf("failed to handle job: %q", err)) - } - }() - - switch job.Action { - case "queued": - // Create instance in database and set it to pending create. - // If we already have an idle runner around, that runner will pick up the job - // and trigger an "in_progress" update from github (see bellow), which in turn will set the - // runner state of the instance to "active". The ensureMinIdleRunners() function will - // exclude that runner from available runners and attempt to ensure - // the needed number of runners. - if err := r.acquireNewInstance(job); err != nil { - log.Printf("failed to add instance: %s", err) - } - case "completed": - // ignore the error here. A completed job may not have a runner name set - // if it was never assigned to a runner, and was canceled. - runnerInfo, err := r.getRunnerDetailsFromJob(job) - if err != nil { - if !errors.Is(err, runnerErrors.ErrUnauthorized) { - // Unassigned jobs will have an empty runner_name. - // We also need to ignore not found errors, as we may get a webhook regarding - // a workflow that is handled by a runner at a different hierarchy level. - return nil - } - return errors.Wrap(err, "updating runner") - } - - // update instance workload state. - if err := r.setInstanceRunnerStatus(runnerInfo.Name, providerCommon.RunnerTerminated); err != nil { - if errors.Is(err, runnerErrors.ErrNotFound) { - return nil - } - log.Printf("failed to update runner %s status", util.SanitizeLogEntry(runnerInfo.Name)) - return errors.Wrap(err, "updating runner") - } - log.Printf("marking instance %s as pending_delete", util.SanitizeLogEntry(runnerInfo.Name)) - if err := r.setInstanceStatus(runnerInfo.Name, providerCommon.InstancePendingDelete, nil); err != nil { - if errors.Is(err, runnerErrors.ErrNotFound) { - return nil - } - log.Printf("failed to update runner %s status", util.SanitizeLogEntry(runnerInfo.Name)) - return errors.Wrap(err, "updating runner") - } - case "in_progress": - // in_progress jobs must have a runner name/ID assigned. Sometimes github will send a hook without - // a runner set. In such cases, we attemt to fetch it from the API. - runnerInfo, err := r.getRunnerDetailsFromJob(job) - if err != nil { - if errors.Is(err, runnerErrors.ErrNotFound) { - // This is most likely a runner we're not managing. If we define a repo from within an org - // and also define that same org, we will get a hook from github from both the repo and the org - // regarding the same workflow. We look for the runner in the database, and make sure it exists and is - // part of a pool that this manager is responsible for. A not found error here will most likely mean - // that we are not responsible for that runner, and we should ignore it. - return nil - } - return errors.Wrap(err, "determining runner name") - } - - // update instance workload state. - if err := r.setInstanceRunnerStatus(runnerInfo.Name, providerCommon.RunnerActive); err != nil { - if errors.Is(err, runnerErrors.ErrNotFound) { - return nil - } - log.Printf("failed to update runner %s status", util.SanitizeLogEntry(runnerInfo.Name)) - return errors.Wrap(err, "updating runner") - } - } - return nil -} - func (r *basePoolManager) poolLabel(poolID string) string { return fmt.Sprintf("%s%s", poolIDLabelprefix, poolID) } From e9f66c20353c9da85bf46af59cef3aff66c6f96c Mon Sep 17 00:00:00 2001 From: Gabriel Adrian Samfira Date: Tue, 6 Jun 2023 16:36:08 +0300 Subject: [PATCH 3/6] Wait for deletePendingInstances() to finish Use an errgroup to wait for all instance deletion operations before returning. Log any failure. Signed-off-by: Gabriel Adrian Samfira --- runner/pool/pool.go | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/runner/pool/pool.go b/runner/pool/pool.go index 5687f0e1..4764e0fe 100644 --- a/runner/pool/pool.go +++ b/runner/pool/pool.go @@ -1021,7 +1021,7 @@ func (r *basePoolManager) deletePendingInstances() { log.Printf("failed to fetch instances from store: %s", err) return } - + g, ctx := errgroup.WithContext(r.ctx) for _, instance := range instances { if instance.Status != providerCommon.InstancePendingDelete { // not in pending_delete status. Skip. @@ -1033,7 +1033,8 @@ func (r *basePoolManager) deletePendingInstances() { if err := r.setInstanceStatus(instance.Name, providerCommon.InstanceDeleting, nil); err != nil { log.Printf("failed to update runner %s status", instance.Name) } - go func(instance params.Instance) (err error) { + instance := instance + g.Go(func() (err error) { defer func(instance params.Instance) { if err != nil { // failed to remove from provider. Set the status back to pending_delete, which @@ -1049,11 +1050,14 @@ func (r *basePoolManager) deletePendingInstances() { return errors.Wrap(err, "removing instance from provider") } - if deleteErr := r.store.DeleteInstance(r.ctx, instance.PoolID, instance.Name); deleteErr != nil { + if deleteErr := r.store.DeleteInstance(ctx, instance.PoolID, instance.Name); deleteErr != nil { return errors.Wrap(deleteErr, "deleting instance from database") } return - }(instance) //nolint + }) + } + if err := g.Wait(); err != nil { + log.Printf("failed to delete pending instances: %s", err) } } From bd2f103743c92900bb2ff46f6e6b28038f78afc0 Mon Sep 17 00:00:00 2001 From: Gabriel Adrian Samfira Date: Tue, 6 Jun 2023 16:40:27 +0300 Subject: [PATCH 4/6] Wait for addPendingInstances to finish Signed-off-by: Gabriel Adrian Samfira --- runner/pool/pool.go | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/runner/pool/pool.go b/runner/pool/pool.go index 4764e0fe..082ae3a7 100644 --- a/runner/pool/pool.go +++ b/runner/pool/pool.go @@ -1068,7 +1068,7 @@ func (r *basePoolManager) addPendingInstances() { log.Printf("failed to fetch instances from store: %s", err) return } - + g, _ := errgroup.WithContext(r.ctx) for _, instance := range instances { if instance.Status != providerCommon.InstancePendingCreate { // not in pending_create status. Skip. @@ -1082,7 +1082,8 @@ func (r *basePoolManager) addPendingInstances() { // when the loop runs again and we end up with multiple instances. continue } - go func(instance params.Instance) { + instance := instance + g.Go(func() error { log.Printf("creating instance %s in pool %s", instance.Name, instance.PoolID) if err := r.addInstanceToProvider(instance); err != nil { log.Printf("failed to add instance to provider: %s", err) @@ -1092,7 +1093,11 @@ func (r *basePoolManager) addPendingInstances() { } log.Printf("failed to create instance in provider: %s", err) } - }(instance) + return nil + }) + } + if err := g.Wait(); err != nil { + log.Printf("failed to add pending instances: %s", err) } } From 829933559d12c1405b72fdda8f3f0a1a82de988d Mon Sep 17 00:00:00 2001 From: Gabriel Adrian Samfira Date: Tue, 6 Jun 2023 14:49:28 +0000 Subject: [PATCH 5/6] Allow installing runners to finish Signed-off-by: Gabriel Adrian Samfira --- runner/pool/pool.go | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/runner/pool/pool.go b/runner/pool/pool.go index 082ae3a7..45ad49d6 100644 --- a/runner/pool/pool.go +++ b/runner/pool/pool.go @@ -332,6 +332,19 @@ func (r *basePoolManager) cleanupOrphanedProviderRunners(runners []*github.Runne continue } + switch instance.RunnerStatus { + case providerCommon.RunnerPending, providerCommon.RunnerInstalling: + // runner is still installing. We give it a chance to finish. + log.Printf("runner %s is still installing, give it a chance to finish", instance.Name) + continue + } + + if time.Since(instance.UpdatedAt).Minutes() < 5 { + // instance was updated recently. We give it a chance to register itself in github. + log.Printf("instance %s was updated recently, give it a chance to connect to github", instance.Name) + continue + } + if ok := runnerNames[instance.Name]; !ok { // Set pending_delete on DB field. Allow consolidate() to remove it. if err := r.setInstanceStatus(instance.Name, providerCommon.InstancePendingDelete, nil); err != nil { @@ -482,7 +495,7 @@ func (r *basePoolManager) cleanupOrphanedGithubRunners(runners []*github.Runner) if providerInstance.Status == providerCommon.InstanceRunning { // instance is running, but github reports runner as offline. Log the event. - // This scenario requires manual intervention. + // This scenario may require manual intervention. // Perhaps it just came online and github did not yet change it's status? log.Printf("instance %s is online but github reports runner as offline", dbInstance.Name) return nil From d8ed55288f7a7ecf3ddd1a3efa3aa8ca453c4b18 Mon Sep 17 00:00:00 2001 From: Gabriel Adrian Samfira Date: Tue, 6 Jun 2023 14:51:28 +0000 Subject: [PATCH 6/6] Small comment change Signed-off-by: Gabriel Adrian Samfira --- runner/pool/pool.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/runner/pool/pool.go b/runner/pool/pool.go index 45ad49d6..dce4e741 100644 --- a/runner/pool/pool.go +++ b/runner/pool/pool.go @@ -341,7 +341,7 @@ func (r *basePoolManager) cleanupOrphanedProviderRunners(runners []*github.Runne if time.Since(instance.UpdatedAt).Minutes() < 5 { // instance was updated recently. We give it a chance to register itself in github. - log.Printf("instance %s was updated recently, give it a chance to connect to github", instance.Name) + log.Printf("instance %s was updated recently, skipping check", instance.Name) continue } @@ -442,7 +442,7 @@ func (r *basePoolManager) cleanupOrphanedGithubRunners(runners []*github.Runner) switch providerCommon.InstanceStatus(dbInstance.Status) { case providerCommon.InstancePendingDelete, providerCommon.InstanceDeleting: - // already marked for deleting or is in the process of being deleted. + // already marked for deletion or is in the process of being deleted. // Let consolidate take care of it. continue }