Scale set workers properly come online

This adds the workers needed to start listening for scale set messages.
There is no handling of messages yet.

Signed-off-by: Gabriel Adrian Samfira <gsamfira@cloudbasesolutions.com>
This commit is contained in:
Gabriel Adrian Samfira 2025-04-16 16:39:16 +00:00
parent 6a5c309399
commit a2aeac731c
15 changed files with 668 additions and 57 deletions

65
cache/cache.go vendored Normal file
View file

@ -0,0 +1,65 @@
package cache
import (
"sync"
"time"
commonParams "github.com/cloudbase/garm-provider-common/params"
"github.com/cloudbase/garm/params"
)
var githubToolsCache *GithubToolsCache
func init() {
ghToolsCache := &GithubToolsCache{
entities: make(map[string]GithubEntityTools),
}
githubToolsCache = ghToolsCache
}
type GithubEntityTools struct {
updatedAt time.Time
entity params.GithubEntity
tools []commonParams.RunnerApplicationDownload
}
type GithubToolsCache struct {
mux sync.Mutex
// entity IDs are UUID4s. It is highly unlikely they will collide (🤞).
entities map[string]GithubEntityTools
}
func (g *GithubToolsCache) Get(entity params.GithubEntity) ([]commonParams.RunnerApplicationDownload, bool) {
g.mux.Lock()
defer g.mux.Unlock()
if cache, ok := g.entities[entity.String()]; ok {
if time.Since(cache.updatedAt) > 1*time.Hour {
// Stale cache, remove it.
delete(g.entities, entity.String())
return nil, false
}
return cache.tools, true
}
return nil, false
}
func (g *GithubToolsCache) Set(entity params.GithubEntity, tools []commonParams.RunnerApplicationDownload) {
g.mux.Lock()
defer g.mux.Unlock()
g.entities[entity.String()] = GithubEntityTools{
updatedAt: time.Now(),
entity: entity,
tools: tools,
}
}
func SetGithubToolsCache(entity params.GithubEntity, tools []commonParams.RunnerApplicationDownload) {
githubToolsCache.Set(entity, tools)
}
func GetGithubToolsCache(entity params.GithubEntity) ([]commonParams.RunnerApplicationDownload, bool) {
return githubToolsCache.Get(entity)
}

View file

@ -163,4 +163,5 @@ type Store interface {
ControllerInfo() (params.ControllerInfo, error)
InitController() (params.ControllerInfo, error)
GetGithubEntity(_ context.Context, entityType params.GithubEntityType, entityID string) (params.GithubEntity, error)
AddEntityEvent(ctx context.Context, entity params.GithubEntity, event params.EventType, eventLevel params.EventLevel, statusMessage string, maxEvents int) error
}

View file

