From d05df3686834794ac9c5e6adbf064e2ee8a8419a Mon Sep 17 00:00:00 2001 From: Gabriel Adrian Samfira Date: Sun, 24 Aug 2025 22:36:44 +0000 Subject: [PATCH] Attempt to use the scalset API and caching On github, attempt to use the scaleset API to list all runners without pagination. This will avoid missing runners and accidentally removing them. Fall back to paginated API if we can't use the scaleset API. Add ability to retrieve all instances from cache, for an entity. Signed-off-by: Gabriel Adrian Samfira --- cache/instance_cache.go | 20 ++++ params/github.go | 30 ++--- runner/pool/cache.go | 75 ++++++++++++ runner/pool/common.go | 82 ++----------- runner/pool/pool.go | 71 +++++------- runner/pool/util.go | 250 ++++++++++++++++++++++++++++------------ util/github/client.go | 11 +- workers/entity/util.go | 3 +- 8 files changed, 333 insertions(+), 209 deletions(-) create mode 100644 runner/pool/cache.go diff --git a/cache/instance_cache.go b/cache/instance_cache.go index baf09945..ae2c1cec 100644 --- a/cache/instance_cache.go +++ b/cache/instance_cache.go @@ -98,6 +98,22 @@ func (i *InstanceCache) GetInstancesForScaleSet(scaleSetID uint) []params.Instan return filteredInstances } +func (i *InstanceCache) GetEntityInstances(entityID string) []params.Instance { + pools := GetEntityPools(entityID) + poolsAsMap := map[string]bool{} + for _, pool := range pools { + poolsAsMap[pool.ID] = true + } + + ret := []params.Instance{} + for _, val := range i.GetAllInstances() { + if _, ok := poolsAsMap[val.PoolID]; ok { + ret = append(ret, val) + } + } + return ret +} + func SetInstanceCache(instance params.Instance) { instanceCache.SetInstance(instance) } @@ -121,3 +137,7 @@ func GetInstancesForPool(poolID string) []params.Instance { func GetInstancesForScaleSet(scaleSetID uint) []params.Instance { return instanceCache.GetInstancesForScaleSet(scaleSetID) } + +func GetEntityInstances(entityID string) []params.Instance { + return instanceCache.GetEntityInstances(entityID) +} diff --git a/params/github.go b/params/github.go index cb68d880..08f7b409 100644 --- a/params/github.go +++ b/params/github.go @@ -427,21 +427,21 @@ func (r RunnerScaleSetMessage) GetJobsFromBody() ([]ScaleSetJobMessage, error) { } type RunnerReference struct { - ID int64 `json:"id"` - Name string `json:"name"` - OS string `json:"os"` - RunnerScaleSetID int `json:"runnerScaleSetId"` - CreatedOn interface{} `json:"createdOn"` - RunnerGroupID uint64 `json:"runnerGroupId"` - RunnerGroupName string `json:"runnerGroupName"` - Version string `json:"version"` - Enabled bool `json:"enabled"` - Ephemeral bool `json:"ephemeral"` - Status interface{} `json:"status"` - DisableUpdate bool `json:"disableUpdate"` - ProvisioningState string `json:"provisioningState"` - Busy bool `json:"busy"` - Labels []Label `json:"labels,omitempty"` + ID int64 `json:"id"` + Name string `json:"name"` + OS string `json:"os"` + RunnerScaleSetID int `json:"runnerScaleSetId"` + CreatedOn any `json:"createdOn"` + RunnerGroupID uint64 `json:"runnerGroupId"` + RunnerGroupName string `json:"runnerGroupName"` + Version string `json:"version"` + Enabled bool `json:"enabled"` + Ephemeral bool `json:"ephemeral"` + Status any `json:"status"` + DisableUpdate bool `json:"disableUpdate"` + ProvisioningState string `json:"provisioningState"` + Busy bool `json:"busy"` + Labels []Label `json:"labels,omitempty"` } func (r RunnerReference) GetStatus() RunnerStatus { diff --git a/runner/pool/cache.go b/runner/pool/cache.go new file mode 100644 index 00000000..5a3a3c8c --- /dev/null +++ b/runner/pool/cache.go @@ -0,0 +1,75 @@ +package pool + +import ( + "sort" + "strings" + "sync" + "sync/atomic" + + runnerErrors "github.com/cloudbase/garm-provider-common/errors" + "github.com/cloudbase/garm/params" +) + +type poolCacheStore interface { + Next() (params.Pool, error) + Reset() + Len() int +} + +type poolRoundRobin struct { + pools []params.Pool + next uint32 +} + +func (p *poolRoundRobin) Next() (params.Pool, error) { + if len(p.pools) == 0 { + return params.Pool{}, runnerErrors.ErrNoPoolsAvailable + } + + n := atomic.AddUint32(&p.next, 1) + return p.pools[(int(n)-1)%len(p.pools)], nil +} + +func (p *poolRoundRobin) Len() int { + return len(p.pools) +} + +func (p *poolRoundRobin) Reset() { + atomic.StoreUint32(&p.next, 0) +} + +type poolsForTags struct { + pools sync.Map + poolCacheType params.PoolBalancerType +} + +func (p *poolsForTags) Get(tags []string) (poolCacheStore, bool) { + sort.Strings(tags) + key := strings.Join(tags, "^") + + v, ok := p.pools.Load(key) + if !ok { + return nil, false + } + poolCache := v.(*poolRoundRobin) + if p.poolCacheType == params.PoolBalancerTypePack { + // When we service a list of jobs, we want to try each pool in turn + // for each job. Pools are sorted by priority so we always start from the + // highest priority pool and move on to the next if the first one is full. + poolCache.Reset() + } + return poolCache, true +} + +func (p *poolsForTags) Add(tags []string, pools []params.Pool) poolCacheStore { + sort.Slice(pools, func(i, j int) bool { + return pools[i].Priority > pools[j].Priority + }) + + sort.Strings(tags) + key := strings.Join(tags, "^") + + poolRR := &poolRoundRobin{pools: pools} + v, _ := p.pools.LoadOrStore(key, poolRR) + return v.(*poolRoundRobin) +} diff --git a/runner/pool/common.go b/runner/pool/common.go index fa2f7e5a..a41e034d 100644 --- a/runner/pool/common.go +++ b/runner/pool/common.go @@ -14,79 +14,15 @@ package pool -import ( - "context" - "fmt" - "net/http" - "net/url" - "strings" - - "github.com/google/go-github/v72/github" - - runnerErrors "github.com/cloudbase/garm-provider-common/errors" - "github.com/cloudbase/garm/params" -) - -func validateHookRequest(controllerID, baseURL string, allHooks []*github.Hook, req *github.Hook) error { - parsed, err := url.Parse(baseURL) - if err != nil { - return fmt.Errorf("error parsing webhook url: %w", err) - } - - partialMatches := []string{} - for _, hook := range allHooks { - hookURL := strings.ToLower(hook.Config.GetURL()) - if hookURL == "" { - continue - } - - if hook.Config.GetURL() == req.Config.GetURL() { - return runnerErrors.NewConflictError("hook already installed") - } else if strings.Contains(hookURL, controllerID) || strings.Contains(hookURL, parsed.Hostname()) { - partialMatches = append(partialMatches, hook.Config.GetURL()) - } - } - - if len(partialMatches) > 0 { - return runnerErrors.NewConflictError("a webhook containing the controller ID or hostname of this contreoller is already installed on this repository") - } - - return nil +type RunnerLabels struct { + ID int64 `json:"id,omitempty"` + Name string `json:"name,omitempty"` + Type string `json:"type,omitempty"` } -func hookToParamsHookInfo(hook *github.Hook) params.HookInfo { - hookURL := hook.Config.GetURL() - - insecureSSLConfig := hook.Config.GetInsecureSSL() - insecureSSL := insecureSSLConfig == "1" - - return params.HookInfo{ - ID: *hook.ID, - URL: hookURL, - Events: hook.Events, - Active: *hook.Active, - InsecureSSL: insecureSSL, - } -} - -func (r *basePoolManager) listHooks(ctx context.Context) ([]*github.Hook, error) { - opts := github.ListOptions{ - PerPage: 100, - } - var allHooks []*github.Hook - for { - hooks, ghResp, err := r.ghcli.ListEntityHooks(ctx, &opts) - if err != nil { - if ghResp != nil && ghResp.StatusCode == http.StatusNotFound { - return nil, runnerErrors.NewBadRequestError("repository not found or your PAT does not have access to manage webhooks") - } - return nil, fmt.Errorf("error fetching hooks: %w", err) - } - allHooks = append(allHooks, hooks...) - if ghResp.NextPage == 0 { - break - } - opts.Page = ghResp.NextPage - } - return allHooks, nil +type forgeRunner struct { + ID int64 `json:"id,omitempty"` + Name string `json:"name,omitempty"` + Status string `json:"status,omitempty"` + Labels []RunnerLabels `json:"labels,omitempty"` } diff --git a/runner/pool/pool.go b/runner/pool/pool.go index 97ffa992..eecb500a 100644 --- a/runner/pool/pool.go +++ b/runner/pool/pool.go @@ -44,6 +44,7 @@ import ( "github.com/cloudbase/garm/runner/common" garmUtil "github.com/cloudbase/garm/util" ghClient "github.com/cloudbase/garm/util/github" + "github.com/cloudbase/garm/util/github/scalesets" ) var ( @@ -104,11 +105,19 @@ func NewEntityPoolManager(ctx context.Context, entity params.ForgeEntity, instan return nil, fmt.Errorf("error creating backoff: %w", err) } + var scaleSetCli *scalesets.ScaleSetClient + if entity.Credentials.ForgeType == params.GithubEndpointType { + scaleSetCli, err = scalesets.NewClient(ghc) + if err != nil { + return nil, fmt.Errorf("failed to get scalesets client: %w", err) + } + } repo := &basePoolManager{ ctx: ctx, consumerID: consumerID, entity: entity, ghcli: ghc, + scaleSetClient: scaleSetCli, controllerInfo: controllerInfo, instanceTokenGetter: instanceTokenGetter, @@ -127,6 +136,7 @@ type basePoolManager struct { consumerID string entity params.ForgeEntity ghcli common.GithubClient + scaleSetClient *scalesets.ScaleSetClient controllerInfo params.ControllerInfo instanceTokenGetter auth.InstanceTokenGetter consumer dbCommon.Consumer @@ -393,7 +403,7 @@ func (r *basePoolManager) updateTools() error { // happens, github will remove the ephemeral worker and send a webhook our way. // If we were offline and did not process the webhook, the instance will linger. // We need to remove it from the provider and database. -func (r *basePoolManager) cleanupOrphanedProviderRunners(runners []*github.Runner) error { +func (r *basePoolManager) cleanupOrphanedProviderRunners(runners []forgeRunner) error { dbInstances, err := r.store.ListEntityInstances(r.ctx, r.entity) if err != nil { return fmt.Errorf("error fetching instances from db: %w", err) @@ -404,10 +414,10 @@ func (r *basePoolManager) cleanupOrphanedProviderRunners(runners []*github.Runne if !isManagedRunner(labelsFromRunner(run), r.controllerInfo.ControllerID.String()) { slog.DebugContext( r.ctx, "runner is not managed by a pool we manage", - "runner_name", run.GetName()) + "runner_name", run.Name) continue } - runnerNames[*run.Name] = true + runnerNames[run.Name] = true } for _, instance := range dbInstances { @@ -473,21 +483,21 @@ func (r *basePoolManager) cleanupOrphanedProviderRunners(runners []*github.Runne // reapTimedOutRunners will mark as pending_delete any runner that has a status // of "running" in the provider, but that has not registered with Github, and has // received no new updates in the configured timeout interval. -func (r *basePoolManager) reapTimedOutRunners(runners []*github.Runner) error { +func (r *basePoolManager) reapTimedOutRunners(runners []forgeRunner) error { dbInstances, err := r.store.ListEntityInstances(r.ctx, r.entity) if err != nil { return fmt.Errorf("error fetching instances from db: %w", err) } - runnersByName := map[string]*github.Runner{} + runnersByName := map[string]forgeRunner{} for _, run := range runners { if !isManagedRunner(labelsFromRunner(run), r.controllerInfo.ControllerID.String()) { slog.DebugContext( r.ctx, "runner is not managed by a pool we manage", - "runner_name", run.GetName()) + "runner_name", run.Name) continue } - runnersByName[*run.Name] = run + runnersByName[run.Name] = run } for _, instance := range dbInstances { @@ -521,7 +531,7 @@ func (r *basePoolManager) reapTimedOutRunners(runners []*github.Runner) error { // * The runner managed to join github, but the setup process failed later and the runner // never started on the instance. // * A JIT config was created, but the runner never joined github. - if runner, ok := runnersByName[instance.Name]; !ok || runner.GetStatus() == "offline" { + if runner, ok := runnersByName[instance.Name]; !ok || runner.Status == "offline" { slog.InfoContext( r.ctx, "reaping timed-out/failed runner", "runner_name", instance.Name) @@ -540,24 +550,24 @@ func (r *basePoolManager) reapTimedOutRunners(runners []*github.Runner) error { // as offline and for which we no longer have a local instance. // This may happen if someone manually deletes the instance in the provider. We need to // first remove the instance from github, and then from our database. -func (r *basePoolManager) cleanupOrphanedGithubRunners(runners []*github.Runner) error { +func (r *basePoolManager) cleanupOrphanedGithubRunners(runners []forgeRunner) error { poolInstanceCache := map[string][]commonParams.ProviderInstance{} g, ctx := errgroup.WithContext(r.ctx) for _, runner := range runners { if !isManagedRunner(labelsFromRunner(runner), r.controllerInfo.ControllerID.String()) { slog.DebugContext( r.ctx, "runner is not managed by a pool we manage", - "runner_name", runner.GetName()) + "runner_name", runner.Name) continue } - status := runner.GetStatus() + status := runner.Status if status != "offline" { // Runner is online. Ignore it. continue } - dbInstance, err := r.store.GetInstance(r.ctx, *runner.Name) + dbInstance, err := r.store.GetInstance(r.ctx, runner.Name) if err != nil { if !errors.Is(err, runnerErrors.ErrNotFound) { return fmt.Errorf("error fetching instance from DB: %w", err) @@ -566,8 +576,8 @@ func (r *basePoolManager) cleanupOrphanedGithubRunners(runners []*github.Runner) // Previous forceful removal may have failed? slog.InfoContext( r.ctx, "Runner has no database entry in garm, removing from github", - "runner_name", runner.GetName()) - if err := r.ghcli.RemoveEntityRunner(r.ctx, runner.GetID()); err != nil { + "runner_name", runner.Name) + if err := r.ghcli.RemoveEntityRunner(r.ctx, runner.ID); err != nil { // Removed in the meantime? if errors.Is(err, runnerErrors.ErrNotFound) { continue @@ -655,7 +665,7 @@ func (r *basePoolManager) cleanupOrphanedGithubRunners(runners []*github.Runner) slog.InfoContext( r.ctx, "Runner instance is no longer on the provider, removing from github", "runner_name", dbInstance.Name) - if err := r.ghcli.RemoveEntityRunner(r.ctx, runner.GetID()); err != nil { + if err := r.ghcli.RemoveEntityRunner(r.ctx, runner.ID); err != nil { // Removed in the meantime? if errors.Is(err, runnerErrors.ErrNotFound) { slog.DebugContext( @@ -767,8 +777,7 @@ func (r *basePoolManager) AddRunner(ctx context.Context, poolID string, aditiona jitConfig := make(map[string]string) var runner *github.Runner - if !provider.DisableJITConfig() { - // Attempt to create JIT config + if !provider.DisableJITConfig() && r.entity.Credentials.ForgeType != params.GiteaEndpointType { jitConfig, runner, err = r.ghcli.GetEntityJITConfig(ctx, name, pool, labels) if err != nil { return fmt.Errorf("failed to generate JIT config: %w", err) @@ -1606,7 +1615,7 @@ func (r *basePoolManager) runnerCleanup() (err error) { return nil } -func (r *basePoolManager) cleanupOrphanedRunners(runners []*github.Runner) error { +func (r *basePoolManager) cleanupOrphanedRunners(runners []forgeRunner) error { if err := r.cleanupOrphanedProviderRunners(runners); err != nil { return fmt.Errorf("error cleaning orphaned instances: %w", err) } @@ -2014,32 +2023,6 @@ func (r *basePoolManager) FetchTools() ([]commonParams.RunnerApplicationDownload return ret, nil } -func (r *basePoolManager) GetGithubRunners() ([]*github.Runner, error) { - opts := github.ListRunnersOptions{ - ListOptions: github.ListOptions{ - PerPage: 100, - }, - } - var allRunners []*github.Runner - - for { - runners, ghResp, err := r.ghcli.ListEntityRunners(r.ctx, &opts) - if err != nil { - if ghResp != nil && ghResp.StatusCode == http.StatusUnauthorized { - return nil, runnerErrors.NewUnauthorizedError("error fetching runners") - } - return nil, fmt.Errorf("error fetching runners: %w", err) - } - allRunners = append(allRunners, runners.Runners...) - if ghResp.NextPage == 0 { - break - } - opts.Page = ghResp.NextPage - } - - return allRunners, nil -} - func (r *basePoolManager) GetWebhookInfo(ctx context.Context) (params.HookInfo, error) { allHooks, err := r.listHooks(ctx) if err != nil { diff --git a/runner/pool/util.go b/runner/pool/util.go index c6c311c8..d58f90a3 100644 --- a/runner/pool/util.go +++ b/runner/pool/util.go @@ -15,10 +15,12 @@ package pool import ( - "sort" + "context" + "fmt" + "log/slog" + "net/http" + "net/url" "strings" - "sync" - "sync/atomic" "time" "github.com/google/go-github/v72/github" @@ -31,70 +33,6 @@ import ( "github.com/cloudbase/garm/params" ) -type poolCacheStore interface { - Next() (params.Pool, error) - Reset() - Len() int -} - -type poolRoundRobin struct { - pools []params.Pool - next uint32 -} - -func (p *poolRoundRobin) Next() (params.Pool, error) { - if len(p.pools) == 0 { - return params.Pool{}, runnerErrors.ErrNoPoolsAvailable - } - - n := atomic.AddUint32(&p.next, 1) - return p.pools[(int(n)-1)%len(p.pools)], nil -} - -func (p *poolRoundRobin) Len() int { - return len(p.pools) -} - -func (p *poolRoundRobin) Reset() { - atomic.StoreUint32(&p.next, 0) -} - -type poolsForTags struct { - pools sync.Map - poolCacheType params.PoolBalancerType -} - -func (p *poolsForTags) Get(tags []string) (poolCacheStore, bool) { - sort.Strings(tags) - key := strings.Join(tags, "^") - - v, ok := p.pools.Load(key) - if !ok { - return nil, false - } - poolCache := v.(*poolRoundRobin) - if p.poolCacheType == params.PoolBalancerTypePack { - // When we service a list of jobs, we want to try each pool in turn - // for each job. Pools are sorted by priority so we always start from the - // highest priority pool and move on to the next if the first one is full. - poolCache.Reset() - } - return poolCache, true -} - -func (p *poolsForTags) Add(tags []string, pools []params.Pool) poolCacheStore { - sort.Slice(pools, func(i, j int) bool { - return pools[i].Priority > pools[j].Priority - }) - - sort.Strings(tags) - key := strings.Join(tags, "^") - - poolRR := &poolRoundRobin{pools: pools} - v, _ := p.pools.LoadOrStore(key, poolRR) - return v.(*poolRoundRobin) -} - func instanceInList(instanceName string, instances []commonParams.ProviderInstance) (commonParams.ProviderInstance, bool) { for _, val := range instances { if val.Name == instanceName { @@ -114,17 +52,14 @@ func controllerIDFromLabels(labels []string) string { return "" } -func labelsFromRunner(runner *github.Runner) []string { - if runner == nil || runner.Labels == nil { +func labelsFromRunner(runner forgeRunner) []string { + if runner.Labels == nil { return []string{} } var labels []string for _, val := range runner.Labels { - if val == nil { - continue - } - labels = append(labels, val.GetName()) + labels = append(labels, val.Name) } return labels } @@ -167,3 +102,172 @@ func (r *basePoolManager) waitForToolsOrCancel() (hasTools, stopped bool) { return false, true } } + +func validateHookRequest(controllerID, baseURL string, allHooks []*github.Hook, req *github.Hook) error { + parsed, err := url.Parse(baseURL) + if err != nil { + return fmt.Errorf("error parsing webhook url: %w", err) + } + + partialMatches := []string{} + for _, hook := range allHooks { + hookURL := strings.ToLower(hook.Config.GetURL()) + if hookURL == "" { + continue + } + + if hook.Config.GetURL() == req.Config.GetURL() { + return runnerErrors.NewConflictError("hook already installed") + } else if strings.Contains(hookURL, controllerID) || strings.Contains(hookURL, parsed.Hostname()) { + partialMatches = append(partialMatches, hook.Config.GetURL()) + } + } + + if len(partialMatches) > 0 { + return runnerErrors.NewConflictError("a webhook containing the controller ID or hostname of this contreoller is already installed on this repository") + } + + return nil +} + +func hookToParamsHookInfo(hook *github.Hook) params.HookInfo { + hookURL := hook.Config.GetURL() + + insecureSSLConfig := hook.Config.GetInsecureSSL() + insecureSSL := insecureSSLConfig == "1" + + return params.HookInfo{ + ID: *hook.ID, + URL: hookURL, + Events: hook.Events, + Active: *hook.Active, + InsecureSSL: insecureSSL, + } +} + +func (r *basePoolManager) listHooks(ctx context.Context) ([]*github.Hook, error) { + opts := github.ListOptions{ + PerPage: 100, + } + var allHooks []*github.Hook + for { + hooks, ghResp, err := r.ghcli.ListEntityHooks(ctx, &opts) + if err != nil { + if ghResp != nil && ghResp.StatusCode == http.StatusNotFound { + return nil, runnerErrors.NewBadRequestError("repository not found or your PAT does not have access to manage webhooks") + } + return nil, fmt.Errorf("error fetching hooks: %w", err) + } + allHooks = append(allHooks, hooks...) + if ghResp.NextPage == 0 { + break + } + opts.Page = ghResp.NextPage + } + return allHooks, nil +} + +func (r *basePoolManager) listRunnersWithPagination() ([]forgeRunner, error) { + opts := github.ListRunnersOptions{ + ListOptions: github.ListOptions{ + PerPage: 100, + }, + } + var allRunners []*github.Runner + + // Paginating like this can lead to a situation where if we have many pages of runners, + // while we paginate, a particular runner can move from page n to page n-1 while we move + // from page n-1 to page n. In situations such as that, we end up with a list of runners + // that does not contain the runner that swapped pages while we were paginating. + // Sadly, the GitHub API does not allow listing more than 100 runners per page. + for { + runners, ghResp, err := r.ghcli.ListEntityRunners(r.ctx, &opts) + if err != nil { + if ghResp != nil && ghResp.StatusCode == http.StatusUnauthorized { + return nil, runnerErrors.NewUnauthorizedError("error fetching runners") + } + return nil, fmt.Errorf("error fetching runners: %w", err) + } + allRunners = append(allRunners, runners.Runners...) + if ghResp.NextPage == 0 { + break + } + opts.Page = ghResp.NextPage + } + + ret := make([]forgeRunner, len(allRunners)) + for idx, val := range allRunners { + ret[idx] = forgeRunner{ + ID: val.GetID(), + Name: val.GetName(), + Status: val.GetStatus(), + Labels: make([]RunnerLabels, len(val.Labels)), + } + for labelIdx, label := range val.Labels { + ret[idx].Labels[labelIdx] = RunnerLabels{ + Name: label.GetName(), + Type: label.GetType(), + ID: label.GetID(), + } + } + } + + return ret, nil +} + +func (r *basePoolManager) listRunnersWithScaleSetAPI() ([]forgeRunner, error) { + if r.scaleSetClient == nil { + return nil, fmt.Errorf("scaleset client not initialized") + } + + runners, err := r.scaleSetClient.ListAllRunners(r.ctx) + if err != nil { + return nil, fmt.Errorf("failed to list runners through scaleset API: %w", err) + } + + ret := []forgeRunner{} + for _, runner := range runners.RunnerReferences { + if runner.RunnerScaleSetID != 0 { + // skip scale set runners. + continue + } + run := forgeRunner{ + Name: runner.Name, + ID: runner.ID, + Status: string(runner.GetStatus()), + Labels: make([]RunnerLabels, len(runner.Labels)), + } + for labelIDX, label := range runner.Labels { + run.Labels[labelIDX] = RunnerLabels{ + Name: label.Name, + Type: label.Type, + } + } + ret = append(ret, run) + } + return ret, nil +} + +func (r *basePoolManager) GetGithubRunners() ([]forgeRunner, error) { + // Gitea has no scale sets API + if r.scaleSetClient == nil { + return r.listRunnersWithPagination() + } + + // try the scale sets API for github + runners, err := r.listRunnersWithScaleSetAPI() + if err != nil { + slog.WarnContext(r.ctx, "failed to list runners via scaleset API; falling back to pagination", "error", err) + return r.listRunnersWithPagination() + } + + entityInstances := cache.GetEntityInstances(r.entity.ID) + if len(entityInstances) > 0 && len(runners) == 0 { + // I have trust issues in the undocumented API. We seem to have runners for this + // entity, but the scaleset API returned nothing and no error. Fall back to pagination. + slog.DebugContext(r.ctx, "the scaleset api returned nothing, but we seem to have runners in the db; falling back to paginated API runner list") + return r.listRunnersWithPagination() + } + slog.DebugContext(r.ctx, "Scaleset API runner list succeeded", "runners", runners) + return runners, nil +} diff --git a/util/github/client.go b/util/github/client.go index 7c1dc11b..b4ca32e5 100644 --- a/util/github/client.go +++ b/util/github/client.go @@ -422,12 +422,19 @@ func (g *githubClient) getEnterpriseRunnerGroupIDByName(ctx context.Context, ent func (g *githubClient) GetEntityRunnerGroupIDByName(ctx context.Context, runnerGroupName string) (int64, error) { var rgID int64 = 1 + + if g.entity.EntityType == params.ForgeEntityTypeRepository { + // This is a repository. Runner groups are supported at the org and + // enterprise levels. Return the default runner group id, early. + return rgID, nil + } + var ok bool var err error // attempt to get the runner group ID from cache. Cache will invalidate after 1 hour. if runnerGroupName != "" && !strings.EqualFold(runnerGroupName, "default") { rgID, ok = cache.GetEntityRunnerGroup(g.entity.ID, runnerGroupName) - if !ok { + if !ok || rgID == 0 { switch g.entity.EntityType { case params.ForgeEntityTypeOrganization: rgID, err = g.getOrganizationRunnerGroupIDByName(ctx, g.entity, runnerGroupName) @@ -450,7 +457,7 @@ func (g *githubClient) GetEntityJITConfig(ctx context.Context, instance string, if err != nil { return nil, nil, fmt.Errorf("failed to get runner group: %w", err) } - + slog.DebugContext(ctx, "using runner group", "group_name", pool.GitHubRunnerGroup, "runner_group_id", rgID) req := github.GenerateJITConfigRequest{ Name: instance, RunnerGroupID: rgID, diff --git a/workers/entity/util.go b/workers/entity/util.go index 38e011b0..23dbc488 100644 --- a/workers/entity/util.go +++ b/workers/entity/util.go @@ -26,8 +26,7 @@ import ( const ( // These are duplicated until we decide if we move the pool manager to the new // worker flow. - poolIDLabelprefix = "runner-pool-id:" - controllerLabelPrefix = "runner-controller-id:" + poolIDLabelprefix = "runner-pool-id:" ) func composeControllerWatcherFilters() dbCommon.PayloadFilterFunc {