diff --git a/cache/entity_cache.go b/cache/entity_cache.go index d69d7099..08e218df 100644 --- a/cache/entity_cache.go +++ b/cache/entity_cache.go @@ -1,6 +1,7 @@ package cache import ( + "log/slog" "sync" "github.com/cloudbase/garm/params" @@ -64,14 +65,21 @@ func (e *EntityCache) SetEntity(entity params.GithubEntity) { } } -func (e *EntityCache) ReplaceEntityPools(entityID string, pools map[string]params.Pool) { +func (e *EntityCache) ReplaceEntityPools(entityID string, pools []params.Pool) { e.mux.Lock() defer e.mux.Unlock() - if cache, ok := e.entities[entityID]; ok { - cache.Pools = pools - e.entities[entityID] = cache + cache, ok := e.entities[entityID] + if !ok { + return } + + poolsByID := map[string]params.Pool{} + for _, pool := range pools { + poolsByID[pool.ID] = pool + } + cache.Pools = poolsByID + e.entities[entityID] = cache } func (e *EntityCache) ReplaceEntityScaleSets(entityID string, scaleSets map[uint]params.ScaleSet) { @@ -154,6 +162,37 @@ func (e *EntityCache) GetEntityScaleSet(entityID string, scaleSetID uint) (param return params.ScaleSet{}, false } +func (e *EntityCache) FindPoolsMatchingAllTags(entityID string, tags []string) []params.Pool { + e.mux.Lock() + defer e.mux.Unlock() + + if cache, ok := e.entities[entityID]; ok { + var pools []params.Pool + slog.Debug("Finding pools matching all tags", "entityID", entityID, "tags", tags, "pools", cache.Pools) + for _, pool := range cache.Pools { + if pool.HasRequiredLabels(tags) { + pools = append(pools, pool) + } + } + return pools + } + return nil +} + +func (e *EntityCache) GetEntityPools(entityID string) []params.Pool { + e.mux.Lock() + defer e.mux.Unlock() + + if cache, ok := e.entities[entityID]; ok { + var pools []params.Pool + for _, pool := range cache.Pools { + pools = append(pools, pool) + } + return pools + } + return nil +} + func GetEntity(entity params.GithubEntity) (EntityItem, bool) { return entityCache.GetEntity(entity) } @@ -162,7 +201,7 @@ func SetEntity(entity params.GithubEntity) { entityCache.SetEntity(entity) } -func ReplaceEntityPools(entityID string, pools map[string]params.Pool) { +func ReplaceEntityPools(entityID string, pools []params.Pool) { entityCache.ReplaceEntityPools(entityID, pools) } @@ -197,3 +236,11 @@ func GetEntityPool(entityID string, poolID string) (params.Pool, bool) { func GetEntityScaleSet(entityID string, scaleSetID uint) (params.ScaleSet, bool) { return entityCache.GetEntityScaleSet(entityID, scaleSetID) } + +func FindPoolsMatchingAllTags(entityID string, tags []string) []params.Pool { + return entityCache.FindPoolsMatchingAllTags(entityID, tags) +} + +func GetEntityPools(entityID string) []params.Pool { + return entityCache.GetEntityPools(entityID) +} diff --git a/cache/instance_cache.go b/cache/instance_cache.go index 88074765..44f95ec2 100644 --- a/cache/instance_cache.go +++ b/cache/instance_cache.go @@ -25,24 +25,24 @@ func (i *InstanceCache) SetInstance(instance params.Instance) { i.mux.Lock() defer i.mux.Unlock() - i.cache[instance.ID] = instance + i.cache[instance.Name] = instance } -func (i *InstanceCache) GetInstance(id string) (params.Instance, bool) { +func (i *InstanceCache) GetInstance(name string) (params.Instance, bool) { i.mux.Lock() defer i.mux.Unlock() - if instance, ok := i.cache[id]; ok { + if instance, ok := i.cache[name]; ok { return instance, true } return params.Instance{}, false } -func (i *InstanceCache) DeleteInstance(id string) { +func (i *InstanceCache) DeleteInstance(name string) { i.mux.Lock() defer i.mux.Unlock() - delete(i.cache, id) + delete(i.cache, name) } func (i *InstanceCache) GetAllInstances() []params.Instance { @@ -86,12 +86,12 @@ func SetInstanceCache(instance params.Instance) { instanceCache.SetInstance(instance) } -func GetInstanceCache(id string) (params.Instance, bool) { - return instanceCache.GetInstance(id) +func GetInstanceCache(name string) (params.Instance, bool) { + return instanceCache.GetInstance(name) } -func DeleteInstanceCache(id string) { - instanceCache.DeleteInstance(id) +func DeleteInstanceCache(name string) { + instanceCache.DeleteInstance(name) } func GetAllInstancesCache() []params.Instance { diff --git a/cmd/garm-cli/cmd/organization.go b/cmd/garm-cli/cmd/organization.go index c7be1f19..c7b80fec 100644 --- a/cmd/garm-cli/cmd/organization.go +++ b/cmd/garm-cli/cmd/organization.go @@ -379,8 +379,6 @@ func formatOneOrganization(org params.Organization) { t.AppendRow(table.Row{"Endpoint", org.Endpoint.Name}) t.AppendRow(table.Row{"Pool balancer type", org.GetBalancerType()}) t.AppendRow(table.Row{"Credentials", org.CredentialsName}) - t.AppendRow(table.Row{"Created at", org.CreatedAt}) - t.AppendRow(table.Row{"Updated at", org.UpdatedAt}) t.AppendRow(table.Row{"Pool manager running", org.PoolManagerStatus.IsRunning}) if !org.PoolManagerStatus.IsRunning { t.AppendRow(table.Row{"Failure reason", org.PoolManagerStatus.FailureReason}) diff --git a/params/github.go b/params/github.go index 888288fc..0f963090 100644 --- a/params/github.go +++ b/params/github.go @@ -242,7 +242,7 @@ type RunnerScaleSetStatistic struct { type RunnerScaleSet struct { ID int `json:"id,omitempty"` Name string `json:"name,omitempty"` - RunnerGroupID int `json:"runnerGroupId,omitempty"` + RunnerGroupID int64 `json:"runnerGroupId,omitempty"` RunnerGroupName string `json:"runnerGroupName,omitempty"` Labels []Label `json:"labels,omitempty"` RunnerSetting RunnerSetting `json:"RunnerSetting,omitempty"` @@ -523,7 +523,38 @@ type ScaleSetJobMessage struct { RunnerAssignTime time.Time `json:"runnerAssignTime,omitempty"` FinishTime time.Time `json:"finishTime,omitempty"` Result string `json:"result,omitempty"` - RunnerID int `json:"runnerId,omitempty"` + RunnerID int64 `json:"runnerId,omitempty"` RunnerName string `json:"runnerName,omitempty"` AcquireJobURL string `json:"acquireJobUrl,omitempty"` } + +func (s ScaleSetJobMessage) MessageTypeToStatus() JobStatus { + switch s.MessageType { + case MessageTypeJobAssigned: + return JobStatusQueued + case MessageTypeJobStarted: + return JobStatusInProgress + case MessageTypeJobCompleted: + return JobStatusCompleted + default: + return JobStatusQueued + } +} + +func (s ScaleSetJobMessage) ToJob() Job { + return Job{ + ID: s.RunnerRequestID, + Action: s.EventName, + RunID: s.WorkflowRunID, + Status: string(s.MessageTypeToStatus()), + Conclusion: s.Result, + CompletedAt: s.FinishTime, + StartedAt: s.RunnerAssignTime, + Name: s.JobDisplayName, + GithubRunnerID: s.RunnerID, + RunnerName: s.RunnerName, + RepositoryName: s.RepositoryName, + RepositoryOwner: s.OwnerName, + Labels: s.RequestLabels, + } +} diff --git a/params/params.go b/params/params.go index 2c1ed042..a15d2446 100644 --- a/params/params.go +++ b/params/params.go @@ -1039,6 +1039,17 @@ func (g GithubEntity) String() string { return "" } +func (g GithubEntity) GetIDAsUUID() (uuid.UUID, error) { + if g.ID == "" { + return uuid.Nil, nil + } + id, err := uuid.Parse(g.ID) + if err != nil { + return uuid.Nil, fmt.Errorf("failed to parse entity ID: %w", err) + } + return id, nil +} + // used by swagger client generated code type GithubEndpoints []GithubEndpoint diff --git a/runner/pool/pool.go b/runner/pool/pool.go index f1134de8..73a0b0fa 100644 --- a/runner/pool/pool.go +++ b/runner/pool/pool.go @@ -36,6 +36,7 @@ import ( commonParams "github.com/cloudbase/garm-provider-common/params" "github.com/cloudbase/garm-provider-common/util" "github.com/cloudbase/garm/auth" + "github.com/cloudbase/garm/cache" dbCommon "github.com/cloudbase/garm/database/common" "github.com/cloudbase/garm/database/watcher" "github.com/cloudbase/garm/locking" @@ -165,14 +166,13 @@ func (r *basePoolManager) HandleWorkflowJob(job params.WorkflowJob) error { var err error var triggeredBy int64 defer func() { + if jobParams.ID == 0 { + return + } // we're updating the job in the database, regardless of whether it was successful or not. // or if it was meant for this pool or not. Github will send the same job data to all hierarchies // that have been configured to work with garm. Updating the job at all levels should yield the same // outcome in the db. - if jobParams.ID == 0 { - return - } - _, err := r.store.GetJobByID(r.ctx, jobParams.ID) if err != nil { if !errors.Is(err, runnerErrors.ErrNotFound) { @@ -182,13 +182,7 @@ func (r *basePoolManager) HandleWorkflowJob(job params.WorkflowJob) error { return } // This job is new to us. Check if we have a pool that can handle it. - potentialPools, err := r.store.FindPoolsMatchingAllTags(r.ctx, r.entity.EntityType, r.entity.ID, jobParams.Labels) - if err != nil { - slog.With(slog.Any("error", err)).WarnContext( - r.ctx, "failed to find pools matching tags; not recording job", - "requested_tags", strings.Join(jobParams.Labels, ", ")) - return - } + potentialPools := cache.FindPoolsMatchingAllTags(r.entity.ID, jobParams.Labels) if len(potentialPools) == 0 { slog.WarnContext( r.ctx, "no pools matching tags; not recording job", @@ -236,6 +230,16 @@ func (r *basePoolManager) HandleWorkflowJob(job params.WorkflowJob) error { return nil } + fromCache, ok := cache.GetInstanceCache(jobParams.RunnerName) + if !ok { + return nil + } + + if _, ok := cache.GetEntityPool(r.entity.ID, fromCache.PoolID); !ok { + slog.DebugContext(r.ctx, "instance belongs to a pool not managed by this entity", "pool_id", fromCache.PoolID) + return nil + } + // update instance workload state. if _, err := r.setInstanceRunnerStatus(jobParams.RunnerName, params.RunnerTerminated); err != nil { if errors.Is(err, runnerErrors.ErrNotFound) { @@ -261,17 +265,20 @@ func (r *basePoolManager) HandleWorkflowJob(job params.WorkflowJob) error { case "in_progress": jobParams, err = r.paramsWorkflowJobToParamsJob(job) if err != nil { - if errors.Is(err, runnerErrors.ErrNotFound) { - // This is most likely a runner we're not managing. If we define a repo from within an org - // and also define that same org, we will get a hook from github from both the repo and the org - // regarding the same workflow. We look for the runner in the database, and make sure it exists and is - // part of a pool that this manager is responsible for. A not found error here will most likely mean - // that we are not responsible for that runner, and we should ignore it. - return nil - } return errors.Wrap(err, "converting job to params") } + fromCache, ok := cache.GetInstanceCache(jobParams.RunnerName) + if !ok { + slog.DebugContext(r.ctx, "instance not found in cache", "runner_name", jobParams.RunnerName) + return nil + } + + pool, ok := cache.GetEntityPool(r.entity.ID, fromCache.PoolID) + if !ok { + slog.DebugContext(r.ctx, "instance belongs to a pool not managed by this entity", "pool_id", fromCache.PoolID) + return nil + } // update instance workload state. instance, err := r.setInstanceRunnerStatus(jobParams.RunnerName, params.RunnerActive) if err != nil { @@ -288,10 +295,6 @@ func (r *basePoolManager) HandleWorkflowJob(job params.WorkflowJob) error { // A runner has picked up the job, and is now running it. It may need to be replaced if the pool has // a minimum number of idle runners configured. - pool, err := r.store.GetEntityPool(r.ctx, r.entity, instance.PoolID) - if err != nil { - return errors.Wrap(err, "getting pool") - } if err := r.ensureIdleRunnersForOnePool(pool); err != nil { slog.With(slog.Any("error", err)).ErrorContext( r.ctx, "error ensuring idle runners for pool", @@ -1286,10 +1289,7 @@ func (r *basePoolManager) retryFailedInstancesForOnePool(ctx context.Context, po } func (r *basePoolManager) retryFailedInstances() error { - pools, err := r.store.ListEntityPools(r.ctx, r.entity) - if err != nil { - return fmt.Errorf("error listing pools: %w", err) - } + pools := cache.GetEntityPools(r.entity.ID) g, ctx := errgroup.WithContext(r.ctx) for _, pool := range pools { pool := pool @@ -1309,10 +1309,7 @@ func (r *basePoolManager) retryFailedInstances() error { } func (r *basePoolManager) scaleDown() error { - pools, err := r.store.ListEntityPools(r.ctx, r.entity) - if err != nil { - return fmt.Errorf("error listing pools: %w", err) - } + pools := cache.GetEntityPools(r.entity.ID) g, ctx := errgroup.WithContext(r.ctx) for _, pool := range pools { pool := pool @@ -1330,11 +1327,7 @@ func (r *basePoolManager) scaleDown() error { } func (r *basePoolManager) ensureMinIdleRunners() error { - pools, err := r.store.ListEntityPools(r.ctx, r.entity) - if err != nil { - return fmt.Errorf("error listing pools: %w", err) - } - + pools := cache.GetEntityPools(r.entity.ID) g, _ := errgroup.WithContext(r.ctx) for _, pool := range pools { pool := pool @@ -1613,6 +1606,13 @@ func (r *basePoolManager) cleanupOrphanedRunners(runners []*github.Runner) error } func (r *basePoolManager) Start() error { + // load pools in cache + pools, err := r.store.ListEntityPools(r.ctx, r.entity) + if err != nil { + return fmt.Errorf("failed to list pools: %w", err) + } + cache.ReplaceEntityPools(r.entity.ID, pools) + initialToolUpdate := make(chan struct{}, 1) go func() { slog.Info("running initial tool update") diff --git a/runner/pool/util.go b/runner/pool/util.go index 9b7b7f14..d7b2c416 100644 --- a/runner/pool/util.go +++ b/runner/pool/util.go @@ -132,5 +132,7 @@ func composeWatcherFilters(entity params.GithubEntity) dbCommon.PayloadFilterFun watcher.WithEntityFilter(entity), // Watch for changes to the github credentials watcher.WithGithubCredentialsFilter(entity.Credentials), + // Watch for entity pool operations + watcher.WithEntityPoolFilter(entity), ) } diff --git a/runner/pool/watcher.go b/runner/pool/watcher.go index 7f05d93b..61a1117c 100644 --- a/runner/pool/watcher.go +++ b/runner/pool/watcher.go @@ -6,6 +6,7 @@ import ( "github.com/pkg/errors" runnerErrors "github.com/cloudbase/garm-provider-common/errors" + "github.com/cloudbase/garm/cache" "github.com/cloudbase/garm/database/common" "github.com/cloudbase/garm/params" runnerCommon "github.com/cloudbase/garm/runner/common" @@ -121,6 +122,23 @@ func (r *basePoolManager) handleCredentialsUpdate(credentials params.GithubCrede r.mux.Unlock() } +func (r *basePoolManager) handleEntityPoolEvent(event common.ChangePayload) { + pool, ok := event.Payload.(params.Pool) + if !ok { + slog.ErrorContext(r.ctx, "failed to cast payload to pool") + return + } + + switch event.Operation { + case common.CreateOperation, common.UpdateOperation: + slog.DebugContext(r.ctx, "updating pool in cache", "pool_id", pool.ID) + cache.SetEntityPool(r.entity.ID, pool) + case common.DeleteOperation: + slog.DebugContext(r.ctx, "deleting pool from cache", "pool_id", pool.ID) + cache.DeleteEntityPool(r.entity.ID, pool.ID) + } +} + func (r *basePoolManager) handleWatcherEvent(event common.ChangePayload) { dbEntityType := common.DatabaseEntityType(r.entity.EntityType) switch event.EntityType { @@ -150,6 +168,8 @@ func (r *basePoolManager) handleWatcherEvent(event common.ChangePayload) { return } r.handleEntityUpdate(entityInfo, event.Operation) + case common.PoolEntityType: + r.handleEntityPoolEvent(event) } } diff --git a/runner/scalesets.go b/runner/scalesets.go index f55b5dca..83432e63 100644 --- a/runner/scalesets.go +++ b/runner/scalesets.go @@ -153,12 +153,12 @@ func (r *Runner) UpdateScaleSetByID(ctx context.Context, scaleSetID uint, param return params.ScaleSet{}, errors.Wrap(err, "creating github client") } - callback := func(old, newSet params.ScaleSet) error { - scalesetCli, err := scalesets.NewClient(ghCli) - if err != nil { - return errors.Wrap(err, "getting scaleset client") - } + scalesetCli, err := scalesets.NewClient(ghCli) + if err != nil { + return params.ScaleSet{}, errors.Wrap(err, "getting scaleset client") + } + callback := func(old, newSet params.ScaleSet) error { updateParams := params.RunnerScaleSet{} hasUpdates := false if old.Name != newSet.Name { @@ -171,7 +171,7 @@ func (r *Runner) UpdateScaleSetByID(ctx context.Context, scaleSetID uint, param if err != nil { return fmt.Errorf("error fetching runner group from github: %w", err) } - updateParams.RunnerGroupID = int(runnerGroup.ID) + updateParams.RunnerGroupID = runnerGroup.ID hasUpdates = true } @@ -225,13 +225,13 @@ func (r *Runner) CreateEntityScaleSet(ctx context.Context, entityType params.Git if err != nil { return params.ScaleSet{}, errors.Wrap(err, "getting scaleset client") } - runnerGroupID := 1 + var runnerGroupID int64 = 1 if param.GitHubRunnerGroup != "Default" { runnerGroup, err := scalesetCli.GetRunnerGroupByName(ctx, param.GitHubRunnerGroup) if err != nil { return params.ScaleSet{}, errors.Wrap(err, "getting runner group") } - runnerGroupID = int(runnerGroup.ID) + runnerGroupID = runnerGroup.ID } createParam := ¶ms.RunnerScaleSet{ diff --git a/workers/provider/instance_manager.go b/workers/provider/instance_manager.go index dfcd1cb5..dcb10257 100644 --- a/workers/provider/instance_manager.go +++ b/workers/provider/instance_manager.go @@ -350,10 +350,6 @@ func (i *instanceManager) handleUpdate(update dbCommon.ChangePayload) error { return runnerErrors.NewBadRequestError("invalid payload type") } - switch instance.Status { - case commonParams.InstanceDeleting, commonParams.InstanceCreating: - return nil - } i.instance = instance return nil } diff --git a/workers/provider/provider.go b/workers/provider/provider.go index 3a7447f6..05a78c7e 100644 --- a/workers/provider/provider.go +++ b/workers/provider/provider.go @@ -239,6 +239,15 @@ func (p *Provider) handleInstanceAdded(instance params.Instance) error { return nil } +func (p *Provider) updateInstanceCache(instance params.Instance, op dbCommon.OperationType) { + if op == dbCommon.DeleteOperation { + slog.DebugContext(p.ctx, "deleting instance from cache", "instance_name", instance.Name) + cache.DeleteInstanceCache(instance.Name) + return + } + cache.SetInstanceCache(instance) +} + func (p *Provider) handleInstanceEvent(event dbCommon.ChangePayload) { p.mux.Lock() defer p.mux.Unlock() @@ -248,6 +257,7 @@ func (p *Provider) handleInstanceEvent(event dbCommon.ChangePayload) { slog.ErrorContext(p.ctx, "invalid payload type", "payload_type", fmt.Sprintf("%T", event.Payload)) return } + p.updateInstanceCache(instance, event.Operation) if instance.ScaleSetID == 0 { slog.DebugContext(p.ctx, "skipping instance event for non scale set instance") @@ -290,7 +300,7 @@ func (p *Provider) handleInstanceEvent(event dbCommon.ChangePayload) { } } delete(p.runners, instance.Name) - cache.DeleteInstanceCache(instance.ID) + cache.DeleteInstanceCache(instance.Name) default: slog.ErrorContext(p.ctx, "invalid operation type", "operation_type", event.Operation) return diff --git a/workers/scaleset/controller.go b/workers/scaleset/controller.go index b6d61f54..e1758550 100644 --- a/workers/scaleset/controller.go +++ b/workers/scaleset/controller.go @@ -91,6 +91,8 @@ func (c *Controller) loadAllScaleSets(cli common.GithubClient) error { } for _, sSet := range scaleSets { + cache.SetEntityScaleSet(c.Entity.ID, sSet) + slog.DebugContext(c.ctx, "loading scale set", "scale_set", sSet.ID) if err := c.handleScaleSetCreateOperation(sSet, cli); err != nil { slog.With(slog.Any("error", err)).ErrorContext(c.ctx, "failed to handle scale set create operation") diff --git a/workers/scaleset/controller_watcher.go b/workers/scaleset/controller_watcher.go index 131cb56c..99fd4617 100644 --- a/workers/scaleset/controller_watcher.go +++ b/workers/scaleset/controller_watcher.go @@ -61,10 +61,10 @@ func (c *Controller) handleScaleSet(event dbCommon.ChangePayload) { func (c *Controller) handleScaleSetCreateOperation(sSet params.ScaleSet, ghCli common.GithubClient) error { c.mux.Lock() defer c.mux.Unlock() + cache.SetEntityScaleSet(c.Entity.ID, sSet) if _, ok := c.ScaleSets[sSet.ID]; ok { slog.DebugContext(c.ctx, "scale set already exists in worker list", "scale_set_id", sSet.ID) - cache.SetEntityScaleSet(c.Entity.ID, sSet) return nil } @@ -92,7 +92,6 @@ func (c *Controller) handleScaleSetCreateOperation(sSet params.ScaleSet, ghCli c scaleSet: sSet, worker: worker, } - cache.SetEntityScaleSet(c.Entity.ID, sSet) return nil } @@ -119,6 +118,8 @@ func (c *Controller) handleScaleSetUpdateOperation(sSet params.ScaleSet) error { c.mux.Lock() defer c.mux.Unlock() + cache.SetEntityScaleSet(c.Entity.ID, sSet) + set, ok := c.ScaleSets[sSet.ID] if !ok { // Some error may have occurred when the scale set was first created, so we @@ -128,7 +129,6 @@ func (c *Controller) handleScaleSetUpdateOperation(sSet params.ScaleSet) error { } set.scaleSet = sSet c.ScaleSets[sSet.ID] = set - cache.SetEntityScaleSet(c.Entity.ID, sSet) // We let the watcher in the scale set worker handle the update operation. return nil } diff --git a/workers/scaleset/interfaces.go b/workers/scaleset/interfaces.go index 51d1d54b..ee089c25 100644 --- a/workers/scaleset/interfaces.go +++ b/workers/scaleset/interfaces.go @@ -13,4 +13,5 @@ type scaleSetHelper interface { Owner() string HandleJobsCompleted(jobs []params.ScaleSetJobMessage) error HandleJobsStarted(jobs []params.ScaleSetJobMessage) error + HandleJobsAvailable(jobs []params.ScaleSetJobMessage) error } diff --git a/workers/scaleset/scaleset_helper.go b/workers/scaleset/scaleset_helper.go index b83351f2..82a8a052 100644 --- a/workers/scaleset/scaleset_helper.go +++ b/workers/scaleset/scaleset_helper.go @@ -31,6 +31,37 @@ func (w *Worker) SetLastMessageID(id int64) error { return nil } +func (w *Worker) recordOrUpdateJob(job params.ScaleSetJobMessage) error { + entity, err := w.scaleSet.GetEntity() + if err != nil { + return fmt.Errorf("getting entity: %w", err) + } + asUUID, err := entity.GetIDAsUUID() + if err != nil { + return fmt.Errorf("getting entity ID as UUID: %w", err) + } + + jobParams := job.ToJob() + jobParams.RunnerGroupName = w.scaleSet.GitHubRunnerGroup + + switch entity.EntityType { + case params.GithubEntityTypeEnterprise: + jobParams.EnterpriseID = &asUUID + case params.GithubEntityTypeRepository: + jobParams.RepoID = &asUUID + case params.GithubEntityTypeOrganization: + jobParams.OrgID = &asUUID + default: + return fmt.Errorf("unknown entity type: %s", entity.EntityType) + } + + if _, jobErr := w.store.CreateOrUpdateJob(w.ctx, jobParams); jobErr != nil { + slog.With(slog.Any("error", jobErr)).ErrorContext( + w.ctx, "failed to update job", "job_id", jobParams.ID) + } + return nil +} + // HandleJobCompleted handles a job completed message. If a job had a runner // assigned and was not canceled before it had a chance to run, then we mark // that runner as pending_delete. @@ -39,6 +70,11 @@ func (w *Worker) HandleJobsCompleted(jobs []params.ScaleSetJobMessage) (err erro defer slog.DebugContext(w.ctx, "finished handling job completed", "jobs", jobs, "error", err) for _, job := range jobs { + if err := w.recordOrUpdateJob(job); err != nil { + // recording scale set jobs are purely informational for now. + slog.ErrorContext(w.ctx, "recording job", "job", job, "error", err) + } + if job.RunnerName == "" { // This job was not assigned to a runner, so we can skip it. continue @@ -68,6 +104,11 @@ func (w *Worker) HandleJobsStarted(jobs []params.ScaleSetJobMessage) (err error) slog.DebugContext(w.ctx, "handling job started", "jobs", jobs) defer slog.DebugContext(w.ctx, "finished handling job started", "jobs", jobs, "error", err) for _, job := range jobs { + if err := w.recordOrUpdateJob(job); err != nil { + // recording scale set jobs are purely informational for now. + slog.ErrorContext(w.ctx, "recording job", "job", job, "error", err) + } + if job.RunnerName == "" { // This should not happen, but just in case. continue @@ -93,6 +134,16 @@ func (w *Worker) HandleJobsStarted(jobs []params.ScaleSetJobMessage) (err error) return nil } +func (w *Worker) HandleJobsAvailable(jobs []params.ScaleSetJobMessage) error { + for _, job := range jobs { + if err := w.recordOrUpdateJob(job); err != nil { + // recording scale set jobs are purely informational for now. + slog.ErrorContext(w.ctx, "recording job", "job", job, "error", err) + } + } + return nil +} + func (w *Worker) SetDesiredRunnerCount(count int) error { if err := w.store.SetScaleSetDesiredRunnerCount(w.ctx, w.scaleSet.ID, count); err != nil { return fmt.Errorf("setting desired runner count: %w", err) diff --git a/workers/scaleset/scaleset_listener.go b/workers/scaleset/scaleset_listener.go index 07b3bf96..df4ab0bc 100644 --- a/workers/scaleset/scaleset_listener.go +++ b/workers/scaleset/scaleset_listener.go @@ -150,6 +150,11 @@ func (l *scaleSetListener) handleSessionMessage(msg params.RunnerScaleSetMessage slog.ErrorContext(l.ctx, "acquiring jobs", "error", err) return } + // HandleJobsAvailable only records jobs in the database for now. The jobs are purely + // informational, so an error here won't break anything. + if err := l.scaleSetHelper.HandleJobsAvailable(availableJobs); err != nil { + slog.ErrorContext(l.ctx, "error handling available jobs", "error", err) + } slog.DebugContext(l.ctx, "acquired jobs", "job_ids", idsAcquired) }