garm/workers/provider/instance_manager.go
Gabriel Adrian Samfira 66fd0d51a6 Cache improvements, db list improvements, cleanup
This change adds some more cache helper functions, additional tests,
vastly improves memory usage when loading instances and cleans up some
code.

Signed-off-by: Gabriel Adrian Samfira <gsamfira@cloudbasesolutions.com>
2025-09-09 20:52:01 +00:00

438 lines
13 KiB
Go

// Copyright 2025 Cloudbase Solutions SRL
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License. You may obtain
// a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations
// under the License.
package provider
import (
"context"
"errors"
"fmt"
"log/slog"
"sync"
"time"
runnerErrors "github.com/cloudbase/garm-provider-common/errors"
commonParams "github.com/cloudbase/garm-provider-common/params"
"github.com/cloudbase/garm/cache"
dbCommon "github.com/cloudbase/garm/database/common"
"github.com/cloudbase/garm/params"
"github.com/cloudbase/garm/runner/common"
garmUtil "github.com/cloudbase/garm/util"
)
func newInstanceManager(ctx context.Context, instance params.Instance, scaleSet params.ScaleSet, provider common.Provider, helper providerHelper) (*instanceManager, error) {
ctx = garmUtil.WithSlogContext(ctx, slog.Any("worker", fmt.Sprintf("instance-worker-%s", instance.Name)))
githubEntity, err := scaleSet.GetEntity()
if err != nil {
return nil, fmt.Errorf("getting github entity: %w", err)
}
return &instanceManager{
ctx: ctx,
instance: instance,
provider: provider,
deleteBackoff: time.Second * 0,
scaleSet: scaleSet,
helper: helper,
scaleSetEntity: githubEntity,
}, nil
}
// instanceManager handles the lifecycle of a single instance.
// When an instance is created, a new instance manager is created
// for it. When the instance is placed in pending_create, the manager
// will attempt to create a new compute resource in the designated
// provider. Finally, when an instance is marked as pending_delete, it is removed
// from the provider and on success the instance is marked as deleted. Failure to
// delete, will place the instance back in pending delete. The removal process is
// retried after a backoff period. Instances placed in force_pending_delete will
// ignore provider errors and exit.
type instanceManager struct {
ctx context.Context
instance params.Instance
provider common.Provider
helper providerHelper
scaleSet params.ScaleSet
scaleSetEntity params.ForgeEntity
deleteBackoff time.Duration
updates chan dbCommon.ChangePayload
mux sync.Mutex
running bool
quit chan struct{}
}
func (i *instanceManager) Start() error {
i.mux.Lock()
defer i.mux.Unlock()
slog.DebugContext(i.ctx, "starting instance manager", "instance", i.instance.Name)
if i.running {
return nil
}
i.running = true
i.quit = make(chan struct{})
i.updates = make(chan dbCommon.ChangePayload)
go i.loop()
go i.updatesLoop()
return nil
}
func (i *instanceManager) Stop() error {
i.mux.Lock()
defer i.mux.Unlock()
if !i.running {
return nil
}
i.running = false
close(i.quit)
close(i.updates)
return nil
}
func (i *instanceManager) sleepForBackOffOrCanceled() bool {
timer := time.NewTimer(i.deleteBackoff)
defer timer.Stop()
slog.DebugContext(i.ctx, "sleeping for backoff", "duration", i.deleteBackoff, "instance", i.instance.Name)
select {
case <-timer.C:
return false
case <-i.quit:
return true
case <-i.ctx.Done():
return true
}
}
func (i *instanceManager) incrementBackOff() {
if i.deleteBackoff == 0 {
i.deleteBackoff = time.Second * 1
} else {
i.deleteBackoff *= 2
}
if i.deleteBackoff > time.Minute*5 {
i.deleteBackoff = time.Minute * 5
}
}
func (i *instanceManager) getEntity() (params.ForgeEntity, error) {
entity, err := i.scaleSet.GetEntity()
if err != nil {
return params.ForgeEntity{}, fmt.Errorf("getting entity: %w", err)
}
ghEntity, err := i.helper.GetGithubEntity(entity)
if err != nil {
return params.ForgeEntity{}, fmt.Errorf("getting entity: %w", err)
}
return ghEntity, nil
}
func (i *instanceManager) pseudoPoolID() string {
// This is temporary. We need to extend providers to know about scale sets.
return fmt.Sprintf("%s-%s", i.scaleSet.Name, i.scaleSetEntity.ID)
}
func (i *instanceManager) handleCreateInstanceInProvider(instance params.Instance) error {
entity, err := i.getEntity()
if err != nil {
return fmt.Errorf("getting entity: %w", err)
}
token, err := i.helper.InstanceTokenGetter().NewInstanceJWTToken(
instance, entity, i.scaleSet.RunnerBootstrapTimeout)
if err != nil {
return fmt.Errorf("creating instance token: %w", err)
}
tools, err := cache.GetGithubToolsCache(entity.ID)
if err != nil {
return fmt.Errorf("tools not found in cache for entity %s: %w", entity.String(), err)
}
bootstrapArgs := commonParams.BootstrapInstance{
Name: instance.Name,
Tools: tools,
RepoURL: entity.ForgeURL(),
MetadataURL: instance.MetadataURL,
CallbackURL: instance.CallbackURL,
InstanceToken: token,
OSArch: i.scaleSet.OSArch,
OSType: i.scaleSet.OSType,
Flavor: i.scaleSet.Flavor,
Image: i.scaleSet.Image,
ExtraSpecs: i.scaleSet.ExtraSpecs,
// This is temporary. We need to extend providers to know about scale sets.
PoolID: i.pseudoPoolID(),
CACertBundle: entity.Credentials.CABundle,
GitHubRunnerGroup: i.scaleSet.GitHubRunnerGroup,
JitConfigEnabled: true,
}
var instanceIDToDelete string
baseParams, err := i.getProviderBaseParams()
if err != nil {
return fmt.Errorf("getting provider base params: %w", err)
}
defer func() {
if instanceIDToDelete != "" {
deleteInstanceParams := common.DeleteInstanceParams{
DeleteInstanceV011: common.DeleteInstanceV011Params{
ProviderBaseParams: baseParams,
},
}
if err := i.provider.DeleteInstance(i.ctx, instanceIDToDelete, deleteInstanceParams); err != nil {
if !errors.Is(err, runnerErrors.ErrNotFound) {
slog.With(slog.Any("error", err)).ErrorContext(
i.ctx, "failed to cleanup instance",
"provider_id", instanceIDToDelete)
}
}
}
}()
createInstanceParams := common.CreateInstanceParams{
CreateInstanceV011: common.CreateInstanceV011Params{
ProviderBaseParams: baseParams,
},
}
providerInstance, err := i.provider.CreateInstance(i.ctx, bootstrapArgs, createInstanceParams)
if err != nil {
instanceIDToDelete = instance.Name
return fmt.Errorf("creating instance in provider: %w", err)
}
if providerInstance.Status == commonParams.InstanceError {
instanceIDToDelete = instance.ProviderID
if instanceIDToDelete == "" {
instanceIDToDelete = instance.Name
}
}
updated, err := i.helper.updateArgsFromProviderInstance(instance.Name, providerInstance)
if err != nil {
return fmt.Errorf("updating instance args: %w", err)
}
i.instance = updated
return nil
}
func (i *instanceManager) getProviderBaseParams() (common.ProviderBaseParams, error) {
info, err := i.helper.GetControllerInfo()
if err != nil {
return common.ProviderBaseParams{}, fmt.Errorf("getting controller info: %w", err)
}
return common.ProviderBaseParams{
ControllerInfo: info,
}, nil
}
func (i *instanceManager) handleDeleteInstanceInProvider(instance params.Instance) error {
slog.InfoContext(i.ctx, "deleting instance in provider", "runner_name", instance.Name)
identifier := instance.ProviderID
if identifier == "" {
// provider did not return a provider ID?
// try with name
identifier = instance.Name
}
baseParams, err := i.getProviderBaseParams()
if err != nil {
return fmt.Errorf("getting provider base params: %w", err)
}
slog.DebugContext(
i.ctx, "calling delete instance on provider",
"runner_name", instance.Name,
"provider_id", identifier)
deleteInstanceParams := common.DeleteInstanceParams{
DeleteInstanceV011: common.DeleteInstanceV011Params{
ProviderBaseParams: baseParams,
},
}
if err := i.provider.DeleteInstance(i.ctx, identifier, deleteInstanceParams); err != nil {
return fmt.Errorf("deleting instance in provider: %w", err)
}
return nil
}
func (i *instanceManager) consolidateState() error {
i.mux.Lock()
defer i.mux.Unlock()
if !i.running {
return nil
}
switch i.instance.Status {
case commonParams.InstancePendingCreate:
// kick off the creation process
if err := i.helper.SetInstanceStatus(i.instance.Name, commonParams.InstanceCreating, nil); err != nil {
return fmt.Errorf("setting instance status to creating: %w", err)
}
if err := i.handleCreateInstanceInProvider(i.instance); err != nil {
slog.ErrorContext(i.ctx, "creating instance in provider", "error", err)
if err := i.helper.SetInstanceStatus(i.instance.Name, commonParams.InstanceError, []byte(err.Error())); err != nil {
return fmt.Errorf("setting instance status to error: %w", err)
}
}
case commonParams.InstanceRunning:
// Nothing to do. The provider finished creating the instance.
case commonParams.InstancePendingDelete, commonParams.InstancePendingForceDelete:
// Remove or force remove the runner. When force remove is specified, we ignore
// IaaS errors.
if i.instance.Status == commonParams.InstancePendingDelete {
// invoke backoff sleep. We only do this for non forced removals,
// as force delete will always return, regardless of whether or not
// the remove operation succeeded in the provider. A user may decide
// to force delete a runner if GARM fails to remove it normally.
if canceled := i.sleepForBackOffOrCanceled(); canceled {
// the worker is shutting down. Return here.
return nil
}
}
prevStatus := i.instance.Status
if err := i.helper.SetInstanceStatus(i.instance.Name, commonParams.InstanceDeleting, nil); err != nil {
if errors.Is(err, runnerErrors.ErrNotFound) {
return nil
}
return fmt.Errorf("setting instance status to deleting: %w", err)
}
if err := i.handleDeleteInstanceInProvider(i.instance); err != nil {
slog.ErrorContext(i.ctx, "deleting instance in provider", "error", err, "forced", i.instance.Status == commonParams.InstancePendingForceDelete)
if prevStatus == commonParams.InstancePendingDelete {
i.incrementBackOff()
if err := i.helper.SetInstanceStatus(i.instance.Name, commonParams.InstancePendingDelete, []byte(err.Error())); err != nil {
return fmt.Errorf("setting instance status to error: %w", err)
}
return fmt.Errorf("error removing instance. Will retry: %w", err)
}
}
if err := i.helper.SetInstanceStatus(i.instance.Name, commonParams.InstanceDeleted, nil); err != nil {
if !errors.Is(err, runnerErrors.ErrNotFound) {
return fmt.Errorf("setting instance status to deleted: %w", err)
}
}
return ErrInstanceDeleted
case commonParams.InstanceError:
// Instance is in error state. We wait for next status or potentially retry
// spawning the instance with a backoff timer.
if err := i.helper.SetInstanceStatus(i.instance.Name, commonParams.InstancePendingDelete, nil); err != nil {
return fmt.Errorf("setting instance status to error: %w", err)
}
case commonParams.InstanceDeleted:
return ErrInstanceDeleted
}
return nil
}
func (i *instanceManager) handleUpdate(update dbCommon.ChangePayload) error {
// We need a better way to handle instance state. Database updates may fail, and we
// end up with an inconsistent state between what we know about the instance and what
// is reflected in the database.
if !i.running {
return nil
}
instance, ok := update.Payload.(params.Instance)
if !ok {
return runnerErrors.NewBadRequestError("invalid payload type")
}
i.instance = instance
return nil
}
func (i *instanceManager) Update(instance dbCommon.ChangePayload) error {
if !i.running {
return runnerErrors.NewBadRequestError("instance manager is not running")
}
timer := time.NewTimer(10 * time.Second)
defer timer.Stop()
slog.DebugContext(i.ctx, "sending update to instance manager")
select {
case i.updates <- instance:
case <-i.quit:
return nil
case <-i.ctx.Done():
return nil
case <-timer.C:
return fmt.Errorf("timeout while sending update to instance manager")
}
return nil
}
func (i *instanceManager) updatesLoop() {
defer i.Stop()
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for {
select {
case <-i.quit:
return
case <-i.ctx.Done():
return
case update, ok := <-i.updates:
if !ok {
slog.InfoContext(i.ctx, "updates channel closed")
return
}
slog.DebugContext(i.ctx, "received update")
if err := i.handleUpdate(update); err != nil {
if errors.Is(err, ErrInstanceDeleted) {
// instance had been deleted, we can exit the loop.
return
}
slog.ErrorContext(i.ctx, "handling update", "error", err)
}
}
}
}
func (i *instanceManager) loop() {
defer i.Stop()
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for {
select {
case <-i.quit:
return
case <-i.ctx.Done():
return
case <-ticker.C:
if err := i.consolidateState(); err != nil {
if errors.Is(err, ErrInstanceDeleted) {
// instance had been deleted, we can exit the loop.
return
}
slog.ErrorContext(i.ctx, "consolidating state", "error", err)
}
}
}
}