Merge pull request #495 from gabriel-samfira/caching-and-fixes
Attempt to use the scalset API and caching
This commit is contained in:
commit
1fceec374d
8 changed files with 333 additions and 209 deletions
20
cache/instance_cache.go
vendored
20
cache/instance_cache.go
vendored
|
|
@ -98,6 +98,22 @@ func (i *InstanceCache) GetInstancesForScaleSet(scaleSetID uint) []params.Instan
|
|||
return filteredInstances
|
||||
}
|
||||
|
||||
func (i *InstanceCache) GetEntityInstances(entityID string) []params.Instance {
|
||||
pools := GetEntityPools(entityID)
|
||||
poolsAsMap := map[string]bool{}
|
||||
for _, pool := range pools {
|
||||
poolsAsMap[pool.ID] = true
|
||||
}
|
||||
|
||||
ret := []params.Instance{}
|
||||
for _, val := range i.GetAllInstances() {
|
||||
if _, ok := poolsAsMap[val.PoolID]; ok {
|
||||
ret = append(ret, val)
|
||||
}
|
||||
}
|
||||
return ret
|
||||
}
|
||||
|
||||
func SetInstanceCache(instance params.Instance) {
|
||||
instanceCache.SetInstance(instance)
|
||||
}
|
||||
|
|
@ -121,3 +137,7 @@ func GetInstancesForPool(poolID string) []params.Instance {
|
|||
func GetInstancesForScaleSet(scaleSetID uint) []params.Instance {
|
||||
return instanceCache.GetInstancesForScaleSet(scaleSetID)
|
||||
}
|
||||
|
||||
func GetEntityInstances(entityID string) []params.Instance {
|
||||
return instanceCache.GetEntityInstances(entityID)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -427,21 +427,21 @@ func (r RunnerScaleSetMessage) GetJobsFromBody() ([]ScaleSetJobMessage, error) {
|
|||
}
|
||||
|
||||
type RunnerReference struct {
|
||||
ID int64 `json:"id"`
|
||||
Name string `json:"name"`
|
||||
OS string `json:"os"`
|
||||
RunnerScaleSetID int `json:"runnerScaleSetId"`
|
||||
CreatedOn interface{} `json:"createdOn"`
|
||||
RunnerGroupID uint64 `json:"runnerGroupId"`
|
||||
RunnerGroupName string `json:"runnerGroupName"`
|
||||
Version string `json:"version"`
|
||||
Enabled bool `json:"enabled"`
|
||||
Ephemeral bool `json:"ephemeral"`
|
||||
Status interface{} `json:"status"`
|
||||
DisableUpdate bool `json:"disableUpdate"`
|
||||
ProvisioningState string `json:"provisioningState"`
|
||||
Busy bool `json:"busy"`
|
||||
Labels []Label `json:"labels,omitempty"`
|
||||
ID int64 `json:"id"`
|
||||
Name string `json:"name"`
|
||||
OS string `json:"os"`
|
||||
RunnerScaleSetID int `json:"runnerScaleSetId"`
|
||||
CreatedOn any `json:"createdOn"`
|
||||
RunnerGroupID uint64 `json:"runnerGroupId"`
|
||||
RunnerGroupName string `json:"runnerGroupName"`
|
||||
Version string `json:"version"`
|
||||
Enabled bool `json:"enabled"`
|
||||
Ephemeral bool `json:"ephemeral"`
|
||||
Status any `json:"status"`
|
||||
DisableUpdate bool `json:"disableUpdate"`
|
||||
ProvisioningState string `json:"provisioningState"`
|
||||
Busy bool `json:"busy"`
|
||||
Labels []Label `json:"labels,omitempty"`
|
||||
}
|
||||
|
||||
func (r RunnerReference) GetStatus() RunnerStatus {
|
||||
|
|
|
|||
75
runner/pool/cache.go
Normal file
75
runner/pool/cache.go
Normal file
|
|
@ -0,0 +1,75 @@
|
|||
package pool
|
||||
|
||||
import (
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
||||
runnerErrors "github.com/cloudbase/garm-provider-common/errors"
|
||||
"github.com/cloudbase/garm/params"
|
||||
)
|
||||
|
||||
type poolCacheStore interface {
|
||||
Next() (params.Pool, error)
|
||||
Reset()
|
||||
Len() int
|
||||
}
|
||||
|
||||
type poolRoundRobin struct {
|
||||
pools []params.Pool
|
||||
next uint32
|
||||
}
|
||||
|
||||
func (p *poolRoundRobin) Next() (params.Pool, error) {
|
||||
if len(p.pools) == 0 {
|
||||
return params.Pool{}, runnerErrors.ErrNoPoolsAvailable
|
||||
}
|
||||
|
||||
n := atomic.AddUint32(&p.next, 1)
|
||||
return p.pools[(int(n)-1)%len(p.pools)], nil
|
||||
}
|
||||
|
||||
func (p *poolRoundRobin) Len() int {
|
||||
return len(p.pools)
|
||||
}
|
||||
|
||||
func (p *poolRoundRobin) Reset() {
|
||||
atomic.StoreUint32(&p.next, 0)
|
||||
}
|
||||
|
||||
type poolsForTags struct {
|
||||
pools sync.Map
|
||||
poolCacheType params.PoolBalancerType
|
||||
}
|
||||
|
||||
func (p *poolsForTags) Get(tags []string) (poolCacheStore, bool) {
|
||||
sort.Strings(tags)
|
||||
key := strings.Join(tags, "^")
|
||||
|
||||
v, ok := p.pools.Load(key)
|
||||
if !ok {
|
||||
return nil, false
|
||||
}
|
||||
poolCache := v.(*poolRoundRobin)
|
||||
if p.poolCacheType == params.PoolBalancerTypePack {
|
||||
// When we service a list of jobs, we want to try each pool in turn
|
||||
// for each job. Pools are sorted by priority so we always start from the
|
||||
// highest priority pool and move on to the next if the first one is full.
|
||||
poolCache.Reset()
|
||||
}
|
||||
return poolCache, true
|
||||
}
|
||||
|
||||
func (p *poolsForTags) Add(tags []string, pools []params.Pool) poolCacheStore {
|
||||
sort.Slice(pools, func(i, j int) bool {
|
||||
return pools[i].Priority > pools[j].Priority
|
||||
})
|
||||
|
||||
sort.Strings(tags)
|
||||
key := strings.Join(tags, "^")
|
||||
|
||||
poolRR := &poolRoundRobin{pools: pools}
|
||||
v, _ := p.pools.LoadOrStore(key, poolRR)
|
||||
return v.(*poolRoundRobin)
|
||||
}
|
||||
|
|
@ -14,79 +14,15 @@
|
|||
|
||||
package pool
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strings"
|
||||
|
||||
"github.com/google/go-github/v72/github"
|
||||
|
||||
runnerErrors "github.com/cloudbase/garm-provider-common/errors"
|
||||
"github.com/cloudbase/garm/params"
|
||||
)
|
||||
|
||||
func validateHookRequest(controllerID, baseURL string, allHooks []*github.Hook, req *github.Hook) error {
|
||||
parsed, err := url.Parse(baseURL)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error parsing webhook url: %w", err)
|
||||
}
|
||||
|
||||
partialMatches := []string{}
|
||||
for _, hook := range allHooks {
|
||||
hookURL := strings.ToLower(hook.Config.GetURL())
|
||||
if hookURL == "" {
|
||||
continue
|
||||
}
|
||||
|
||||
if hook.Config.GetURL() == req.Config.GetURL() {
|
||||
return runnerErrors.NewConflictError("hook already installed")
|
||||
} else if strings.Contains(hookURL, controllerID) || strings.Contains(hookURL, parsed.Hostname()) {
|
||||
partialMatches = append(partialMatches, hook.Config.GetURL())
|
||||
}
|
||||
}
|
||||
|
||||
if len(partialMatches) > 0 {
|
||||
return runnerErrors.NewConflictError("a webhook containing the controller ID or hostname of this contreoller is already installed on this repository")
|
||||
}
|
||||
|
||||
return nil
|
||||
type RunnerLabels struct {
|
||||
ID int64 `json:"id,omitempty"`
|
||||
Name string `json:"name,omitempty"`
|
||||
Type string `json:"type,omitempty"`
|
||||
}
|
||||
|
||||
func hookToParamsHookInfo(hook *github.Hook) params.HookInfo {
|
||||
hookURL := hook.Config.GetURL()
|
||||
|
||||
insecureSSLConfig := hook.Config.GetInsecureSSL()
|
||||
insecureSSL := insecureSSLConfig == "1"
|
||||
|
||||
return params.HookInfo{
|
||||
ID: *hook.ID,
|
||||
URL: hookURL,
|
||||
Events: hook.Events,
|
||||
Active: *hook.Active,
|
||||
InsecureSSL: insecureSSL,
|
||||
}
|
||||
}
|
||||
|
||||
func (r *basePoolManager) listHooks(ctx context.Context) ([]*github.Hook, error) {
|
||||
opts := github.ListOptions{
|
||||
PerPage: 100,
|
||||
}
|
||||
var allHooks []*github.Hook
|
||||
for {
|
||||
hooks, ghResp, err := r.ghcli.ListEntityHooks(ctx, &opts)
|
||||
if err != nil {
|
||||
if ghResp != nil && ghResp.StatusCode == http.StatusNotFound {
|
||||
return nil, runnerErrors.NewBadRequestError("repository not found or your PAT does not have access to manage webhooks")
|
||||
}
|
||||
return nil, fmt.Errorf("error fetching hooks: %w", err)
|
||||
}
|
||||
allHooks = append(allHooks, hooks...)
|
||||
if ghResp.NextPage == 0 {
|
||||
break
|
||||
}
|
||||
opts.Page = ghResp.NextPage
|
||||
}
|
||||
return allHooks, nil
|
||||
type forgeRunner struct {
|
||||
ID int64 `json:"id,omitempty"`
|
||||
Name string `json:"name,omitempty"`
|
||||
Status string `json:"status,omitempty"`
|
||||
Labels []RunnerLabels `json:"labels,omitempty"`
|
||||
}
|
||||
|
|
|
|||
|
|
@ -44,6 +44,7 @@ import (
|
|||
"github.com/cloudbase/garm/runner/common"
|
||||
garmUtil "github.com/cloudbase/garm/util"
|
||||
ghClient "github.com/cloudbase/garm/util/github"
|
||||
"github.com/cloudbase/garm/util/github/scalesets"
|
||||
)
|
||||
|
||||
var (
|
||||
|
|
@ -104,11 +105,19 @@ func NewEntityPoolManager(ctx context.Context, entity params.ForgeEntity, instan
|
|||
return nil, fmt.Errorf("error creating backoff: %w", err)
|
||||
}
|
||||
|
||||
var scaleSetCli *scalesets.ScaleSetClient
|
||||
if entity.Credentials.ForgeType == params.GithubEndpointType {
|
||||
scaleSetCli, err = scalesets.NewClient(ghc)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get scalesets client: %w", err)
|
||||
}
|
||||
}
|
||||
repo := &basePoolManager{
|
||||
ctx: ctx,
|
||||
consumerID: consumerID,
|
||||
entity: entity,
|
||||
ghcli: ghc,
|
||||
scaleSetClient: scaleSetCli,
|
||||
controllerInfo: controllerInfo,
|
||||
instanceTokenGetter: instanceTokenGetter,
|
||||
|
||||
|
|
@ -127,6 +136,7 @@ type basePoolManager struct {
|
|||
consumerID string
|
||||
entity params.ForgeEntity
|
||||
ghcli common.GithubClient
|
||||
scaleSetClient *scalesets.ScaleSetClient
|
||||
controllerInfo params.ControllerInfo
|
||||
instanceTokenGetter auth.InstanceTokenGetter
|
||||
consumer dbCommon.Consumer
|
||||
|
|
@ -393,7 +403,7 @@ func (r *basePoolManager) updateTools() error {
|
|||
// happens, github will remove the ephemeral worker and send a webhook our way.
|
||||
// If we were offline and did not process the webhook, the instance will linger.
|
||||
// We need to remove it from the provider and database.
|
||||
func (r *basePoolManager) cleanupOrphanedProviderRunners(runners []*github.Runner) error {
|
||||
func (r *basePoolManager) cleanupOrphanedProviderRunners(runners []forgeRunner) error {
|
||||
dbInstances, err := r.store.ListEntityInstances(r.ctx, r.entity)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error fetching instances from db: %w", err)
|
||||
|
|
@ -404,10 +414,10 @@ func (r *basePoolManager) cleanupOrphanedProviderRunners(runners []*github.Runne
|
|||
if !isManagedRunner(labelsFromRunner(run), r.controllerInfo.ControllerID.String()) {
|
||||
slog.DebugContext(
|
||||
r.ctx, "runner is not managed by a pool we manage",
|
||||
"runner_name", run.GetName())
|
||||
"runner_name", run.Name)
|
||||
continue
|
||||
}
|
||||
runnerNames[*run.Name] = true
|
||||
runnerNames[run.Name] = true
|
||||
}
|
||||
|
||||
for _, instance := range dbInstances {
|
||||
|
|
@ -473,21 +483,21 @@ func (r *basePoolManager) cleanupOrphanedProviderRunners(runners []*github.Runne
|
|||
// reapTimedOutRunners will mark as pending_delete any runner that has a status
|
||||
// of "running" in the provider, but that has not registered with Github, and has
|
||||
// received no new updates in the configured timeout interval.
|
||||
func (r *basePoolManager) reapTimedOutRunners(runners []*github.Runner) error {
|
||||
func (r *basePoolManager) reapTimedOutRunners(runners []forgeRunner) error {
|
||||
dbInstances, err := r.store.ListEntityInstances(r.ctx, r.entity)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error fetching instances from db: %w", err)
|
||||
}
|
||||
|
||||
runnersByName := map[string]*github.Runner{}
|
||||
runnersByName := map[string]forgeRunner{}
|
||||
for _, run := range runners {
|
||||
if !isManagedRunner(labelsFromRunner(run), r.controllerInfo.ControllerID.String()) {
|
||||
slog.DebugContext(
|
||||
r.ctx, "runner is not managed by a pool we manage",
|
||||
"runner_name", run.GetName())
|
||||
"runner_name", run.Name)
|
||||
continue
|
||||
}
|
||||
runnersByName[*run.Name] = run
|
||||
runnersByName[run.Name] = run
|
||||
}
|
||||
|
||||
for _, instance := range dbInstances {
|
||||
|
|
@ -521,7 +531,7 @@ func (r *basePoolManager) reapTimedOutRunners(runners []*github.Runner) error {
|
|||
// * The runner managed to join github, but the setup process failed later and the runner
|
||||
// never started on the instance.
|
||||
// * A JIT config was created, but the runner never joined github.
|
||||
if runner, ok := runnersByName[instance.Name]; !ok || runner.GetStatus() == "offline" {
|
||||
if runner, ok := runnersByName[instance.Name]; !ok || runner.Status == "offline" {
|
||||
slog.InfoContext(
|
||||
r.ctx, "reaping timed-out/failed runner",
|
||||
"runner_name", instance.Name)
|
||||
|
|
@ -540,24 +550,24 @@ func (r *basePoolManager) reapTimedOutRunners(runners []*github.Runner) error {
|
|||
// as offline and for which we no longer have a local instance.
|
||||
// This may happen if someone manually deletes the instance in the provider. We need to
|
||||
// first remove the instance from github, and then from our database.
|
||||
func (r *basePoolManager) cleanupOrphanedGithubRunners(runners []*github.Runner) error {
|
||||
func (r *basePoolManager) cleanupOrphanedGithubRunners(runners []forgeRunner) error {
|
||||
poolInstanceCache := map[string][]commonParams.ProviderInstance{}
|
||||
g, ctx := errgroup.WithContext(r.ctx)
|
||||
for _, runner := range runners {
|
||||
if !isManagedRunner(labelsFromRunner(runner), r.controllerInfo.ControllerID.String()) {
|
||||
slog.DebugContext(
|
||||
r.ctx, "runner is not managed by a pool we manage",
|
||||
"runner_name", runner.GetName())
|
||||
"runner_name", runner.Name)
|
||||
continue
|
||||
}
|
||||
|
||||
status := runner.GetStatus()
|
||||
status := runner.Status
|
||||
if status != "offline" {
|
||||
// Runner is online. Ignore it.
|
||||
continue
|
||||
}
|
||||
|
||||
dbInstance, err := r.store.GetInstance(r.ctx, *runner.Name)
|
||||
dbInstance, err := r.store.GetInstance(r.ctx, runner.Name)
|
||||
if err != nil {
|
||||
if !errors.Is(err, runnerErrors.ErrNotFound) {
|
||||
return fmt.Errorf("error fetching instance from DB: %w", err)
|
||||
|
|
@ -566,8 +576,8 @@ func (r *basePoolManager) cleanupOrphanedGithubRunners(runners []*github.Runner)
|
|||
// Previous forceful removal may have failed?
|
||||
slog.InfoContext(
|
||||
r.ctx, "Runner has no database entry in garm, removing from github",
|
||||
"runner_name", runner.GetName())
|
||||
if err := r.ghcli.RemoveEntityRunner(r.ctx, runner.GetID()); err != nil {
|
||||
"runner_name", runner.Name)
|
||||
if err := r.ghcli.RemoveEntityRunner(r.ctx, runner.ID); err != nil {
|
||||
// Removed in the meantime?
|
||||
if errors.Is(err, runnerErrors.ErrNotFound) {
|
||||
continue
|
||||
|
|
@ -655,7 +665,7 @@ func (r *basePoolManager) cleanupOrphanedGithubRunners(runners []*github.Runner)
|
|||
slog.InfoContext(
|
||||
r.ctx, "Runner instance is no longer on the provider, removing from github",
|
||||
"runner_name", dbInstance.Name)
|
||||
if err := r.ghcli.RemoveEntityRunner(r.ctx, runner.GetID()); err != nil {
|
||||
if err := r.ghcli.RemoveEntityRunner(r.ctx, runner.ID); err != nil {
|
||||
// Removed in the meantime?
|
||||
if errors.Is(err, runnerErrors.ErrNotFound) {
|
||||
slog.DebugContext(
|
||||
|
|
@ -767,8 +777,7 @@ func (r *basePoolManager) AddRunner(ctx context.Context, poolID string, aditiona
|
|||
jitConfig := make(map[string]string)
|
||||
var runner *github.Runner
|
||||
|
||||
if !provider.DisableJITConfig() {
|
||||
// Attempt to create JIT config
|
||||
if !provider.DisableJITConfig() && r.entity.Credentials.ForgeType != params.GiteaEndpointType {
|
||||
jitConfig, runner, err = r.ghcli.GetEntityJITConfig(ctx, name, pool, labels)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to generate JIT config: %w", err)
|
||||
|
|
@ -1606,7 +1615,7 @@ func (r *basePoolManager) runnerCleanup() (err error) {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (r *basePoolManager) cleanupOrphanedRunners(runners []*github.Runner) error {
|
||||
func (r *basePoolManager) cleanupOrphanedRunners(runners []forgeRunner) error {
|
||||
if err := r.cleanupOrphanedProviderRunners(runners); err != nil {
|
||||
return fmt.Errorf("error cleaning orphaned instances: %w", err)
|
||||
}
|
||||
|
|
@ -2014,32 +2023,6 @@ func (r *basePoolManager) FetchTools() ([]commonParams.RunnerApplicationDownload
|
|||
return ret, nil
|
||||
}
|
||||
|
||||
func (r *basePoolManager) GetGithubRunners() ([]*github.Runner, error) {
|
||||
opts := github.ListRunnersOptions{
|
||||
ListOptions: github.ListOptions{
|
||||
PerPage: 100,
|
||||
},
|
||||
}
|
||||
var allRunners []*github.Runner
|
||||
|
||||
for {
|
||||
runners, ghResp, err := r.ghcli.ListEntityRunners(r.ctx, &opts)
|
||||
if err != nil {
|
||||
if ghResp != nil && ghResp.StatusCode == http.StatusUnauthorized {
|
||||
return nil, runnerErrors.NewUnauthorizedError("error fetching runners")
|
||||
}
|
||||
return nil, fmt.Errorf("error fetching runners: %w", err)
|
||||
}
|
||||
allRunners = append(allRunners, runners.Runners...)
|
||||
if ghResp.NextPage == 0 {
|
||||
break
|
||||
}
|
||||
opts.Page = ghResp.NextPage
|
||||
}
|
||||
|
||||
return allRunners, nil
|
||||
}
|
||||
|
||||
func (r *basePoolManager) GetWebhookInfo(ctx context.Context) (params.HookInfo, error) {
|
||||
allHooks, err := r.listHooks(ctx)
|
||||
if err != nil {
|
||||
|
|
|
|||
|
|
@ -15,10 +15,12 @@
|
|||
package pool
|
||||
|
||||
import (
|
||||
"sort"
|
||||
"context"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/google/go-github/v72/github"
|
||||
|
|
@ -31,70 +33,6 @@ import (
|
|||
"github.com/cloudbase/garm/params"
|
||||
)
|
||||
|
||||
type poolCacheStore interface {
|
||||
Next() (params.Pool, error)
|
||||
Reset()
|
||||
Len() int
|
||||
}
|
||||
|
||||
type poolRoundRobin struct {
|
||||
pools []params.Pool
|
||||
next uint32
|
||||
}
|
||||
|
||||
func (p *poolRoundRobin) Next() (params.Pool, error) {
|
||||
if len(p.pools) == 0 {
|
||||
return params.Pool{}, runnerErrors.ErrNoPoolsAvailable
|
||||
}
|
||||
|
||||
n := atomic.AddUint32(&p.next, 1)
|
||||
return p.pools[(int(n)-1)%len(p.pools)], nil
|
||||
}
|
||||
|
||||
func (p *poolRoundRobin) Len() int {
|
||||
return len(p.pools)
|
||||
}
|
||||
|
||||
func (p *poolRoundRobin) Reset() {
|
||||
atomic.StoreUint32(&p.next, 0)
|
||||
}
|
||||
|
||||
type poolsForTags struct {
|
||||
pools sync.Map
|
||||
poolCacheType params.PoolBalancerType
|
||||
}
|
||||
|
||||
func (p *poolsForTags) Get(tags []string) (poolCacheStore, bool) {
|
||||
sort.Strings(tags)
|
||||
key := strings.Join(tags, "^")
|
||||
|
||||
v, ok := p.pools.Load(key)
|
||||
if !ok {
|
||||
return nil, false
|
||||
}
|
||||
poolCache := v.(*poolRoundRobin)
|
||||
if p.poolCacheType == params.PoolBalancerTypePack {
|
||||
// When we service a list of jobs, we want to try each pool in turn
|
||||
// for each job. Pools are sorted by priority so we always start from the
|
||||
// highest priority pool and move on to the next if the first one is full.
|
||||
poolCache.Reset()
|
||||
}
|
||||
return poolCache, true
|
||||
}
|
||||
|
||||
func (p *poolsForTags) Add(tags []string, pools []params.Pool) poolCacheStore {
|
||||
sort.Slice(pools, func(i, j int) bool {
|
||||
return pools[i].Priority > pools[j].Priority
|
||||
})
|
||||
|
||||
sort.Strings(tags)
|
||||
key := strings.Join(tags, "^")
|
||||
|
||||
poolRR := &poolRoundRobin{pools: pools}
|
||||
v, _ := p.pools.LoadOrStore(key, poolRR)
|
||||
return v.(*poolRoundRobin)
|
||||
}
|
||||
|
||||
func instanceInList(instanceName string, instances []commonParams.ProviderInstance) (commonParams.ProviderInstance, bool) {
|
||||
for _, val := range instances {
|
||||
if val.Name == instanceName {
|
||||
|
|
@ -114,17 +52,14 @@ func controllerIDFromLabels(labels []string) string {
|
|||
return ""
|
||||
}
|
||||
|
||||
func labelsFromRunner(runner *github.Runner) []string {
|
||||
if runner == nil || runner.Labels == nil {
|
||||
func labelsFromRunner(runner forgeRunner) []string {
|
||||
if runner.Labels == nil {
|
||||
return []string{}
|
||||
}
|
||||
|
||||
var labels []string
|
||||
for _, val := range runner.Labels {
|
||||
if val == nil {
|
||||
continue
|
||||
}
|
||||
labels = append(labels, val.GetName())
|
||||
labels = append(labels, val.Name)
|
||||
}
|
||||
return labels
|
||||
}
|
||||
|
|
@ -167,3 +102,172 @@ func (r *basePoolManager) waitForToolsOrCancel() (hasTools, stopped bool) {
|
|||
return false, true
|
||||
}
|
||||
}
|
||||
|
||||
func validateHookRequest(controllerID, baseURL string, allHooks []*github.Hook, req *github.Hook) error {
|
||||
parsed, err := url.Parse(baseURL)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error parsing webhook url: %w", err)
|
||||
}
|
||||
|
||||
partialMatches := []string{}
|
||||
for _, hook := range allHooks {
|
||||
hookURL := strings.ToLower(hook.Config.GetURL())
|
||||
if hookURL == "" {
|
||||
continue
|
||||
}
|
||||
|
||||
if hook.Config.GetURL() == req.Config.GetURL() {
|
||||
return runnerErrors.NewConflictError("hook already installed")
|
||||
} else if strings.Contains(hookURL, controllerID) || strings.Contains(hookURL, parsed.Hostname()) {
|
||||
partialMatches = append(partialMatches, hook.Config.GetURL())
|
||||
}
|
||||
}
|
||||
|
||||
if len(partialMatches) > 0 {
|
||||
return runnerErrors.NewConflictError("a webhook containing the controller ID or hostname of this contreoller is already installed on this repository")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func hookToParamsHookInfo(hook *github.Hook) params.HookInfo {
|
||||
hookURL := hook.Config.GetURL()
|
||||
|
||||
insecureSSLConfig := hook.Config.GetInsecureSSL()
|
||||
insecureSSL := insecureSSLConfig == "1"
|
||||
|
||||
return params.HookInfo{
|
||||
ID: *hook.ID,
|
||||
URL: hookURL,
|
||||
Events: hook.Events,
|
||||
Active: *hook.Active,
|
||||
InsecureSSL: insecureSSL,
|
||||
}
|
||||
}
|
||||
|
||||
func (r *basePoolManager) listHooks(ctx context.Context) ([]*github.Hook, error) {
|
||||
opts := github.ListOptions{
|
||||
PerPage: 100,
|
||||
}
|
||||
var allHooks []*github.Hook
|
||||
for {
|
||||
hooks, ghResp, err := r.ghcli.ListEntityHooks(ctx, &opts)
|
||||
if err != nil {
|
||||
if ghResp != nil && ghResp.StatusCode == http.StatusNotFound {
|
||||
return nil, runnerErrors.NewBadRequestError("repository not found or your PAT does not have access to manage webhooks")
|
||||
}
|
||||
return nil, fmt.Errorf("error fetching hooks: %w", err)
|
||||
}
|
||||
allHooks = append(allHooks, hooks...)
|
||||
if ghResp.NextPage == 0 {
|
||||
break
|
||||
}
|
||||
opts.Page = ghResp.NextPage
|
||||
}
|
||||
return allHooks, nil
|
||||
}
|
||||
|
||||
func (r *basePoolManager) listRunnersWithPagination() ([]forgeRunner, error) {
|
||||
opts := github.ListRunnersOptions{
|
||||
ListOptions: github.ListOptions{
|
||||
PerPage: 100,
|
||||
},
|
||||
}
|
||||
var allRunners []*github.Runner
|
||||
|
||||
// Paginating like this can lead to a situation where if we have many pages of runners,
|
||||
// while we paginate, a particular runner can move from page n to page n-1 while we move
|
||||
// from page n-1 to page n. In situations such as that, we end up with a list of runners
|
||||
// that does not contain the runner that swapped pages while we were paginating.
|
||||
// Sadly, the GitHub API does not allow listing more than 100 runners per page.
|
||||
for {
|
||||
runners, ghResp, err := r.ghcli.ListEntityRunners(r.ctx, &opts)
|
||||
if err != nil {
|
||||
if ghResp != nil && ghResp.StatusCode == http.StatusUnauthorized {
|
||||
return nil, runnerErrors.NewUnauthorizedError("error fetching runners")
|
||||
}
|
||||
return nil, fmt.Errorf("error fetching runners: %w", err)
|
||||
}
|
||||
allRunners = append(allRunners, runners.Runners...)
|
||||
if ghResp.NextPage == 0 {
|
||||
break
|
||||
}
|
||||
opts.Page = ghResp.NextPage
|
||||
}
|
||||
|
||||
ret := make([]forgeRunner, len(allRunners))
|
||||
for idx, val := range allRunners {
|
||||
ret[idx] = forgeRunner{
|
||||
ID: val.GetID(),
|
||||
Name: val.GetName(),
|
||||
Status: val.GetStatus(),
|
||||
Labels: make([]RunnerLabels, len(val.Labels)),
|
||||
}
|
||||
for labelIdx, label := range val.Labels {
|
||||
ret[idx].Labels[labelIdx] = RunnerLabels{
|
||||
Name: label.GetName(),
|
||||
Type: label.GetType(),
|
||||
ID: label.GetID(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return ret, nil
|
||||
}
|
||||
|
||||
func (r *basePoolManager) listRunnersWithScaleSetAPI() ([]forgeRunner, error) {
|
||||
if r.scaleSetClient == nil {
|
||||
return nil, fmt.Errorf("scaleset client not initialized")
|
||||
}
|
||||
|
||||
runners, err := r.scaleSetClient.ListAllRunners(r.ctx)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to list runners through scaleset API: %w", err)
|
||||
}
|
||||
|
||||
ret := []forgeRunner{}
|
||||
for _, runner := range runners.RunnerReferences {
|
||||
if runner.RunnerScaleSetID != 0 {
|
||||
// skip scale set runners.
|
||||
continue
|
||||
}
|
||||
run := forgeRunner{
|
||||
Name: runner.Name,
|
||||
ID: runner.ID,
|
||||
Status: string(runner.GetStatus()),
|
||||
Labels: make([]RunnerLabels, len(runner.Labels)),
|
||||
}
|
||||
for labelIDX, label := range runner.Labels {
|
||||
run.Labels[labelIDX] = RunnerLabels{
|
||||
Name: label.Name,
|
||||
Type: label.Type,
|
||||
}
|
||||
}
|
||||
ret = append(ret, run)
|
||||
}
|
||||
return ret, nil
|
||||
}
|
||||
|
||||
func (r *basePoolManager) GetGithubRunners() ([]forgeRunner, error) {
|
||||
// Gitea has no scale sets API
|
||||
if r.scaleSetClient == nil {
|
||||
return r.listRunnersWithPagination()
|
||||
}
|
||||
|
||||
// try the scale sets API for github
|
||||
runners, err := r.listRunnersWithScaleSetAPI()
|
||||
if err != nil {
|
||||
slog.WarnContext(r.ctx, "failed to list runners via scaleset API; falling back to pagination", "error", err)
|
||||
return r.listRunnersWithPagination()
|
||||
}
|
||||
|
||||
entityInstances := cache.GetEntityInstances(r.entity.ID)
|
||||
if len(entityInstances) > 0 && len(runners) == 0 {
|
||||
// I have trust issues in the undocumented API. We seem to have runners for this
|
||||
// entity, but the scaleset API returned nothing and no error. Fall back to pagination.
|
||||
slog.DebugContext(r.ctx, "the scaleset api returned nothing, but we seem to have runners in the db; falling back to paginated API runner list")
|
||||
return r.listRunnersWithPagination()
|
||||
}
|
||||
slog.DebugContext(r.ctx, "Scaleset API runner list succeeded", "runners", runners)
|
||||
return runners, nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -422,12 +422,19 @@ func (g *githubClient) getEnterpriseRunnerGroupIDByName(ctx context.Context, ent
|
|||
|
||||
func (g *githubClient) GetEntityRunnerGroupIDByName(ctx context.Context, runnerGroupName string) (int64, error) {
|
||||
var rgID int64 = 1
|
||||
|
||||
if g.entity.EntityType == params.ForgeEntityTypeRepository {
|
||||
// This is a repository. Runner groups are supported at the org and
|
||||
// enterprise levels. Return the default runner group id, early.
|
||||
return rgID, nil
|
||||
}
|
||||
|
||||
var ok bool
|
||||
var err error
|
||||
// attempt to get the runner group ID from cache. Cache will invalidate after 1 hour.
|
||||
if runnerGroupName != "" && !strings.EqualFold(runnerGroupName, "default") {
|
||||
rgID, ok = cache.GetEntityRunnerGroup(g.entity.ID, runnerGroupName)
|
||||
if !ok {
|
||||
if !ok || rgID == 0 {
|
||||
switch g.entity.EntityType {
|
||||
case params.ForgeEntityTypeOrganization:
|
||||
rgID, err = g.getOrganizationRunnerGroupIDByName(ctx, g.entity, runnerGroupName)
|
||||
|
|
@ -450,7 +457,7 @@ func (g *githubClient) GetEntityJITConfig(ctx context.Context, instance string,
|
|||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("failed to get runner group: %w", err)
|
||||
}
|
||||
|
||||
slog.DebugContext(ctx, "using runner group", "group_name", pool.GitHubRunnerGroup, "runner_group_id", rgID)
|
||||
req := github.GenerateJITConfigRequest{
|
||||
Name: instance,
|
||||
RunnerGroupID: rgID,
|
||||
|
|
|
|||
|
|
@ -26,8 +26,7 @@ import (
|
|||
const (
|
||||
// These are duplicated until we decide if we move the pool manager to the new
|
||||
// worker flow.
|
||||
poolIDLabelprefix = "runner-pool-id:"
|
||||
controllerLabelPrefix = "runner-controller-id:"
|
||||
poolIDLabelprefix = "runner-pool-id:"
|
||||
)
|
||||
|
||||
func composeControllerWatcherFilters() dbCommon.PayloadFilterFunc {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue