Use cache for github client
Signed-off-by: Gabriel Adrian Samfira <gsamfira@cloudbasesolutions.com>
This commit is contained in:
parent
fd6f01d2c0
commit
ef676488b7
12 changed files with 277 additions and 192 deletions
47
cache/github_client.go
vendored
Normal file
47
cache/github_client.go
vendored
Normal file
|
|
@ -0,0 +1,47 @@
|
|||
package cache
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"github.com/cloudbase/garm/runner/common"
|
||||
)
|
||||
|
||||
var ghClientCache *GithubClientCache
|
||||
|
||||
type GithubClientCache struct {
|
||||
mux sync.Mutex
|
||||
|
||||
cache map[string]common.GithubClient
|
||||
}
|
||||
|
||||
func init() {
|
||||
clientCache := &GithubClientCache{
|
||||
cache: make(map[string]common.GithubClient),
|
||||
}
|
||||
ghClientCache = clientCache
|
||||
}
|
||||
|
||||
func (g *GithubClientCache) SetClient(entityID string, client common.GithubClient) {
|
||||
g.mux.Lock()
|
||||
defer g.mux.Unlock()
|
||||
|
||||
g.cache[entityID] = client
|
||||
}
|
||||
|
||||
func (g *GithubClientCache) GetClient(entityID string) (common.GithubClient, bool) {
|
||||
g.mux.Lock()
|
||||
defer g.mux.Unlock()
|
||||
|
||||
if client, ok := g.cache[entityID]; ok {
|
||||
return client, true
|
||||
}
|
||||
return nil, false
|
||||
}
|
||||
|
||||
func SetGithubClient(entityID string, client common.GithubClient) {
|
||||
ghClientCache.SetClient(entityID, client)
|
||||
}
|
||||
|
||||
func GetGithubClient(entityID string) (common.GithubClient, bool) {
|
||||
return ghClientCache.GetClient(entityID)
|
||||
}
|
||||
|
|
@ -1,6 +1,8 @@
|
|||
package entity
|
||||
|
||||
import (
|
||||
"strings"
|
||||
|
||||
"golang.org/x/sync/errgroup"
|
||||
|
||||
dbCommon "github.com/cloudbase/garm/database/common"
|
||||
|
|
@ -8,6 +10,13 @@ import (
|
|||
"github.com/cloudbase/garm/params"
|
||||
)
|
||||
|
||||
const (
|
||||
// These are duplicated until we decide if we move the pool manager to the new
|
||||
// worker flow.
|
||||
poolIDLabelprefix = "runner-pool-id:"
|
||||
controllerLabelPrefix = "runner-controller-id:"
|
||||
)
|
||||
|
||||
func composeControllerWatcherFilters() dbCommon.PayloadFilterFunc {
|
||||
return watcher.WithAll(
|
||||
watcher.WithAny(
|
||||
|
|
@ -56,3 +65,12 @@ func (c *Controller) waitForErrorGroupOrContextCancelled(g *errgroup.Group) erro
|
|||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func poolIDFromLabels(runner params.RunnerReference) string {
|
||||
for _, lbl := range runner.Labels {
|
||||
if strings.HasPrefix(lbl.Name, poolIDLabelprefix) {
|
||||
return lbl.Name[len(poolIDLabelprefix):]
|
||||
}
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
|
|
|||
|
|
@ -5,13 +5,18 @@ import (
|
|||
"fmt"
|
||||
"log/slog"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/cloudbase/garm/cache"
|
||||
dbCommon "github.com/cloudbase/garm/database/common"
|
||||
"github.com/cloudbase/garm/database/watcher"
|
||||
"github.com/cloudbase/garm/params"
|
||||
"github.com/cloudbase/garm/runner/common"
|
||||
garmUtil "github.com/cloudbase/garm/util"
|
||||
"github.com/cloudbase/garm/util/github"
|
||||
"github.com/cloudbase/garm/util/github/scalesets"
|
||||
"github.com/cloudbase/garm/workers/scaleset"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
func NewWorker(ctx context.Context, store dbCommon.Store, entity params.GithubEntity, providers map[string]common.Provider) (*Worker, error) {
|
||||
|
|
@ -36,6 +41,7 @@ type Worker struct {
|
|||
|
||||
consumer dbCommon.Consumer
|
||||
store dbCommon.Store
|
||||
ghCli common.GithubClient
|
||||
|
||||
Entity params.GithubEntity
|
||||
providers map[string]common.Provider
|
||||
|
|
@ -71,6 +77,13 @@ func (w *Worker) Start() (err error) {
|
|||
w.mux.Lock()
|
||||
defer w.mux.Unlock()
|
||||
|
||||
ghCli, err := github.Client(w.ctx, w.Entity)
|
||||
if err != nil {
|
||||
return fmt.Errorf("creating github client: %w", err)
|
||||
}
|
||||
w.ghCli = ghCli
|
||||
cache.SetGithubClient(w.Entity.ID, ghCli)
|
||||
|
||||
scaleSetController, err := scaleset.NewController(w.ctx, w.store, w.Entity, w.providers)
|
||||
if err != nil {
|
||||
return fmt.Errorf("creating scale set controller: %w", err)
|
||||
|
|
@ -100,16 +113,110 @@ func (w *Worker) Start() (err error) {
|
|||
w.quit = make(chan struct{})
|
||||
|
||||
go w.loop()
|
||||
go w.consolidateRunnerLoop()
|
||||
return nil
|
||||
}
|
||||
|
||||
// consolidateRunnerState will list all runners on GitHub for this entity, sort by
|
||||
// pool or scale set and pass those runners to the appropriate controller (pools or scale sets).
|
||||
// The controller will then pass along to their respective workers the list of runners
|
||||
// they should be responsible for. The workers will then cross check the current state
|
||||
// from github with their local state and reconcile any differences. This cleans up
|
||||
// any runners that have been removed out of band in either the provider or github.
|
||||
func (w *Worker) consolidateRunnerState() error {
|
||||
scaleSetCli, err := scalesets.NewClient(w.ghCli)
|
||||
if err != nil {
|
||||
return fmt.Errorf("creating scaleset client: %w", err)
|
||||
}
|
||||
// Client is scoped to the current entity. Only runners in a repo/org/enterprise
|
||||
// will be listed.
|
||||
runners, err := scaleSetCli.ListAllRunners(w.ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("listing runners: %w", err)
|
||||
}
|
||||
|
||||
byPoolID := make(map[string][]params.RunnerReference)
|
||||
byScaleSetID := make(map[int][]params.RunnerReference)
|
||||
for _, runner := range runners.RunnerReferences {
|
||||
if runner.RunnerScaleSetID != 0 {
|
||||
byScaleSetID[runner.RunnerScaleSetID] = append(byScaleSetID[runner.RunnerScaleSetID], runner)
|
||||
} else {
|
||||
poolID := poolIDFromLabels(runner)
|
||||
if poolID == "" {
|
||||
continue
|
||||
}
|
||||
byPoolID[poolID] = append(byPoolID[poolID], runner)
|
||||
}
|
||||
}
|
||||
|
||||
g, ctx := errgroup.WithContext(w.ctx)
|
||||
g.Go(func() error {
|
||||
slog.DebugContext(ctx, "consolidating scale set runners", "entity", w.Entity.String(), "runners", runners)
|
||||
if err := w.scaleSetController.ConsolidateRunnerState(byScaleSetID); err != nil {
|
||||
return fmt.Errorf("consolidating runners for scale set: %w", err)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
if err := w.waitForErrorGroupOrContextCancelled(g); err != nil {
|
||||
return fmt.Errorf("waiting for error group: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (w *Worker) waitForErrorGroupOrContextCancelled(g *errgroup.Group) error {
|
||||
if g == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
done := make(chan error, 1)
|
||||
go func() {
|
||||
waitErr := g.Wait()
|
||||
done <- waitErr
|
||||
}()
|
||||
|
||||
select {
|
||||
case err := <-done:
|
||||
return err
|
||||
case <-w.ctx.Done():
|
||||
return w.ctx.Err()
|
||||
case <-w.quit:
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func (w *Worker) consolidateRunnerLoop() {
|
||||
ticker := time.NewTicker(common.PoolReapTimeoutInterval)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case _, ok := <-ticker.C:
|
||||
if !ok {
|
||||
slog.InfoContext(w.ctx, "consolidate ticker closed")
|
||||
return
|
||||
}
|
||||
if err := w.consolidateRunnerState(); err != nil {
|
||||
if err := w.store.AddEntityEvent(w.ctx, w.Entity, params.StatusEvent, params.EventError, fmt.Sprintf("failed to consolidate runner state: %q", err.Error()), 30); err != nil {
|
||||
slog.With(slog.Any("error", err)).Error("failed to add entity event")
|
||||
}
|
||||
slog.With(slog.Any("error", err)).Error("failed to consolidate runner state")
|
||||
}
|
||||
case <-w.ctx.Done():
|
||||
return
|
||||
case <-w.quit:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (w *Worker) loop() {
|
||||
defer w.Stop()
|
||||
for {
|
||||
select {
|
||||
case payload := <-w.consumer.Watch():
|
||||
slog.InfoContext(w.ctx, "received payload")
|
||||
go w.handleWorkerWatcherEvent(payload)
|
||||
w.handleWorkerWatcherEvent(payload)
|
||||
case <-w.ctx.Done():
|
||||
return
|
||||
case <-w.quit:
|
||||
|
|
|
|||
|
|
@ -3,8 +3,10 @@ package entity
|
|||
import (
|
||||
"log/slog"
|
||||
|
||||
"github.com/cloudbase/garm/cache"
|
||||
dbCommon "github.com/cloudbase/garm/database/common"
|
||||
"github.com/cloudbase/garm/params"
|
||||
"github.com/cloudbase/garm/util/github"
|
||||
)
|
||||
|
||||
func (w *Worker) handleWorkerWatcherEvent(event dbCommon.ChangePayload) {
|
||||
|
|
@ -46,6 +48,13 @@ func (w *Worker) handleEntityEventPayload(event dbCommon.ChangePayload) {
|
|||
// credentials were swapped on the entity. We need to recompose the watcher
|
||||
// filters.
|
||||
w.consumer.SetFilters(composeWorkerWatcherFilters(entity))
|
||||
ghCli, err := github.Client(w.ctx, entity)
|
||||
if err != nil {
|
||||
slog.ErrorContext(w.ctx, "creating github client", "entity_id", entity.ID, "error", err)
|
||||
return
|
||||
}
|
||||
w.ghCli = ghCli
|
||||
cache.SetGithubClient(entity.ID, ghCli)
|
||||
}
|
||||
w.Entity = entity
|
||||
default:
|
||||
|
|
@ -72,6 +81,13 @@ func (w *Worker) handleEntityCredentialsEventPayload(event dbCommon.ChangePayloa
|
|||
return
|
||||
}
|
||||
w.Entity.Credentials = credentials
|
||||
ghCli, err := github.Client(w.ctx, w.Entity)
|
||||
if err != nil {
|
||||
slog.ErrorContext(w.ctx, "creating github client", "entity_id", w.Entity.ID, "error", err)
|
||||
return
|
||||
}
|
||||
w.ghCli = ghCli
|
||||
cache.SetGithubClient(w.Entity.ID, ghCli)
|
||||
default:
|
||||
slog.ErrorContext(w.ctx, "invalid operation type", "operation_type", event.Operation)
|
||||
}
|
||||
|
|
|
|||
3
workers/pools/controller.go
Normal file
3
workers/pools/controller.go
Normal file
|
|
@ -0,0 +1,3 @@
|
|||
package pools
|
||||
|
||||
|
||||
|
|
@ -5,7 +5,6 @@ import (
|
|||
"fmt"
|
||||
"log/slog"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"golang.org/x/sync/errgroup"
|
||||
|
||||
|
|
@ -14,15 +13,6 @@ import (
|
|||
"github.com/cloudbase/garm/params"
|
||||
"github.com/cloudbase/garm/runner/common"
|
||||
garmUtil "github.com/cloudbase/garm/util"
|
||||
"github.com/cloudbase/garm/util/github"
|
||||
"github.com/cloudbase/garm/util/github/scalesets"
|
||||
)
|
||||
|
||||
const (
|
||||
// These are duplicated until we decide if we move the pool manager to the new
|
||||
// worker flow.
|
||||
poolIDLabelprefix = "runner-pool-id:"
|
||||
controllerLabelPrefix = "runner-controller-id:"
|
||||
)
|
||||
|
||||
func NewController(ctx context.Context, store dbCommon.Store, entity params.GithubEntity, providers map[string]common.Provider) (*Controller, error) {
|
||||
|
|
@ -73,14 +63,12 @@ type Controller struct {
|
|||
store dbCommon.Store
|
||||
providers map[string]common.Provider
|
||||
|
||||
ghCli common.GithubClient
|
||||
|
||||
mux sync.Mutex
|
||||
running bool
|
||||
quit chan struct{}
|
||||
}
|
||||
|
||||
func (c *Controller) loadAllScaleSets(cli common.GithubClient) error {
|
||||
func (c *Controller) loadAllScaleSets() error {
|
||||
scaleSets, err := c.store.ListEntityScaleSets(c.ctx, c.Entity)
|
||||
if err != nil {
|
||||
return fmt.Errorf("listing scale sets: %w", err)
|
||||
|
|
@ -88,7 +76,7 @@ func (c *Controller) loadAllScaleSets(cli common.GithubClient) error {
|
|||
|
||||
for _, sSet := range scaleSets {
|
||||
slog.DebugContext(c.ctx, "loading scale set", "scale_set", sSet.ID)
|
||||
if err := c.handleScaleSetCreateOperation(sSet, cli); err != nil {
|
||||
if err := c.handleScaleSetCreateOperation(sSet); err != nil {
|
||||
slog.With(slog.Any("error", err)).ErrorContext(c.ctx, "failed to handle scale set create operation")
|
||||
continue
|
||||
}
|
||||
|
|
@ -105,13 +93,8 @@ func (c *Controller) Start() (err error) {
|
|||
}
|
||||
c.mux.Unlock()
|
||||
|
||||
ghCli, err := github.Client(c.ctx, c.Entity)
|
||||
if err != nil {
|
||||
return fmt.Errorf("creating github client: %w", err)
|
||||
}
|
||||
|
||||
slog.DebugContext(c.ctx, "loaging scale sets", "entity", c.Entity.String())
|
||||
if err := c.loadAllScaleSets(ghCli); err != nil {
|
||||
if err := c.loadAllScaleSets(); err != nil {
|
||||
return fmt.Errorf("loading all scale sets: %w", err)
|
||||
}
|
||||
|
||||
|
|
@ -124,7 +107,6 @@ func (c *Controller) Start() (err error) {
|
|||
}
|
||||
|
||||
c.mux.Lock()
|
||||
c.ghCli = ghCli
|
||||
c.consumer = consumer
|
||||
c.running = true
|
||||
c.quit = make(chan struct{})
|
||||
|
|
@ -159,39 +141,11 @@ func (c *Controller) Stop() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// consolidateRunnerState will list all runners on GitHub for this entity, sort by
|
||||
// pool or scale set and pass those runners to the appropriate worker. The worker will
|
||||
// then have the responsibility to cross check the runners from github with what it
|
||||
// knows should be true from the database. Any inconsistency needs to be handled.
|
||||
// If we have an offline runner in github but no database entry for it, we remove the
|
||||
// runner from github. If we have a runner that is active in the provider but does not
|
||||
// exist in github, we remove it from the provider and the database.
|
||||
func (c *Controller) consolidateRunnerState() error {
|
||||
scaleSetCli, err := scalesets.NewClient(c.ghCli)
|
||||
if err != nil {
|
||||
return fmt.Errorf("creating scaleset client: %w", err)
|
||||
}
|
||||
// Client is scoped to the current entity. Only runners in a repo/org/enterprise
|
||||
// will be listed.
|
||||
runners, err := scaleSetCli.ListAllRunners(c.ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("listing runners: %w", err)
|
||||
}
|
||||
|
||||
byPoolID := make(map[string][]params.RunnerReference)
|
||||
byScaleSetID := make(map[int][]params.RunnerReference)
|
||||
for _, runner := range runners.RunnerReferences {
|
||||
if runner.RunnerScaleSetID != 0 {
|
||||
byScaleSetID[runner.RunnerScaleSetID] = append(byScaleSetID[runner.RunnerScaleSetID], runner)
|
||||
} else {
|
||||
poolID := poolIDFromLabels(runner)
|
||||
if poolID == "" {
|
||||
continue
|
||||
}
|
||||
byPoolID[poolID] = append(byPoolID[poolID], runner)
|
||||
}
|
||||
}
|
||||
|
||||
// ConsolidateRunnerState will send a list of existing github runners to each scale set worker.
|
||||
// The scale set worker will then need to cross check the existing runners in Github with the sate
|
||||
// in the database. Any inconsistencies will b reconciliated. This cleans up any manually removed
|
||||
// runners in either github or the providers.
|
||||
func (c *Controller) ConsolidateRunnerState(byScaleSetID map[int][]params.RunnerReference) error {
|
||||
g, ctx := errgroup.WithContext(c.ctx)
|
||||
for _, scaleSet := range c.ScaleSets {
|
||||
runners := byScaleSetID[scaleSet.scaleSet.ScaleSetID]
|
||||
|
|
@ -233,9 +187,6 @@ func (c *Controller) waitForErrorGroupOrContextCancelled(g *errgroup.Group) erro
|
|||
func (c *Controller) loop() {
|
||||
defer c.Stop()
|
||||
|
||||
consolidateTicker := time.NewTicker(common.PoolReapTimeoutInterval)
|
||||
defer consolidateTicker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case payload, ok := <-c.consumer.Watch():
|
||||
|
|
@ -247,17 +198,6 @@ func (c *Controller) loop() {
|
|||
c.handleWatcherEvent(payload)
|
||||
case <-c.ctx.Done():
|
||||
return
|
||||
case _, ok := <-consolidateTicker.C:
|
||||
if !ok {
|
||||
slog.InfoContext(c.ctx, "consolidate ticker closed")
|
||||
return
|
||||
}
|
||||
if err := c.consolidateRunnerState(); err != nil {
|
||||
if err := c.store.AddEntityEvent(c.ctx, c.Entity, params.StatusEvent, params.EventError, fmt.Sprintf("failed to consolidate runner state: %q", err.Error()), 30); err != nil {
|
||||
slog.With(slog.Any("error", err)).Error("failed to add entity event")
|
||||
}
|
||||
slog.With(slog.Any("error", err)).Error("failed to consolidate runner state")
|
||||
}
|
||||
case <-c.quit:
|
||||
return
|
||||
}
|
||||
|
|
|
|||
|
|
@ -6,8 +6,6 @@ import (
|
|||
|
||||
dbCommon "github.com/cloudbase/garm/database/common"
|
||||
"github.com/cloudbase/garm/params"
|
||||
"github.com/cloudbase/garm/runner/common"
|
||||
"github.com/cloudbase/garm/util/github"
|
||||
)
|
||||
|
||||
func (c *Controller) handleWatcherEvent(event dbCommon.ChangePayload) {
|
||||
|
|
@ -19,9 +17,6 @@ func (c *Controller) handleWatcherEvent(event dbCommon.ChangePayload) {
|
|||
case entityType:
|
||||
slog.DebugContext(c.ctx, "got entity payload event")
|
||||
c.handleEntityEvent(event)
|
||||
case dbCommon.GithubCredentialsEntityType:
|
||||
slog.DebugContext(c.ctx, "got github credentials payload event")
|
||||
c.handleCredentialsEvent(event)
|
||||
default:
|
||||
slog.ErrorContext(c.ctx, "invalid entity type", "entity_type", event.EntityType)
|
||||
return
|
||||
|
|
@ -38,7 +33,7 @@ func (c *Controller) handleScaleSet(event dbCommon.ChangePayload) {
|
|||
switch event.Operation {
|
||||
case dbCommon.CreateOperation:
|
||||
slog.DebugContext(c.ctx, "got create operation for scale set", "scale_set_id", scaleSet.ID, "scale_set_name", scaleSet.Name)
|
||||
if err := c.handleScaleSetCreateOperation(scaleSet, c.ghCli); err != nil {
|
||||
if err := c.handleScaleSetCreateOperation(scaleSet); err != nil {
|
||||
slog.With(slog.Any("error", err)).ErrorContext(c.ctx, "failed to handle scale set create operation")
|
||||
}
|
||||
case dbCommon.UpdateOperation:
|
||||
|
|
@ -57,7 +52,7 @@ func (c *Controller) handleScaleSet(event dbCommon.ChangePayload) {
|
|||
}
|
||||
}
|
||||
|
||||
func (c *Controller) handleScaleSetCreateOperation(sSet params.ScaleSet, ghCli common.GithubClient) error {
|
||||
func (c *Controller) handleScaleSetCreateOperation(sSet params.ScaleSet) error {
|
||||
c.mux.Lock()
|
||||
defer c.mux.Unlock()
|
||||
|
||||
|
|
@ -74,7 +69,7 @@ func (c *Controller) handleScaleSetCreateOperation(sSet params.ScaleSet, ghCli c
|
|||
return fmt.Errorf("provider %s not found for scale set %s", sSet.ProviderName, sSet.Name)
|
||||
}
|
||||
|
||||
worker, err := NewWorker(c.ctx, c.store, sSet, provider, ghCli)
|
||||
worker, err := NewWorker(c.ctx, c.store, sSet, provider)
|
||||
if err != nil {
|
||||
return fmt.Errorf("creating scale set worker: %w", err)
|
||||
}
|
||||
|
|
@ -120,7 +115,7 @@ func (c *Controller) handleScaleSetUpdateOperation(sSet params.ScaleSet) error {
|
|||
// Some error may have occurred when the scale set was first created, so we
|
||||
// attempt to create it after the user updated the scale set, hopefully
|
||||
// fixing the reason for the failure.
|
||||
return c.handleScaleSetCreateOperation(sSet, c.ghCli)
|
||||
return c.handleScaleSetCreateOperation(sSet)
|
||||
}
|
||||
set.scaleSet = sSet
|
||||
c.ScaleSets[sSet.ID] = set
|
||||
|
|
@ -128,35 +123,6 @@ func (c *Controller) handleScaleSetUpdateOperation(sSet params.ScaleSet) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (c *Controller) handleCredentialsEvent(event dbCommon.ChangePayload) {
|
||||
credentials, ok := event.Payload.(params.GithubCredentials)
|
||||
if !ok {
|
||||
slog.ErrorContext(c.ctx, "invalid credentials payload for entity type", "entity_type", event.EntityType, "payload", event)
|
||||
return
|
||||
}
|
||||
|
||||
switch event.Operation {
|
||||
case dbCommon.UpdateOperation:
|
||||
slog.DebugContext(c.ctx, "got update operation")
|
||||
c.mux.Lock()
|
||||
defer c.mux.Unlock()
|
||||
|
||||
if c.Entity.Credentials.ID != credentials.ID {
|
||||
// stale update event.
|
||||
return
|
||||
}
|
||||
c.Entity.Credentials = credentials
|
||||
|
||||
if err := c.updateAndBroadcastCredentials(c.Entity); err != nil {
|
||||
slog.With(slog.Any("error", err)).ErrorContext(c.ctx, "failed to update credentials")
|
||||
return
|
||||
}
|
||||
default:
|
||||
slog.ErrorContext(c.ctx, "invalid operation type", "operation_type", event.Operation)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Controller) handleEntityEvent(event dbCommon.ChangePayload) {
|
||||
var entityGetter params.EntityGetter
|
||||
var ok bool
|
||||
|
|
@ -184,35 +150,9 @@ func (c *Controller) handleEntityEvent(event dbCommon.ChangePayload) {
|
|||
slog.DebugContext(c.ctx, "got update operation")
|
||||
c.mux.Lock()
|
||||
defer c.mux.Unlock()
|
||||
|
||||
if c.Entity.Credentials.ID != entity.Credentials.ID {
|
||||
// credentials were swapped on the entity. We need to recompose the watcher
|
||||
// filters.
|
||||
c.consumer.SetFilters(composeControllerWatcherFilters(entity))
|
||||
if err := c.updateAndBroadcastCredentials(c.Entity); err != nil {
|
||||
slog.With(slog.Any("error", err)).ErrorContext(c.ctx, "failed to update credentials")
|
||||
}
|
||||
}
|
||||
c.Entity = entity
|
||||
default:
|
||||
slog.ErrorContext(c.ctx, "invalid operation type", "operation_type", event.Operation)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Controller) updateAndBroadcastCredentials(entity params.GithubEntity) error {
|
||||
ghCli, err := github.Client(c.ctx, entity)
|
||||
if err != nil {
|
||||
return fmt.Errorf("creating github client: %w", err)
|
||||
}
|
||||
|
||||
c.ghCli = ghCli
|
||||
|
||||
for _, scaleSet := range c.ScaleSets {
|
||||
if err := scaleSet.worker.SetGithubClient(ghCli); err != nil {
|
||||
slog.ErrorContext(c.ctx, "setting github client on worker", "error", err)
|
||||
continue
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -6,8 +6,8 @@ import (
|
|||
)
|
||||
|
||||
type scaleSetHelper interface {
|
||||
ScaleSetCLI() *scalesets.ScaleSetClient
|
||||
GetScaleSet() params.ScaleSet
|
||||
GetScaleSetClient() (*scalesets.ScaleSetClient, error)
|
||||
SetLastMessageID(id int64) error
|
||||
SetDesiredRunnerCount(count int) error
|
||||
Owner() string
|
||||
|
|
|
|||
|
|
@ -17,19 +17,14 @@ import (
|
|||
"github.com/cloudbase/garm/locking"
|
||||
"github.com/cloudbase/garm/params"
|
||||
"github.com/cloudbase/garm/runner/common"
|
||||
"github.com/cloudbase/garm/util/github/scalesets"
|
||||
)
|
||||
|
||||
func NewWorker(ctx context.Context, store dbCommon.Store, scaleSet params.ScaleSet, provider common.Provider, ghCli common.GithubClient) (*Worker, error) {
|
||||
func NewWorker(ctx context.Context, store dbCommon.Store, scaleSet params.ScaleSet, provider common.Provider) (*Worker, error) {
|
||||
consumerID := fmt.Sprintf("scaleset-worker-%s-%d", scaleSet.Name, scaleSet.ID)
|
||||
controllerInfo, err := store.ControllerInfo()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("getting controller info: %w", err)
|
||||
}
|
||||
scaleSetCli, err := scalesets.NewClient(ghCli)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("creating scale set client: %w", err)
|
||||
}
|
||||
return &Worker{
|
||||
ctx: ctx,
|
||||
controllerInfo: controllerInfo,
|
||||
|
|
@ -37,8 +32,6 @@ func NewWorker(ctx context.Context, store dbCommon.Store, scaleSet params.ScaleS
|
|||
store: store,
|
||||
provider: provider,
|
||||
scaleSet: scaleSet,
|
||||
ghCli: ghCli,
|
||||
scaleSetCli: scaleSetCli,
|
||||
runners: make(map[string]params.Instance),
|
||||
}, nil
|
||||
}
|
||||
|
|
@ -53,9 +46,7 @@ type Worker struct {
|
|||
scaleSet params.ScaleSet
|
||||
runners map[string]params.Instance
|
||||
|
||||
ghCli common.GithubClient
|
||||
scaleSetCli *scalesets.ScaleSetClient
|
||||
consumer dbCommon.Consumer
|
||||
consumer dbCommon.Consumer
|
||||
|
||||
listener *scaleSetListener
|
||||
|
||||
|
|
@ -110,7 +101,12 @@ func (w *Worker) Start() (err error) {
|
|||
instanceState := commonParams.InstancePendingDelete
|
||||
locking.Lock(instance.Name, w.consumerID)
|
||||
if instance.AgentID != 0 {
|
||||
if err := w.scaleSetCli.RemoveRunner(w.ctx, instance.AgentID); err != nil {
|
||||
scaleSetCli, err := w.GetScaleSetClient()
|
||||
if err != nil {
|
||||
slog.ErrorContext(w.ctx, "error getting scale set client", "error", err)
|
||||
return fmt.Errorf("getting scale set client: %w", err)
|
||||
}
|
||||
if err := scaleSetCli.RemoveRunner(w.ctx, instance.AgentID); err != nil {
|
||||
// scale sets use JIT runners. This means that we create the runner in github
|
||||
// before we create the actual instance that will use the credentials. We need
|
||||
// to remove the runner from github if it exists.
|
||||
|
|
@ -128,7 +124,7 @@ func (w *Worker) Start() (err error) {
|
|||
}
|
||||
// The runner may have come up, registered and is currently running a
|
||||
// job, in which case, github will not allow us to remove it.
|
||||
runnerInstance, err := w.scaleSetCli.GetRunner(w.ctx, instance.AgentID)
|
||||
runnerInstance, err := scaleSetCli.GetRunner(w.ctx, instance.AgentID)
|
||||
if err != nil {
|
||||
if !errors.Is(err, runnerErrors.ErrNotFound) {
|
||||
// We could not get info about the runner and it wasn't not found
|
||||
|
|
@ -254,7 +250,11 @@ func (w *Worker) setRunnerDBStatus(runner string, status commonParams.InstanceSt
|
|||
}
|
||||
|
||||
func (w *Worker) removeRunnerFromGithubAndSetPendingDelete(runnerName string, agentID int64) error {
|
||||
if err := w.scaleSetCli.RemoveRunner(w.ctx, agentID); err != nil {
|
||||
scaleSetCli, err := w.GetScaleSetClient()
|
||||
if err != nil {
|
||||
return fmt.Errorf("getting scale set client: %w", err)
|
||||
}
|
||||
if err := scaleSetCli.RemoveRunner(w.ctx, agentID); err != nil {
|
||||
if !errors.Is(err, runnerErrors.ErrNotFound) {
|
||||
return fmt.Errorf("removing runner %s: %w", runnerName, err)
|
||||
}
|
||||
|
|
@ -321,6 +321,10 @@ func (w *Worker) consolidateRunnerState(runners []params.RunnerReference) error
|
|||
ghRunnersByName[runner.Name] = runner
|
||||
}
|
||||
|
||||
scaleSetCli, err := w.GetScaleSetClient()
|
||||
if err != nil {
|
||||
return fmt.Errorf("getting scale set client: %w", err)
|
||||
}
|
||||
dbRunnersByName := w.runnerByName()
|
||||
// Cross check what exists in github with what we have in the database.
|
||||
for name, runner := range ghRunnersByName {
|
||||
|
|
@ -329,7 +333,7 @@ func (w *Worker) consolidateRunnerState(runners []params.RunnerReference) error
|
|||
// runner appears to be active. Is it not managed by GARM?
|
||||
if status != params.RunnerIdle && status != params.RunnerActive {
|
||||
slog.InfoContext(w.ctx, "runner does not exist in GARM; removing from github", "runner_name", name)
|
||||
if err := w.scaleSetCli.RemoveRunner(w.ctx, runner.ID); err != nil {
|
||||
if err := scaleSetCli.RemoveRunner(w.ctx, runner.ID); err != nil {
|
||||
if errors.Is(err, runnerErrors.ErrNotFound) {
|
||||
continue
|
||||
}
|
||||
|
|
@ -466,23 +470,6 @@ func (w *Worker) consolidateRunnerState(runners []params.RunnerReference) error
|
|||
return nil
|
||||
}
|
||||
|
||||
func (w *Worker) SetGithubClient(client common.GithubClient) error {
|
||||
w.mux.Lock()
|
||||
defer w.mux.Unlock()
|
||||
|
||||
if err := w.listener.Stop(); err != nil {
|
||||
slog.ErrorContext(w.ctx, "error stopping listener", "error", err)
|
||||
}
|
||||
|
||||
w.ghCli = client
|
||||
scaleSetCli, err := scalesets.NewClient(client)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error creating scale set client: %w", err)
|
||||
}
|
||||
w.scaleSetCli = scaleSetCli
|
||||
return nil
|
||||
}
|
||||
|
||||
func (w *Worker) pseudoPoolID() (string, error) {
|
||||
// This is temporary. We need to extend providers to know about scale sets.
|
||||
entity, err := w.scaleSet.GetEntity()
|
||||
|
|
@ -563,8 +550,13 @@ func (w *Worker) handleInstanceEntityEvent(event dbCommon.ChangePayload) {
|
|||
w.mux.Unlock()
|
||||
return
|
||||
}
|
||||
scaleSetCli, err := w.GetScaleSetClient()
|
||||
if err != nil {
|
||||
slog.ErrorContext(w.ctx, "error getting scale set client", "error", err)
|
||||
return
|
||||
}
|
||||
if oldInstance.RunnerStatus != instance.RunnerStatus && instance.RunnerStatus == params.RunnerIdle {
|
||||
serviceRuner, err := w.scaleSetCli.GetRunner(w.ctx, instance.AgentID)
|
||||
serviceRuner, err := scaleSetCli.GetRunner(w.ctx, instance.AgentID)
|
||||
if err != nil {
|
||||
slog.ErrorContext(w.ctx, "error getting runner details", "error", err)
|
||||
w.mux.Unlock()
|
||||
|
|
@ -725,9 +717,14 @@ func (w *Worker) handleScaleUp(target, current uint) {
|
|||
return
|
||||
}
|
||||
|
||||
scaleSetCli, err := w.GetScaleSetClient()
|
||||
if err != nil {
|
||||
slog.ErrorContext(w.ctx, "error getting scale set client", "error", err)
|
||||
return
|
||||
}
|
||||
for i := current; i < target; i++ {
|
||||
newRunnerName := fmt.Sprintf("%s-%s", w.scaleSet.GetRunnerPrefix(), util.NewID())
|
||||
jitConfig, err := w.scaleSetCli.GenerateJitRunnerConfig(w.ctx, newRunnerName, w.scaleSet.ScaleSetID)
|
||||
jitConfig, err := scaleSetCli.GenerateJitRunnerConfig(w.ctx, newRunnerName, w.scaleSet.ScaleSetID)
|
||||
if err != nil {
|
||||
slog.ErrorContext(w.ctx, "error generating jit config", "error", err)
|
||||
continue
|
||||
|
|
@ -755,14 +752,14 @@ func (w *Worker) handleScaleUp(target, current uint) {
|
|||
dbInstance, err := w.store.CreateScaleSetInstance(w.ctx, w.scaleSet.ID, runnerParams)
|
||||
if err != nil {
|
||||
slog.ErrorContext(w.ctx, "error creating instance", "error", err)
|
||||
if err := w.scaleSetCli.RemoveRunner(w.ctx, jitConfig.Runner.ID); err != nil {
|
||||
if err := scaleSetCli.RemoveRunner(w.ctx, jitConfig.Runner.ID); err != nil {
|
||||
slog.ErrorContext(w.ctx, "error deleting runner", "error", err)
|
||||
}
|
||||
continue
|
||||
}
|
||||
w.runners[dbInstance.ID] = dbInstance
|
||||
|
||||
_, err = w.scaleSetCli.GetRunner(w.ctx, jitConfig.Runner.ID)
|
||||
_, err = scaleSetCli.GetRunner(w.ctx, jitConfig.Runner.ID)
|
||||
if err != nil {
|
||||
slog.ErrorContext(w.ctx, "error getting runner details", "error", err)
|
||||
continue
|
||||
|
|
@ -854,8 +851,13 @@ func (w *Worker) handleScaleDown(target, current uint) {
|
|||
continue
|
||||
}
|
||||
|
||||
scaleSetCli, err := w.GetScaleSetClient()
|
||||
if err != nil {
|
||||
slog.ErrorContext(w.ctx, "error getting scale set client", "error", err)
|
||||
return
|
||||
}
|
||||
slog.DebugContext(w.ctx, "removing runner", "runner_name", runner.Name)
|
||||
if err := w.scaleSetCli.RemoveRunner(w.ctx, runner.AgentID); err != nil {
|
||||
if err := scaleSetCli.RemoveRunner(w.ctx, runner.AgentID); err != nil {
|
||||
if !errors.Is(err, runnerErrors.ErrNotFound) {
|
||||
slog.ErrorContext(w.ctx, "error removing runner", "runner_name", runner.Name, "error", err)
|
||||
locking.Unlock(runner.Name, false)
|
||||
|
|
|
|||
|
|
@ -7,13 +7,28 @@ import (
|
|||
|
||||
runnerErrors "github.com/cloudbase/garm-provider-common/errors"
|
||||
commonParams "github.com/cloudbase/garm-provider-common/params"
|
||||
"github.com/cloudbase/garm/cache"
|
||||
"github.com/cloudbase/garm/locking"
|
||||
"github.com/cloudbase/garm/params"
|
||||
"github.com/cloudbase/garm/util/github/scalesets"
|
||||
)
|
||||
|
||||
func (w *Worker) ScaleSetCLI() *scalesets.ScaleSetClient {
|
||||
return w.scaleSetCli
|
||||
func (w *Worker) GetScaleSetClient() (*scalesets.ScaleSetClient, error) {
|
||||
scaleSetEntity, err := w.scaleSet.GetEntity()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("getting entity: %w", err)
|
||||
}
|
||||
|
||||
ghCli, ok := cache.GetGithubClient(scaleSetEntity.ID)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("getting github client: %w", err)
|
||||
}
|
||||
scaleSetClient, err := scalesets.NewClient(ghCli)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("creating scale set client: %w", err)
|
||||
}
|
||||
|
||||
return scaleSetClient, nil
|
||||
}
|
||||
|
||||
func (w *Worker) GetScaleSet() params.ScaleSet {
|
||||
|
|
|
|||
|
|
@ -48,8 +48,12 @@ func (l *scaleSetListener) Start() error {
|
|||
|
||||
l.listenerCtx, l.cancelFunc = context.WithCancel(context.Background())
|
||||
scaleSet := l.scaleSetHelper.GetScaleSet()
|
||||
scaleSetClient, err := l.scaleSetHelper.GetScaleSetClient()
|
||||
if err != nil {
|
||||
return fmt.Errorf("getting scale set client: %w", err)
|
||||
}
|
||||
slog.DebugContext(l.ctx, "creating new message session", "scale_set", scaleSet.ScaleSetID)
|
||||
session, err := l.scaleSetHelper.ScaleSetCLI().CreateMessageSession(
|
||||
session, err := scaleSetClient.CreateMessageSession(
|
||||
l.listenerCtx, scaleSet.ScaleSetID,
|
||||
l.scaleSetHelper.Owner(),
|
||||
)
|
||||
|
|
@ -72,13 +76,16 @@ func (l *scaleSetListener) Stop() error {
|
|||
if !l.running {
|
||||
return nil
|
||||
}
|
||||
|
||||
scaleSetClient, err := l.scaleSetHelper.GetScaleSetClient()
|
||||
if err != nil {
|
||||
return fmt.Errorf("getting scale set client: %w", err)
|
||||
}
|
||||
if l.messageSession != nil {
|
||||
slog.DebugContext(l.ctx, "closing message session", "scale_set", l.scaleSetHelper.GetScaleSet().ScaleSetID)
|
||||
if err := l.messageSession.Close(); err != nil {
|
||||
slog.ErrorContext(l.ctx, "closing message session", "error", err)
|
||||
}
|
||||
if err := l.scaleSetHelper.ScaleSetCLI().DeleteMessageSession(context.Background(), l.messageSession); err != nil {
|
||||
if err := scaleSetClient.DeleteMessageSession(context.Background(), l.messageSession); err != nil {
|
||||
slog.ErrorContext(l.ctx, "error deleting message session", "error", err)
|
||||
}
|
||||
}
|
||||
|
|
@ -145,12 +152,17 @@ func (l *scaleSetListener) handleSessionMessage(msg params.RunnerScaleSetMessage
|
|||
}
|
||||
}
|
||||
|
||||
scaleSetClient, err := l.scaleSetHelper.GetScaleSetClient()
|
||||
if err != nil {
|
||||
slog.ErrorContext(l.ctx, "getting scale set client", "error", err)
|
||||
return
|
||||
}
|
||||
if len(availableJobs) > 0 {
|
||||
jobIDs := make([]int64, len(availableJobs))
|
||||
for idx, job := range availableJobs {
|
||||
jobIDs[idx] = job.RunnerRequestID
|
||||
}
|
||||
idsAcquired, err := l.scaleSetHelper.ScaleSetCLI().AcquireJobs(
|
||||
idsAcquired, err := scaleSetClient.AcquireJobs(
|
||||
l.listenerCtx, l.scaleSetHelper.GetScaleSet().ScaleSetID,
|
||||
l.messageSession.MessageQueueAccessToken(), jobIDs)
|
||||
if err != nil {
|
||||
|
|
|
|||
|
|
@ -1,8 +1,6 @@
|
|||
package scaleset
|
||||
|
||||
import (
|
||||
"strings"
|
||||
|
||||
dbCommon "github.com/cloudbase/garm/database/common"
|
||||
"github.com/cloudbase/garm/database/watcher"
|
||||
"github.com/cloudbase/garm/params"
|
||||
|
|
@ -22,18 +20,5 @@ func composeControllerWatcherFilters(entity params.GithubEntity) dbCommon.Payloa
|
|||
watcher.WithEntityFilter(entity),
|
||||
watcher.WithOperationTypeFilter(dbCommon.UpdateOperation),
|
||||
),
|
||||
watcher.WithAll(
|
||||
watcher.WithGithubCredentialsFilter(entity.Credentials),
|
||||
watcher.WithOperationTypeFilter(dbCommon.UpdateOperation),
|
||||
),
|
||||
)
|
||||
}
|
||||
|
||||
func poolIDFromLabels(runner params.RunnerReference) string {
|
||||
for _, lbl := range runner.Labels {
|
||||
if strings.HasPrefix(lbl.Name, poolIDLabelprefix) {
|
||||
return lbl.Name[len(poolIDLabelprefix):]
|
||||
}
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue