Add basic round robin for pools

Signed-off-by: Gabriel Adrian Samfira <gsamfira@cloudbasesolutions.com>
This commit is contained in:
Gabriel Adrian Samfira 2023-06-27 11:50:04 +00:00
parent 4b9c20e1be
commit c04a93dde9
3 changed files with 89 additions and 15 deletions

View file

@ -29,8 +29,9 @@ var (
// ErrBadRequest is returned is a malformed request is sent
ErrBadRequest = NewBadRequestError("invalid request")
// ErrTimeout is returned when a timeout occurs.
ErrTimeout = fmt.Errorf("timed out")
ErrUnprocessable = fmt.Errorf("cannot process request")
ErrTimeout = fmt.Errorf("timed out")
ErrUnprocessable = fmt.Errorf("cannot process request")
ErrNoPoolsAvailable = fmt.Errorf("no pools available")
)
type baseError struct {

View file

@ -1459,6 +1459,8 @@ func (r *basePoolManager) consumeQueuedJobs() error {
return errors.Wrap(err, "listing queued jobs")
}
poolsCache := poolsForTags{}
log.Printf("found %d queued jobs for %s", len(queued), r.helper.String())
for _, job := range queued {
if job.LockedBy != uuid.Nil && job.LockedBy.String() != r.ID() {
@ -1467,9 +1469,9 @@ func (r *basePoolManager) consumeQueuedJobs() error {
continue
}
if time.Since(job.UpdatedAt) < time.Second*10 {
if time.Since(job.UpdatedAt) < time.Second*20 {
// give the idle runners a chance to pick up the job.
log.Printf("job %d was updated less than 10 seconds ago. Skipping", job.ID)
log.Printf("job %d was updated less than 20 seconds ago. Skipping", job.ID)
continue
}
@ -1532,14 +1534,18 @@ func (r *basePoolManager) consumeQueuedJobs() error {
continue
}
potentialPools, err := r.store.FindPoolsMatchingAllTags(r.ctx, r.helper.PoolType(), r.helper.ID(), job.Labels)
if err != nil {
log.Printf("[Pool mgr ID %s] error finding pools matching labels: %s", r.ID(), err)
continue
poolRR, ok := poolsCache.Get(job.Labels)
if !ok {
potentialPools, err := r.store.FindPoolsMatchingAllTags(r.ctx, r.helper.PoolType(), r.helper.ID(), job.Labels)
if err != nil {
log.Printf("[Pool mgr ID %s] error finding pools matching labels: %s", r.ID(), err)
continue
}
poolRR = poolsCache.Add(job.Labels, potentialPools)
}
if len(potentialPools) == 0 {
log.Printf("[Pool mgr ID %s] could not find pool with labels %s", r.ID(), strings.Join(job.Labels, ","))
if poolRR.Len() == 0 {
log.Printf("[Pool mgr ID %s] could not find pools with labels %s", r.ID(), strings.Join(job.Labels, ","))
continue
}
@ -1552,16 +1558,23 @@ func (r *basePoolManager) consumeQueuedJobs() error {
jobLabels := []string{
fmt.Sprintf("%s%d", jobLabelPrefix, job.ID),
}
for _, pool := range potentialPools {
log.Printf("attempting to create a runner in pool %s for job %d", pool.ID, job.ID)
for {
pool, err := poolRR.Next()
if err != nil {
log.Printf("[PoolRR] could not find a pool to create a runner for job %d: %s", job.ID, err)
break
}
log.Printf("[PoolRR] attempting to create a runner in pool %s for job %d", pool.ID, job.ID)
if err := r.addRunnerToPool(pool, jobLabels); err != nil {
log.Printf("could not add runner to pool %s: %s", pool.ID, err)
log.Printf("[PoolRR] could not add runner to pool %s: %s", pool.ID, err)
continue
}
log.Printf("a new runner was added to pool %s as a response to queued job %d", pool.ID, job.ID)
log.Printf("[PoolRR] a new runner was added to pool %s as a response to queued job %d", pool.ID, job.ID)
runnerCreated = true
break
}
if !runnerCreated {
log.Printf("could not create a runner for job %d; unlocking", job.ID)
if err := r.store.UnlockJob(r.ctx, job.ID, r.ID()); err != nil {

View file

@ -1,6 +1,66 @@
package pool
import "log"
import (
"log"
"sort"
"strings"
"sync"
"sync/atomic"
runnerErrors "github.com/cloudbase/garm/errors"
"github.com/cloudbase/garm/params"
)
type poolRoundRobin struct {
pools []params.Pool
next uint32
}
func (p *poolRoundRobin) Next() (params.Pool, error) {
if len(p.pools) == 0 {
return params.Pool{}, runnerErrors.ErrNoPoolsAvailable
}
if p.next >= uint32(len(p.pools)) {
p.Reset()
return params.Pool{}, runnerErrors.ErrNoPoolsAvailable
}
n := atomic.AddUint32(&p.next, 1)
return p.pools[(int(n)-1)%len(p.pools)], nil
}
func (p *poolRoundRobin) Len() int {
return len(p.pools)
}
func (p *poolRoundRobin) Reset() {
atomic.StoreUint32(&p.next, 0)
}
type poolsForTags struct {
pools sync.Map
}
func (p *poolsForTags) Get(tags []string) (*poolRoundRobin, bool) {
sort.Strings(tags)
key := strings.Join(tags, "^")
v, ok := p.pools.Load(key)
if !ok {
return nil, false
}
return v.(*poolRoundRobin), true
}
func (p *poolsForTags) Add(tags []string, pools []params.Pool) *poolRoundRobin {
sort.Strings(tags)
key := strings.Join(tags, "^")
poolRR := &poolRoundRobin{pools: pools}
v, _ := p.pools.LoadOrStore(key, poolRR)
return v.(*poolRoundRobin)
}
func (r *basePoolManager) log(msg string, args ...interface{}) {
msgArgs := []interface{}{