diff --git a/cache/credentials_cache.go b/cache/credentials_cache.go index 092d2e90..7cf65a03 100644 --- a/cache/credentials_cache.go +++ b/cache/credentials_cache.go @@ -21,6 +21,16 @@ type GithubCredentials struct { cache map[uint]params.GithubCredentials } +func (g *GithubCredentials) SetCredentialsRateLimit(credsID uint, rateLimit params.GithubRateLimit) { + g.mux.Lock() + defer g.mux.Unlock() + + if creds, ok := g.cache[credsID]; ok { + creds.RateLimit = rateLimit + g.cache[credsID] = creds + } +} + func (g *GithubCredentials) SetCredentials(credentials params.GithubCredentials) { g.mux.Lock() defer g.mux.Unlock() @@ -54,6 +64,21 @@ func (g *GithubCredentials) GetAllCredentials() []params.GithubCredentials { for _, cred := range g.cache { creds = append(creds, cred) } + + // Sort the credentials by ID + sortByID(creds) + return creds +} + +func (g *GithubCredentials) GetAllCredentialsAsMap() map[uint]params.GithubCredentials { + g.mux.Lock() + defer g.mux.Unlock() + + creds := make(map[uint]params.GithubCredentials, len(g.cache)) + for id, cred := range g.cache { + creds[id] = cred + } + return creds } @@ -72,3 +97,11 @@ func DeleteGithubCredentials(id uint) { func GetAllGithubCredentials() []params.GithubCredentials { return credentialsCache.GetAllCredentials() } + +func SetCredentialsRateLimit(credsID uint, rateLimit params.GithubRateLimit) { + credentialsCache.SetCredentialsRateLimit(credsID, rateLimit) +} + +func GetAllGithubCredentialsAsMap() map[uint]params.GithubCredentials { + return credentialsCache.GetAllCredentialsAsMap() +} diff --git a/cache/entity_cache.go b/cache/entity_cache.go index 0c549498..006f40db 100644 --- a/cache/entity_cache.go +++ b/cache/entity_cache.go @@ -186,6 +186,8 @@ func (e *EntityCache) FindPoolsMatchingAllTags(entityID string, tags []string) [ pools = append(pools, pool) } } + // Sort the pools by creation date. + sortByCreationDate(pools) return pools } return nil @@ -200,6 +202,8 @@ func (e *EntityCache) GetEntityPools(entityID string) []params.Pool { for _, pool := range cache.Pools { pools = append(pools, pool) } + // Sort the pools by creation date. + sortByCreationDate(pools) return pools } return nil @@ -214,6 +218,8 @@ func (e *EntityCache) GetEntityScaleSets(entityID string) []params.ScaleSet { for _, scaleSet := range cache.ScaleSets { scaleSets = append(scaleSets, scaleSet) } + // Sort the scale sets by creation date. + sortByID(scaleSets) return scaleSets } return nil @@ -229,6 +235,7 @@ func (e *EntityCache) GetEntitiesUsingGredentials(credsID uint) []params.GithubE entities = append(entities, cache.Entity) } } + sortByCreationDate(entities) return entities } @@ -245,9 +252,38 @@ func (e *EntityCache) GetAllEntities() []params.GithubEntity { } entities = append(entities, cache.Entity) } + sortByCreationDate(entities) return entities } +func (e *EntityCache) GetAllPools() []params.Pool { + e.mux.Lock() + defer e.mux.Unlock() + + var pools []params.Pool + for _, cache := range e.entities { + for _, pool := range cache.Pools { + pools = append(pools, pool) + } + } + sortByCreationDate(pools) + return pools +} + +func (e *EntityCache) GetAllScaleSets() []params.ScaleSet { + e.mux.Lock() + defer e.mux.Unlock() + + var scaleSets []params.ScaleSet + for _, cache := range e.entities { + for _, scaleSet := range cache.ScaleSets { + scaleSets = append(scaleSets, scaleSet) + } + } + sortByID(scaleSets) + return scaleSets +} + func GetEntity(entityID string) (params.GithubEntity, bool) { return entityCache.GetEntity(entityID) } @@ -315,3 +351,11 @@ func GetEntitiesUsingGredentials(credsID uint) []params.GithubEntity { func GetAllEntities() []params.GithubEntity { return entityCache.GetAllEntities() } + +func GetAllPools() []params.Pool { + return entityCache.GetAllPools() +} + +func GetAllScaleSets() []params.ScaleSet { + return entityCache.GetAllScaleSets() +} diff --git a/cache/instance_cache.go b/cache/instance_cache.go index 44f95ec2..b96db5e9 100644 --- a/cache/instance_cache.go +++ b/cache/instance_cache.go @@ -53,6 +53,7 @@ func (i *InstanceCache) GetAllInstances() []params.Instance { for _, instance := range i.cache { instances = append(instances, instance) } + sortByCreationDate(instances) return instances } @@ -66,6 +67,7 @@ func (i *InstanceCache) GetInstancesForPool(poolID string) []params.Instance { filteredInstances = append(filteredInstances, instance) } } + sortByCreationDate(filteredInstances) return filteredInstances } @@ -79,6 +81,7 @@ func (i *InstanceCache) GetInstancesForScaleSet(scaleSetID uint) []params.Instan filteredInstances = append(filteredInstances, instance) } } + sortByCreationDate(filteredInstances) return filteredInstances } diff --git a/cache/util.go b/cache/util.go new file mode 100644 index 00000000..f8769c65 --- /dev/null +++ b/cache/util.go @@ -0,0 +1,19 @@ +package cache + +import ( + "sort" + + "github.com/cloudbase/garm/params" +) + +func sortByID[T params.IDGetter](s []T) { + sort.Slice(s, func(i, j int) bool { + return s[i].GetID() < s[j].GetID() + }) +} + +func sortByCreationDate[T params.CreationDateGetter](s []T) { + sort.Slice(s, func(i, j int) bool { + return s[i].GetCreatedAt().Before(s[j].GetCreatedAt()) + }) +} diff --git a/cmd/garm-cli/cmd/github_credentials.go b/cmd/garm-cli/cmd/github_credentials.go index c4faec1a..bd3521bf 100644 --- a/cmd/garm-cli/cmd/github_credentials.go +++ b/cmd/garm-cli/cmd/github_credentials.go @@ -375,6 +375,8 @@ func formatOneGithubCredential(cred params.GithubCredentials) { header := table.Row{"Field", "Value"} t.AppendHeader(header) + resetMinutes := cred.RateLimit.ResetIn().Minutes() + t.AppendRow(table.Row{"ID", cred.ID}) t.AppendRow(table.Row{"Created At", cred.CreatedAt}) t.AppendRow(table.Row{"Updated At", cred.UpdatedAt}) @@ -385,6 +387,10 @@ func formatOneGithubCredential(cred params.GithubCredentials) { t.AppendRow(table.Row{"Upload URL", cred.UploadBaseURL}) t.AppendRow(table.Row{"Type", cred.AuthType}) t.AppendRow(table.Row{"Endpoint", cred.Endpoint.Name}) + if resetMinutes > 0 { + t.AppendRow(table.Row{"Remaining API requests", cred.RateLimit.Remaining}) + t.AppendRow(table.Row{"Rate limit reset", fmt.Sprintf("%d minutes", int64(resetMinutes))}) + } if len(cred.Repositories) > 0 { t.AppendRow(table.Row{"", ""}) diff --git a/params/interfaces.go b/params/interfaces.go index 95f02a9a..cd9b94ff 100644 --- a/params/interfaces.go +++ b/params/interfaces.go @@ -1,7 +1,17 @@ package params +import "time" + // EntityGetter is implemented by all github entities (repositories, organizations and enterprises). // It defines the GetEntity() function which returns a github entity. type EntityGetter interface { GetEntity() (GithubEntity, error) } + +type IDGetter interface { + GetID() uint +} + +type CreationDateGetter interface { + GetCreatedAt() time.Time +} diff --git a/params/params.go b/params/params.go index a15d2446..7636102f 100644 --- a/params/params.go +++ b/params/params.go @@ -252,6 +252,10 @@ type Instance struct { JitConfiguration map[string]string `json:"-"` } +func (i Instance) GetCreatedAt() time.Time { + return i.CreatedAt +} + func (i Instance) GetName() string { return i.Name } @@ -370,6 +374,22 @@ type Pool struct { Priority uint `json:"priority,omitempty"` } +func (p Pool) BelongsTo(entity GithubEntity) bool { + switch p.PoolType() { + case GithubEntityTypeRepository: + return p.RepoID == entity.ID + case GithubEntityTypeOrganization: + return p.OrgID == entity.ID + case GithubEntityTypeEnterprise: + return p.EnterpriseID == entity.ID + } + return false +} + +func (p Pool) GetCreatedAt() time.Time { + return p.CreatedAt +} + func (p Pool) MinIdleRunnersAsInt() int { if p.MinIdleRunners > math.MaxInt { return math.MaxInt @@ -493,6 +513,22 @@ type ScaleSet struct { LastMessageID int64 `json:"-"` } +func (p ScaleSet) BelongsTo(entity GithubEntity) bool { + switch p.ScaleSetType() { + case GithubEntityTypeRepository: + return p.RepoID == entity.ID + case GithubEntityTypeOrganization: + return p.OrgID == entity.ID + case GithubEntityTypeEnterprise: + return p.EnterpriseID == entity.ID + } + return false +} + +func (p ScaleSet) GetID() uint { + return p.ID +} + func (p ScaleSet) GetEntity() (GithubEntity, error) { switch p.ScaleSetType() { case GithubEntityTypeRepository: @@ -526,10 +562,6 @@ func (p *ScaleSet) ScaleSetType() GithubEntityType { return "" } -func (p ScaleSet) GetID() uint { - return p.ID -} - func (p *ScaleSet) RunnerTimeout() uint { if p.RunnerBootstrapTimeout == 0 { return appdefaults.DefaultRunnerBootstrapTimeout @@ -560,6 +592,10 @@ type Repository struct { WebhookSecret string `json:"-"` } +func (r Repository) CreationDateGetter() time.Time { + return r.CreatedAt +} + func (r Repository) GetEntity() (GithubEntity, error) { if r.ID == "" { return GithubEntity{}, fmt.Errorf("repository has no ID") @@ -572,6 +608,7 @@ func (r Repository) GetEntity() (GithubEntity, error) { PoolBalancerType: r.PoolBalancerType, Credentials: r.Credentials, WebhookSecret: r.WebhookSecret, + CreatedAt: r.CreatedAt, }, nil } @@ -616,6 +653,10 @@ type Organization struct { WebhookSecret string `json:"-"` } +func (o Organization) GetCreatedAt() time.Time { + return o.CreatedAt +} + func (o Organization) GetEntity() (GithubEntity, error) { if o.ID == "" { return GithubEntity{}, fmt.Errorf("organization has no ID") @@ -627,6 +668,7 @@ func (o Organization) GetEntity() (GithubEntity, error) { WebhookSecret: o.WebhookSecret, PoolBalancerType: o.PoolBalancerType, Credentials: o.Credentials, + CreatedAt: o.CreatedAt, }, nil } @@ -667,6 +709,10 @@ type Enterprise struct { WebhookSecret string `json:"-"` } +func (e Enterprise) GetCreatedAt() time.Time { + return e.CreatedAt +} + func (e Enterprise) GetEntity() (GithubEntity, error) { if e.ID == "" { return GithubEntity{}, fmt.Errorf("enterprise has no ID") @@ -678,6 +724,7 @@ func (e Enterprise) GetEntity() (GithubEntity, error) { WebhookSecret: e.WebhookSecret, PoolBalancerType: e.PoolBalancerType, Credentials: e.Credentials, + CreatedAt: e.CreatedAt, }, nil } @@ -772,6 +819,24 @@ func (c *ControllerInfo) JobBackoff() time.Duration { return time.Duration(int64(c.MinimumJobAgeBackoff)) } +type GithubRateLimit struct { + Limit int `json:"limit,omitempty"` + Used int `json:"used,omitempty"` + Remaining int `json:"remaining,omitempty"` + Reset int64 `json:"reset,omitempty"` +} + +func (g GithubRateLimit) ResetIn() time.Duration { + return time.Until(g.ResetAt()) +} + +func (g GithubRateLimit) ResetAt() time.Time { + if g.Reset == 0 { + return time.Time{} + } + return time.Unix(g.Reset, 0) +} + type GithubCredentials struct { ID uint `json:"id,omitempty"` Name string `json:"name,omitempty"` @@ -782,17 +847,22 @@ type GithubCredentials struct { CABundle []byte `json:"ca_bundle,omitempty"` AuthType GithubAuthType `json:"auth-type,omitempty"` - Repositories []Repository `json:"repositories,omitempty"` - Organizations []Organization `json:"organizations,omitempty"` - Enterprises []Enterprise `json:"enterprises,omitempty"` - Endpoint GithubEndpoint `json:"endpoint,omitempty"` - CreatedAt time.Time `json:"created_at,omitempty"` - UpdatedAt time.Time `json:"updated_at,omitempty"` + Repositories []Repository `json:"repositories,omitempty"` + Organizations []Organization `json:"organizations,omitempty"` + Enterprises []Enterprise `json:"enterprises,omitempty"` + Endpoint GithubEndpoint `json:"endpoint,omitempty"` + CreatedAt time.Time `json:"created_at,omitempty"` + UpdatedAt time.Time `json:"updated_at,omitempty"` + RateLimit GithubRateLimit `json:"rate_limit,omitempty"` // Do not serialize sensitive info. CredentialsPayload []byte `json:"-"` } +func (g GithubCredentials) GetID() uint { + return g.ID +} + func (g GithubCredentials) GetHTTPClient(ctx context.Context) (*http.Client, error) { var roots *x509.CertPool if g.CABundle != nil { @@ -994,11 +1064,16 @@ type GithubEntity struct { EntityType GithubEntityType `json:"entity_type,omitempty"` Credentials GithubCredentials `json:"credentials,omitempty"` PoolBalancerType PoolBalancerType `json:"pool_balancing_type,omitempty"` + CreatedAt time.Time `json:"created_at,omitempty"` WebhookSecret string `json:"-"` } -func (g *GithubEntity) GithubURL() string { +func (g GithubEntity) GetCreatedAt() time.Time { + return g.CreatedAt +} + +func (g GithubEntity) GithubURL() string { switch g.EntityType { case GithubEntityTypeRepository: return fmt.Sprintf("%s/%s/%s", g.Credentials.BaseURL, g.Owner, g.Name) diff --git a/runner/github_credentials.go b/runner/github_credentials.go index fbf9d330..7cd4e74c 100644 --- a/runner/github_credentials.go +++ b/runner/github_credentials.go @@ -7,6 +7,7 @@ import ( runnerErrors "github.com/cloudbase/garm-provider-common/errors" "github.com/cloudbase/garm/auth" + "github.com/cloudbase/garm/cache" "github.com/cloudbase/garm/params" ) @@ -15,11 +16,24 @@ func (r *Runner) ListCredentials(ctx context.Context) ([]params.GithubCredential return nil, runnerErrors.ErrUnauthorized } + // Get the credentials from the store. The cache is always updated after the database successfully + // commits the transaction that created/updated the credentials. + // If we create a set of credentials then immediately after we call ListCredentials, + // there is a posibillity that not all creds will be in the cache. creds, err := r.store.ListGithubCredentials(ctx) if err != nil { return nil, errors.Wrap(err, "fetching github credentials") } + // If we do have cache, update the rate limit for each credential. The rate limits are queried + // every 30 seconds and set in cache. + credsCache := cache.GetAllGithubCredentialsAsMap() + for idx, cred := range creds { + inCache, ok := credsCache[cred.ID] + if ok { + creds[idx].RateLimit = inCache.RateLimit + } + } return creds, nil } @@ -50,6 +64,11 @@ func (r *Runner) GetGithubCredentials(ctx context.Context, id uint) (params.Gith return params.GithubCredentials{}, errors.Wrap(err, "failed to get github credentials") } + cached, ok := cache.GetGithubCredentials((creds.ID)) + if ok { + creds.RateLimit = cached.RateLimit + } + return creds, nil } diff --git a/workers/cache/cache.go b/workers/cache/cache.go index d19bbbaf..315876d6 100644 --- a/workers/cache/cache.go +++ b/workers/cache/cache.go @@ -5,12 +5,14 @@ import ( "fmt" "log/slog" "sync" + "time" "github.com/cloudbase/garm/cache" "github.com/cloudbase/garm/database/common" "github.com/cloudbase/garm/database/watcher" "github.com/cloudbase/garm/params" garmUtil "github.com/cloudbase/garm/util" + "github.com/cloudbase/garm/util/github" ) func NewWorker(ctx context.Context, store common.Store) *Worker { @@ -47,23 +49,23 @@ func (w *Worker) setCacheForEntity(entityGetter params.EntityGetter, pools []par return fmt.Errorf("getting entity: %w", err) } cache.SetEntity(entity) - var repoPools []params.Pool - var repoScaleSets []params.ScaleSet + var entityPools []params.Pool + var entityScaleSets []params.ScaleSet for _, pool := range pools { - if pool.RepoID == entity.ID { - repoPools = append(repoPools, pool) + if pool.BelongsTo(entity) { + entityPools = append(entityPools, pool) } } for _, scaleSet := range scaleSets { - if scaleSet.RepoID == entity.ID { - repoScaleSets = append(repoScaleSets, scaleSet) + if scaleSet.BelongsTo(entity) { + entityScaleSets = append(entityScaleSets, scaleSet) } } - cache.ReplaceEntityPools(entity.ID, repoPools) - cache.ReplaceEntityScaleSets(entity.ID, repoScaleSets) + cache.ReplaceEntityPools(entity.ID, entityPools) + cache.ReplaceEntityScaleSets(entity.ID, entityScaleSets) return nil } @@ -178,6 +180,7 @@ func (w *Worker) Start() error { w.quit = make(chan struct{}) go w.loop() + go w.rateLimitLoop() return nil } @@ -379,3 +382,44 @@ func (w *Worker) loop() { } } } + +func (w *Worker) rateLimitLoop() { + defer w.Stop() + + ticker := time.NewTicker(30 * time.Second) + defer ticker.Stop() + + for { + select { + case <-w.quit: + return + case <-w.ctx.Done(): + slog.DebugContext(w.ctx, "context done") + return + case <-ticker.C: + // update credentials rate limits + for _, creds := range cache.GetAllGithubCredentials() { + rateCli, err := github.NewRateLimitClient(w.ctx, creds) + if err != nil { + slog.With(slog.Any("error", err)).ErrorContext(w.ctx, "failed to create rate limit client") + continue + } + rateLimit, err := rateCli.RateLimit(w.ctx) + if err != nil { + slog.With(slog.Any("error", err)).ErrorContext(w.ctx, "failed to get rate limit") + continue + } + if rateLimit != nil { + core := rateLimit.GetCore() + limit := params.GithubRateLimit{ + Limit: core.Limit, + Used: core.Used, + Remaining: core.Remaining, + Reset: core.Reset.Unix(), + } + cache.SetCredentialsRateLimit(creds.ID, limit) + } + } + } + } +} diff --git a/workers/scaleset/controller.go b/workers/scaleset/controller.go index 5d00471f..3b4287c2 100644 --- a/workers/scaleset/controller.go +++ b/workers/scaleset/controller.go @@ -233,8 +233,8 @@ func (c *Controller) waitForErrorGroupOrContextCancelled(g *errgroup.Group) erro func (c *Controller) loop() { defer c.Stop() - consilidateTicker := time.NewTicker(common.PoolReapTimeoutInterval) - defer consilidateTicker.Stop() + consolidateTicker := time.NewTicker(common.PoolReapTimeoutInterval) + defer consolidateTicker.Stop() for { select { @@ -244,10 +244,10 @@ func (c *Controller) loop() { return } slog.InfoContext(c.ctx, "received payload") - go c.handleWatcherEvent(payload) + c.handleWatcherEvent(payload) case <-c.ctx.Done(): return - case _, ok := <-consilidateTicker.C: + case _, ok := <-consolidateTicker.C: if !ok { slog.InfoContext(c.ctx, "consolidate ticker closed") return