@ -136,6 +136,17 @@ type ScaleSet struct {
Instances []Instance `gorm:"foreignKey:ScaleSetFkID"`
}
type RepositoryEvent struct {
gorm.Model
EventType params.EventType
EventLevel params.EventLevel
Message string `gorm:"type:text"`
RepoID uuid.UUID `gorm:"index:idx_repo_event"`
Repo Repository `gorm:"foreignKey:RepoID"`
}
type Repository struct {
Base
@ -154,8 +165,20 @@ type Repository struct {
EndpointName *string `gorm:"index:idx_owner_nocase,unique,collate:nocase"`
Endpoint GithubEndpoint `gorm:"foreignKey:EndpointName;constraint:OnDelete:SET NULL"`
Events []RepositoryEvent `gorm:"foreignKey:RepoID;constraint:OnDelete:CASCADE,OnUpdate:CASCADE;"`
}
type OrganizationEvent struct {
gorm.Model
EventType params.EventType
EventLevel params.EventLevel
Message string `gorm:"type:text"`
OrgID uuid.UUID `gorm:"index:idx_org_event"`
Org Organization `gorm:"foreignKey:OrgID"`
}
type Organization struct {
Base
@ -173,6 +196,19 @@ type Organization struct {
EndpointName *string `gorm:"index:idx_org_name_nocase,collate:nocase"`
Endpoint GithubEndpoint `gorm:"foreignKey:EndpointName;constraint:OnDelete:SET NULL"`
Events []OrganizationEvent `gorm:"foreignKey:OrgID;constraint:OnDelete:CASCADE,OnUpdate:CASCADE;"`
}
type EnterpriseEvent struct {
gorm.Model
EventType params.EventType
EventLevel params.EventLevel
Message string `gorm:"type:text"`
EnterpriseID uuid.UUID `gorm:"index:idx_enterprise_event"`
Enterprise Enterprise `gorm:"foreignKey:EnterpriseID"`
}
type Enterprise struct {
@ -192,6 +228,8 @@ type Enterprise struct {
EndpointName *string `gorm:"index:idx_ent_name_nocase,collate:nocase"`
Endpoint GithubEndpoint `gorm:"foreignKey:EndpointName;constraint:OnDelete:SET NULL"`
Events []EnterpriseEvent `gorm:"foreignKey:EnterpriseID;constraint:OnDelete:CASCADE,OnUpdate:CASCADE;"`
}
type Address struct {

View file

@ -423,6 +423,9 @@ func (s *sqlDatabase) migrateDB() error {
&Repository{},
&Organization{},
&Enterprise{},
&EnterpriseEvent{},
&OrganizationEvent{},
&RepositoryEvent{},
&Address{},
&InstanceStatusUpdate{},
&Instance{},

View file

@ -631,3 +631,136 @@ func (s *sqlDatabase) GetGithubEntity(_ context.Context, entityType params.Githu
}
return entity, nil
}
func (s *sqlDatabase) addRepositoryEvent(ctx context.Context, repoID string, event params.EventType, eventLevel params.EventLevel, statusMessage string, maxEvents int) error {
repo, err := s.GetRepositoryByID(ctx, repoID)
if err != nil {
return errors.Wrap(err, "updating instance")
}
msg := InstanceStatusUpdate{
Message: statusMessage,
EventType: event,
EventLevel: eventLevel,
}
if err := s.conn.Model(&repo).Association("Events").Append(&msg); err != nil {
return errors.Wrap(err, "adding status message")
}
if maxEvents > 0 {
repoID, err := uuid.Parse(repo.ID)
if err != nil {
return errors.Wrap(runnerErrors.ErrBadRequest, "parsing id")
}
var latestEvents []OrganizationEvent
q := s.conn.Model(&OrganizationEvent{}).
Limit(maxEvents).Order("id desc").
Where("repo_id = ?", repoID).Find(&latestEvents)
if q.Error != nil {
return errors.Wrap(q.Error, "fetching latest events")
}
if len(latestEvents) == maxEvents {
lastInList := latestEvents[len(latestEvents)-1]
if err := s.conn.Where("repo_id = ? and id < ?", repoID, lastInList.ID).Unscoped().Delete(&OrganizationEvent{}).Error; err != nil {
return errors.Wrap(err, "deleting old events")
}
}
}
return nil
}
func (s *sqlDatabase) addOrgEvent(ctx context.Context, orgID string, event params.EventType, eventLevel params.EventLevel, statusMessage string, maxEvents int) error {
org, err := s.GetOrganizationByID(ctx, orgID)
if err != nil {
return errors.Wrap(err, "updating instance")
}
msg := InstanceStatusUpdate{
Message: statusMessage,
EventType: event,
EventLevel: eventLevel,
}
if err := s.conn.Model(&org).Association("Events").Append(&msg); err != nil {
return errors.Wrap(err, "adding status message")
}
if maxEvents > 0 {
orgID, err := uuid.Parse(org.ID)
if err != nil {
return errors.Wrap(runnerErrors.ErrBadRequest, "parsing id")
}
var latestEvents []OrganizationEvent
q := s.conn.Model(&OrganizationEvent{}).
Limit(maxEvents).Order("id desc").
Where("org_id = ?", orgID).Find(&latestEvents)
if q.Error != nil {
return errors.Wrap(q.Error, "fetching latest events")
}
if len(latestEvents) == maxEvents {
lastInList := latestEvents[len(latestEvents)-1]
if err := s.conn.Where("org_id = ? and id < ?", orgID, lastInList.ID).Unscoped().Delete(&OrganizationEvent{}).Error; err != nil {
return errors.Wrap(err, "deleting old events")
}
}
}
return nil
}
func (s *sqlDatabase) addEnterpriseEvent(ctx context.Context, entID string, event params.EventType, eventLevel params.EventLevel, statusMessage string, maxEvents int) error {
ent, err := s.GetEnterpriseByID(ctx, entID)
if err != nil {
return errors.Wrap(err, "updating instance")
}
msg := InstanceStatusUpdate{
Message: statusMessage,
EventType: event,
EventLevel: eventLevel,
}
if err := s.conn.Model(&ent).Association("Events").Append(&msg); err != nil {
return errors.Wrap(err, "adding status message")
}
if maxEvents > 0 {
entID, err := uuid.Parse(ent.ID)
if err != nil {
return errors.Wrap(runnerErrors.ErrBadRequest, "parsing id")
}
var latestEvents []EnterpriseEvent
q := s.conn.Model(&EnterpriseEvent{}).
Limit(maxEvents).Order("id desc").
Where("enterprise_id = ?", entID).Find(&latestEvents)
if q.Error != nil {
return errors.Wrap(q.Error, "fetching latest events")
}
if len(latestEvents) == maxEvents {
lastInList := latestEvents[len(latestEvents)-1]
if err := s.conn.Where("enterprise_id = ? and id < ?", entID, lastInList.ID).Unscoped().Delete(&EnterpriseEvent{}).Error; err != nil {
return errors.Wrap(err, "deleting old events")
}
}
}
return nil
}
func (s *sqlDatabase) AddEntityEvent(ctx context.Context, entity params.GithubEntity, event params.EventType, eventLevel params.EventLevel, statusMessage string, maxEvents int) error {
if maxEvents == 0 {
return errors.Wrap(runnerErrors.ErrBadRequest, "max events cannot be 0")
}
// TODO(gabriel-samfira): Should we send watcher notifications for events?
// Not sure it's of any value.
switch entity.EntityType {
case params.GithubEntityTypeRepository:
return s.addRepositoryEvent(ctx, entity.ID, event, eventLevel, statusMessage, maxEvents)
case params.GithubEntityTypeOrganization:
return s.addOrgEvent(ctx, entity.ID, event, eventLevel, statusMessage, maxEvents)
case params.GithubEntityTypeEnterprise:
return s.addEnterpriseEvent(ctx, entity.ID, event, eventLevel, statusMessage, maxEvents)
default:
return errors.Wrap(runnerErrors.ErrBadRequest, "invalid entity type")
}
}

View file

@ -402,6 +402,18 @@ type RunnerScaleSetMessage struct {
Statistics *RunnerScaleSetStatistic `json:"statistics"`
}
func (r RunnerScaleSetMessage) GetJobsFromBody() ([]ScaleSetJobMessage, error) {
var body []ScaleSetJobMessage
if r.Body == "" {
return nil, fmt.Errorf("no body specified")
}
if err := json.Unmarshal([]byte(r.Body), &body); err != nil {
return nil, fmt.Errorf("failed to unmarshal body: %w", err)
}
return body, nil
}
type RunnerReference struct {
ID int `json:"id"`
Name string `json:"name"`
@ -469,3 +481,23 @@ type RunnerGroupList struct {
Count int `json:"count"`
RunnerGroups []RunnerGroup `json:"value"`
}
type ScaleSetJobMessage struct {
MessageType string `json:"messageType,omitempty"`
RunnerRequestId int64 `json:"runnerRequestId,omitempty"`
RepositoryName string `json:"repositoryName,omitempty"`
OwnerName string `json:"ownerName,omitempty"`
JobWorkflowRef string `json:"jobWorkflowRef,omitempty"`
JobDisplayName string `json:"jobDisplayName,omitempty"`
WorkflowRunId int64 `json:"workflowRunId,omitempty"`
EventName string `json:"eventName,omitempty"`
RequestLabels []string `json:"requestLabels,omitempty"`
QueueTime time.Time `json:"queueTime,omitempty"`
ScaleSetAssignTime time.Time `json:"scaleSetAssignTime,omitempty"`
RunnerAssignTime time.Time `json:"runnerAssignTime,omitempty"`
FinishTime time.Time `json:"finishTime,omitempty"`
Result string `json:"result,omitempty"`
RunnerId int `json:"runnerId,omitempty"`
RunnerName string `json:"runnerName,omitempty"`
AcquireJobUrl string `json:"acquireJobUrl,omitempty"`
}

View file

@ -94,6 +94,7 @@ func (r *Runner) DeleteScaleSetByID(ctx context.Context, scaleSetID uint) error
return errors.Wrap(err, "getting scaleset client")
}
slog.DebugContext(ctx, "deleting scale set", "scale_set_id", scaleSet.ScaleSetID)
if err := scalesetCli.DeleteRunnerScaleSet(ctx, scaleSet.ScaleSetID); err != nil {
if !errors.Is(err, runnerErrors.ErrNotFound) {
slog.InfoContext(ctx, "scale set not found", "scale_set_id", scaleSet.ScaleSetID)

View file

@ -74,7 +74,6 @@ func (c *Controller) loadAllRepositories() error {
if err != nil {
return fmt.Errorf("creating worker: %w", err)
}
slog.DebugContext(c.ctx, "starting entity worker", "entity_id", entity.ID, "entity_type", entity.EntityType)
if err := worker.Start(); err != nil {
return fmt.Errorf("starting worker: %w", err)
}
@ -99,7 +98,6 @@ func (c *Controller) loadAllOrganizations() error {
if err != nil {
return fmt.Errorf("creating worker: %w", err)
}
slog.DebugContext(c.ctx, "starting entity worker", "entity_id", entity.ID, "entity_type", entity.EntityType)
if err := worker.Start(); err != nil {
return fmt.Errorf("starting worker: %w", err)
}
@ -124,7 +122,6 @@ func (c *Controller) loadAllEnterprises() error {
if err != nil {
return fmt.Errorf("creating worker: %w", err)
}
slog.DebugContext(c.ctx, "starting entity worker", "entity_id", entity.ID, "entity_type", entity.EntityType)
if err := worker.Start(); err != nil {
return fmt.Errorf("starting worker: %w", err)
}
@ -172,6 +169,7 @@ func (c *Controller) Start() error {
}
func (c *Controller) Stop() error {
slog.DebugContext(c.ctx, "stopping entity controller", "entity", c.consumerID)
c.mux.Lock()
defer c.mux.Unlock()
if !c.running {

View file

@ -50,6 +50,7 @@ type Worker struct {
}
func (w *Worker) Stop() error {
slog.DebugContext(w.ctx, "stopping entity worker", "entity", w.consumerID)
w.mux.Lock()
defer w.mux.Unlock()
@ -69,7 +70,8 @@ func (w *Worker) Stop() error {
return nil
}
func (w *Worker) Start() error {
func (w *Worker) Start() (err error) {
slog.DebugContext(w.ctx, "starting entity worker", "entity", w.consumerID)
w.mux.Lock()
defer w.mux.Unlock()
@ -77,8 +79,19 @@ func (w *Worker) Start() error {
if err != nil {
return fmt.Errorf("creating scale set controller: %w", err)
}
if err := scaleSetController.Start(); err != nil {
return fmt.Errorf("starting scale set controller: %w", err)
}
w.scaleSetController = scaleSetController
defer func() {
if err != nil {
w.scaleSetController.Stop()
w.scaleSetController = nil
}
}()
consumer, err := watcher.RegisterConsumer(
w.ctx, w.consumerID,
composeWorkerWatcherFilters(w.Entity),
@ -90,6 +103,7 @@ func (w *Worker) Start() error {
w.running = true
w.quit = make(chan struct{})
go w.loop()
return nil
}

View file

@ -2,20 +2,21 @@ package scaleset
import (
"context"
"errors"
"fmt"
"log/slog"
"sync"
"time"
commonParams "github.com/cloudbase/garm-provider-common/params"
runnerErrors "github.com/cloudbase/garm-provider-common/errors"
"github.com/cloudbase/garm/cache"
dbCommon "github.com/cloudbase/garm/database/common"
"github.com/cloudbase/garm/database/watcher"
"github.com/cloudbase/garm/params"
"github.com/cloudbase/garm/runner/common"
garmUtil "github.com/cloudbase/garm/util"
"github.com/cloudbase/garm/util/github"
"github.com/cloudbase/garm/util/github/scalesets"
)
func NewController(ctx context.Context, store dbCommon.Store, entity params.GithubEntity, providers map[string]common.Provider) (*Controller, error) {
@ -76,9 +77,7 @@ type Controller struct {
store dbCommon.Store
providers map[string]common.Provider
tools []commonParams.RunnerApplicationDownload
ghCli common.GithubClient
scaleSetCli *scalesets.ScaleSetClient
forgeCredsAreValid bool
statusUpdates chan scaleSetStatus
@ -88,14 +87,15 @@ type Controller struct {
quit chan struct{}
}
func (c *Controller) loadAllScaleSets() error {
func (c *Controller) loadAllScaleSets(cli common.GithubClient) error {
scaleSets, err := c.store.ListEntityScaleSets(c.ctx, c.Entity)
if err != nil {
return fmt.Errorf("listing scale sets: %w", err)
}
for _, sSet := range scaleSets {
if err := c.handleScaleSetCreateOperation(sSet); err != nil {
slog.DebugContext(c.ctx, "loading scale set", "scale_set", sSet.ID)
if err := c.handleScaleSetCreateOperation(sSet, cli); err != nil {
slog.With(slog.Any("error", err)).ErrorContext(c.ctx, "failed to handle scale set create operation")
continue
}
@ -104,6 +104,7 @@ func (c *Controller) loadAllScaleSets() error {
}
func (c *Controller) Start() (err error) {
slog.DebugContext(c.ctx, "starting scale set controller", "scale_set", c.consumerID)
c.mux.Lock()
if c.running {
c.mux.Unlock()
@ -111,15 +112,16 @@ func (c *Controller) Start() (err error) {
}
c.mux.Unlock()
if err := c.loadAllScaleSets(); err != nil {
return fmt.Errorf("loading all scale sets: %w", err)
}
ghCli, err := github.Client(c.ctx, c.Entity)
if err != nil {
return fmt.Errorf("creating github client: %w", err)
}
slog.DebugContext(c.ctx, "loaging scale sets", "entity", c.Entity.String())
if err := c.loadAllScaleSets(ghCli); err != nil {
return fmt.Errorf("loading all scale sets: %w", err)
}
consumer, err := watcher.RegisterConsumer(
c.ctx, c.consumerID,
composeControllerWatcherFilters(c.Entity),
@ -140,6 +142,7 @@ func (c *Controller) Start() (err error) {
}
func (c *Controller) Stop() error {
slog.DebugContext(c.ctx, "stopping scale set controller", "scale_set", c.consumerID)
c.mux.Lock()
defer c.mux.Unlock()
@ -170,15 +173,21 @@ func (c *Controller) updateTools() error {
c.mux.Lock()
defer c.mux.Unlock()
slog.DebugContext(c.ctx, "updating tools for entity", "entity", c.Entity.String())
tools, err := garmUtil.FetchTools(c.ctx, c.ghCli)
if err != nil {
slog.With(slog.Any("error", err)).ErrorContext(
c.ctx, "failed to update tools for entity", "entity", c.Entity.String())
c.forgeCredsAreValid = false
if errors.Is(err, runnerErrors.ErrUnauthorized) {
// TODO: block all scale sets
c.forgeCredsAreValid = false
}
return fmt.Errorf("failed to update tools for entity %s: %w", c.Entity.String(), err)
}
slog.DebugContext(c.ctx, "tools successfully updated for entity", "entity", c.Entity.String())
c.forgeCredsAreValid = true
c.tools = tools
cache.SetGithubToolsCache(c.Entity, tools)
return nil
}
@ -202,7 +211,7 @@ func (c *Controller) loop() {
updateToolsTicker := time.NewTicker(common.PoolToolUpdateInterval)
initialToolUpdate := make(chan struct{}, 1)
go func() {
slog.Info("running initial tool update")
slog.InfoContext(c.ctx, "running initial tool update")
if err := c.updateTools(); err != nil {
slog.With(slog.Any("error", err)).Error("failed to update tools")
}
@ -211,7 +220,11 @@ func (c *Controller) loop() {
for {
select {
case payload := <-c.consumer.Watch():
case payload, ok := <-c.consumer.Watch():
if !ok {
slog.InfoContext(c.ctx, "consumer channel closed")
return
}
slog.InfoContext(c.ctx, "received payload", slog.Any("payload", payload))
go c.handleWatcherEvent(payload)
case <-c.ctx.Done():

View file

@ -6,8 +6,8 @@ import (
dbCommon "github.com/cloudbase/garm/database/common"
"github.com/cloudbase/garm/params"
"github.com/cloudbase/garm/runner/common"
"github.com/cloudbase/garm/util/github"
scaleSetCli "github.com/cloudbase/garm/util/github/scalesets"
)
func (c *Controller) handleWatcherEvent(event dbCommon.ChangePayload) {
@ -38,7 +38,7 @@ func (c *Controller) handleScaleSet(event dbCommon.ChangePayload) {
switch event.Operation {
case dbCommon.CreateOperation:
slog.DebugContext(c.ctx, "got create operation for scale set", "scale_set_id", scaleSet.ID, "scale_set_name", scaleSet.Name)
if err := c.handleScaleSetCreateOperation(scaleSet); err != nil {
if err := c.handleScaleSetCreateOperation(scaleSet, c.ghCli); err != nil {
slog.With(slog.Any("error", err)).ErrorContext(c.ctx, "failed to handle scale set create operation")
}
case dbCommon.UpdateOperation:
@ -57,7 +57,7 @@ func (c *Controller) handleScaleSet(event dbCommon.ChangePayload) {
}
}
func (c *Controller) handleScaleSetCreateOperation(sSet params.ScaleSet) error {
func (c *Controller) handleScaleSetCreateOperation(sSet params.ScaleSet, ghCli common.GithubClient) error {
c.mux.Lock()
defer c.mux.Unlock()
@ -74,7 +74,7 @@ func (c *Controller) handleScaleSetCreateOperation(sSet params.ScaleSet) error {
return fmt.Errorf("provider %s not found for scale set %s", sSet.ProviderName, sSet.Name)
}
worker, err := NewWorker(c.ctx, c.store, sSet, provider)
worker, err := NewWorker(c.ctx, c.store, sSet, provider, ghCli)
if err != nil {
return fmt.Errorf("creating scale set worker: %w", err)
}
@ -120,7 +120,7 @@ func (c *Controller) handleScaleSetUpdateOperation(sSet params.ScaleSet) error {
// Some error may have occured when the scale set was first created, so we
// attempt to create it after the user updated the scale set, hopefully
// fixing the reason for the failure.
return c.handleScaleSetCreateOperation(sSet)
return c.handleScaleSetCreateOperation(sSet, c.ghCli)
}
// We let the watcher in the scale set worker handle the update operation.
return nil
@ -140,8 +140,7 @@ func (c *Controller) handleCredentialsEvent(event dbCommon.ChangePayload) {
defer c.mux.Unlock()
if c.Entity.Credentials.ID != credentials.ID {
// credentials were swapped on the entity. We need to recompose the watcher
// filters.
// stale update event.
return
}
c.Entity.Credentials = credentials
@ -190,15 +189,10 @@ func (c *Controller) updateAndBroadcastCredentials(entity params.GithubEntity) e
return fmt.Errorf("creating github client: %w", err)
}
setCli, err := scaleSetCli.NewClient(ghCli)
if err != nil {
return fmt.Errorf("creating scaleset client: %w", err)
}
c.ghCli = ghCli
c.scaleSetCli = setCli
for _, scaleSet := range c.ScaleSets {
if err := scaleSet.worker.SetGithubClient(ghCli, setCli); err != nil {
if err := scaleSet.worker.SetGithubClient(ghCli); err != nil {
slog.ErrorContext(c.ctx, "setting github client on worker", "error", err)
continue
}

View file

@ -0,0 +1,12 @@
package scaleset
import (
"github.com/cloudbase/garm/params"
"github.com/cloudbase/garm/util/github/scalesets"
)
type scaleSetHelper interface {
ScaleSetCLI() *scalesets.ScaleSetClient
GetScaleSet() params.ScaleSet
Owner() string
}

View file

@ -2,34 +2,54 @@ package scaleset
import (
"context"
"fmt"
"log/slog"
"sync"
"time"
commonParams "github.com/cloudbase/garm-provider-common/params"
dbCommon "github.com/cloudbase/garm/database/common"
"github.com/cloudbase/garm/database/watcher"
"github.com/cloudbase/garm/params"
"github.com/cloudbase/garm/runner/common"
"github.com/cloudbase/garm/util/github/scalesets"
)
func NewWorker(ctx context.Context, store dbCommon.Store, scaleSet params.ScaleSet, provider common.Provider) (*Worker, error) {
func NewWorker(ctx context.Context, store dbCommon.Store, scaleSet params.ScaleSet, provider common.Provider, ghCli common.GithubClient) (*Worker, error) {
consumerID := fmt.Sprintf("scaleset-worker-%s-%d", scaleSet.Name, scaleSet.ID)
controllerInfo, err := store.ControllerInfo()
if err != nil {
return nil, fmt.Errorf("getting controller info: %w", err)
}
scaleSetCli, err := scalesets.NewClient(ghCli)
if err != nil {
return nil, fmt.Errorf("creating scale set client: %w", err)
}
return &Worker{
ctx: ctx,
store: store,
provider: provider,
Entity: scaleSet,
ctx: ctx,
controllerInfo: controllerInfo,
consumerID: consumerID,
store: store,
provider: provider,
Entity: scaleSet,
ghCli: ghCli,
scaleSetCli: scaleSetCli,
}, nil
}
type Worker struct {
ctx context.Context
ctx context.Context
consumerID string
controllerInfo params.ControllerInfo
provider common.Provider
store dbCommon.Store
Entity params.ScaleSet
tools []commonParams.RunnerApplicationDownload
ghCli common.GithubClient
scaleSetCli *scalesets.ScaleSetClient
consumer dbCommon.Consumer
listener *scaleSetListener
mux sync.Mutex
running bool
@ -37,40 +57,173 @@ type Worker struct {
}
func (w *Worker) Stop() error {
return nil
}
func (w *Worker) Start() error {
slog.DebugContext(w.ctx, "stopping scale set worker", "scale_set", w.consumerID)
w.mux.Lock()
defer w.mux.Unlock()
if !w.running {
return nil
}
w.consumer.Close()
w.running = false
if w.quit != nil {
close(w.quit)
w.quit = nil
}
w.listener.Stop()
w.listener = nil
return nil
}
func (w *Worker) Start() (err error) {
slog.DebugContext(w.ctx, "starting scale set worker", "scale_set", w.consumerID)
w.mux.Lock()
defer w.mux.Unlock()
if w.running {
return nil
}
consumer, err := watcher.RegisterConsumer(
w.ctx, w.consumerID,
watcher.WithAll(
watcher.WithScaleSetFilter(w.Entity),
watcher.WithOperationTypeFilter(dbCommon.UpdateOperation),
),
)
if err != nil {
return fmt.Errorf("error registering consumer: %w", err)
}
defer func() {
if err != nil {
consumer.Close()
w.consumer = nil
}
}()
slog.DebugContext(w.ctx, "creating scale set listener")
listener := newListener(w.ctx, w)
slog.DebugContext(w.ctx, "starting scale set listener")
if err := listener.Start(); err != nil {
return fmt.Errorf("error starting listener: %w", err)
}
w.listener = listener
w.consumer = consumer
w.running = true
w.quit = make(chan struct{})
slog.DebugContext(w.ctx, "starting scale set worker loops", "scale_set", w.consumerID)
go w.loop()
go w.keepListenerAlive()
return nil
}
func (w *Worker) SetTools(tools []commonParams.RunnerApplicationDownload) {
func (w *Worker) SetGithubClient(client common.GithubClient) error {
w.mux.Lock()
defer w.mux.Unlock()
w.tools = tools
}
func (w *Worker) SetGithubClient(client common.GithubClient, scaleSetCli *scalesets.ScaleSetClient) error {
w.mux.Lock()
defer w.mux.Unlock()
// TODO:
// * stop current listener if any
if err := w.listener.Stop(); err != nil {
slog.ErrorContext(w.ctx, "error stopping listener", "error", err)
}
w.ghCli = client
scaleSetCli, err := scalesets.NewClient(client)
if err != nil {
return fmt.Errorf("error creating scale set client: %w", err)
}
w.scaleSetCli = scaleSetCli
// TODO:
// * start new listener
return nil
}
func (w *Worker) handleEvent(event dbCommon.ChangePayload) {
scaleSet, ok := event.Payload.(params.ScaleSet)
if !ok {
slog.ErrorContext(w.ctx, "invalid payload for scale set type", "scale_set_type", event.EntityType, "payload", event.Payload)
return
}
switch event.Operation {
case dbCommon.UpdateOperation:
slog.DebugContext(w.ctx, "got update operation")
w.mux.Lock()
w.Entity = scaleSet
w.mux.Unlock()
default:
slog.DebugContext(w.ctx, "invalid operation type; ignoring", "operation_type", event.Operation)
}
}
func (w *Worker) loop() {
defer w.Stop()
for {
select {
case <-w.quit:
return
case event, ok := <-w.consumer.Watch():
if !ok {
slog.InfoContext(w.ctx, "consumer channel closed")
return
}
go w.handleEvent(event)
case <-w.ctx.Done():
slog.DebugContext(w.ctx, "context done")
return
}
}
}
func (w *Worker) sleepWithCancel(sleepTime time.Duration) (canceled bool) {
ticker := time.NewTicker(sleepTime)
defer ticker.Stop()
select {
case <-ticker.C:
return false
case <-w.quit:
return true
case <-w.ctx.Done():
return true
}
}
func (w *Worker) keepListenerAlive() {
var backoff time.Duration
for {
select {
case <-w.quit:
return
case <-w.ctx.Done():
return
case <-w.listener.Wait():
slog.DebugContext(w.ctx, "listener is stopped; attempting to restart")
for {
w.mux.Lock()
w.listener.Stop() //cleanup
slog.DebugContext(w.ctx, "attempting to restart")
if err := w.listener.Start(); err != nil {
w.mux.Unlock()
slog.ErrorContext(w.ctx, "error restarting listener", "error", err)
if backoff > 60*time.Second {
backoff = 60 * time.Second
} else if backoff == 0 {
backoff = 5 * time.Second
slog.InfoContext(w.ctx, "backing off restart attempt", "backoff", backoff)
} else {
backoff *= 2
}
slog.ErrorContext(w.ctx, "error restarting listener", "error", err, "backoff", backoff)
if canceled := w.sleepWithCancel(backoff); canceled {
slog.DebugContext(w.ctx, "listener restart canceled")
return
}
continue
}
w.mux.Unlock()
break
}
}
}
}

View file

@ -0,0 +1,20 @@
package scaleset
import (
"fmt"
"github.com/cloudbase/garm/params"
"github.com/cloudbase/garm/util/github/scalesets"
)
func (w *Worker) ScaleSetCLI() *scalesets.ScaleSetClient {
return w.scaleSetCli
}
func (w *Worker) GetScaleSet() params.ScaleSet {
return w.Entity
}
func (w *Worker) Owner() string {
return fmt.Sprintf("garm-%s", w.controllerInfo.ControllerID)
}

View file

@ -0,0 +1,134 @@
package scaleset
import (
"context"
"errors"
"fmt"
"log/slog"
"sync"
"github.com/cloudbase/garm/params"
"github.com/cloudbase/garm/util/github/scalesets"
)
func newListener(ctx context.Context, scaleSetHelper scaleSetHelper) *scaleSetListener {
return &scaleSetListener{
ctx: ctx,
scaleSetHelper: scaleSetHelper,
}
}
type scaleSetListener struct {
// ctx is the global context for the worker
ctx context.Context
// listenerCtx is the context for the listener. We pass this
// context to GetMessages() which blocks until a message is
// available. We need to be able to cancel that longpoll request
// independent of the worker context, in case we need to restart
// the listener without restarting the worker.
listenerCtx context.Context
cancelFunc context.CancelFunc
lastMessageID int64
scaleSetHelper scaleSetHelper
messageSession *scalesets.MessageSession
mux sync.Mutex
running bool
quit chan struct{}
}
func (l *scaleSetListener) Start() error {
slog.DebugContext(l.ctx, "starting scale set listener", "scale_set", l.scaleSetHelper.GetScaleSet().ScaleSetID)
l.mux.Lock()
defer l.mux.Unlock()
l.listenerCtx, l.cancelFunc = context.WithCancel(context.Background())
scaleSet := l.scaleSetHelper.GetScaleSet()
slog.DebugContext(l.ctx, "creating new message session", "scale_set", scaleSet.ScaleSetID)
session, err := l.scaleSetHelper.ScaleSetCLI().CreateMessageSession(
l.listenerCtx, scaleSet.ScaleSetID,
l.scaleSetHelper.Owner(),
)
if err != nil {
return fmt.Errorf("creating message session: %w", err)
}
l.messageSession = session
l.quit = make(chan struct{})
l.running = true
go l.loop()
return nil
}
func (l *scaleSetListener) Stop() error {
l.mux.Lock()
defer l.mux.Unlock()
if !l.running {
return nil
}
if l.messageSession != nil {
slog.DebugContext(l.ctx, "closing message session", "scale_set", l.scaleSetHelper.GetScaleSet().ScaleSetID)
if err := l.messageSession.Close(); err != nil {
slog.ErrorContext(l.ctx, "closing message session", "error", err)
}
if err := l.scaleSetHelper.ScaleSetCLI().DeleteMessageSession(context.Background(), l.messageSession); err != nil {
slog.ErrorContext(l.ctx, "error deleting message session", "error", err)
}
}
l.cancelFunc()
l.messageSession.Close()
l.running = false
close(l.quit)
return nil
}
func (l *scaleSetListener) handleSessionMessage(msg params.RunnerScaleSetMessage) {
l.mux.Lock()
defer l.mux.Unlock()
body, err := msg.GetJobsFromBody()
if err != nil {
slog.ErrorContext(l.ctx, "getting jobs from body", "error", err)
return
}
slog.InfoContext(l.ctx, "handling message", "message", msg, "body", body)
l.lastMessageID = msg.MessageID
}
func (l *scaleSetListener) loop() {
defer l.Stop()
slog.DebugContext(l.ctx, "starting scale set listener loop", "scale_set", l.scaleSetHelper.GetScaleSet().ScaleSetID)
for {
select {
case <-l.quit:
return
case <-l.listenerCtx.Done():
slog.DebugContext(l.ctx, "stopping scale set listener")
return
case <-l.ctx.Done():
slog.DebugContext(l.ctx, "scaleset worker has stopped")
return
default:
slog.DebugContext(l.ctx, "getting message")
msg, err := l.messageSession.GetMessage(
l.listenerCtx, l.lastMessageID, l.scaleSetHelper.GetScaleSet().MaxRunners)
if err != nil {
if !errors.Is(err, context.Canceled) {
slog.ErrorContext(l.ctx, "getting message", "error", err)
}
return
}
l.handleSessionMessage(msg)
}
}
}
func (l *scaleSetListener) Wait() <-chan struct{} {
if !l.running {
return nil
}
return l.listenerCtx.Done()
}