From 5bca63eeb1654e9cb9870682d26af6aaea579de0 Mon Sep 17 00:00:00 2001 From: Gabriel Adrian Samfira Date: Mon, 3 Jul 2023 07:35:30 +0000 Subject: [PATCH] Replace wait implementation with errgroup Signed-off-by: Gabriel Adrian Samfira --- runner/runner.go | 142 ++++++++++++++++++++++------------------------- 1 file changed, 67 insertions(+), 75 deletions(-) diff --git a/runner/runner.go b/runner/runner.go index fa28ae66..7db97fa9 100644 --- a/runner/runner.go +++ b/runner/runner.go @@ -40,6 +40,7 @@ import ( "github.com/cloudbase/garm/runner/providers" providerCommon "github.com/cloudbase/garm/runner/providers/common" "github.com/cloudbase/garm/util" + "golang.org/x/sync/errgroup" "github.com/juju/clock" "github.com/juju/retry" @@ -360,44 +361,37 @@ func (r *Runner) loadReposOrgsAndEnterprises() error { return errors.Wrap(err, "fetching enterprises") } - expectedReplies := len(repos) + len(orgs) + len(enterprises) - errChan := make(chan error, expectedReplies) - + g, _ := errgroup.WithContext(r.ctx) for _, repo := range repos { - go func(repo params.Repository) { + repo := repo + g.Go(func() error { log.Printf("creating pool manager for repo %s/%s", repo.Owner, repo.Name) _, err := r.poolManagerCtrl.CreateRepoPoolManager(r.ctx, repo, r.providers, r.store) - errChan <- err - }(repo) + return err + }) } for _, org := range orgs { - go func(org params.Organization) { + org := org + g.Go(func() error { log.Printf("creating pool manager for organization %s", org.Name) _, err := r.poolManagerCtrl.CreateOrgPoolManager(r.ctx, org, r.providers, r.store) - errChan <- err - }(org) + return err + }) } for _, enterprise := range enterprises { - go func(enterprise params.Enterprise) { + enterprise := enterprise + g.Go(func() error { log.Printf("creating pool manager for enterprise %s", enterprise.Name) _, err := r.poolManagerCtrl.CreateEnterprisePoolManager(r.ctx, enterprise, r.providers, r.store) - errChan <- err - }(enterprise) + return err + }) } - for i := 0; i < expectedReplies; i++ { - select { - case err := <-errChan: - if err != nil { - return errors.Wrap(err, "failed to load pool managers for repos and orgs") - } - case <-time.After(60 * time.Second): - return fmt.Errorf("timed out waiting for pool manager load") - } + if err := r.waitForErrorGroupOrTimeout(g); err != nil { + return fmt.Errorf("failed to create pool managers: %w", err) } - return nil } @@ -420,43 +414,52 @@ func (r *Runner) Start() error { return errors.Wrap(err, "fetch enterprise pool managers") } - expectedReplies := len(repositories) + len(organizations) + len(enterprises) - errChan := make(chan error, expectedReplies) - + g, _ := errgroup.WithContext(r.ctx) for _, repo := range repositories { - go func(repo common.PoolManager) { - err := repo.Start() - errChan <- err - }(repo) + repo := repo + g.Go(func() error { + return repo.Start() + }) } for _, org := range organizations { - go func(org common.PoolManager) { - err := org.Start() - errChan <- err - }(org) + org := org + g.Go(func() error { + return org.Start() + }) } for _, enterprise := range enterprises { - go func(org common.PoolManager) { - err := org.Start() - errChan <- err - }(enterprise) + enterprise := enterprise + g.Go(func() error { + return enterprise.Start() + }) } - for i := 0; i < expectedReplies; i++ { - select { - case err := <-errChan: - if err != nil { - return errors.Wrap(err, "starting pool manager") - } - case <-time.After(60 * time.Second): - return fmt.Errorf("timed out waiting for pool mamager start") - } + if err := r.waitForErrorGroupOrTimeout(g); err != nil { + return fmt.Errorf("failed to start pool managers: %w", err) } return nil } +func (r *Runner) waitForErrorGroupOrTimeout(g *errgroup.Group) error { + if g == nil { + return nil + } + + done := make(chan error, 1) + go func() { + done <- g.Wait() + }() + + select { + case err := <-done: + return err + case <-time.After(60 * time.Second): + return fmt.Errorf("timed out waiting for pool manager start") + } +} + func (r *Runner) Stop() error { r.mux.Lock() defer r.mux.Unlock() @@ -476,54 +479,43 @@ func (r *Runner) Stop() error { return errors.Wrap(err, "fetch enterprise pool managers") } - expectedReplies := len(repos) + len(orgs) + len(enterprises) - errChan := make(chan error, expectedReplies) + g, _ := errgroup.WithContext(r.ctx) for _, repo := range repos { - go func(poolMgr common.PoolManager) { + poolMgr := repo + g.Go(func() error { err := poolMgr.Stop() if err != nil { - errChan <- err - return + return fmt.Errorf("failed to stop repo pool manager: %w", err) } - err = poolMgr.Wait() - errChan <- err - }(repo) + return poolMgr.Wait() + }) } for _, org := range orgs { - go func(poolMgr common.PoolManager) { + poolMgr := org + g.Go(func() error { err := poolMgr.Stop() if err != nil { - errChan <- err - return + return fmt.Errorf("failed to stop org pool manager: %w", err) } - err = poolMgr.Wait() - errChan <- err - }(org) + return poolMgr.Wait() + }) } for _, enterprise := range enterprises { - go func(poolMgr common.PoolManager) { + poolMgr := enterprise + g.Go(func() error { err := poolMgr.Stop() if err != nil { - errChan <- err - return + return fmt.Errorf("failed to stop enterprise pool manager: %w", err) } - err = poolMgr.Wait() - errChan <- err - }(enterprise) + return poolMgr.Wait() + }) } - for i := 0; i < expectedReplies; i++ { - select { - case err := <-errChan: - if err != nil { - return errors.Wrap(err, "stopping pool manager") - } - case <-time.After(60 * time.Second): - return fmt.Errorf("timed out waiting for pool mamager stop") - } + if err := r.waitForErrorGroupOrTimeout(g); err != nil { + return fmt.Errorf("failed to stop pool managers: %w", err) } return nil }