Add tools update routine and cleanup logging

This change adds an update routine in the cache worker, for github tools
downloads.

Signed-off-by: Gabriel Adrian Samfira <gsamfira@cloudbasesolutions.com>
This commit is contained in:
Gabriel Adrian Samfira 2025-05-07 23:01:22 +00:00
parent ffbb3b8d41
commit 52007f4ffa
10 changed files with 320 additions and 86 deletions

10
cache/cache_test.go vendored
View file

@ -55,7 +55,7 @@ func (c *CacheTestSuite) TestSetCacheWorks() {
c.Require().Len(githubToolsCache.entities, 0)
SetGithubToolsCache(c.entity, tools)
c.Require().Len(githubToolsCache.entities, 1)
cachedTools, ok := GetGithubToolsCache(c.entity)
cachedTools, ok := GetGithubToolsCache(c.entity.ID)
c.Require().True(ok)
c.Require().Len(cachedTools, 1)
c.Require().Equal(tools[0].GetDownloadURL(), cachedTools[0].GetDownloadURL())
@ -72,11 +72,11 @@ func (c *CacheTestSuite) TestTimedOutToolsCache() {
c.Require().Len(githubToolsCache.entities, 0)
SetGithubToolsCache(c.entity, tools)
c.Require().Len(githubToolsCache.entities, 1)
entity := githubToolsCache.entities[c.entity.String()]
entity := githubToolsCache.entities[c.entity.ID]
entity.updatedAt = entity.updatedAt.Add(-2 * time.Hour)
githubToolsCache.entities[c.entity.String()] = entity
githubToolsCache.entities[c.entity.ID] = entity
cachedTools, ok := GetGithubToolsCache(c.entity)
cachedTools, ok := GetGithubToolsCache(c.entity.ID)
c.Require().False(ok)
c.Require().Nil(cachedTools)
}
@ -84,7 +84,7 @@ func (c *CacheTestSuite) TestTimedOutToolsCache() {
func (c *CacheTestSuite) TestGetInexistentCache() {
c.Require().NotNil(githubToolsCache)
c.Require().Len(githubToolsCache.entities, 0)
cachedTools, ok := GetGithubToolsCache(c.entity)
cachedTools, ok := GetGithubToolsCache(c.entity.ID)
c.Require().False(ok)
c.Require().Nil(cachedTools)
}

View file

@ -26,6 +26,7 @@ func (g *GithubCredentials) SetCredentials(credentials params.GithubCredentials)
defer g.mux.Unlock()
g.cache[credentials.ID] = credentials
UpdateCredentialsInAffectedEntities(credentials)
}
func (g *GithubCredentials) GetCredentials(id uint) (params.GithubCredentials, bool) {

60
cache/entity_cache.go vendored
View file

@ -1,7 +1,6 @@
package cache
import (
"log/slog"
"sync"
"github.com/cloudbase/garm/params"
@ -28,15 +27,24 @@ type EntityCache struct {
entities map[string]EntityItem
}
func (e *EntityCache) UpdateCredentialsInAffectedEntities(creds params.GithubCredentials) {
e.mux.Lock()
defer e.mux.Unlock()
for entityID, cache := range e.entities {
if cache.Entity.Credentials.ID == creds.ID {
cache.Entity.Credentials = creds
e.entities[entityID] = cache
}
}
}
func (e *EntityCache) GetEntity(entityID string) (params.GithubEntity, bool) {
e.mux.Lock()
defer e.mux.Unlock()
if cache, ok := e.entities[entityID]; ok {
// Updating specific credential details will not update entity cache which
// uses those credentials.
// Entity credentials in the cache are only updated if you swap the creds
// on the entity. We get the updated credentials from the credentials cache.
// Get the credentials from the credentials cache.
creds, ok := GetGithubCredentials(cache.Entity.Credentials.ID)
if ok {
cache.Entity.Credentials = creds
@ -173,7 +181,6 @@ func (e *EntityCache) FindPoolsMatchingAllTags(entityID string, tags []string) [
if cache, ok := e.entities[entityID]; ok {
var pools []params.Pool
slog.Debug("Finding pools matching all tags", "entityID", entityID, "tags", tags, "pools", cache.Pools)
for _, pool := range cache.Pools {
if pool.HasRequiredLabels(tags) {
pools = append(pools, pool)
@ -212,6 +219,35 @@ func (e *EntityCache) GetEntityScaleSets(entityID string) []params.ScaleSet {
return nil
}
func (e *EntityCache) GetEntitiesUsingGredentials(credsID uint) []params.GithubEntity {
e.mux.Lock()
defer e.mux.Unlock()
var entities []params.GithubEntity
for _, cache := range e.entities {
if cache.Entity.Credentials.ID == credsID {
entities = append(entities, cache.Entity)
}
}
return entities
}
func (e *EntityCache) GetAllEntities() []params.GithubEntity {
e.mux.Lock()
defer e.mux.Unlock()
var entities []params.GithubEntity
for _, cache := range e.entities {
// Get the credentials from the credentials cache.
creds, ok := GetGithubCredentials(cache.Entity.Credentials.ID)
if ok {
cache.Entity.Credentials = creds
}
entities = append(entities, cache.Entity)
}
return entities
}
func GetEntity(entityID string) (params.GithubEntity, bool) {
return entityCache.GetEntity(entityID)
}
@ -267,3 +303,15 @@ func GetEntityPools(entityID string) []params.Pool {
func GetEntityScaleSets(entityID string) []params.ScaleSet {
return entityCache.GetEntityScaleSets(entityID)
}
func UpdateCredentialsInAffectedEntities(creds params.GithubCredentials) {
entityCache.UpdateCredentialsInAffectedEntities(creds)
}
func GetEntitiesUsingGredentials(credsID uint) []params.GithubEntity {
return entityCache.GetEntitiesUsingGredentials(credsID)
}
func GetAllEntities() []params.GithubEntity {
return entityCache.GetAllEntities()
}

12
cache/tools_cache.go vendored
View file

@ -29,14 +29,14 @@ type GithubToolsCache struct {
entities map[string]GithubEntityTools
}
func (g *GithubToolsCache) Get(entity params.GithubEntity) ([]commonParams.RunnerApplicationDownload, bool) {
func (g *GithubToolsCache) Get(entityID string) ([]commonParams.RunnerApplicationDownload, bool) {
g.mux.Lock()
defer g.mux.Unlock()
if cache, ok := g.entities[entity.String()]; ok {
if cache, ok := g.entities[entityID]; ok {
if time.Since(cache.updatedAt) > 1*time.Hour {
// Stale cache, remove it.
delete(g.entities, entity.String())
delete(g.entities, entityID)
return nil, false
}
return cache.tools, true
@ -48,7 +48,7 @@ func (g *GithubToolsCache) Set(entity params.GithubEntity, tools []commonParams.
g.mux.Lock()
defer g.mux.Unlock()
g.entities[entity.String()] = GithubEntityTools{
g.entities[entity.ID] = GithubEntityTools{
updatedAt: time.Now(),
entity: entity,
tools: tools,
@ -59,6 +59,6 @@ func SetGithubToolsCache(entity params.GithubEntity, tools []commonParams.Runner
githubToolsCache.Set(entity, tools)
}
func GetGithubToolsCache(entity params.GithubEntity) ([]commonParams.RunnerApplicationDownload, bool) {
return githubToolsCache.Get(entity)
func GetGithubToolsCache(entityID string) ([]commonParams.RunnerApplicationDownload, bool) {
return githubToolsCache.Get(entityID)
}

View file

@ -20,10 +20,11 @@ func NewWorker(ctx context.Context, store common.Store) *Worker {
slog.Any("worker", consumerID))
return &Worker{
ctx: ctx,
store: store,
consumerID: consumerID,
quit: make(chan struct{}),
ctx: ctx,
store: store,
consumerID: consumerID,
toolsWorkes: make(map[string]*toolsUpdater),
quit: make(chan struct{}),
}
}
@ -31,8 +32,9 @@ type Worker struct {
ctx context.Context
consumerID string
consumer common.Consumer
store common.Store
consumer common.Consumer
store common.Store
toolsWorkes map[string]*toolsUpdater
mux sync.Mutex
running bool
@ -110,6 +112,13 @@ func (w *Worker) loadAllEntities() error {
}
}
for _, entity := range cache.GetAllEntities() {
worker := newToolsUpdater(w.ctx, entity)
if err := worker.Start(); err != nil {
return fmt.Errorf("starting tools updater: %w", err)
}
w.toolsWorkes[entity.ID] = worker
}
return nil
}
@ -181,6 +190,11 @@ func (w *Worker) Stop() error {
return nil
}
for _, worker := range w.toolsWorkes {
if err := worker.Stop(); err != nil {
slog.ErrorContext(w.ctx, "stopping tools updater", "error", err)
}
}
w.consumer.Close()
w.running = false
close(w.quit)
@ -195,9 +209,31 @@ func (w *Worker) handleEntityEvent(entityGetter params.EntityGetter, op common.O
}
switch op {
case common.CreateOperation, common.UpdateOperation:
old, hasOld := cache.GetEntity(entity.ID)
cache.SetEntity(entity)
worker, ok := w.toolsWorkes[entity.ID]
if !ok {
worker = newToolsUpdater(w.ctx, entity)
if err := worker.Start(); err != nil {
slog.ErrorContext(w.ctx, "starting tools updater", "error", err)
return
}
w.toolsWorkes[entity.ID] = worker
} else if hasOld {
// probably an update operation
if old.Credentials.ID != entity.Credentials.ID {
worker.Reset()
}
}
case common.DeleteOperation:
cache.DeleteEntity(entity.ID)
worker, ok := w.toolsWorkes[entity.ID]
if ok {
if err := worker.Stop(); err != nil {
slog.ErrorContext(w.ctx, "stopping tools updater", "error", err)
}
delete(w.toolsWorkes, entity.ID)
}
}
}
@ -291,13 +327,20 @@ func (w *Worker) handleCredentialsEvent(event common.ChangePayload) {
switch event.Operation {
case common.CreateOperation, common.UpdateOperation:
cache.SetGithubCredentials(credentials)
entities := cache.GetEntitiesUsingGredentials(credentials.ID)
for _, entity := range entities {
worker, ok := w.toolsWorkes[entity.ID]
if ok {
worker.Reset()
}
}
case common.DeleteOperation:
cache.DeleteGithubCredentials(credentials.ID)
}
}
func (w *Worker) handleEvent(event common.ChangePayload) {
slog.DebugContext(w.ctx, "handling event", "event", event)
slog.DebugContext(w.ctx, "handling event", "event_entity_type", event.EntityType, "event_operation", event.Operation)
switch event.EntityType {
case common.PoolEntityType:
w.handlePoolEvent(event)

170
workers/cache/tool_cache.go vendored Normal file
View file

@ -0,0 +1,170 @@
package cache
import (
"context"
"crypto/rand"
"fmt"
"log/slog"
"math/big"
"sync"
"time"
commonParams "github.com/cloudbase/garm-provider-common/params"
"github.com/cloudbase/garm/cache"
"github.com/cloudbase/garm/params"
garmUtil "github.com/cloudbase/garm/util"
"github.com/cloudbase/garm/util/github"
)
func newToolsUpdater(ctx context.Context, entity params.GithubEntity) *toolsUpdater {
return &toolsUpdater{
ctx: ctx,
entity: entity,
quit: make(chan struct{}),
}
}
type toolsUpdater struct {
ctx context.Context
entity params.GithubEntity
tools []commonParams.RunnerApplicationDownload
lastUpdate time.Time
mux sync.Mutex
running bool
quit chan struct{}
reset chan struct{}
}
func (t *toolsUpdater) Start() error {
t.mux.Lock()
defer t.mux.Unlock()
if t.running {
return nil
}
t.running = true
t.quit = make(chan struct{})
go t.loop()
return nil
}
func (t *toolsUpdater) Stop() error {
t.mux.Lock()
defer t.mux.Unlock()
if !t.running {
return nil
}
t.running = false
close(t.quit)
return nil
}
func (t *toolsUpdater) updateTools() error {
slog.DebugContext(t.ctx, "updating tools", "entity", t.entity.String())
entity, ok := cache.GetEntity(t.entity.ID)
if !ok {
return fmt.Errorf("getting entity from cache: %s", t.entity.ID)
}
ghCli, err := github.Client(t.ctx, entity)
if err != nil {
return fmt.Errorf("getting github client: %w", err)
}
tools, err := garmUtil.FetchTools(t.ctx, ghCli)
if err != nil {
return fmt.Errorf("fetching tools: %w", err)
}
t.lastUpdate = time.Now().UTC()
t.tools = tools
slog.DebugContext(t.ctx, "updating tools cache", "entity", t.entity.String())
cache.SetGithubToolsCache(entity, tools)
return nil
}
func (t *toolsUpdater) Reset() {
t.mux.Lock()
defer t.mux.Unlock()
if !t.running {
return
}
if t.reset != nil {
close(t.reset)
t.reset = nil
}
}
func (t *toolsUpdater) loop() {
defer t.Stop()
// add some jitter. When spinning up multiple entities, we add
// jitter to prevent stampeeding herd.
randInt, err := rand.Int(rand.Reader, big.NewInt(3000))
if err != nil {
randInt = big.NewInt(0)
}
time.Sleep(time.Duration(randInt.Int64()) * time.Millisecond)
var resetTime time.Time
now := time.Now().UTC()
if now.After(t.lastUpdate.Add(40 * time.Minute)) {
if err := t.updateTools(); err != nil {
slog.ErrorContext(t.ctx, "initial tools update error", "error", err)
resetTime = now.Add(5 * time.Minute)
slog.ErrorContext(t.ctx, "initial tools update error", "error", err)
} else {
// Tools are usually valid for 1 hour.
resetTime = t.lastUpdate.Add(40 * time.Minute)
}
}
for {
if t.reset == nil {
t.reset = make(chan struct{})
}
// add some jitter
randInt, err := rand.Int(rand.Reader, big.NewInt(300))
if err != nil {
randInt = big.NewInt(0)
}
timer := time.NewTimer(resetTime.Sub(now) + time.Duration(randInt.Int64())*time.Second)
select {
case <-t.quit:
slog.DebugContext(t.ctx, "stopping tools updater")
timer.Stop()
return
case <-timer.C:
slog.DebugContext(t.ctx, "updating tools")
now = time.Now().UTC()
if err := t.updateTools(); err == nil {
slog.ErrorContext(t.ctx, "updating tools", "error", err)
resetTime = now.Add(5 * time.Minute)
} else {
// Tools are usually valid for 1 hour.
resetTime = t.lastUpdate.Add(40 * time.Minute)
}
case <-t.reset:
slog.DebugContext(t.ctx, "resetting tools updater")
timer.Stop()
now = time.Now().UTC()
if err := t.updateTools(); err != nil {
slog.ErrorContext(t.ctx, "updating tools", "error", err)
resetTime = now.Add(5 * time.Minute)
} else {
// Tools are usually valid for 1 hour.
resetTime = t.lastUpdate.Add(40 * time.Minute)
}
}
timer.Stop()
}
}

View file

@ -148,7 +148,7 @@ func (i *instanceManager) handleCreateInstanceInProvider(instance params.Instanc
if err != nil {
return fmt.Errorf("creating instance token: %w", err)
}
tools, ok := cache.GetGithubToolsCache(entity)
tools, ok := cache.GetGithubToolsCache(entity.ID)
if !ok {
return fmt.Errorf("tools not found in cache for entity %s", entity.String())
}

View file

@ -2,7 +2,6 @@ package scaleset
import (
"context"
"errors"
"fmt"
"log/slog"
"sync"
@ -10,8 +9,6 @@ import (
"golang.org/x/sync/errgroup"
runnerErrors "github.com/cloudbase/garm-provider-common/errors"
"github.com/cloudbase/garm/cache"
dbCommon "github.com/cloudbase/garm/database/common"
"github.com/cloudbase/garm/database/watcher"
"github.com/cloudbase/garm/params"
@ -76,8 +73,7 @@ type Controller struct {
store dbCommon.Store
providers map[string]common.Provider
ghCli common.GithubClient
forgeCredsAreValid bool
ghCli common.GithubClient
mux sync.Mutex
running bool
@ -163,29 +159,6 @@ func (c *Controller) Stop() error {
return nil
}
func (c *Controller) updateTools() error {
c.mux.Lock()
defer c.mux.Unlock()
slog.DebugContext(c.ctx, "updating tools for entity", "entity", c.Entity.String())
tools, err := garmUtil.FetchTools(c.ctx, c.ghCli)
if err != nil {
slog.With(slog.Any("error", err)).ErrorContext(
c.ctx, "failed to update tools for entity", "entity", c.Entity.String())
if errors.Is(err, runnerErrors.ErrUnauthorized) {
// nolint:golangci-lint,godox
// TODO: block all scale sets
c.forgeCredsAreValid = false
}
return fmt.Errorf("failed to update tools for entity %s: %w", c.Entity.String(), err)
}
slog.DebugContext(c.ctx, "tools successfully updated for entity", "entity", c.Entity.String())
c.forgeCredsAreValid = true
cache.SetGithubToolsCache(c.Entity, tools)
return nil
}
// consolidateRunnerState will list all runners on GitHub for this entity, sort by
// pool or scale set and pass those runners to the appropriate worker. The worker will
// then have the responsibility to cross check the runners from github with what it
@ -259,23 +232,10 @@ func (c *Controller) waitForErrorGroupOrContextCancelled(g *errgroup.Group) erro
func (c *Controller) loop() {
defer c.Stop()
updateToolsTicker := time.NewTicker(common.PoolToolUpdateInterval)
defer updateToolsTicker.Stop()
consilidateTicker := time.NewTicker(common.PoolReapTimeoutInterval)
defer consilidateTicker.Stop()
initialToolUpdate := make(chan struct{}, 1)
defer close(initialToolUpdate)
go func() {
slog.InfoContext(c.ctx, "running initial tool update")
if err := c.updateTools(); err != nil {
slog.With(slog.Any("error", err)).Error("failed to update tools")
}
initialToolUpdate <- struct{}{}
}()
for {
select {
case payload, ok := <-c.consumer.Watch():
@ -287,25 +247,6 @@ func (c *Controller) loop() {
go c.handleWatcherEvent(payload)
case <-c.ctx.Done():
return
case <-initialToolUpdate:
case _, ok := <-updateToolsTicker.C:
if !ok {
slog.InfoContext(c.ctx, "update tools ticker closed")
return
}
validCreds := c.forgeCredsAreValid
if err := c.updateTools(); err != nil {
if err := c.store.AddEntityEvent(c.ctx, c.Entity, params.StatusEvent, params.EventError, fmt.Sprintf("failed to update tools: %q", err.Error()), 30); err != nil {
slog.With(slog.Any("error", err)).Error("failed to add entity event")
}
slog.With(slog.Any("error", err)).Error("failed to update tools")
continue
}
if validCreds != c.forgeCredsAreValid && c.forgeCredsAreValid {
if err := c.store.AddEntityEvent(c.ctx, c.Entity, params.StatusEvent, params.EventInfo, "tools updated successfully", 30); err != nil {
slog.With(slog.Any("error", err)).Error("failed to add entity event")
}
}
case _, ok := <-consilidateTicker.C:
if !ok {
slog.InfoContext(c.ctx, "consolidate ticker closed")

View file

@ -11,6 +11,7 @@ import (
runnerErrors "github.com/cloudbase/garm-provider-common/errors"
commonParams "github.com/cloudbase/garm-provider-common/params"
"github.com/cloudbase/garm-provider-common/util"
"github.com/cloudbase/garm/cache"
dbCommon "github.com/cloudbase/garm/database/common"
"github.com/cloudbase/garm/database/watcher"
"github.com/cloudbase/garm/locking"
@ -769,6 +770,24 @@ func (w *Worker) handleScaleUp(target, current uint) {
}
}
func (w *Worker) waitForToolsOrCancel() (hasTools, stopped bool) {
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
select {
case <-ticker.C:
entity, err := w.scaleSet.GetEntity()
if err != nil {
slog.ErrorContext(w.ctx, "error getting entity", "error", err)
}
_, ok := cache.GetGithubToolsCache(entity.ID)
return ok, false
case <-w.quit:
return false, true
case <-w.ctx.Done():
return false, true
}
}
func (w *Worker) handleScaleDown(target, current uint) {
delta := current - target
if delta <= 0 {
@ -880,7 +899,19 @@ func (w *Worker) handleAutoScale() {
lastMsg = msg
}
}
for {
hasTools, stopped := w.waitForToolsOrCancel()
if stopped {
slog.DebugContext(w.ctx, "worker is stopped; exiting handleAutoScale")
return
}
if !hasTools {
time.Sleep(1 * time.Second)
continue
}
select {
case <-w.quit:
return

View file

@ -109,7 +109,7 @@ func (l *scaleSetListener) handleSessionMessage(msg params.RunnerScaleSetMessage
if err != nil {
slog.ErrorContext(l.ctx, "getting jobs from body", "error", err)
}
slog.InfoContext(l.ctx, "handling message", "message", msg, "body", body)
if msg.MessageID < l.lastMessageID {
slog.DebugContext(l.ctx, "message is older than last message, ignoring")
return