diff --git a/errors/errors.go b/errors/errors.go index 11ebce92..9f98c33a 100644 --- a/errors/errors.go +++ b/errors/errors.go @@ -29,8 +29,9 @@ var ( // ErrBadRequest is returned is a malformed request is sent ErrBadRequest = NewBadRequestError("invalid request") // ErrTimeout is returned when a timeout occurs. - ErrTimeout = fmt.Errorf("timed out") - ErrUnprocessable = fmt.Errorf("cannot process request") + ErrTimeout = fmt.Errorf("timed out") + ErrUnprocessable = fmt.Errorf("cannot process request") + ErrNoPoolsAvailable = fmt.Errorf("no pools available") ) type baseError struct { diff --git a/runner/pool/pool.go b/runner/pool/pool.go index 7729f024..284b2983 100644 --- a/runner/pool/pool.go +++ b/runner/pool/pool.go @@ -1459,6 +1459,8 @@ func (r *basePoolManager) consumeQueuedJobs() error { return errors.Wrap(err, "listing queued jobs") } + poolsCache := poolsForTags{} + log.Printf("found %d queued jobs for %s", len(queued), r.helper.String()) for _, job := range queued { if job.LockedBy != uuid.Nil && job.LockedBy.String() != r.ID() { @@ -1467,9 +1469,9 @@ func (r *basePoolManager) consumeQueuedJobs() error { continue } - if time.Since(job.UpdatedAt) < time.Second*10 { + if time.Since(job.UpdatedAt) < time.Second*20 { // give the idle runners a chance to pick up the job. - log.Printf("job %d was updated less than 10 seconds ago. Skipping", job.ID) + log.Printf("job %d was updated less than 20 seconds ago. Skipping", job.ID) continue } @@ -1532,14 +1534,18 @@ func (r *basePoolManager) consumeQueuedJobs() error { continue } - potentialPools, err := r.store.FindPoolsMatchingAllTags(r.ctx, r.helper.PoolType(), r.helper.ID(), job.Labels) - if err != nil { - log.Printf("[Pool mgr ID %s] error finding pools matching labels: %s", r.ID(), err) - continue + poolRR, ok := poolsCache.Get(job.Labels) + if !ok { + potentialPools, err := r.store.FindPoolsMatchingAllTags(r.ctx, r.helper.PoolType(), r.helper.ID(), job.Labels) + if err != nil { + log.Printf("[Pool mgr ID %s] error finding pools matching labels: %s", r.ID(), err) + continue + } + poolRR = poolsCache.Add(job.Labels, potentialPools) } - if len(potentialPools) == 0 { - log.Printf("[Pool mgr ID %s] could not find pool with labels %s", r.ID(), strings.Join(job.Labels, ",")) + if poolRR.Len() == 0 { + log.Printf("[Pool mgr ID %s] could not find pools with labels %s", r.ID(), strings.Join(job.Labels, ",")) continue } @@ -1552,16 +1558,23 @@ func (r *basePoolManager) consumeQueuedJobs() error { jobLabels := []string{ fmt.Sprintf("%s%d", jobLabelPrefix, job.ID), } - for _, pool := range potentialPools { - log.Printf("attempting to create a runner in pool %s for job %d", pool.ID, job.ID) + for { + pool, err := poolRR.Next() + if err != nil { + log.Printf("[PoolRR] could not find a pool to create a runner for job %d: %s", job.ID, err) + break + } + + log.Printf("[PoolRR] attempting to create a runner in pool %s for job %d", pool.ID, job.ID) if err := r.addRunnerToPool(pool, jobLabels); err != nil { - log.Printf("could not add runner to pool %s: %s", pool.ID, err) + log.Printf("[PoolRR] could not add runner to pool %s: %s", pool.ID, err) continue } - log.Printf("a new runner was added to pool %s as a response to queued job %d", pool.ID, job.ID) + log.Printf("[PoolRR] a new runner was added to pool %s as a response to queued job %d", pool.ID, job.ID) runnerCreated = true break } + if !runnerCreated { log.Printf("could not create a runner for job %d; unlocking", job.ID) if err := r.store.UnlockJob(r.ctx, job.ID, r.ID()); err != nil { diff --git a/runner/pool/util.go b/runner/pool/util.go index c5f38473..98a52b8b 100644 --- a/runner/pool/util.go +++ b/runner/pool/util.go @@ -1,6 +1,66 @@ package pool -import "log" +import ( + "log" + "sort" + "strings" + "sync" + "sync/atomic" + + runnerErrors "github.com/cloudbase/garm/errors" + "github.com/cloudbase/garm/params" +) + +type poolRoundRobin struct { + pools []params.Pool + next uint32 +} + +func (p *poolRoundRobin) Next() (params.Pool, error) { + if len(p.pools) == 0 { + return params.Pool{}, runnerErrors.ErrNoPoolsAvailable + } + if p.next >= uint32(len(p.pools)) { + p.Reset() + return params.Pool{}, runnerErrors.ErrNoPoolsAvailable + } + + n := atomic.AddUint32(&p.next, 1) + return p.pools[(int(n)-1)%len(p.pools)], nil +} + +func (p *poolRoundRobin) Len() int { + return len(p.pools) +} + +func (p *poolRoundRobin) Reset() { + atomic.StoreUint32(&p.next, 0) +} + +type poolsForTags struct { + pools sync.Map +} + +func (p *poolsForTags) Get(tags []string) (*poolRoundRobin, bool) { + sort.Strings(tags) + key := strings.Join(tags, "^") + + v, ok := p.pools.Load(key) + if !ok { + return nil, false + } + + return v.(*poolRoundRobin), true +} + +func (p *poolsForTags) Add(tags []string, pools []params.Pool) *poolRoundRobin { + sort.Strings(tags) + key := strings.Join(tags, "^") + + poolRR := &poolRoundRobin{pools: pools} + v, _ := p.pools.LoadOrStore(key, poolRR) + return v.(*poolRoundRobin) +} func (r *basePoolManager) log(msg string, args ...interface{}) { msgArgs := []interface{}{