garm/workers/provider/provider.go
Gabriel Adrian Samfira 6994c8ce05 Add copyright header
Signed-off-by: Gabriel Adrian Samfira <gsamfira@cloudbasesolutions.com>
2025-05-20 09:43:29 +00:00

306 lines
8.6 KiB
Go

// Copyright 2025 Cloudbase Solutions SRL
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License. You may obtain
// a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations
// under the License.
package provider
import (
"context"
"fmt"
"log/slog"
"sync"
commonParams "github.com/cloudbase/garm-provider-common/params"
"github.com/cloudbase/garm/auth"
dbCommon "github.com/cloudbase/garm/database/common"
"github.com/cloudbase/garm/database/watcher"
"github.com/cloudbase/garm/params"
"github.com/cloudbase/garm/runner/common"
garmUtil "github.com/cloudbase/garm/util"
)
func NewWorker(ctx context.Context, store dbCommon.Store, providers map[string]common.Provider, tokenGetter auth.InstanceTokenGetter) (*Provider, error) {
consumerID := "provider-worker"
ctx = garmUtil.WithSlogContext(
ctx,
slog.Any("worker", consumerID))
return &Provider{
ctx: ctx,
store: store,
consumerID: consumerID,
providers: providers,
tokenGetter: tokenGetter,
scaleSets: make(map[uint]params.ScaleSet),
runners: make(map[string]*instanceManager),
}, nil
}
type Provider struct {
ctx context.Context
consumerID string
consumer dbCommon.Consumer
// nolint:golangci-lint,godox
// TODO: not all workers should have access to the store.
// We need to implement way to RPC from workers to controllers
// and abstract that into something we can use to eventually
// scale out.
store dbCommon.Store
tokenGetter auth.InstanceTokenGetter
providers map[string]common.Provider
// A cache of all scale sets kept updated by the watcher.
// This helps us avoid a bunch of queries to the database.
scaleSets map[uint]params.ScaleSet
runners map[string]*instanceManager
mux sync.Mutex
running bool
quit chan struct{}
}
func (p *Provider) loadAllScaleSets() error {
scaleSets, err := p.store.ListAllScaleSets(p.ctx)
if err != nil {
return fmt.Errorf("fetching scale sets: %w", err)
}
for _, scaleSet := range scaleSets {
p.scaleSets[scaleSet.ID] = scaleSet
}
return nil
}
// loadAllRunners loads all runners from the database. At this stage we only
// care about runners created by scale sets, but in the future, we will migrate
// the pool manager to the same model.
func (p *Provider) loadAllRunners() error {
runners, err := p.store.ListAllInstances(p.ctx)
if err != nil {
return fmt.Errorf("fetching runners: %w", err)
}
for _, runner := range runners {
// Skip non scale set instances for now. This condition needs to be
// removed once we replace the current pool manager.
if runner.ScaleSetID == 0 {
continue
}
// Ignore runners in "creating" state. If we're just starting up and
// we find a runner in "creating" it was most likely interrupted while
// creating. It is unlikely that it is still usable. We allow the scale set
// worker to clean it up. It will eventually be marked as pending delete and
// this worker will get an update to clean up any resources left behing by
// an incomplete creation event.
if runner.Status == commonParams.InstanceCreating {
continue
}
if runner.Status == commonParams.InstanceDeleting || runner.Status == commonParams.InstanceDeleted {
continue
}
scaleSet, ok := p.scaleSets[runner.ScaleSetID]
if !ok {
slog.ErrorContext(p.ctx, "scale set not found", "scale_set_id", runner.ScaleSetID)
continue
}
provider, ok := p.providers[scaleSet.ProviderName]
if !ok {
slog.ErrorContext(p.ctx, "provider not found", "provider_name", runner.ProviderName)
continue
}
instanceManager, err := newInstanceManager(
p.ctx, runner, scaleSet, provider, p)
if err != nil {
return fmt.Errorf("creating instance manager: %w", err)
}
if err := instanceManager.Start(); err != nil {
return fmt.Errorf("starting instance manager: %w", err)
}
p.runners[runner.Name] = instanceManager
}
return nil
}
func (p *Provider) Start() error {
p.mux.Lock()
defer p.mux.Unlock()
if p.running {
return nil
}
if err := p.loadAllScaleSets(); err != nil {
return fmt.Errorf("loading all scale sets: %w", err)
}
if err := p.loadAllRunners(); err != nil {
return fmt.Errorf("loading all runners: %w", err)
}
consumer, err := watcher.RegisterConsumer(
p.ctx, p.consumerID, composeProviderWatcher())
if err != nil {
return fmt.Errorf("registering consumer: %w", err)
}
p.consumer = consumer
p.quit = make(chan struct{})
p.running = true
go p.loop()
return nil
}
func (p *Provider) Stop() error {
p.mux.Lock()
defer p.mux.Unlock()
if !p.running {
return nil
}
p.consumer.Close()
close(p.quit)
p.running = false
return nil
}
func (p *Provider) loop() {
defer p.Stop()
for {
select {
case payload, ok := <-p.consumer.Watch():
if !ok {
slog.ErrorContext(p.ctx, "watcher channel closed")
return
}
slog.InfoContext(p.ctx, "received payload")
go p.handleWatcherEvent(payload)
case <-p.ctx.Done():
return
case <-p.quit:
return
}
}
}
func (p *Provider) handleWatcherEvent(payload dbCommon.ChangePayload) {
switch payload.EntityType {
case dbCommon.ScaleSetEntityType:
p.handleScaleSetEvent(payload)
case dbCommon.InstanceEntityType:
p.handleInstanceEvent(payload)
default:
slog.ErrorContext(p.ctx, "invalid entity type", "entity_type", payload.EntityType)
}
}
func (p *Provider) handleScaleSetEvent(event dbCommon.ChangePayload) {
p.mux.Lock()
defer p.mux.Unlock()
scaleSet, ok := event.Payload.(params.ScaleSet)
if !ok {
slog.ErrorContext(p.ctx, "invalid payload type", "payload_type", fmt.Sprintf("%T", event.Payload))
return
}
switch event.Operation {
case dbCommon.CreateOperation, dbCommon.UpdateOperation:
slog.DebugContext(p.ctx, "got create/update operation")
p.scaleSets[scaleSet.ID] = scaleSet
case dbCommon.DeleteOperation:
slog.DebugContext(p.ctx, "got delete operation")
delete(p.scaleSets, scaleSet.ID)
default:
slog.ErrorContext(p.ctx, "invalid operation type", "operation_type", event.Operation)
return
}
}
func (p *Provider) handleInstanceAdded(instance params.Instance) error {
scaleSet, ok := p.scaleSets[instance.ScaleSetID]
if !ok {
return fmt.Errorf("scale set not found for instance %s", instance.Name)
}
instanceManager, err := newInstanceManager(
p.ctx, instance, scaleSet, p.providers[instance.ProviderName], p)
if err != nil {
return fmt.Errorf("creating instance manager: %w", err)
}
if err := instanceManager.Start(); err != nil {
return fmt.Errorf("starting instance manager: %w", err)
}
p.runners[instance.Name] = instanceManager
return nil
}
func (p *Provider) handleInstanceEvent(event dbCommon.ChangePayload) {
p.mux.Lock()
defer p.mux.Unlock()
instance, ok := event.Payload.(params.Instance)
if !ok {
slog.ErrorContext(p.ctx, "invalid payload type", "payload_type", fmt.Sprintf("%T", event.Payload))
return
}
if instance.ScaleSetID == 0 {
slog.DebugContext(p.ctx, "skipping instance event for non scale set instance")
return
}
slog.DebugContext(p.ctx, "handling instance event", "instance_name", instance.Name)
switch event.Operation {
case dbCommon.CreateOperation:
slog.DebugContext(p.ctx, "got create operation")
if err := p.handleInstanceAdded(instance); err != nil {
slog.ErrorContext(p.ctx, "failed to handle instance added", "error", err)
return
}
case dbCommon.UpdateOperation:
slog.DebugContext(p.ctx, "got update operation")
existingInstance, ok := p.runners[instance.Name]
if !ok {
slog.DebugContext(p.ctx, "instance not found, creating new instance", "instance_name", instance.Name)
if err := p.handleInstanceAdded(instance); err != nil {
slog.ErrorContext(p.ctx, "failed to handle instance added", "error", err)
return
}
} else {
slog.DebugContext(p.ctx, "updating instance", "instance_name", instance.Name)
if err := existingInstance.Update(event); err != nil {
slog.ErrorContext(p.ctx, "failed to update instance", "error", err)
return
}
}
case dbCommon.DeleteOperation:
slog.DebugContext(p.ctx, "got delete operation", "instance_name", instance.Name)
existingInstance, ok := p.runners[instance.Name]
if ok {
if err := existingInstance.Stop(); err != nil {
slog.ErrorContext(p.ctx, "failed to stop instance", "error", err)
return
}
}
delete(p.runners, instance.Name)
default:
slog.ErrorContext(p.ctx, "invalid operation type", "operation_type", event.Operation)
return
}
}