Add instance cache

Signed-off-by: Gabriel Adrian Samfira <gsamfira@cloudbasesolutions.com>
This commit is contained in:
Gabriel Adrian Samfira 2025-05-05 23:34:53 +00:00
parent 9f640965e2
commit 2f3c74562e
6 changed files with 171 additions and 46 deletions

10
cache/entity_cache.go vendored
View file

@ -49,6 +49,16 @@ func (e *EntityCache) SetEntity(entity params.GithubEntity) {
e.mux.Lock()
defer e.mux.Unlock()
_, ok := e.entities[entity.ID]
if !ok {
e.entities[entity.ID] = EntityItem{
Entity: entity,
Pools: make(map[string]params.Pool),
ScaleSets: make(map[uint]params.ScaleSet),
}
return
}
e.entities[entity.ID] = EntityItem{
Entity: entity,
}

107
cache/instance_cache.go vendored Normal file
View file

@ -0,0 +1,107 @@
package cache
import (
"sync"
"github.com/cloudbase/garm/params"
)
var instanceCache *InstanceCache
func init() {
cache := &InstanceCache{
cache: make(map[string]params.Instance),
}
instanceCache = cache
}
type InstanceCache struct {
mux sync.Mutex
cache map[string]params.Instance
}
func (i *InstanceCache) SetInstance(instance params.Instance) {
i.mux.Lock()
defer i.mux.Unlock()
i.cache[instance.ID] = instance
}
func (i *InstanceCache) GetInstance(id string) (params.Instance, bool) {
i.mux.Lock()
defer i.mux.Unlock()
if instance, ok := i.cache[id]; ok {
return instance, true
}
return params.Instance{}, false
}
func (i *InstanceCache) DeleteInstance(id string) {
i.mux.Lock()
defer i.mux.Unlock()
delete(i.cache, id)
}
func (i *InstanceCache) GetAllInstances() []params.Instance {
i.mux.Lock()
defer i.mux.Unlock()
instances := make([]params.Instance, 0, len(i.cache))
for _, instance := range i.cache {
instances = append(instances, instance)
}
return instances
}
func (i *InstanceCache) GetInstancesForPool(poolID string) []params.Instance {
i.mux.Lock()
defer i.mux.Unlock()
var filteredInstances []params.Instance
for _, instance := range i.cache {
if instance.PoolID == poolID {
filteredInstances = append(filteredInstances, instance)
}
}
return filteredInstances
}
func (i *InstanceCache) GetInstancesForScaleSet(scaleSetID uint) []params.Instance {
i.mux.Lock()
defer i.mux.Unlock()
var filteredInstances []params.Instance
for _, instance := range i.cache {
if instance.ScaleSetID == scaleSetID {
filteredInstances = append(filteredInstances, instance)
}
}
return filteredInstances
}
func SetInstanceCache(instance params.Instance) {
instanceCache.SetInstance(instance)
}
func GetInstanceCache(id string) (params.Instance, bool) {
return instanceCache.GetInstance(id)
}
func DeleteInstanceCache(id string) {
instanceCache.DeleteInstance(id)
}
func GetAllInstancesCache() []params.Instance {
return instanceCache.GetAllInstances()
}
func GetInstancesForPool(poolID string) []params.Instance {
return instanceCache.GetInstancesForPool(poolID)
}
func GetInstancesForScaleSet(scaleSetID uint) []params.Instance {
return instanceCache.GetInstancesForScaleSet(scaleSetID)
}

View file

@ -226,12 +226,6 @@ func (r *basePoolManager) HandleWorkflowJob(job params.WorkflowJob) error {
case "completed":
jobParams, err = r.paramsWorkflowJobToParamsJob(job)
if err != nil {
if errors.Is(err, runnerErrors.ErrNotFound) {
// Unassigned jobs will have an empty runner_name.
// We also need to ignore not found errors, as we may get a webhook regarding
// a workflow that is handled by a runner at a different hierarchy level.
return nil
}
return errors.Wrap(err, "converting job to params")
}

View file

@ -18,7 +18,7 @@ import (
)
func newInstanceManager(ctx context.Context, instance params.Instance, scaleSet params.ScaleSet, provider common.Provider, helper providerHelper) (*instanceManager, error) {
ctx = garmUtil.WithSlogContext(ctx, slog.Any("instance", instance.Name))
ctx = garmUtil.WithSlogContext(ctx, slog.Any("worker", fmt.Sprintf("instance-worker-%s", instance.Name)))
githubEntity, err := scaleSet.GetEntity()
if err != nil {
@ -66,25 +66,17 @@ func (i *instanceManager) Start() error {
i.mux.Lock()
defer i.mux.Unlock()
slog.DebugContext(i.ctx, "starting instance manager", "instance", i.instance.Name)
if i.running {
return nil
}
// switch i.instance.Status {
// case commonParams.InstancePendingCreate,
// commonParams.InstancePendingDelete,
// commonParams.InstancePendingForceDelete:
// if err := i.consolidateState(); err != nil {
// return fmt.Errorf("consolidating state: %w", err)
// }
// case commonParams.InstanceDeleted:
// return ErrInstanceDeleted
// }
i.running = true
i.quit = make(chan struct{})
i.updates = make(chan dbCommon.ChangePayload)
go i.loop()
go i.updatesLoop()
return nil
}
@ -106,6 +98,7 @@ func (i *instanceManager) sleepForBackOffOrCanceled() bool {
timer := time.NewTimer(i.deleteBackoff)
defer timer.Stop()
slog.DebugContext(i.ctx, "sleeping for backoff", "duration", i.deleteBackoff, "instance", i.instance.Name)
select {
case <-timer.C:
return false
@ -274,6 +267,7 @@ func (i *instanceManager) handleDeleteInstanceInProvider(instance params.Instanc
func (i *instanceManager) consolidateState() error {
i.mux.Lock()
defer i.mux.Unlock()
if !i.running {
return nil
}
@ -347,9 +341,6 @@ func (i *instanceManager) handleUpdate(update dbCommon.ChangePayload) error {
// We need a better way to handle instance state. Database updates may fail, and we
// end up with an inconsistent state between what we know about the instance and what
// is reflected in the database.
i.mux.Lock()
defer i.mux.Unlock()
if !i.running {
return nil
}
@ -359,25 +350,23 @@ func (i *instanceManager) handleUpdate(update dbCommon.ChangePayload) error {
return runnerErrors.NewBadRequestError("invalid payload type")
}
i.instance = instance
if i.instance.Status == instance.Status {
// Nothing of interest happened.
switch instance.Status {
case commonParams.InstanceDeleting, commonParams.InstanceCreating:
return nil
}
i.instance = instance
return nil
}
func (i *instanceManager) Update(instance dbCommon.ChangePayload) error {
i.mux.Lock()
defer i.mux.Unlock()
if !i.running {
return runnerErrors.NewBadRequestError("instance manager is not running")
}
timer := time.NewTimer(60 * time.Second)
timer := time.NewTimer(10 * time.Second)
defer timer.Stop()
slog.DebugContext(i.ctx, "sending update to instance manager")
select {
case i.updates <- instance:
case <-i.quit:
@ -390,6 +379,33 @@ func (i *instanceManager) Update(instance dbCommon.ChangePayload) error {
return nil
}
func (i *instanceManager) updatesLoop() {
defer i.Stop()
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for {
select {
case <-i.quit:
return
case <-i.ctx.Done():
return
case update, ok := <-i.updates:
if !ok {
slog.InfoContext(i.ctx, "updates channel closed")
return
}
slog.DebugContext(i.ctx, "received update")
if err := i.handleUpdate(update); err != nil {
if errors.Is(err, ErrInstanceDeleted) {
// instance had been deleted, we can exit the loop.
return
}
slog.ErrorContext(i.ctx, "handling update", "error", err)
}
}
}
}
func (i *instanceManager) loop() {
defer i.Stop()
ticker := time.NewTicker(5 * time.Second)
@ -408,17 +424,6 @@ func (i *instanceManager) loop() {
}
slog.ErrorContext(i.ctx, "consolidating state", "error", err)
}
case update, ok := <-i.updates:
if !ok {
return
}
if err := i.handleUpdate(update); err != nil {
if errors.Is(err, ErrInstanceDeleted) {
// instance had been deleted, we can exit the loop.
return
}
slog.ErrorContext(i.ctx, "handling update", "error", err)
}
}
}
}

View file

@ -8,16 +8,23 @@ import (
commonParams "github.com/cloudbase/garm-provider-common/params"
"github.com/cloudbase/garm/auth"
"github.com/cloudbase/garm/cache"
dbCommon "github.com/cloudbase/garm/database/common"
"github.com/cloudbase/garm/database/watcher"
"github.com/cloudbase/garm/params"
"github.com/cloudbase/garm/runner/common"
garmUtil "github.com/cloudbase/garm/util"
)
func NewWorker(ctx context.Context, store dbCommon.Store, providers map[string]common.Provider, tokenGetter auth.InstanceTokenGetter) (*Provider, error) {
consumerID := "provider-worker"
ctx = garmUtil.WithSlogContext(
ctx,
slog.Any("worker", consumerID))
return &Provider{
ctx: context.Background(),
ctx: ctx,
store: store,
consumerID: consumerID,
providers: providers,
@ -74,6 +81,7 @@ func (p *Provider) loadAllRunners() error {
}
for _, runner := range runners {
cache.SetInstanceCache(runner)
// Skip non scale set instances for now. This condition needs to be
// removed once we replace the current pool manager.
if runner.ScaleSetID == 0 {
@ -246,29 +254,34 @@ func (p *Provider) handleInstanceEvent(event dbCommon.ChangePayload) {
return
}
slog.DebugContext(p.ctx, "handling instance event", "instance_name", instance.Name)
switch event.Operation {
case dbCommon.CreateOperation:
cache.SetInstanceCache(instance)
slog.DebugContext(p.ctx, "got create operation")
if err := p.handleInstanceAdded(instance); err != nil {
slog.ErrorContext(p.ctx, "failed to handle instance added", "error", err)
return
}
case dbCommon.UpdateOperation:
cache.SetInstanceCache(instance)
slog.DebugContext(p.ctx, "got update operation")
existingInstance, ok := p.runners[instance.Name]
if !ok {
slog.DebugContext(p.ctx, "instance not found, creating new instance", "instance_name", instance.Name)
if err := p.handleInstanceAdded(instance); err != nil {
slog.ErrorContext(p.ctx, "failed to handle instance added", "error", err)
return
}
} else {
slog.DebugContext(p.ctx, "updating instance", "instance_name", instance.Name)
if err := existingInstance.Update(event); err != nil {
slog.ErrorContext(p.ctx, "failed to update instance", "error", err)
return
}
}
case dbCommon.DeleteOperation:
slog.DebugContext(p.ctx, "got delete operation")
slog.DebugContext(p.ctx, "got delete operation", "instance_name", instance.Name)
existingInstance, ok := p.runners[instance.Name]
if ok {
if err := existingInstance.Stop(); err != nil {
@ -277,6 +290,7 @@ func (p *Provider) handleInstanceEvent(event dbCommon.ChangePayload) {
}
}
delete(p.runners, instance.Name)
cache.DeleteInstanceCache(instance.ID)
default:
slog.ErrorContext(p.ctx, "invalid operation type", "operation_type", event.Operation)
return

View file

@ -1,18 +1,13 @@
package provider
import (
commonParams "github.com/cloudbase/garm-provider-common/params"
dbCommon "github.com/cloudbase/garm/database/common"
"github.com/cloudbase/garm/database/watcher"
)
func composeProviderWatcher() dbCommon.PayloadFilterFunc {
return watcher.WithAny(
watcher.WithInstanceStatusFilter(
commonParams.InstancePendingCreate,
commonParams.InstancePendingDelete,
commonParams.InstancePendingForceDelete,
),
watcher.WithEntityTypeFilter(dbCommon.InstanceEntityType),
watcher.WithEntityTypeFilter(dbCommon.ScaleSetEntityType),
)
}