From 1a719567ff002b8e2d2f3ea3075fdb428c0240ca Mon Sep 17 00:00:00 2001 From: Gabriel Adrian Samfira Date: Thu, 8 May 2025 21:39:55 +0000 Subject: [PATCH] Add rate limit cache and fixes This change adds a loop that keeps a cache of credentials rate limits as reported by the github API. The cache is updated every 30 seconds and is purely informational for the user. This change also adds some caching improvements. Functions that return values from the cache as lists, will now sort by ID or creation date. Signed-off-by: Gabriel Adrian Samfira --- cache/credentials_cache.go | 33 +++++++++ cache/entity_cache.go | 44 ++++++++++++ cache/instance_cache.go | 3 + cache/util.go | 19 +++++ cmd/garm-cli/cmd/github_credentials.go | 6 ++ params/interfaces.go | 10 +++ params/params.go | 97 +++++++++++++++++++++++--- runner/github_credentials.go | 19 +++++ workers/cache/cache.go | 60 +++++++++++++--- workers/scaleset/controller.go | 8 +-- 10 files changed, 276 insertions(+), 23 deletions(-) create mode 100644 cache/util.go diff --git a/cache/credentials_cache.go b/cache/credentials_cache.go index 092d2e90..7cf65a03 100644 --- a/cache/credentials_cache.go +++ b/cache/credentials_cache.go @@ -21,6 +21,16 @@ type GithubCredentials struct { cache map[uint]params.GithubCredentials } +func (g *GithubCredentials) SetCredentialsRateLimit(credsID uint, rateLimit params.GithubRateLimit) { + g.mux.Lock() + defer g.mux.Unlock() + + if creds, ok := g.cache[credsID]; ok { + creds.RateLimit = rateLimit + g.cache[credsID] = creds + } +} + func (g *GithubCredentials) SetCredentials(credentials params.GithubCredentials) { g.mux.Lock() defer g.mux.Unlock() @@ -54,6 +64,21 @@ func (g *GithubCredentials) GetAllCredentials() []params.GithubCredentials { for _, cred := range g.cache { creds = append(creds, cred) } + + // Sort the credentials by ID + sortByID(creds) + return creds +} + +func (g *GithubCredentials) GetAllCredentialsAsMap() map[uint]params.GithubCredentials { + g.mux.Lock() + defer g.mux.Unlock() + + creds := make(map[uint]params.GithubCredentials, len(g.cache)) + for id, cred := range g.cache { + creds[id] = cred + } + return creds } @@ -72,3 +97,11 @@ func DeleteGithubCredentials(id uint) { func GetAllGithubCredentials() []params.GithubCredentials { return credentialsCache.GetAllCredentials() } + +func SetCredentialsRateLimit(credsID uint, rateLimit params.GithubRateLimit) { + credentialsCache.SetCredentialsRateLimit(credsID, rateLimit) +} + +func GetAllGithubCredentialsAsMap() map[uint]params.GithubCredentials { + return credentialsCache.GetAllCredentialsAsMap() +} diff --git a/cache/entity_cache.go b/cache/entity_cache.go index 0c549498..006f40db 100644 --- a/cache/entity_cache.go +++ b/cache/entity_cache.go @@ -186,6 +186,8 @@ func (e *EntityCache) FindPoolsMatchingAllTags(entityID string, tags []string) [ pools = append(pools, pool) } } + // Sort the pools by creation date. + sortByCreationDate(pools) return pools } return nil @@ -200,6 +202,8 @@ func (e *EntityCache) GetEntityPools(entityID string) []params.Pool { for _, pool := range cache.Pools { pools = append(pools, pool) } + // Sort the pools by creation date. + sortByCreationDate(pools) return pools } return nil @@ -214,6 +218,8 @@ func (e *EntityCache) GetEntityScaleSets(entityID string) []params.ScaleSet { for _, scaleSet := range cache.ScaleSets { scaleSets = append(scaleSets, scaleSet) } + // Sort the scale sets by creation date. + sortByID(scaleSets) return scaleSets } return nil @@ -229,6 +235,7 @@ func (e *EntityCache) GetEntitiesUsingGredentials(credsID uint) []params.GithubE entities = append(entities, cache.Entity) } } + sortByCreationDate(entities) return entities } @@ -245,9 +252,38 @@ func (e *EntityCache) GetAllEntities() []params.GithubEntity { } entities = append(entities, cache.Entity) } + sortByCreationDate(entities) return entities } +func (e *EntityCache) GetAllPools() []params.Pool { + e.mux.Lock() + defer e.mux.Unlock() + + var pools []params.Pool + for _, cache := range e.entities { + for _, pool := range cache.Pools { + pools = append(pools, pool) + } + } + sortByCreationDate(pools) + return pools +} + +func (e *EntityCache) GetAllScaleSets() []params.ScaleSet { + e.mux.Lock() + defer e.mux.Unlock() + + var scaleSets []params.ScaleSet + for _, cache := range e.entities { + for _, scaleSet := range cache.ScaleSets { + scaleSets = append(scaleSets, scaleSet) + } + } + sortByID(scaleSets) + return scaleSets +} + func GetEntity(entityID string) (params.GithubEntity, bool) { return entityCache.GetEntity(entityID) } @@ -315,3 +351,11 @@ func GetEntitiesUsingGredentials(credsID uint) []params.GithubEntity { func GetAllEntities() []params.GithubEntity { return entityCache.GetAllEntities() } + +func GetAllPools() []params.Pool { + return entityCache.GetAllPools() +} + +func GetAllScaleSets() []params.ScaleSet { + return entityCache.GetAllScaleSets() +} diff --git a/cache/instance_cache.go b/cache/instance_cache.go index 44f95ec2..b96db5e9 100644 --- a/cache/instance_cache.go +++ b/cache/instance_cache.go @@ -53,6 +53,7 @@ func (i *InstanceCache) GetAllInstances() []params.Instance { for _, instance := range i.cache { instances = append(instances, instance) } + sortByCreationDate(instances) return instances } @@ -66,6 +67,7 @@ func (i *InstanceCache) GetInstancesForPool(poolID string) []params.Instance { filteredInstances = append(filteredInstances, instance) } } + sortByCreationDate(filteredInstances) return filteredInstances } @@ -79,6 +81,7 @@ func (i *InstanceCache) GetInstancesForScaleSet(scaleSetID uint) []params.Instan filteredInstances = append(filteredInstances, instance) } } + sortByCreationDate(filteredInstances) return filteredInstances } diff --git a/cache/util.go b/cache/util.go new file mode 100644 index 00000000..f8769c65 --- /dev/null +++ b/cache/util.go @@ -0,0 +1,19 @@ +package cache + +import ( + "sort" + + "github.com/cloudbase/garm/params" +) + +func sortByID[T params.IDGetter](s []T) { + sort.Slice(s, func(i, j int) bool { + return s[i].GetID() < s[j].GetID() + }) +} + +func sortByCreationDate[T params.CreationDateGetter](s []T) { + sort.Slice(s, func(i, j int) bool { + return s[i].GetCreatedAt().Before(s[j].GetCreatedAt()) + }) +} diff --git a/cmd/garm-cli/cmd/github_credentials.go b/cmd/garm-cli/cmd/github_credentials.go index c4faec1a..bd3521bf 100644 --- a/cmd/garm-cli/cmd/github_credentials.go +++ b/cmd/garm-cli/cmd/github_credentials.go @@ -375,6 +375,8 @@ func formatOneGithubCredential(cred params.GithubCredentials) { header := table.Row{"Field", "Value"} t.AppendHeader(header) + resetMinutes := cred.RateLimit.ResetIn().Minutes() + t.AppendRow(table.Row{"ID", cred.ID}) t.AppendRow(table.Row{"Created At", cred.CreatedAt}) t.AppendRow(table.Row{"Updated At", cred.UpdatedAt}) @@ -385,6 +387,10 @@ func formatOneGithubCredential(cred params.GithubCredentials) { t.AppendRow(table.Row{"Upload URL", cred.UploadBaseURL}) t.AppendRow(table.Row{"Type", cred.AuthType}) t.AppendRow(table.Row{"Endpoint", cred.Endpoint.Name}) + if resetMinutes > 0 { + t.AppendRow(table.Row{"Remaining API requests", cred.RateLimit.Remaining}) + t.AppendRow(table.Row{"Rate limit reset", fmt.Sprintf("%d minutes", int64(resetMinutes))}) + } if len(cred.Repositories) > 0 { t.AppendRow(table.Row{"", ""}) diff --git a/params/interfaces.go b/params/interfaces.go index 95f02a9a..cd9b94ff 100644 --- a/params/interfaces.go +++ b/params/interfaces.go @@ -1,7 +1,17 @@ package params +import "time" + // EntityGetter is implemented by all github entities (repositories, organizations and enterprises). // It defines the GetEntity() function which returns a github entity. type EntityGetter interface { GetEntity() (GithubEntity, error) } + +type IDGetter interface { + GetID() uint +} + +type CreationDateGetter interface { + GetCreatedAt() time.Time +} diff --git a/params/params.go b/params/params.go index a15d2446..7636102f 100644 --- a/params/params.go +++ b/params/params.go @@ -252,6 +252,10 @@ type Instance struct { JitConfiguration map[string]string `json:"-"` } +func (i Instance) GetCreatedAt() time.Time { + return i.CreatedAt +} + func (i Instance) GetName() string { return i.Name } @@ -370,6 +374,22 @@ type Pool struct { Priority uint `json:"priority,omitempty"` } +func (p Pool) BelongsTo(entity GithubEntity) bool { + switch p.PoolType() { + case GithubEntityTypeRepository: + return p.RepoID == entity.ID + case GithubEntityTypeOrganization: + return p.OrgID == entity.ID + case GithubEntityTypeEnterprise: + return p.EnterpriseID == entity.ID + } + return false +} + +func (p Pool) GetCreatedAt() time.Time { + return p.CreatedAt +} + func (p Pool) MinIdleRunnersAsInt() int { if p.MinIdleRunners > math.MaxInt { return math.MaxInt @@ -493,6 +513,22 @@ type ScaleSet struct { LastMessageID int64 `json:"-"` } +func (p ScaleSet) BelongsTo(entity GithubEntity) bool { + switch p.ScaleSetType() { + case GithubEntityTypeRepository: + return p.RepoID == entity.ID + case GithubEntityTypeOrganization: + return p.OrgID == entity.ID + case GithubEntityTypeEnterprise: + return p.EnterpriseID == entity.ID + } + return false +} + +func (p ScaleSet) GetID() uint { + return p.ID +} + func (p ScaleSet) GetEntity() (GithubEntity, error) { switch p.ScaleSetType() { case GithubEntityTypeRepository: @@ -526,10 +562,6 @@ func (p *ScaleSet) ScaleSetType() GithubEntityType { return "" } -func (p ScaleSet) GetID() uint { - return p.ID -} - func (p *ScaleSet) RunnerTimeout() uint { if p.RunnerBootstrapTimeout == 0 { return appdefaults.DefaultRunnerBootstrapTimeout @@ -560,6 +592,10 @@ type Repository struct { WebhookSecret string `json:"-"` } +func (r Repository) CreationDateGetter() time.Time { + return r.CreatedAt +} + func (r Repository) GetEntity() (GithubEntity, error) { if r.ID == "" { return GithubEntity{}, fmt.Errorf("repository has no ID") @@ -572,6 +608,7 @@ func (r Repository) GetEntity() (GithubEntity, error) { PoolBalancerType: r.PoolBalancerType, Credentials: r.Credentials, WebhookSecret: r.WebhookSecret, + CreatedAt: r.CreatedAt, }, nil } @@ -616,6 +653,10 @@ type Organization struct { WebhookSecret string `json:"-"` } +func (o Organization) GetCreatedAt() time.Time { + return o.CreatedAt +} + func (o Organization) GetEntity() (GithubEntity, error) { if o.ID == "" { return GithubEntity{}, fmt.Errorf("organization has no ID") @@ -627,6 +668,7 @@ func (o Organization) GetEntity() (GithubEntity, error) { WebhookSecret: o.WebhookSecret, PoolBalancerType: o.PoolBalancerType, Credentials: o.Credentials, + CreatedAt: o.CreatedAt, }, nil } @@ -667,6 +709,10 @@ type Enterprise struct { WebhookSecret string `json:"-"` } +func (e Enterprise) GetCreatedAt() time.Time { + return e.CreatedAt +} + func (e Enterprise) GetEntity() (GithubEntity, error) { if e.ID == "" { return GithubEntity{}, fmt.Errorf("enterprise has no ID") @@ -678,6 +724,7 @@ func (e Enterprise) GetEntity() (GithubEntity, error) { WebhookSecret: e.WebhookSecret, PoolBalancerType: e.PoolBalancerType, Credentials: e.Credentials, + CreatedAt: e.CreatedAt, }, nil } @@ -772,6 +819,24 @@ func (c *ControllerInfo) JobBackoff() time.Duration { return time.Duration(int64(c.MinimumJobAgeBackoff)) } +type GithubRateLimit struct { + Limit int `json:"limit,omitempty"` + Used int `json:"used,omitempty"` + Remaining int `json:"remaining,omitempty"` + Reset int64 `json:"reset,omitempty"` +} + +func (g GithubRateLimit) ResetIn() time.Duration { + return time.Until(g.ResetAt()) +} + +func (g GithubRateLimit) ResetAt() time.Time { + if g.Reset == 0 { + return time.Time{} + } + return time.Unix(g.Reset, 0) +} + type GithubCredentials struct { ID uint `json:"id,omitempty"` Name string `json:"name,omitempty"` @@ -782,17 +847,22 @@ type GithubCredentials struct { CABundle []byte `json:"ca_bundle,omitempty"` AuthType GithubAuthType `json:"auth-type,omitempty"` - Repositories []Repository `json:"repositories,omitempty"` - Organizations []Organization `json:"organizations,omitempty"` - Enterprises []Enterprise `json:"enterprises,omitempty"` - Endpoint GithubEndpoint `json:"endpoint,omitempty"` - CreatedAt time.Time `json:"created_at,omitempty"` - UpdatedAt time.Time `json:"updated_at,omitempty"` + Repositories []Repository `json:"repositories,omitempty"` + Organizations []Organization `json:"organizations,omitempty"` + Enterprises []Enterprise `json:"enterprises,omitempty"` + Endpoint GithubEndpoint `json:"endpoint,omitempty"` + CreatedAt time.Time `json:"created_at,omitempty"` + UpdatedAt time.Time `json:"updated_at,omitempty"` + RateLimit GithubRateLimit `json:"rate_limit,omitempty"` // Do not serialize sensitive info. CredentialsPayload []byte `json:"-"` } +func (g GithubCredentials) GetID() uint { + return g.ID +} + func (g GithubCredentials) GetHTTPClient(ctx context.Context) (*http.Client, error) { var roots *x509.CertPool if g.CABundle != nil { @@ -994,11 +1064,16 @@ type GithubEntity struct { EntityType GithubEntityType `json:"entity_type,omitempty"` Credentials GithubCredentials `json:"credentials,omitempty"` PoolBalancerType PoolBalancerType `json:"pool_balancing_type,omitempty"` + CreatedAt time.Time `json:"created_at,omitempty"` WebhookSecret string `json:"-"` } -func (g *GithubEntity) GithubURL() string { +func (g GithubEntity) GetCreatedAt() time.Time { + return g.CreatedAt +} + +func (g GithubEntity) GithubURL() string { switch g.EntityType { case GithubEntityTypeRepository: return fmt.Sprintf("%s/%s/%s", g.Credentials.BaseURL, g.Owner, g.Name) diff --git a/runner/github_credentials.go b/runner/github_credentials.go index fbf9d330..7cd4e74c 100644 --- a/runner/github_credentials.go +++ b/runner/github_credentials.go @@ -7,6 +7,7 @@ import ( runnerErrors "github.com/cloudbase/garm-provider-common/errors" "github.com/cloudbase/garm/auth" + "github.com/cloudbase/garm/cache" "github.com/cloudbase/garm/params" ) @@ -15,11 +16,24 @@ func (r *Runner) ListCredentials(ctx context.Context) ([]params.GithubCredential return nil, runnerErrors.ErrUnauthorized } + // Get the credentials from the store. The cache is always updated after the database successfully + // commits the transaction that created/updated the credentials. + // If we create a set of credentials then immediately after we call ListCredentials, + // there is a posibillity that not all creds will be in the cache. creds, err := r.store.ListGithubCredentials(ctx) if err != nil { return nil, errors.Wrap(err, "fetching github credentials") } + // If we do have cache, update the rate limit for each credential. The rate limits are queried + // every 30 seconds and set in cache. + credsCache := cache.GetAllGithubCredentialsAsMap() + for idx, cred := range creds { + inCache, ok := credsCache[cred.ID] + if ok { + creds[idx].RateLimit = inCache.RateLimit + } + } return creds, nil } @@ -50,6 +64,11 @@ func (r *Runner) GetGithubCredentials(ctx context.Context, id uint) (params.Gith return params.GithubCredentials{}, errors.Wrap(err, "failed to get github credentials") } + cached, ok := cache.GetGithubCredentials((creds.ID)) + if ok { + creds.RateLimit = cached.RateLimit + } + return creds, nil } diff --git a/workers/cache/cache.go b/workers/cache/cache.go index d19bbbaf..315876d6 100644 --- a/workers/cache/cache.go +++ b/workers/cache/cache.go @@ -5,12 +5,14 @@ import ( "fmt" "log/slog" "sync" + "time" "github.com/cloudbase/garm/cache" "github.com/cloudbase/garm/database/common" "github.com/cloudbase/garm/database/watcher" "github.com/cloudbase/garm/params" garmUtil "github.com/cloudbase/garm/util" + "github.com/cloudbase/garm/util/github" ) func NewWorker(ctx context.Context, store common.Store) *Worker { @@ -47,23 +49,23 @@ func (w *Worker) setCacheForEntity(entityGetter params.EntityGetter, pools []par return fmt.Errorf("getting entity: %w", err) } cache.SetEntity(entity) - var repoPools []params.Pool - var repoScaleSets []params.ScaleSet + var entityPools []params.Pool + var entityScaleSets []params.ScaleSet for _, pool := range pools { - if pool.RepoID == entity.ID { - repoPools = append(repoPools, pool) + if pool.BelongsTo(entity) { + entityPools = append(entityPools, pool) } } for _, scaleSet := range scaleSets { - if scaleSet.RepoID == entity.ID { - repoScaleSets = append(repoScaleSets, scaleSet) + if scaleSet.BelongsTo(entity) { + entityScaleSets = append(entityScaleSets, scaleSet) } } - cache.ReplaceEntityPools(entity.ID, repoPools) - cache.ReplaceEntityScaleSets(entity.ID, repoScaleSets) + cache.ReplaceEntityPools(entity.ID, entityPools) + cache.ReplaceEntityScaleSets(entity.ID, entityScaleSets) return nil } @@ -178,6 +180,7 @@ func (w *Worker) Start() error { w.quit = make(chan struct{}) go w.loop() + go w.rateLimitLoop() return nil } @@ -379,3 +382,44 @@ func (w *Worker) loop() { } } } + +func (w *Worker) rateLimitLoop() { + defer w.Stop() + + ticker := time.NewTicker(30 * time.Second) + defer ticker.Stop() + + for { + select { + case <-w.quit: + return + case <-w.ctx.Done(): + slog.DebugContext(w.ctx, "context done") + return + case <-ticker.C: + // update credentials rate limits + for _, creds := range cache.GetAllGithubCredentials() { + rateCli, err := github.NewRateLimitClient(w.ctx, creds) + if err != nil { + slog.With(slog.Any("error", err)).ErrorContext(w.ctx, "failed to create rate limit client") + continue + } + rateLimit, err := rateCli.RateLimit(w.ctx) + if err != nil { + slog.With(slog.Any("error", err)).ErrorContext(w.ctx, "failed to get rate limit") + continue + } + if rateLimit != nil { + core := rateLimit.GetCore() + limit := params.GithubRateLimit{ + Limit: core.Limit, + Used: core.Used, + Remaining: core.Remaining, + Reset: core.Reset.Unix(), + } + cache.SetCredentialsRateLimit(creds.ID, limit) + } + } + } + } +} diff --git a/workers/scaleset/controller.go b/workers/scaleset/controller.go index 5d00471f..3b4287c2 100644 --- a/workers/scaleset/controller.go +++ b/workers/scaleset/controller.go @@ -233,8 +233,8 @@ func (c *Controller) waitForErrorGroupOrContextCancelled(g *errgroup.Group) erro func (c *Controller) loop() { defer c.Stop() - consilidateTicker := time.NewTicker(common.PoolReapTimeoutInterval) - defer consilidateTicker.Stop() + consolidateTicker := time.NewTicker(common.PoolReapTimeoutInterval) + defer consolidateTicker.Stop() for { select { @@ -244,10 +244,10 @@ func (c *Controller) loop() { return } slog.InfoContext(c.ctx, "received payload") - go c.handleWatcherEvent(payload) + c.handleWatcherEvent(payload) case <-c.ctx.Done(): return - case _, ok := <-consilidateTicker.C: + case _, ok := <-consolidateTicker.C: if !ok { slog.InfoContext(c.ctx, "consolidate ticker closed") return