From 1e2e96ccb504203ec4a071259915b70d84ab9636 Mon Sep 17 00:00:00 2001 From: Gabriel Adrian Samfira Date: Thu, 5 May 2022 07:54:52 +0000 Subject: [PATCH] Paralelize loading pools --- cmd/garm-cli/client/client.go | 2 +- runner/common/pool.go | 1 + runner/pool/common.go | 111 ++++++++++++++++++++-------------- runner/pool/interfaces.go | 1 + runner/pool/organization.go | 4 ++ runner/pool/repository.go | 4 ++ runner/runner.go | 86 +++++++++++++++++++------- 7 files changed, 142 insertions(+), 67 deletions(-) diff --git a/cmd/garm-cli/client/client.go b/cmd/garm-cli/client/client.go index 220cc20b..a7c5a3ef 100644 --- a/cmd/garm-cli/client/client.go +++ b/cmd/garm-cli/client/client.go @@ -36,7 +36,7 @@ type Client struct { func (c *Client) decodeAPIError(body []byte) (apiParams.APIErrorResponse, error) { var errDetails apiParams.APIErrorResponse if err := json.Unmarshal(body, &errDetails); err != nil { - return apiParams.APIErrorResponse{}, errors.Wrap(err, "decoding response") + return apiParams.APIErrorResponse{}, fmt.Errorf("invalid response from server, use --debug for more info") } return errDetails, fmt.Errorf("error in API call: %s", errDetails.Details) diff --git a/runner/common/pool.go b/runner/common/pool.go index a523dcd7..32dcec31 100644 --- a/runner/common/pool.go +++ b/runner/common/pool.go @@ -15,6 +15,7 @@ type PoolManager interface { WebhookSecret() string HandleWorkflowJob(job params.WorkflowJob) error RefreshState(param params.UpdatePoolStateParams) error + ID() string // AddPool(ctx context.Context, pool params.Pool) error // PoolManager lifecycle functions. Start/stop pool. diff --git a/runner/pool/common.go b/runner/pool/common.go index 3b16f9bd..08f8acf3 100644 --- a/runner/pool/common.go +++ b/runner/pool/common.go @@ -360,57 +360,65 @@ func (r *basePool) updateArgsFromProviderInstance(providerInstance params.Instan } } +func (r *basePool) ensureIdleRunnersForOnePool(pool params.Pool) { + if !pool.Enabled { + log.Printf("pool %s is disabled, skipping", pool.ID) + return + } + existingInstances, err := r.store.ListInstances(r.ctx, pool.ID) + if err != nil { + log.Printf("failed to ensure minimum idle workers for pool %s: %s", pool.ID, err) + return + } + + if uint(len(existingInstances)) >= pool.MaxRunners { + log.Printf("max workers (%d) reached for pool %s, skipping idle worker creation", pool.MaxRunners, pool.ID) + return + } + + idleOrPendingWorkers := []params.Instance{} + for _, inst := range existingInstances { + if providerCommon.RunnerStatus(inst.RunnerStatus) != providerCommon.RunnerActive { + idleOrPendingWorkers = append(idleOrPendingWorkers, inst) + } + } + + var required int + if len(idleOrPendingWorkers) < int(pool.MinIdleRunners) { + // get the needed delta. + required = int(pool.MinIdleRunners) - len(idleOrPendingWorkers) + + projectedInstanceCount := len(existingInstances) + required + if uint(projectedInstanceCount) > pool.MaxRunners { + // ensure we don't go above max workers + delta := projectedInstanceCount - int(pool.MaxRunners) + required = required - delta + } + } + + for i := 0; i < required; i++ { + log.Printf("addind new idle worker to pool %s", pool.ID) + if err := r.AddRunner(r.ctx, pool.ID); err != nil { + log.Printf("failed to add new instance for pool %s: %s", pool.ID, err) + } + } +} + func (r *basePool) ensureMinIdleRunners() { pools, err := r.helper.ListPools() if err != nil { log.Printf("error listing pools: %s", err) return } + wg := sync.WaitGroup{} + wg.Add(len(pools)) for _, pool := range pools { - if !pool.Enabled { - log.Printf("pool %s is disabled, skipping", pool.ID) - continue - } - existingInstances, err := r.store.ListInstances(r.ctx, pool.ID) - if err != nil { - log.Printf("failed to ensure minimum idle workers for pool %s: %s", pool.ID, err) - return - } - - // asJs, _ := json.MarshalIndent(existingInstances, "", " ") - // log.Printf(">>> %s", string(asJs)) - if uint(len(existingInstances)) >= pool.MaxRunners { - log.Printf("max workers (%d) reached for pool %s, skipping idle worker creation", pool.MaxRunners, pool.ID) - continue - } - - idleOrPendingWorkers := []params.Instance{} - for _, inst := range existingInstances { - if providerCommon.RunnerStatus(inst.RunnerStatus) != providerCommon.RunnerActive { - idleOrPendingWorkers = append(idleOrPendingWorkers, inst) - } - } - - var required int - if len(idleOrPendingWorkers) < int(pool.MinIdleRunners) { - // get the needed delta. - required = int(pool.MinIdleRunners) - len(idleOrPendingWorkers) - - projectedInstanceCount := len(existingInstances) + required - if uint(projectedInstanceCount) > pool.MaxRunners { - // ensure we don't go above max workers - delta := projectedInstanceCount - int(pool.MaxRunners) - required = required - delta - } - } - - for i := 0; i < required; i++ { - log.Printf("addind new idle worker to pool %s", pool.ID) - if err := r.AddRunner(r.ctx, pool.ID); err != nil { - log.Printf("failed to add new instance for pool %s: %s", pool.ID, err) - } - } + go func(pool params.Pool) { + defer wg.Done() + r.ensureIdleRunnersForOnePool(pool) + }(pool) } + wg.Wait() } // cleanupOrphanedGithubRunners will forcefully remove any github runners that appear @@ -562,8 +570,17 @@ func (r *basePool) consolidate() { r.mux.Lock() defer r.mux.Unlock() - r.deletePendingInstances() - r.addPendingInstances() + wg := sync.WaitGroup{} + wg.Add(2) + go func() { + defer wg.Done() + r.deletePendingInstances() + }() + go func() { + defer wg.Done() + r.addPendingInstances() + }() + wg.Wait() r.ensureMinIdleRunners() } @@ -612,3 +629,7 @@ func (r *basePool) RefreshState(param params.UpdatePoolStateParams) error { func (r *basePool) WebhookSecret() string { return r.helper.WebhookSecret() } + +func (r *basePool) ID() string { + return r.helper.ID() +} diff --git a/runner/pool/interfaces.go b/runner/pool/interfaces.go index 1304e8f8..5517c636 100644 --- a/runner/pool/interfaces.go +++ b/runner/pool/interfaces.go @@ -23,4 +23,5 @@ type poolHelper interface { ValidateOwner(job params.WorkflowJob) error UpdateState(param params.UpdatePoolStateParams) error WebhookSecret() string + ID() string } diff --git a/runner/pool/organization.go b/runner/pool/organization.go index 0c2edbff..63bf01fc 100644 --- a/runner/pool/organization.go +++ b/runner/pool/organization.go @@ -162,3 +162,7 @@ func (r *organization) ValidateOwner(job params.WorkflowJob) error { } return nil } + +func (r *organization) ID() string { + return r.id +} diff --git a/runner/pool/repository.go b/runner/pool/repository.go index ad4f4ec3..0cf820b4 100644 --- a/runner/pool/repository.go +++ b/runner/pool/repository.go @@ -164,3 +164,7 @@ func (r *repository) ValidateOwner(job params.WorkflowJob) error { } return nil } + +func (r *repository) ID() string { + return r.id +} diff --git a/runner/runner.go b/runner/runner.go index 87152aa6..5f388a60 100644 --- a/runner/runner.go +++ b/runner/runner.go @@ -7,6 +7,7 @@ import ( "crypto/sha256" "encoding/hex" "encoding/json" + "fmt" "hash" "io/ioutil" "log" @@ -14,6 +15,7 @@ import ( "path/filepath" "strings" "sync" + "time" "garm/auth" "garm/config" @@ -130,27 +132,51 @@ func (r *Runner) loadReposAndOrgs() error { return errors.Wrap(err, "fetching repositories") } - for _, repo := range repos { - log.Printf("creating pool manager for %s/%s", repo.Owner, repo.Name) - poolManager, err := r.loadRepoPoolManager(repo) - if err != nil { - return errors.Wrap(err, "loading repo pool manager") - } - r.repositories[repo.ID] = poolManager - } - orgs, err := r.store.ListOrganizations(r.ctx) if err != nil { return errors.Wrap(err, "fetching repositories") } + expectedReplies := len(repos) + len(orgs) + repoPoolMgrChan := make(chan common.PoolManager, len(repos)) + orgPoolMgrChan := make(chan common.PoolManager, len(orgs)) + errChan := make(chan error, expectedReplies) + + for _, repo := range repos { + go func(repo params.Repository) { + log.Printf("creating pool manager for %s/%s", repo.Owner, repo.Name) + poolManager, err := r.loadRepoPoolManager(repo) + if err != nil { + errChan <- err + return + } + repoPoolMgrChan <- poolManager + }(repo) + } + for _, org := range orgs { - log.Printf("creating pool manager for organization %s", org.Name) - poolManager, err := r.loadOrgPoolManager(org) - if err != nil { - return errors.Wrap(err, "loading repo pool manager") + go func(org params.Organization) { + log.Printf("creating pool manager for organization %s", org.Name) + poolManager, err := r.loadOrgPoolManager(org) + if err != nil { + errChan <- err + return + } + orgPoolMgrChan <- poolManager + }(org) + } + + for i := 0; i < expectedReplies; i++ { + select { + case repoPool := <-repoPoolMgrChan: + r.repositories[repoPool.ID()] = repoPool + case orgPool := <-orgPoolMgrChan: + r.organizations[orgPool.ID()] = orgPool + case err := <-errChan: + return errors.Wrap(err, "failed to load repos and pools") + case <-time.After(60 * time.Second): + return fmt.Errorf("timed out waiting for pool mamager load") } - r.organizations[org.ID] = poolManager } return nil @@ -160,15 +186,33 @@ func (r *Runner) Start() error { r.mux.Lock() defer r.mux.Unlock() + expectedReplies := len(r.repositories) + len(r.organizations) + errChan := make(chan error, expectedReplies) + for _, repo := range r.repositories { - if err := repo.Start(); err != nil { - return errors.Wrap(err, "starting repo pool manager") - } + go func(repo common.PoolManager) { + err := repo.Start() + errChan <- err + + }(repo) } for _, org := range r.organizations { - if err := org.Start(); err != nil { - return errors.Wrap(err, "starting org pool manager") + go func(org common.PoolManager) { + err := org.Start() + errChan <- err + }(org) + + } + + for i := 0; i < expectedReplies; i++ { + select { + case err := <-errChan: + if err != nil { + return errors.Wrap(err, "starting pool manager") + } + case <-time.After(60 * time.Second): + return fmt.Errorf("timed out waiting for pool mamager start") } } return nil @@ -180,13 +224,13 @@ func (r *Runner) Stop() error { for _, repo := range r.repositories { if err := repo.Stop(); err != nil { - return errors.Wrap(err, "starting repo pool manager") + return errors.Wrap(err, "stopping repo pool manager") } } for _, org := range r.organizations { if err := org.Stop(); err != nil { - return errors.Wrap(err, "starting org pool manager") + return errors.Wrap(err, "stopping org pool manager") } } return nil