Paralelize loading pools

This commit is contained in:
Gabriel Adrian Samfira 2022-05-05 07:54:52 +00:00
parent 6b80009730
commit 1e2e96ccb5
7 changed files with 142 additions and 67 deletions

View file

@ -36,7 +36,7 @@ type Client struct {
func (c *Client) decodeAPIError(body []byte) (apiParams.APIErrorResponse, error) {
var errDetails apiParams.APIErrorResponse
if err := json.Unmarshal(body, &errDetails); err != nil {
return apiParams.APIErrorResponse{}, errors.Wrap(err, "decoding response")
return apiParams.APIErrorResponse{}, fmt.Errorf("invalid response from server, use --debug for more info")
}
return errDetails, fmt.Errorf("error in API call: %s", errDetails.Details)

View file

@ -15,6 +15,7 @@ type PoolManager interface {
WebhookSecret() string
HandleWorkflowJob(job params.WorkflowJob) error
RefreshState(param params.UpdatePoolStateParams) error
ID() string
// AddPool(ctx context.Context, pool params.Pool) error
// PoolManager lifecycle functions. Start/stop pool.

View file

@ -360,57 +360,65 @@ func (r *basePool) updateArgsFromProviderInstance(providerInstance params.Instan
}
}
func (r *basePool) ensureIdleRunnersForOnePool(pool params.Pool) {
if !pool.Enabled {
log.Printf("pool %s is disabled, skipping", pool.ID)
return
}
existingInstances, err := r.store.ListInstances(r.ctx, pool.ID)
if err != nil {
log.Printf("failed to ensure minimum idle workers for pool %s: %s", pool.ID, err)
return
}
if uint(len(existingInstances)) >= pool.MaxRunners {
log.Printf("max workers (%d) reached for pool %s, skipping idle worker creation", pool.MaxRunners, pool.ID)
return
}
idleOrPendingWorkers := []params.Instance{}
for _, inst := range existingInstances {
if providerCommon.RunnerStatus(inst.RunnerStatus) != providerCommon.RunnerActive {
idleOrPendingWorkers = append(idleOrPendingWorkers, inst)
}
}
var required int
if len(idleOrPendingWorkers) < int(pool.MinIdleRunners) {
// get the needed delta.
required = int(pool.MinIdleRunners) - len(idleOrPendingWorkers)
projectedInstanceCount := len(existingInstances) + required
if uint(projectedInstanceCount) > pool.MaxRunners {
// ensure we don't go above max workers
delta := projectedInstanceCount - int(pool.MaxRunners)
required = required - delta
}
}
for i := 0; i < required; i++ {
log.Printf("addind new idle worker to pool %s", pool.ID)
if err := r.AddRunner(r.ctx, pool.ID); err != nil {
log.Printf("failed to add new instance for pool %s: %s", pool.ID, err)
}
}
}
func (r *basePool) ensureMinIdleRunners() {
pools, err := r.helper.ListPools()
if err != nil {
log.Printf("error listing pools: %s", err)
return
}
wg := sync.WaitGroup{}
wg.Add(len(pools))
for _, pool := range pools {
if !pool.Enabled {
log.Printf("pool %s is disabled, skipping", pool.ID)
continue
}
existingInstances, err := r.store.ListInstances(r.ctx, pool.ID)
if err != nil {
log.Printf("failed to ensure minimum idle workers for pool %s: %s", pool.ID, err)
return
}
// asJs, _ := json.MarshalIndent(existingInstances, "", " ")
// log.Printf(">>> %s", string(asJs))
if uint(len(existingInstances)) >= pool.MaxRunners {
log.Printf("max workers (%d) reached for pool %s, skipping idle worker creation", pool.MaxRunners, pool.ID)
continue
}
idleOrPendingWorkers := []params.Instance{}
for _, inst := range existingInstances {
if providerCommon.RunnerStatus(inst.RunnerStatus) != providerCommon.RunnerActive {
idleOrPendingWorkers = append(idleOrPendingWorkers, inst)
}
}
var required int
if len(idleOrPendingWorkers) < int(pool.MinIdleRunners) {
// get the needed delta.
required = int(pool.MinIdleRunners) - len(idleOrPendingWorkers)
projectedInstanceCount := len(existingInstances) + required
if uint(projectedInstanceCount) > pool.MaxRunners {
// ensure we don't go above max workers
delta := projectedInstanceCount - int(pool.MaxRunners)
required = required - delta
}
}
for i := 0; i < required; i++ {
log.Printf("addind new idle worker to pool %s", pool.ID)
if err := r.AddRunner(r.ctx, pool.ID); err != nil {
log.Printf("failed to add new instance for pool %s: %s", pool.ID, err)
}
}
go func(pool params.Pool) {
defer wg.Done()
r.ensureIdleRunnersForOnePool(pool)
}(pool)
}
wg.Wait()
}
// cleanupOrphanedGithubRunners will forcefully remove any github runners that appear
@ -562,8 +570,17 @@ func (r *basePool) consolidate() {
r.mux.Lock()
defer r.mux.Unlock()
r.deletePendingInstances()
r.addPendingInstances()
wg := sync.WaitGroup{}
wg.Add(2)
go func() {
defer wg.Done()
r.deletePendingInstances()
}()
go func() {
defer wg.Done()
r.addPendingInstances()
}()
wg.Wait()
r.ensureMinIdleRunners()
}
@ -612,3 +629,7 @@ func (r *basePool) RefreshState(param params.UpdatePoolStateParams) error {
func (r *basePool) WebhookSecret() string {
return r.helper.WebhookSecret()
}
func (r *basePool) ID() string {
return r.helper.ID()
}

View file

@ -23,4 +23,5 @@ type poolHelper interface {
ValidateOwner(job params.WorkflowJob) error
UpdateState(param params.UpdatePoolStateParams) error
WebhookSecret() string
ID() string
}

View file

@ -162,3 +162,7 @@ func (r *organization) ValidateOwner(job params.WorkflowJob) error {
}
return nil
}
func (r *organization) ID() string {
return r.id
}

View file

@ -164,3 +164,7 @@ func (r *repository) ValidateOwner(job params.WorkflowJob) error {
}
return nil
}
func (r *repository) ID() string {
return r.id
}

View file

@ -7,6 +7,7 @@ import (
"crypto/sha256"
"encoding/hex"
"encoding/json"
"fmt"
"hash"
"io/ioutil"
"log"
@ -14,6 +15,7 @@ import (
"path/filepath"
"strings"
"sync"
"time"
"garm/auth"
"garm/config"
@ -130,27 +132,51 @@ func (r *Runner) loadReposAndOrgs() error {
return errors.Wrap(err, "fetching repositories")
}
for _, repo := range repos {
log.Printf("creating pool manager for %s/%s", repo.Owner, repo.Name)
poolManager, err := r.loadRepoPoolManager(repo)
if err != nil {
return errors.Wrap(err, "loading repo pool manager")
}
r.repositories[repo.ID] = poolManager
}
orgs, err := r.store.ListOrganizations(r.ctx)
if err != nil {
return errors.Wrap(err, "fetching repositories")
}
expectedReplies := len(repos) + len(orgs)
repoPoolMgrChan := make(chan common.PoolManager, len(repos))
orgPoolMgrChan := make(chan common.PoolManager, len(orgs))
errChan := make(chan error, expectedReplies)
for _, repo := range repos {
go func(repo params.Repository) {
log.Printf("creating pool manager for %s/%s", repo.Owner, repo.Name)
poolManager, err := r.loadRepoPoolManager(repo)
if err != nil {
errChan <- err
return
}
repoPoolMgrChan <- poolManager
}(repo)
}
for _, org := range orgs {
log.Printf("creating pool manager for organization %s", org.Name)
poolManager, err := r.loadOrgPoolManager(org)
if err != nil {
return errors.Wrap(err, "loading repo pool manager")
go func(org params.Organization) {
log.Printf("creating pool manager for organization %s", org.Name)
poolManager, err := r.loadOrgPoolManager(org)
if err != nil {
errChan <- err
return
}
orgPoolMgrChan <- poolManager
}(org)
}
for i := 0; i < expectedReplies; i++ {
select {
case repoPool := <-repoPoolMgrChan:
r.repositories[repoPool.ID()] = repoPool
case orgPool := <-orgPoolMgrChan:
r.organizations[orgPool.ID()] = orgPool
case err := <-errChan:
return errors.Wrap(err, "failed to load repos and pools")
case <-time.After(60 * time.Second):
return fmt.Errorf("timed out waiting for pool mamager load")
}
r.organizations[org.ID] = poolManager
}
return nil
@ -160,15 +186,33 @@ func (r *Runner) Start() error {
r.mux.Lock()
defer r.mux.Unlock()
expectedReplies := len(r.repositories) + len(r.organizations)
errChan := make(chan error, expectedReplies)
for _, repo := range r.repositories {
if err := repo.Start(); err != nil {
return errors.Wrap(err, "starting repo pool manager")
}
go func(repo common.PoolManager) {
err := repo.Start()
errChan <- err
}(repo)
}
for _, org := range r.organizations {
if err := org.Start(); err != nil {
return errors.Wrap(err, "starting org pool manager")
go func(org common.PoolManager) {
err := org.Start()
errChan <- err
}(org)
}
for i := 0; i < expectedReplies; i++ {
select {
case err := <-errChan:
if err != nil {
return errors.Wrap(err, "starting pool manager")
}
case <-time.After(60 * time.Second):
return fmt.Errorf("timed out waiting for pool mamager start")
}
}
return nil
@ -180,13 +224,13 @@ func (r *Runner) Stop() error {
for _, repo := range r.repositories {
if err := repo.Stop(); err != nil {
return errors.Wrap(err, "starting repo pool manager")
return errors.Wrap(err, "stopping repo pool manager")
}
}
for _, org := range r.organizations {
if err := org.Stop(); err != nil {
return errors.Wrap(err, "starting org pool manager")
return errors.Wrap(err, "stopping org pool manager")
}
}
return nil