diff --git a/cache/entity_cache.go b/cache/entity_cache.go index 920b9a9b..d69d7099 100644 --- a/cache/entity_cache.go +++ b/cache/entity_cache.go @@ -49,6 +49,16 @@ func (e *EntityCache) SetEntity(entity params.GithubEntity) { e.mux.Lock() defer e.mux.Unlock() + _, ok := e.entities[entity.ID] + if !ok { + e.entities[entity.ID] = EntityItem{ + Entity: entity, + Pools: make(map[string]params.Pool), + ScaleSets: make(map[uint]params.ScaleSet), + } + return + } + e.entities[entity.ID] = EntityItem{ Entity: entity, } diff --git a/cache/instance_cache.go b/cache/instance_cache.go new file mode 100644 index 00000000..88074765 --- /dev/null +++ b/cache/instance_cache.go @@ -0,0 +1,107 @@ +package cache + +import ( + "sync" + + "github.com/cloudbase/garm/params" +) + +var instanceCache *InstanceCache + +func init() { + cache := &InstanceCache{ + cache: make(map[string]params.Instance), + } + instanceCache = cache +} + +type InstanceCache struct { + mux sync.Mutex + + cache map[string]params.Instance +} + +func (i *InstanceCache) SetInstance(instance params.Instance) { + i.mux.Lock() + defer i.mux.Unlock() + + i.cache[instance.ID] = instance +} + +func (i *InstanceCache) GetInstance(id string) (params.Instance, bool) { + i.mux.Lock() + defer i.mux.Unlock() + + if instance, ok := i.cache[id]; ok { + return instance, true + } + return params.Instance{}, false +} + +func (i *InstanceCache) DeleteInstance(id string) { + i.mux.Lock() + defer i.mux.Unlock() + + delete(i.cache, id) +} + +func (i *InstanceCache) GetAllInstances() []params.Instance { + i.mux.Lock() + defer i.mux.Unlock() + + instances := make([]params.Instance, 0, len(i.cache)) + for _, instance := range i.cache { + instances = append(instances, instance) + } + return instances +} + +func (i *InstanceCache) GetInstancesForPool(poolID string) []params.Instance { + i.mux.Lock() + defer i.mux.Unlock() + + var filteredInstances []params.Instance + for _, instance := range i.cache { + if instance.PoolID == poolID { + filteredInstances = append(filteredInstances, instance) + } + } + return filteredInstances +} + +func (i *InstanceCache) GetInstancesForScaleSet(scaleSetID uint) []params.Instance { + i.mux.Lock() + defer i.mux.Unlock() + + var filteredInstances []params.Instance + for _, instance := range i.cache { + if instance.ScaleSetID == scaleSetID { + filteredInstances = append(filteredInstances, instance) + } + } + return filteredInstances +} + +func SetInstanceCache(instance params.Instance) { + instanceCache.SetInstance(instance) +} + +func GetInstanceCache(id string) (params.Instance, bool) { + return instanceCache.GetInstance(id) +} + +func DeleteInstanceCache(id string) { + instanceCache.DeleteInstance(id) +} + +func GetAllInstancesCache() []params.Instance { + return instanceCache.GetAllInstances() +} + +func GetInstancesForPool(poolID string) []params.Instance { + return instanceCache.GetInstancesForPool(poolID) +} + +func GetInstancesForScaleSet(scaleSetID uint) []params.Instance { + return instanceCache.GetInstancesForScaleSet(scaleSetID) +} diff --git a/runner/pool/pool.go b/runner/pool/pool.go index f5f9a13b..f1134de8 100644 --- a/runner/pool/pool.go +++ b/runner/pool/pool.go @@ -226,12 +226,6 @@ func (r *basePoolManager) HandleWorkflowJob(job params.WorkflowJob) error { case "completed": jobParams, err = r.paramsWorkflowJobToParamsJob(job) if err != nil { - if errors.Is(err, runnerErrors.ErrNotFound) { - // Unassigned jobs will have an empty runner_name. - // We also need to ignore not found errors, as we may get a webhook regarding - // a workflow that is handled by a runner at a different hierarchy level. - return nil - } return errors.Wrap(err, "converting job to params") } diff --git a/workers/provider/instance_manager.go b/workers/provider/instance_manager.go index 95d29f69..dfcd1cb5 100644 --- a/workers/provider/instance_manager.go +++ b/workers/provider/instance_manager.go @@ -18,7 +18,7 @@ import ( ) func newInstanceManager(ctx context.Context, instance params.Instance, scaleSet params.ScaleSet, provider common.Provider, helper providerHelper) (*instanceManager, error) { - ctx = garmUtil.WithSlogContext(ctx, slog.Any("instance", instance.Name)) + ctx = garmUtil.WithSlogContext(ctx, slog.Any("worker", fmt.Sprintf("instance-worker-%s", instance.Name))) githubEntity, err := scaleSet.GetEntity() if err != nil { @@ -66,25 +66,17 @@ func (i *instanceManager) Start() error { i.mux.Lock() defer i.mux.Unlock() + slog.DebugContext(i.ctx, "starting instance manager", "instance", i.instance.Name) if i.running { return nil } - // switch i.instance.Status { - // case commonParams.InstancePendingCreate, - // commonParams.InstancePendingDelete, - // commonParams.InstancePendingForceDelete: - // if err := i.consolidateState(); err != nil { - // return fmt.Errorf("consolidating state: %w", err) - // } - // case commonParams.InstanceDeleted: - // return ErrInstanceDeleted - // } i.running = true i.quit = make(chan struct{}) i.updates = make(chan dbCommon.ChangePayload) go i.loop() + go i.updatesLoop() return nil } @@ -106,6 +98,7 @@ func (i *instanceManager) sleepForBackOffOrCanceled() bool { timer := time.NewTimer(i.deleteBackoff) defer timer.Stop() + slog.DebugContext(i.ctx, "sleeping for backoff", "duration", i.deleteBackoff, "instance", i.instance.Name) select { case <-timer.C: return false @@ -274,6 +267,7 @@ func (i *instanceManager) handleDeleteInstanceInProvider(instance params.Instanc func (i *instanceManager) consolidateState() error { i.mux.Lock() defer i.mux.Unlock() + if !i.running { return nil } @@ -347,9 +341,6 @@ func (i *instanceManager) handleUpdate(update dbCommon.ChangePayload) error { // We need a better way to handle instance state. Database updates may fail, and we // end up with an inconsistent state between what we know about the instance and what // is reflected in the database. - i.mux.Lock() - defer i.mux.Unlock() - if !i.running { return nil } @@ -359,25 +350,23 @@ func (i *instanceManager) handleUpdate(update dbCommon.ChangePayload) error { return runnerErrors.NewBadRequestError("invalid payload type") } - i.instance = instance - if i.instance.Status == instance.Status { - // Nothing of interest happened. + switch instance.Status { + case commonParams.InstanceDeleting, commonParams.InstanceCreating: return nil } + i.instance = instance return nil } func (i *instanceManager) Update(instance dbCommon.ChangePayload) error { - i.mux.Lock() - defer i.mux.Unlock() - if !i.running { return runnerErrors.NewBadRequestError("instance manager is not running") } - timer := time.NewTimer(60 * time.Second) + timer := time.NewTimer(10 * time.Second) defer timer.Stop() + slog.DebugContext(i.ctx, "sending update to instance manager") select { case i.updates <- instance: case <-i.quit: @@ -390,6 +379,33 @@ func (i *instanceManager) Update(instance dbCommon.ChangePayload) error { return nil } +func (i *instanceManager) updatesLoop() { + defer i.Stop() + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() + for { + select { + case <-i.quit: + return + case <-i.ctx.Done(): + return + case update, ok := <-i.updates: + if !ok { + slog.InfoContext(i.ctx, "updates channel closed") + return + } + slog.DebugContext(i.ctx, "received update") + if err := i.handleUpdate(update); err != nil { + if errors.Is(err, ErrInstanceDeleted) { + // instance had been deleted, we can exit the loop. + return + } + slog.ErrorContext(i.ctx, "handling update", "error", err) + } + } + } +} + func (i *instanceManager) loop() { defer i.Stop() ticker := time.NewTicker(5 * time.Second) @@ -408,17 +424,6 @@ func (i *instanceManager) loop() { } slog.ErrorContext(i.ctx, "consolidating state", "error", err) } - case update, ok := <-i.updates: - if !ok { - return - } - if err := i.handleUpdate(update); err != nil { - if errors.Is(err, ErrInstanceDeleted) { - // instance had been deleted, we can exit the loop. - return - } - slog.ErrorContext(i.ctx, "handling update", "error", err) - } } } } diff --git a/workers/provider/provider.go b/workers/provider/provider.go index ba95d733..3a7447f6 100644 --- a/workers/provider/provider.go +++ b/workers/provider/provider.go @@ -8,16 +8,23 @@ import ( commonParams "github.com/cloudbase/garm-provider-common/params" "github.com/cloudbase/garm/auth" + "github.com/cloudbase/garm/cache" dbCommon "github.com/cloudbase/garm/database/common" "github.com/cloudbase/garm/database/watcher" "github.com/cloudbase/garm/params" "github.com/cloudbase/garm/runner/common" + garmUtil "github.com/cloudbase/garm/util" ) func NewWorker(ctx context.Context, store dbCommon.Store, providers map[string]common.Provider, tokenGetter auth.InstanceTokenGetter) (*Provider, error) { consumerID := "provider-worker" + + ctx = garmUtil.WithSlogContext( + ctx, + slog.Any("worker", consumerID)) + return &Provider{ - ctx: context.Background(), + ctx: ctx, store: store, consumerID: consumerID, providers: providers, @@ -74,6 +81,7 @@ func (p *Provider) loadAllRunners() error { } for _, runner := range runners { + cache.SetInstanceCache(runner) // Skip non scale set instances for now. This condition needs to be // removed once we replace the current pool manager. if runner.ScaleSetID == 0 { @@ -246,29 +254,34 @@ func (p *Provider) handleInstanceEvent(event dbCommon.ChangePayload) { return } + slog.DebugContext(p.ctx, "handling instance event", "instance_name", instance.Name) switch event.Operation { case dbCommon.CreateOperation: + cache.SetInstanceCache(instance) slog.DebugContext(p.ctx, "got create operation") if err := p.handleInstanceAdded(instance); err != nil { slog.ErrorContext(p.ctx, "failed to handle instance added", "error", err) return } case dbCommon.UpdateOperation: + cache.SetInstanceCache(instance) slog.DebugContext(p.ctx, "got update operation") existingInstance, ok := p.runners[instance.Name] if !ok { + slog.DebugContext(p.ctx, "instance not found, creating new instance", "instance_name", instance.Name) if err := p.handleInstanceAdded(instance); err != nil { slog.ErrorContext(p.ctx, "failed to handle instance added", "error", err) return } } else { + slog.DebugContext(p.ctx, "updating instance", "instance_name", instance.Name) if err := existingInstance.Update(event); err != nil { slog.ErrorContext(p.ctx, "failed to update instance", "error", err) return } } case dbCommon.DeleteOperation: - slog.DebugContext(p.ctx, "got delete operation") + slog.DebugContext(p.ctx, "got delete operation", "instance_name", instance.Name) existingInstance, ok := p.runners[instance.Name] if ok { if err := existingInstance.Stop(); err != nil { @@ -277,6 +290,7 @@ func (p *Provider) handleInstanceEvent(event dbCommon.ChangePayload) { } } delete(p.runners, instance.Name) + cache.DeleteInstanceCache(instance.ID) default: slog.ErrorContext(p.ctx, "invalid operation type", "operation_type", event.Operation) return diff --git a/workers/provider/util.go b/workers/provider/util.go index 7e6395ff..8cd33525 100644 --- a/workers/provider/util.go +++ b/workers/provider/util.go @@ -1,18 +1,13 @@ package provider import ( - commonParams "github.com/cloudbase/garm-provider-common/params" dbCommon "github.com/cloudbase/garm/database/common" "github.com/cloudbase/garm/database/watcher" ) func composeProviderWatcher() dbCommon.PayloadFilterFunc { return watcher.WithAny( - watcher.WithInstanceStatusFilter( - commonParams.InstancePendingCreate, - commonParams.InstancePendingDelete, - commonParams.InstancePendingForceDelete, - ), + watcher.WithEntityTypeFilter(dbCommon.InstanceEntityType), watcher.WithEntityTypeFilter(dbCommon.ScaleSetEntityType), ) }