2073 lines
68 KiB
Go
2073 lines
68 KiB
Go
// Copyright 2022 Cloudbase Solutions SRL
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
// not use this file except in compliance with the License. You may obtain
|
|
// a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
// License for the specific language governing permissions and limitations
|
|
// under the License.
|
|
|
|
package pool
|
|
|
|
import (
|
|
"context"
|
|
"crypto/rand"
|
|
"fmt"
|
|
"log/slog"
|
|
"math"
|
|
"math/big"
|
|
"net/http"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/google/go-github/v71/github"
|
|
"github.com/google/uuid"
|
|
"github.com/pkg/errors"
|
|
"golang.org/x/sync/errgroup"
|
|
|
|
runnerErrors "github.com/cloudbase/garm-provider-common/errors"
|
|
commonParams "github.com/cloudbase/garm-provider-common/params"
|
|
"github.com/cloudbase/garm-provider-common/util"
|
|
"github.com/cloudbase/garm/auth"
|
|
"github.com/cloudbase/garm/cache"
|
|
dbCommon "github.com/cloudbase/garm/database/common"
|
|
"github.com/cloudbase/garm/database/watcher"
|
|
"github.com/cloudbase/garm/locking"
|
|
"github.com/cloudbase/garm/params"
|
|
"github.com/cloudbase/garm/runner/common"
|
|
garmUtil "github.com/cloudbase/garm/util"
|
|
ghClient "github.com/cloudbase/garm/util/github"
|
|
)
|
|
|
|
var (
|
|
poolIDLabelprefix = "runner-pool-id"
|
|
controllerLabelPrefix = "runner-controller-id"
|
|
// We tag runners that have been spawned as a result of a queued job with the job ID
|
|
// that spawned them. There is no way to guarantee that the runner spawned in response to a particular
|
|
// job, will be picked up by that job. We mark them so as in the very likely event that the runner
|
|
// has picked up a different job, we can clear the lock on the job that spaned it.
|
|
// The job it picked up would already be transitioned to in_progress so it will be ignored by the
|
|
// consume loop.
|
|
jobLabelPrefix = "in_response_to_job"
|
|
)
|
|
|
|
const (
|
|
// maxCreateAttempts is the number of times we will attempt to create an instance
|
|
// before we give up.
|
|
//
|
|
// nolint:golangci-lint,godox
|
|
// TODO: make this configurable(?)
|
|
maxCreateAttempts = 5
|
|
)
|
|
|
|
func NewEntityPoolManager(ctx context.Context, entity params.ForgeEntity, instanceTokenGetter auth.InstanceTokenGetter, providers map[string]common.Provider, store dbCommon.Store) (common.PoolManager, error) {
|
|
ctx = garmUtil.WithSlogContext(ctx, slog.Any("pool_mgr", entity.String()), slog.Any("pool_type", entity.EntityType))
|
|
ghc, err := ghClient.Client(ctx, entity)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "getting github client")
|
|
}
|
|
|
|
if entity.WebhookSecret == "" {
|
|
return nil, errors.New("webhook secret is empty")
|
|
}
|
|
|
|
controllerInfo, err := store.ControllerInfo()
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "getting controller info")
|
|
}
|
|
|
|
consumerID := fmt.Sprintf("pool-manager-%s-%s", entity.String(), entity.Credentials.Endpoint.Name)
|
|
slog.InfoContext(ctx, "registering consumer", "consumer_id", consumerID)
|
|
consumer, err := watcher.RegisterConsumer(
|
|
ctx, consumerID,
|
|
composeWatcherFilters(entity),
|
|
)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "registering consumer")
|
|
}
|
|
|
|
wg := &sync.WaitGroup{}
|
|
backoff, err := locking.NewInstanceDeleteBackoff(ctx)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "creating backoff")
|
|
}
|
|
|
|
repo := &basePoolManager{
|
|
ctx: ctx,
|
|
consumerID: consumerID,
|
|
entity: entity,
|
|
ghcli: ghc,
|
|
controllerInfo: controllerInfo,
|
|
instanceTokenGetter: instanceTokenGetter,
|
|
|
|
store: store,
|
|
providers: providers,
|
|
quit: make(chan struct{}),
|
|
wg: wg,
|
|
backoff: backoff,
|
|
consumer: consumer,
|
|
}
|
|
return repo, nil
|
|
}
|
|
|
|
type basePoolManager struct {
|
|
ctx context.Context
|
|
consumerID string
|
|
entity params.ForgeEntity
|
|
ghcli common.GithubClient
|
|
controllerInfo params.ControllerInfo
|
|
instanceTokenGetter auth.InstanceTokenGetter
|
|
consumer dbCommon.Consumer
|
|
|
|
store dbCommon.Store
|
|
|
|
providers map[string]common.Provider
|
|
tools []commonParams.RunnerApplicationDownload
|
|
quit chan struct{}
|
|
|
|
managerIsRunning bool
|
|
managerErrorReason string
|
|
|
|
mux sync.Mutex
|
|
wg *sync.WaitGroup
|
|
backoff locking.InstanceDeleteBackoff
|
|
}
|
|
|
|
func (r *basePoolManager) getProviderBaseParams(pool params.Pool) common.ProviderBaseParams {
|
|
r.mux.Lock()
|
|
defer r.mux.Unlock()
|
|
|
|
return common.ProviderBaseParams{
|
|
PoolInfo: pool,
|
|
ControllerInfo: r.controllerInfo,
|
|
}
|
|
}
|
|
|
|
func (r *basePoolManager) HandleWorkflowJob(job params.WorkflowJob) error {
|
|
if err := r.ValidateOwner(job); err != nil {
|
|
return errors.Wrap(err, "validating owner")
|
|
}
|
|
|
|
// we see events where the lables seem to be missing. We should ignore these
|
|
// as we can't know if we should handle them or not.
|
|
if len(job.WorkflowJob.Labels) == 0 {
|
|
slog.WarnContext(r.ctx, "job has no labels", "workflow_job", job.WorkflowJob.Name)
|
|
return nil
|
|
}
|
|
|
|
jobParams, err := r.paramsWorkflowJobToParamsJob(job)
|
|
if err != nil {
|
|
return errors.Wrap(err, "converting job to params")
|
|
}
|
|
|
|
var triggeredBy int64
|
|
defer func() {
|
|
if jobParams.ID == 0 {
|
|
return
|
|
}
|
|
// we're updating the job in the database, regardless of whether it was successful or not.
|
|
// or if it was meant for this pool or not. Github will send the same job data to all hierarchies
|
|
// that have been configured to work with garm. Updating the job at all levels should yield the same
|
|
// outcome in the db.
|
|
_, err := r.store.GetJobByID(r.ctx, jobParams.ID)
|
|
if err != nil {
|
|
if !errors.Is(err, runnerErrors.ErrNotFound) {
|
|
slog.With(slog.Any("error", err)).ErrorContext(
|
|
r.ctx, "failed to get job",
|
|
"job_id", jobParams.ID)
|
|
return
|
|
}
|
|
// This job is new to us. Check if we have a pool that can handle it.
|
|
potentialPools := cache.FindPoolsMatchingAllTags(r.entity.ID, jobParams.Labels)
|
|
if len(potentialPools) == 0 {
|
|
slog.WarnContext(
|
|
r.ctx, "no pools matching tags; not recording job",
|
|
"requested_tags", strings.Join(jobParams.Labels, ", "))
|
|
return
|
|
}
|
|
}
|
|
|
|
if _, jobErr := r.store.CreateOrUpdateJob(r.ctx, jobParams); jobErr != nil {
|
|
slog.With(slog.Any("error", jobErr)).ErrorContext(
|
|
r.ctx, "failed to update job", "job_id", jobParams.ID)
|
|
}
|
|
|
|
if triggeredBy != 0 && jobParams.ID != triggeredBy {
|
|
// The triggeredBy value is only set by the "in_progress" webhook. The runner that
|
|
// transitioned to in_progress was created as a result of a different queued job. If that job is
|
|
// still queued and we don't remove the lock, it will linger until the lock timeout is reached.
|
|
// That may take a long time, so we break the lock here and allow it to be scheduled again.
|
|
if err := r.store.BreakLockJobIsQueued(r.ctx, triggeredBy); err != nil {
|
|
slog.With(slog.Any("error", err)).ErrorContext(
|
|
r.ctx, "failed to break lock for job",
|
|
"job_id", triggeredBy)
|
|
}
|
|
}
|
|
}()
|
|
|
|
switch job.Action {
|
|
case "queued":
|
|
// Record the job in the database. Queued jobs will be picked up by the consumeQueuedJobs() method
|
|
// when reconciling.
|
|
case "completed":
|
|
// If job was not assigned to a runner, we can ignore it.
|
|
if jobParams.RunnerName == "" {
|
|
slog.InfoContext(
|
|
r.ctx, "job never got assigned to a runner, ignoring")
|
|
return nil
|
|
}
|
|
|
|
fromCache, ok := cache.GetInstanceCache(jobParams.RunnerName)
|
|
if !ok {
|
|
return nil
|
|
}
|
|
|
|
if _, ok := cache.GetEntityPool(r.entity.ID, fromCache.PoolID); !ok {
|
|
slog.DebugContext(r.ctx, "instance belongs to a pool not managed by this entity", "pool_id", fromCache.PoolID)
|
|
return nil
|
|
}
|
|
|
|
// update instance workload state.
|
|
if _, err := r.setInstanceRunnerStatus(jobParams.RunnerName, params.RunnerTerminated); err != nil {
|
|
if errors.Is(err, runnerErrors.ErrNotFound) {
|
|
return nil
|
|
}
|
|
slog.With(slog.Any("error", err)).ErrorContext(
|
|
r.ctx, "failed to update runner status",
|
|
"runner_name", util.SanitizeLogEntry(jobParams.RunnerName))
|
|
return errors.Wrap(err, "updating runner")
|
|
}
|
|
slog.DebugContext(
|
|
r.ctx, "marking instance as pending_delete",
|
|
"runner_name", util.SanitizeLogEntry(jobParams.RunnerName))
|
|
if _, err := r.setInstanceStatus(jobParams.RunnerName, commonParams.InstancePendingDelete, nil); err != nil {
|
|
if errors.Is(err, runnerErrors.ErrNotFound) {
|
|
return nil
|
|
}
|
|
slog.With(slog.Any("error", err)).ErrorContext(
|
|
r.ctx, "failed to update runner status",
|
|
"runner_name", util.SanitizeLogEntry(jobParams.RunnerName))
|
|
return errors.Wrap(err, "updating runner")
|
|
}
|
|
case "in_progress":
|
|
fromCache, ok := cache.GetInstanceCache(jobParams.RunnerName)
|
|
if !ok {
|
|
slog.DebugContext(r.ctx, "instance not found in cache", "runner_name", jobParams.RunnerName)
|
|
return nil
|
|
}
|
|
|
|
pool, ok := cache.GetEntityPool(r.entity.ID, fromCache.PoolID)
|
|
if !ok {
|
|
slog.DebugContext(r.ctx, "instance belongs to a pool not managed by this entity", "pool_id", fromCache.PoolID)
|
|
return nil
|
|
}
|
|
// update instance workload state.
|
|
instance, err := r.setInstanceRunnerStatus(jobParams.RunnerName, params.RunnerActive)
|
|
if err != nil {
|
|
if errors.Is(err, runnerErrors.ErrNotFound) {
|
|
return nil
|
|
}
|
|
slog.With(slog.Any("error", err)).ErrorContext(
|
|
r.ctx, "failed to update runner status",
|
|
"runner_name", util.SanitizeLogEntry(jobParams.RunnerName))
|
|
return errors.Wrap(err, "updating runner")
|
|
}
|
|
// Set triggeredBy here so we break the lock on any potential queued job.
|
|
triggeredBy = jobIDFromLabels(instance.AditionalLabels)
|
|
|
|
// A runner has picked up the job, and is now running it. It may need to be replaced if the pool has
|
|
// a minimum number of idle runners configured.
|
|
if err := r.ensureIdleRunnersForOnePool(pool); err != nil {
|
|
slog.With(slog.Any("error", err)).ErrorContext(
|
|
r.ctx, "error ensuring idle runners for pool",
|
|
"pool_id", pool.ID)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func jobIDFromLabels(labels []string) int64 {
|
|
for _, lbl := range labels {
|
|
if strings.HasPrefix(lbl, jobLabelPrefix) {
|
|
trimLength := min(len(jobLabelPrefix)+1, len(lbl))
|
|
jobID, err := strconv.ParseInt(lbl[trimLength:], 10, 64)
|
|
if err != nil {
|
|
return 0
|
|
}
|
|
return jobID
|
|
}
|
|
}
|
|
return 0
|
|
}
|
|
|
|
func (r *basePoolManager) startLoopForFunction(f func() error, interval time.Duration, name string, alwaysRun bool) {
|
|
slog.InfoContext(
|
|
r.ctx, "starting loop for entity",
|
|
"loop_name", name)
|
|
ticker := time.NewTicker(interval)
|
|
r.wg.Add(1)
|
|
|
|
defer func() {
|
|
slog.InfoContext(
|
|
r.ctx, "pool loop exited",
|
|
"loop_name", name)
|
|
ticker.Stop()
|
|
r.wg.Done()
|
|
}()
|
|
|
|
for {
|
|
shouldRun := r.managerIsRunning
|
|
if alwaysRun {
|
|
shouldRun = true
|
|
}
|
|
switch shouldRun {
|
|
case true:
|
|
select {
|
|
case <-ticker.C:
|
|
if err := f(); err != nil {
|
|
slog.With(slog.Any("error", err)).ErrorContext(
|
|
r.ctx, "error in loop",
|
|
"loop_name", name)
|
|
if errors.Is(err, runnerErrors.ErrUnauthorized) {
|
|
r.SetPoolRunningState(false, err.Error())
|
|
}
|
|
}
|
|
case <-r.ctx.Done():
|
|
// daemon is shutting down.
|
|
return
|
|
case <-r.quit:
|
|
// this worker was stopped.
|
|
return
|
|
}
|
|
default:
|
|
select {
|
|
case <-r.ctx.Done():
|
|
// daemon is shutting down.
|
|
return
|
|
case <-r.quit:
|
|
// this worker was stopped.
|
|
return
|
|
default:
|
|
r.waitForTimeoutOrCancelled(common.BackoffTimer)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (r *basePoolManager) updateTools() error {
|
|
tools, err := cache.GetGithubToolsCache(r.entity.ID)
|
|
if err != nil {
|
|
slog.With(slog.Any("error", err)).ErrorContext(
|
|
r.ctx, "failed to update tools for entity", "entity", r.entity.String())
|
|
r.SetPoolRunningState(false, err.Error())
|
|
return fmt.Errorf("failed to update tools for entity %s: %w", r.entity.String(), err)
|
|
}
|
|
|
|
r.mux.Lock()
|
|
r.tools = tools
|
|
r.mux.Unlock()
|
|
|
|
slog.DebugContext(r.ctx, "successfully updated tools")
|
|
r.SetPoolRunningState(true, "")
|
|
return nil
|
|
}
|
|
|
|
// cleanupOrphanedProviderRunners compares runners in github with local runners and removes
|
|
// any local runners that are not present in Github. Runners that are "idle" in our
|
|
// provider, but do not exist in github, will be removed. This can happen if the
|
|
// garm was offline while a job was executed by a github action. When this
|
|
// happens, github will remove the ephemeral worker and send a webhook our way.
|
|
// If we were offline and did not process the webhook, the instance will linger.
|
|
// We need to remove it from the provider and database.
|
|
func (r *basePoolManager) cleanupOrphanedProviderRunners(runners []*github.Runner) error {
|
|
dbInstances, err := r.store.ListEntityInstances(r.ctx, r.entity)
|
|
if err != nil {
|
|
return errors.Wrap(err, "fetching instances from db")
|
|
}
|
|
|
|
runnerNames := map[string]bool{}
|
|
for _, run := range runners {
|
|
if !isManagedRunner(labelsFromRunner(run), r.controllerInfo.ControllerID.String()) {
|
|
slog.DebugContext(
|
|
r.ctx, "runner is not managed by a pool we manage",
|
|
"runner_name", run.GetName())
|
|
continue
|
|
}
|
|
runnerNames[*run.Name] = true
|
|
}
|
|
|
|
for _, instance := range dbInstances {
|
|
if instance.ScaleSetID != 0 {
|
|
// ignore scale set instances.
|
|
continue
|
|
}
|
|
|
|
lockAcquired := locking.TryLock(instance.Name, r.consumerID)
|
|
if !lockAcquired {
|
|
slog.DebugContext(
|
|
r.ctx, "failed to acquire lock for instance",
|
|
"runner_name", instance.Name)
|
|
continue
|
|
}
|
|
defer locking.Unlock(instance.Name, false)
|
|
|
|
switch instance.Status {
|
|
case commonParams.InstancePendingCreate,
|
|
commonParams.InstancePendingDelete, commonParams.InstancePendingForceDelete:
|
|
// this instance is in the process of being created or is awaiting deletion.
|
|
// Instances in pending_create did not get a chance to register themselves in,
|
|
// github so we let them be for now.
|
|
continue
|
|
}
|
|
pool, err := r.store.GetEntityPool(r.ctx, r.entity, instance.PoolID)
|
|
if err != nil {
|
|
return errors.Wrap(err, "fetching instance pool info")
|
|
}
|
|
|
|
switch instance.RunnerStatus {
|
|
case params.RunnerPending, params.RunnerInstalling:
|
|
if time.Since(instance.UpdatedAt).Minutes() < float64(pool.RunnerTimeout()) {
|
|
// runner is still installing. We give it a chance to finish.
|
|
slog.DebugContext(
|
|
r.ctx, "runner is still installing, give it a chance to finish",
|
|
"runner_name", instance.Name)
|
|
continue
|
|
}
|
|
}
|
|
|
|
if time.Since(instance.UpdatedAt).Minutes() < 5 {
|
|
// instance was updated recently. We give it a chance to register itself in github.
|
|
slog.DebugContext(
|
|
r.ctx, "instance was updated recently, skipping check",
|
|
"runner_name", instance.Name)
|
|
continue
|
|
}
|
|
|
|
if ok := runnerNames[instance.Name]; !ok {
|
|
// Set pending_delete on DB field. Allow consolidate() to remove it.
|
|
if _, err := r.setInstanceStatus(instance.Name, commonParams.InstancePendingDelete, nil); err != nil {
|
|
slog.With(slog.Any("error", err)).ErrorContext(
|
|
r.ctx, "failed to update runner",
|
|
"runner_name", instance.Name)
|
|
return errors.Wrap(err, "updating runner")
|
|
}
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// reapTimedOutRunners will mark as pending_delete any runner that has a status
|
|
// of "running" in the provider, but that has not registered with Github, and has
|
|
// received no new updates in the configured timeout interval.
|
|
func (r *basePoolManager) reapTimedOutRunners(runners []*github.Runner) error {
|
|
dbInstances, err := r.store.ListEntityInstances(r.ctx, r.entity)
|
|
if err != nil {
|
|
return errors.Wrap(err, "fetching instances from db")
|
|
}
|
|
|
|
runnersByName := map[string]*github.Runner{}
|
|
for _, run := range runners {
|
|
if !isManagedRunner(labelsFromRunner(run), r.controllerInfo.ControllerID.String()) {
|
|
slog.DebugContext(
|
|
r.ctx, "runner is not managed by a pool we manage",
|
|
"runner_name", run.GetName())
|
|
continue
|
|
}
|
|
runnersByName[*run.Name] = run
|
|
}
|
|
|
|
for _, instance := range dbInstances {
|
|
if instance.ScaleSetID != 0 {
|
|
// ignore scale set instances.
|
|
continue
|
|
}
|
|
|
|
slog.DebugContext(
|
|
r.ctx, "attempting to lock instance",
|
|
"runner_name", instance.Name)
|
|
lockAcquired := locking.TryLock(instance.Name, r.consumerID)
|
|
if !lockAcquired {
|
|
slog.DebugContext(
|
|
r.ctx, "failed to acquire lock for instance",
|
|
"runner_name", instance.Name)
|
|
continue
|
|
}
|
|
defer locking.Unlock(instance.Name, false)
|
|
|
|
pool, err := r.store.GetEntityPool(r.ctx, r.entity, instance.PoolID)
|
|
if err != nil {
|
|
return errors.Wrap(err, "fetching instance pool info")
|
|
}
|
|
if time.Since(instance.UpdatedAt).Minutes() < float64(pool.RunnerTimeout()) {
|
|
continue
|
|
}
|
|
|
|
// There are 3 cases (currently) where we consider a runner as timed out:
|
|
// * The runner never joined github within the pool timeout
|
|
// * The runner managed to join github, but the setup process failed later and the runner
|
|
// never started on the instance.
|
|
// * A JIT config was created, but the runner never joined github.
|
|
if runner, ok := runnersByName[instance.Name]; !ok || runner.GetStatus() == "offline" {
|
|
slog.InfoContext(
|
|
r.ctx, "reaping timed-out/failed runner",
|
|
"runner_name", instance.Name)
|
|
if err := r.DeleteRunner(instance, false, false); err != nil {
|
|
slog.With(slog.Any("error", err)).ErrorContext(
|
|
r.ctx, "failed to update runner status",
|
|
"runner_name", instance.Name)
|
|
return errors.Wrap(err, "updating runner")
|
|
}
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// cleanupOrphanedGithubRunners will forcefully remove any github runners that appear
|
|
// as offline and for which we no longer have a local instance.
|
|
// This may happen if someone manually deletes the instance in the provider. We need to
|
|
// first remove the instance from github, and then from our database.
|
|
func (r *basePoolManager) cleanupOrphanedGithubRunners(runners []*github.Runner) error {
|
|
poolInstanceCache := map[string][]commonParams.ProviderInstance{}
|
|
g, ctx := errgroup.WithContext(r.ctx)
|
|
for _, runner := range runners {
|
|
if !isManagedRunner(labelsFromRunner(runner), r.controllerInfo.ControllerID.String()) {
|
|
slog.DebugContext(
|
|
r.ctx, "runner is not managed by a pool we manage",
|
|
"runner_name", runner.GetName())
|
|
continue
|
|
}
|
|
|
|
status := runner.GetStatus()
|
|
if status != "offline" {
|
|
// Runner is online. Ignore it.
|
|
continue
|
|
}
|
|
|
|
dbInstance, err := r.store.GetInstanceByName(r.ctx, *runner.Name)
|
|
if err != nil {
|
|
if !errors.Is(err, runnerErrors.ErrNotFound) {
|
|
return errors.Wrap(err, "fetching instance from DB")
|
|
}
|
|
// We no longer have a DB entry for this instance, and the runner appears offline in github.
|
|
// Previous forceful removal may have failed?
|
|
slog.InfoContext(
|
|
r.ctx, "Runner has no database entry in garm, removing from github",
|
|
"runner_name", runner.GetName())
|
|
if err := r.ghcli.RemoveEntityRunner(r.ctx, runner.GetID()); err != nil {
|
|
// Removed in the meantime?
|
|
if errors.Is(err, runnerErrors.ErrNotFound) {
|
|
continue
|
|
}
|
|
return errors.Wrap(err, "removing runner")
|
|
}
|
|
continue
|
|
}
|
|
if dbInstance.ScaleSetID != 0 {
|
|
// ignore scale set instances.
|
|
continue
|
|
}
|
|
|
|
switch dbInstance.Status {
|
|
case commonParams.InstancePendingDelete, commonParams.InstanceDeleting:
|
|
// already marked for deletion or is in the process of being deleted.
|
|
// Let consolidate take care of it.
|
|
continue
|
|
case commonParams.InstancePendingCreate, commonParams.InstanceCreating:
|
|
// instance is still being created. We give it a chance to finish.
|
|
slog.DebugContext(
|
|
r.ctx, "instance is still being created, give it a chance to finish",
|
|
"runner_name", dbInstance.Name)
|
|
continue
|
|
case commonParams.InstanceRunning:
|
|
// this check is not strictly needed, but can help avoid unnecessary strain on the provider.
|
|
// At worst, we will have a runner that is offline in github for 5 minutes before we reap it.
|
|
if time.Since(dbInstance.UpdatedAt).Minutes() < 5 {
|
|
// instance was updated recently. We give it a chance to register itself in github.
|
|
slog.DebugContext(
|
|
r.ctx, "instance was updated recently, skipping check",
|
|
"runner_name", dbInstance.Name)
|
|
continue
|
|
}
|
|
}
|
|
|
|
pool, err := r.store.GetEntityPool(r.ctx, r.entity, dbInstance.PoolID)
|
|
if err != nil {
|
|
return errors.Wrap(err, "fetching pool")
|
|
}
|
|
|
|
// check if the provider still has the instance.
|
|
provider, ok := r.providers[dbInstance.ProviderName]
|
|
if !ok {
|
|
return fmt.Errorf("unknown provider %s for pool %s", dbInstance.ProviderName, dbInstance.PoolID)
|
|
}
|
|
|
|
var poolInstances []commonParams.ProviderInstance
|
|
poolInstances, ok = poolInstanceCache[dbInstance.PoolID]
|
|
if !ok {
|
|
slog.DebugContext(
|
|
r.ctx, "updating instances cache for pool",
|
|
"pool_id", pool.ID)
|
|
listInstancesParams := common.ListInstancesParams{
|
|
ListInstancesV011: common.ListInstancesV011Params{
|
|
ProviderBaseParams: r.getProviderBaseParams(pool),
|
|
},
|
|
}
|
|
poolInstances, err = provider.ListInstances(r.ctx, pool.ID, listInstancesParams)
|
|
if err != nil {
|
|
return errors.Wrapf(err, "fetching instances for pool %s", dbInstance.PoolID)
|
|
}
|
|
poolInstanceCache[dbInstance.PoolID] = poolInstances
|
|
}
|
|
|
|
lockAcquired := locking.TryLock(dbInstance.Name, r.consumerID)
|
|
if !lockAcquired {
|
|
slog.DebugContext(
|
|
r.ctx, "failed to acquire lock for instance",
|
|
"runner_name", dbInstance.Name)
|
|
continue
|
|
}
|
|
|
|
// See: https://golang.org/doc/faq#closures_and_goroutines
|
|
runner := runner
|
|
g.Go(func() error {
|
|
deleteMux := false
|
|
defer func() {
|
|
locking.Unlock(dbInstance.Name, deleteMux)
|
|
}()
|
|
providerInstance, ok := instanceInList(dbInstance.Name, poolInstances)
|
|
if !ok {
|
|
// The runner instance is no longer on the provider, and it appears offline in github.
|
|
// It should be safe to force remove it.
|
|
slog.InfoContext(
|
|
r.ctx, "Runner instance is no longer on the provider, removing from github",
|
|
"runner_name", dbInstance.Name)
|
|
if err := r.ghcli.RemoveEntityRunner(r.ctx, runner.GetID()); err != nil {
|
|
// Removed in the meantime?
|
|
if errors.Is(err, runnerErrors.ErrNotFound) {
|
|
slog.DebugContext(
|
|
r.ctx, "runner disappeared from github",
|
|
"runner_name", dbInstance.Name)
|
|
} else {
|
|
return errors.Wrap(err, "removing runner from github")
|
|
}
|
|
}
|
|
// Remove the database entry for the runner.
|
|
slog.InfoContext(
|
|
r.ctx, "Removing from database",
|
|
"runner_name", dbInstance.Name)
|
|
if err := r.store.DeleteInstance(ctx, dbInstance.PoolID, dbInstance.Name); err != nil {
|
|
return errors.Wrap(err, "removing runner from database")
|
|
}
|
|
deleteMux = true
|
|
return nil
|
|
}
|
|
|
|
if providerInstance.Status == commonParams.InstanceRunning {
|
|
// instance is running, but github reports runner as offline. Log the event.
|
|
// This scenario may require manual intervention.
|
|
// Perhaps it just came online and github did not yet change it's status?
|
|
slog.WarnContext(
|
|
r.ctx, "instance is online but github reports runner as offline",
|
|
"runner_name", dbInstance.Name)
|
|
return nil
|
|
}
|
|
|
|
slog.InfoContext(
|
|
r.ctx, "instance was found in stopped state; starting",
|
|
"runner_name", dbInstance.Name)
|
|
|
|
startParams := common.StartParams{
|
|
StartV011: common.StartV011Params{
|
|
ProviderBaseParams: r.getProviderBaseParams(pool),
|
|
},
|
|
}
|
|
if err := provider.Start(r.ctx, dbInstance.ProviderID, startParams); err != nil {
|
|
return errors.Wrapf(err, "starting instance %s", dbInstance.ProviderID)
|
|
}
|
|
return nil
|
|
})
|
|
}
|
|
if err := r.waitForErrorGroupOrContextCancelled(g); err != nil {
|
|
return errors.Wrap(err, "removing orphaned github runners")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (r *basePoolManager) waitForErrorGroupOrContextCancelled(g *errgroup.Group) error {
|
|
if g == nil {
|
|
return nil
|
|
}
|
|
|
|
done := make(chan error, 1)
|
|
go func() {
|
|
waitErr := g.Wait()
|
|
done <- waitErr
|
|
}()
|
|
|
|
select {
|
|
case err := <-done:
|
|
return err
|
|
case <-r.ctx.Done():
|
|
return r.ctx.Err()
|
|
}
|
|
}
|
|
|
|
func (r *basePoolManager) setInstanceRunnerStatus(runnerName string, status params.RunnerStatus) (params.Instance, error) {
|
|
updateParams := params.UpdateInstanceParams{
|
|
RunnerStatus: status,
|
|
}
|
|
instance, err := r.store.UpdateInstance(r.ctx, runnerName, updateParams)
|
|
if err != nil {
|
|
return params.Instance{}, errors.Wrap(err, "updating runner state")
|
|
}
|
|
return instance, nil
|
|
}
|
|
|
|
func (r *basePoolManager) setInstanceStatus(runnerName string, status commonParams.InstanceStatus, providerFault []byte) (params.Instance, error) {
|
|
updateParams := params.UpdateInstanceParams{
|
|
Status: status,
|
|
ProviderFault: providerFault,
|
|
}
|
|
|
|
instance, err := r.store.UpdateInstance(r.ctx, runnerName, updateParams)
|
|
if err != nil {
|
|
return params.Instance{}, errors.Wrap(err, "updating runner state")
|
|
}
|
|
return instance, nil
|
|
}
|
|
|
|
func (r *basePoolManager) AddRunner(ctx context.Context, poolID string, aditionalLabels []string) (err error) {
|
|
pool, err := r.store.GetEntityPool(r.ctx, r.entity, poolID)
|
|
if err != nil {
|
|
return errors.Wrap(err, "fetching pool")
|
|
}
|
|
|
|
provider, ok := r.providers[pool.ProviderName]
|
|
if !ok {
|
|
return fmt.Errorf("unknown provider %s for pool %s", pool.ProviderName, pool.ID)
|
|
}
|
|
|
|
name := fmt.Sprintf("%s-%s", pool.GetRunnerPrefix(), util.NewID())
|
|
labels := r.getLabelsForInstance(pool)
|
|
|
|
jitConfig := make(map[string]string)
|
|
var runner *github.Runner
|
|
|
|
if !provider.DisableJITConfig() {
|
|
// Attempt to create JIT config
|
|
jitConfig, runner, err = r.ghcli.GetEntityJITConfig(ctx, name, pool, labels)
|
|
if err != nil {
|
|
slog.With(slog.Any("error", err)).ErrorContext(
|
|
ctx, "failed to get JIT config, falling back to registration token")
|
|
}
|
|
}
|
|
|
|
createParams := params.CreateInstanceParams{
|
|
Name: name,
|
|
Status: commonParams.InstancePendingCreate,
|
|
RunnerStatus: params.RunnerPending,
|
|
OSArch: pool.OSArch,
|
|
OSType: pool.OSType,
|
|
CallbackURL: r.controllerInfo.CallbackURL,
|
|
MetadataURL: r.controllerInfo.MetadataURL,
|
|
CreateAttempt: 1,
|
|
GitHubRunnerGroup: pool.GitHubRunnerGroup,
|
|
AditionalLabels: aditionalLabels,
|
|
JitConfiguration: jitConfig,
|
|
}
|
|
|
|
if runner != nil {
|
|
createParams.AgentID = runner.GetID()
|
|
}
|
|
|
|
instance, err := r.store.CreateInstance(r.ctx, poolID, createParams)
|
|
if err != nil {
|
|
return errors.Wrap(err, "creating instance")
|
|
}
|
|
|
|
defer func() {
|
|
if err != nil {
|
|
if instance.ID != "" {
|
|
if err := r.DeleteRunner(instance, false, false); err != nil {
|
|
slog.With(slog.Any("error", err)).ErrorContext(
|
|
ctx, "failed to cleanup instance",
|
|
"runner_name", instance.Name)
|
|
}
|
|
}
|
|
|
|
if runner != nil {
|
|
runnerCleanupErr := r.ghcli.RemoveEntityRunner(r.ctx, runner.GetID())
|
|
if err != nil {
|
|
slog.With(slog.Any("error", runnerCleanupErr)).ErrorContext(
|
|
ctx, "failed to remove runner",
|
|
"gh_runner_id", runner.GetID())
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
|
|
return nil
|
|
}
|
|
|
|
func (r *basePoolManager) Status() params.PoolManagerStatus {
|
|
r.mux.Lock()
|
|
defer r.mux.Unlock()
|
|
return params.PoolManagerStatus{
|
|
IsRunning: r.managerIsRunning,
|
|
FailureReason: r.managerErrorReason,
|
|
}
|
|
}
|
|
|
|
func (r *basePoolManager) waitForTimeoutOrCancelled(timeout time.Duration) {
|
|
slog.DebugContext(
|
|
r.ctx, fmt.Sprintf("sleeping for %.2f minutes", timeout.Minutes()))
|
|
timer := time.NewTimer(timeout)
|
|
defer timer.Stop()
|
|
select {
|
|
case <-timer.C:
|
|
case <-r.ctx.Done():
|
|
case <-r.quit:
|
|
}
|
|
}
|
|
|
|
func (r *basePoolManager) SetPoolRunningState(isRunning bool, failureReason string) {
|
|
r.mux.Lock()
|
|
r.managerErrorReason = failureReason
|
|
r.managerIsRunning = isRunning
|
|
r.mux.Unlock()
|
|
}
|
|
|
|
func (r *basePoolManager) getLabelsForInstance(pool params.Pool) []string {
|
|
labels := []string{}
|
|
for _, tag := range pool.Tags {
|
|
labels = append(labels, tag.Name)
|
|
}
|
|
labels = append(labels, r.controllerLabel())
|
|
labels = append(labels, r.poolLabel(pool.ID))
|
|
return labels
|
|
}
|
|
|
|
func (r *basePoolManager) addInstanceToProvider(instance params.Instance) error {
|
|
pool, err := r.store.GetEntityPool(r.ctx, r.entity, instance.PoolID)
|
|
if err != nil {
|
|
return errors.Wrap(err, "fetching pool")
|
|
}
|
|
|
|
provider, ok := r.providers[pool.ProviderName]
|
|
if !ok {
|
|
return fmt.Errorf("unknown provider %s for pool %s", pool.ProviderName, pool.ID)
|
|
}
|
|
|
|
jwtValidity := pool.RunnerTimeout()
|
|
|
|
entity := r.entity.String()
|
|
jwtToken, err := r.instanceTokenGetter.NewInstanceJWTToken(instance, entity, pool.PoolType(), jwtValidity)
|
|
if err != nil {
|
|
return errors.Wrap(err, "fetching instance jwt token")
|
|
}
|
|
|
|
hasJITConfig := len(instance.JitConfiguration) > 0
|
|
|
|
bootstrapArgs := commonParams.BootstrapInstance{
|
|
Name: instance.Name,
|
|
Tools: r.tools,
|
|
RepoURL: r.entity.ForgeURL(),
|
|
MetadataURL: instance.MetadataURL,
|
|
CallbackURL: instance.CallbackURL,
|
|
InstanceToken: jwtToken,
|
|
OSArch: pool.OSArch,
|
|
OSType: pool.OSType,
|
|
Flavor: pool.Flavor,
|
|
Image: pool.Image,
|
|
ExtraSpecs: pool.ExtraSpecs,
|
|
PoolID: instance.PoolID,
|
|
CACertBundle: r.entity.Credentials.CABundle,
|
|
GitHubRunnerGroup: instance.GitHubRunnerGroup,
|
|
JitConfigEnabled: hasJITConfig,
|
|
}
|
|
|
|
if !hasJITConfig {
|
|
// We still need the labels here for situations where we don't have a JIT config generated.
|
|
// This can happen if GARM is used against an instance of GHES older than version 3.10.
|
|
// The labels field should be ignored by providers if JIT config is enabled.
|
|
bootstrapArgs.Labels = r.getLabelsForInstance(pool)
|
|
}
|
|
|
|
var instanceIDToDelete string
|
|
|
|
defer func() {
|
|
if instanceIDToDelete != "" {
|
|
deleteInstanceParams := common.DeleteInstanceParams{
|
|
DeleteInstanceV011: common.DeleteInstanceV011Params{
|
|
ProviderBaseParams: r.getProviderBaseParams(pool),
|
|
},
|
|
}
|
|
if err := provider.DeleteInstance(r.ctx, instanceIDToDelete, deleteInstanceParams); err != nil {
|
|
if !errors.Is(err, runnerErrors.ErrNotFound) {
|
|
slog.With(slog.Any("error", err)).ErrorContext(
|
|
r.ctx, "failed to cleanup instance",
|
|
"provider_id", instanceIDToDelete)
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
|
|
createInstanceParams := common.CreateInstanceParams{
|
|
CreateInstanceV011: common.CreateInstanceV011Params{
|
|
ProviderBaseParams: r.getProviderBaseParams(pool),
|
|
},
|
|
}
|
|
providerInstance, err := provider.CreateInstance(r.ctx, bootstrapArgs, createInstanceParams)
|
|
if err != nil {
|
|
instanceIDToDelete = instance.Name
|
|
return errors.Wrap(err, "creating instance")
|
|
}
|
|
|
|
if providerInstance.Status == commonParams.InstanceError {
|
|
instanceIDToDelete = instance.ProviderID
|
|
if instanceIDToDelete == "" {
|
|
instanceIDToDelete = instance.Name
|
|
}
|
|
}
|
|
|
|
updateInstanceArgs := r.updateArgsFromProviderInstance(providerInstance)
|
|
if _, err := r.store.UpdateInstance(r.ctx, instance.Name, updateInstanceArgs); err != nil {
|
|
return errors.Wrap(err, "updating instance")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// paramsWorkflowJobToParamsJob returns a params.Job from a params.WorkflowJob, and aditionally determines
|
|
// if the runner belongs to this pool or not. It will always return a valid params.Job, even if it errs out.
|
|
// This allows us to still update the job in the database, even if we determined that it wasn't necessarily meant
|
|
// for this pool.
|
|
// If garm manages multiple hierarchies (repos, org, enterprise) which involve the same repo, we will get a hook
|
|
// whenever a job involving our repo triggers a hook. So even if the job is picked up by a runner at the enterprise
|
|
// level, the repo and org still get a hook.
|
|
// We even get a hook if a particular job is picked up by a GitHub hosted runner. We don't know who will pick up the job
|
|
// until the "in_progress" event is sent and we can see which runner picked it up.
|
|
//
|
|
// We save the details of that job at every level, because we want to at least update the status of the job. We make
|
|
// decissions based on the status of saved jobs. A "queued" job will prompt garm to search for an appropriate pool
|
|
// and spin up a runner there if no other idle runner exists to pick it up.
|
|
func (r *basePoolManager) paramsWorkflowJobToParamsJob(job params.WorkflowJob) (params.Job, error) {
|
|
asUUID, err := uuid.Parse(r.ID())
|
|
if err != nil {
|
|
return params.Job{}, errors.Wrap(err, "parsing pool ID as UUID")
|
|
}
|
|
|
|
jobParams := params.Job{
|
|
ID: job.WorkflowJob.ID,
|
|
Action: job.Action,
|
|
RunID: job.WorkflowJob.RunID,
|
|
Status: job.WorkflowJob.Status,
|
|
Conclusion: job.WorkflowJob.Conclusion,
|
|
StartedAt: job.WorkflowJob.StartedAt,
|
|
CompletedAt: job.WorkflowJob.CompletedAt,
|
|
Name: job.WorkflowJob.Name,
|
|
GithubRunnerID: job.WorkflowJob.RunnerID,
|
|
RunnerName: job.WorkflowJob.RunnerName,
|
|
RunnerGroupID: job.WorkflowJob.RunnerGroupID,
|
|
RunnerGroupName: job.WorkflowJob.RunnerGroupName,
|
|
RepositoryName: job.Repository.Name,
|
|
RepositoryOwner: job.Repository.Owner.Login,
|
|
Labels: job.WorkflowJob.Labels,
|
|
}
|
|
|
|
switch r.entity.EntityType {
|
|
case params.ForgeEntityTypeEnterprise:
|
|
jobParams.EnterpriseID = &asUUID
|
|
case params.ForgeEntityTypeRepository:
|
|
jobParams.RepoID = &asUUID
|
|
case params.ForgeEntityTypeOrganization:
|
|
jobParams.OrgID = &asUUID
|
|
default:
|
|
return jobParams, errors.Errorf("unknown pool type: %s", r.entity.EntityType)
|
|
}
|
|
|
|
return jobParams, nil
|
|
}
|
|
|
|
func (r *basePoolManager) poolLabel(poolID string) string {
|
|
return fmt.Sprintf("%s=%s", poolIDLabelprefix, poolID)
|
|
}
|
|
|
|
func (r *basePoolManager) controllerLabel() string {
|
|
return fmt.Sprintf("%s=%s", controllerLabelPrefix, r.controllerInfo.ControllerID.String())
|
|
}
|
|
|
|
func (r *basePoolManager) updateArgsFromProviderInstance(providerInstance commonParams.ProviderInstance) params.UpdateInstanceParams {
|
|
return params.UpdateInstanceParams{
|
|
ProviderID: providerInstance.ProviderID,
|
|
OSName: providerInstance.OSName,
|
|
OSVersion: providerInstance.OSVersion,
|
|
Addresses: providerInstance.Addresses,
|
|
Status: providerInstance.Status,
|
|
ProviderFault: providerInstance.ProviderFault,
|
|
}
|
|
}
|
|
|
|
func (r *basePoolManager) scaleDownOnePool(ctx context.Context, pool params.Pool) error {
|
|
slog.DebugContext(
|
|
ctx, "scaling down pool",
|
|
"pool_id", pool.ID)
|
|
if !pool.Enabled {
|
|
slog.DebugContext(
|
|
ctx, "pool is disabled, skipping scale down",
|
|
"pool_id", pool.ID)
|
|
return nil
|
|
}
|
|
|
|
existingInstances, err := r.store.ListPoolInstances(r.ctx, pool.ID)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to ensure minimum idle workers for pool %s: %w", pool.ID, err)
|
|
}
|
|
|
|
idleWorkers := []params.Instance{}
|
|
for _, inst := range existingInstances {
|
|
// Idle runners that have been spawned and are still idle after 5 minutes, are take into
|
|
// consideration for scale-down. The 5 minute grace period prevents a situation where a
|
|
// "queued" workflow triggers the creation of a new idle runner, and this routine reaps
|
|
// an idle runner before they have a chance to pick up a job.
|
|
if inst.RunnerStatus == params.RunnerIdle && inst.Status == commonParams.InstanceRunning && time.Since(inst.UpdatedAt).Minutes() > 2 {
|
|
idleWorkers = append(idleWorkers, inst)
|
|
}
|
|
}
|
|
|
|
if len(idleWorkers) == 0 {
|
|
return nil
|
|
}
|
|
|
|
surplus := float64(len(idleWorkers) - pool.MinIdleRunnersAsInt())
|
|
|
|
if surplus <= 0 {
|
|
return nil
|
|
}
|
|
|
|
scaleDownFactor := 0.5 // could be configurable
|
|
numScaleDown := int(math.Ceil(surplus * scaleDownFactor))
|
|
|
|
if numScaleDown <= 0 || numScaleDown > len(idleWorkers) {
|
|
return fmt.Errorf("invalid number of instances to scale down: %v, check your scaleDownFactor: %v", numScaleDown, scaleDownFactor)
|
|
}
|
|
|
|
g, _ := errgroup.WithContext(ctx)
|
|
|
|
for _, instanceToDelete := range idleWorkers[:numScaleDown] {
|
|
instanceToDelete := instanceToDelete
|
|
|
|
lockAcquired := locking.TryLock(instanceToDelete.Name, r.consumerID)
|
|
if !lockAcquired {
|
|
slog.With(slog.Any("error", err)).ErrorContext(
|
|
ctx, "failed to acquire lock for instance",
|
|
"provider_id", instanceToDelete.Name)
|
|
continue
|
|
}
|
|
defer locking.Unlock(instanceToDelete.Name, false)
|
|
|
|
g.Go(func() error {
|
|
slog.InfoContext(
|
|
ctx, "scaling down idle worker from pool",
|
|
"runner_name", instanceToDelete.Name,
|
|
"pool_id", pool.ID)
|
|
if err := r.DeleteRunner(instanceToDelete, false, false); err != nil {
|
|
return fmt.Errorf("failed to delete instance %s: %w", instanceToDelete.ID, err)
|
|
}
|
|
return nil
|
|
})
|
|
}
|
|
|
|
if numScaleDown > 0 {
|
|
// We just scaled down a runner for this pool. That means that if we have jobs that are
|
|
// still queued in our DB, and those jobs should match this pool but have not been picked
|
|
// up by a runner, they are most likely stale and can be removed. For now, we can simply
|
|
// remove jobs older than 10 minutes.
|
|
//
|
|
// nolint:golangci-lint,godox
|
|
// TODO: should probably allow aditional filters to list functions. Would help to filter by date
|
|
// instead of returning a bunch of results and filtering manually.
|
|
queued, err := r.store.ListEntityJobsByStatus(r.ctx, r.entity.EntityType, r.entity.ID, params.JobStatusQueued)
|
|
if err != nil && !errors.Is(err, runnerErrors.ErrNotFound) {
|
|
return errors.Wrap(err, "listing queued jobs")
|
|
}
|
|
|
|
for _, job := range queued {
|
|
if time.Since(job.CreatedAt).Minutes() > 10 && pool.HasRequiredLabels(job.Labels) {
|
|
if err := r.store.DeleteJob(ctx, job.ID); err != nil && !errors.Is(err, runnerErrors.ErrNotFound) {
|
|
slog.With(slog.Any("error", err)).ErrorContext(
|
|
ctx, "failed to delete job",
|
|
"job_id", job.ID)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
if err := r.waitForErrorGroupOrContextCancelled(g); err != nil {
|
|
return fmt.Errorf("failed to scale down pool %s: %w", pool.ID, err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (r *basePoolManager) addRunnerToPool(pool params.Pool, aditionalLabels []string) error {
|
|
if !pool.Enabled {
|
|
return fmt.Errorf("pool %s is disabled", pool.ID)
|
|
}
|
|
|
|
poolInstanceCount, err := r.store.PoolInstanceCount(r.ctx, pool.ID)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to list pool instances: %w", err)
|
|
}
|
|
|
|
if poolInstanceCount >= int64(pool.MaxRunnersAsInt()) {
|
|
return fmt.Errorf("max workers (%d) reached for pool %s", pool.MaxRunners, pool.ID)
|
|
}
|
|
|
|
if err := r.AddRunner(r.ctx, pool.ID, aditionalLabels); err != nil {
|
|
return fmt.Errorf("failed to add new instance for pool %s: %s", pool.ID, err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (r *basePoolManager) ensureIdleRunnersForOnePool(pool params.Pool) error {
|
|
if !pool.Enabled || pool.MinIdleRunners == 0 {
|
|
return nil
|
|
}
|
|
|
|
existingInstances, err := r.store.ListPoolInstances(r.ctx, pool.ID)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to ensure minimum idle workers for pool %s: %w", pool.ID, err)
|
|
}
|
|
|
|
if uint(len(existingInstances)) >= pool.MaxRunners {
|
|
slog.DebugContext(
|
|
r.ctx, "max workers reached for pool, skipping idle worker creation",
|
|
"max_runners", pool.MaxRunners,
|
|
"pool_id", pool.ID)
|
|
return nil
|
|
}
|
|
|
|
idleOrPendingWorkers := []params.Instance{}
|
|
for _, inst := range existingInstances {
|
|
if inst.RunnerStatus != params.RunnerActive && inst.RunnerStatus != params.RunnerTerminated {
|
|
idleOrPendingWorkers = append(idleOrPendingWorkers, inst)
|
|
}
|
|
}
|
|
|
|
var required int
|
|
if len(idleOrPendingWorkers) < pool.MinIdleRunnersAsInt() {
|
|
// get the needed delta.
|
|
required = pool.MinIdleRunnersAsInt() - len(idleOrPendingWorkers)
|
|
|
|
projectedInstanceCount := len(existingInstances) + required
|
|
|
|
var projected uint
|
|
if projectedInstanceCount > 0 {
|
|
projected = uint(projectedInstanceCount)
|
|
}
|
|
if projected > pool.MaxRunners {
|
|
// ensure we don't go above max workers
|
|
delta := projectedInstanceCount - pool.MaxRunnersAsInt()
|
|
required -= delta
|
|
}
|
|
}
|
|
|
|
for i := 0; i < required; i++ {
|
|
slog.InfoContext(
|
|
r.ctx, "adding new idle worker to pool",
|
|
"pool_id", pool.ID)
|
|
if err := r.AddRunner(r.ctx, pool.ID, nil); err != nil {
|
|
return fmt.Errorf("failed to add new instance for pool %s: %w", pool.ID, err)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (r *basePoolManager) retryFailedInstancesForOnePool(ctx context.Context, pool params.Pool) error {
|
|
if !pool.Enabled {
|
|
return nil
|
|
}
|
|
slog.DebugContext(
|
|
ctx, "running retry failed instances for pool",
|
|
"pool_id", pool.ID)
|
|
|
|
existingInstances, err := r.store.ListPoolInstances(r.ctx, pool.ID)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to list instances for pool %s: %w", pool.ID, err)
|
|
}
|
|
|
|
g, errCtx := errgroup.WithContext(ctx)
|
|
for _, instance := range existingInstances {
|
|
instance := instance
|
|
|
|
if instance.Status != commonParams.InstanceError {
|
|
continue
|
|
}
|
|
if instance.CreateAttempt >= maxCreateAttempts {
|
|
continue
|
|
}
|
|
|
|
slog.DebugContext(
|
|
ctx, "attempting to retry failed instance",
|
|
"runner_name", instance.Name)
|
|
lockAcquired := locking.TryLock(instance.Name, r.consumerID)
|
|
if !lockAcquired {
|
|
slog.DebugContext(
|
|
ctx, "failed to acquire lock for instance",
|
|
"runner_name", instance.Name)
|
|
continue
|
|
}
|
|
|
|
g.Go(func() error {
|
|
defer locking.Unlock(instance.Name, false)
|
|
slog.DebugContext(
|
|
ctx, "attempting to clean up any previous instance",
|
|
"runner_name", instance.Name)
|
|
// nolint:golangci-lint,godox
|
|
// NOTE(gabriel-samfira): this is done in parallel. If there are many failed instances
|
|
// this has the potential to create many API requests to the target provider.
|
|
// TODO(gabriel-samfira): implement request throttling.
|
|
if err := r.deleteInstanceFromProvider(errCtx, instance); err != nil {
|
|
slog.With(slog.Any("error", err)).ErrorContext(
|
|
ctx, "failed to delete instance from provider",
|
|
"runner_name", instance.Name)
|
|
// Bail here, otherwise we risk creating multiple failing instances, and losing track
|
|
// of them. If Create instance failed to return a proper provider ID, we rely on the
|
|
// name to delete the instance. If we don't bail here, and end up with multiple
|
|
// instances with the same name, using the name to clean up failed instances will fail
|
|
// on any subsequent call, unless the external or native provider takes into account
|
|
// non unique names and loops over all of them. Something which is extremely hacky and
|
|
// which we would rather avoid.
|
|
return err
|
|
}
|
|
slog.DebugContext(
|
|
ctx, "cleanup of previously failed instance complete",
|
|
"runner_name", instance.Name)
|
|
// nolint:golangci-lint,godox
|
|
// TODO(gabriel-samfira): Incrementing CreateAttempt should be done within a transaction.
|
|
// It's fairly safe to do here (for now), as there should be no other code path that updates
|
|
// an instance in this state.
|
|
var tokenFetched bool = len(instance.JitConfiguration) > 0
|
|
updateParams := params.UpdateInstanceParams{
|
|
CreateAttempt: instance.CreateAttempt + 1,
|
|
TokenFetched: &tokenFetched,
|
|
Status: commonParams.InstancePendingCreate,
|
|
RunnerStatus: params.RunnerPending,
|
|
}
|
|
slog.DebugContext(
|
|
ctx, "queueing previously failed instance for retry",
|
|
"runner_name", instance.Name)
|
|
// Set instance to pending create and wait for retry.
|
|
if _, err := r.store.UpdateInstance(r.ctx, instance.Name, updateParams); err != nil {
|
|
slog.With(slog.Any("error", err)).ErrorContext(
|
|
ctx, "failed to update runner status",
|
|
"runner_name", instance.Name)
|
|
}
|
|
return nil
|
|
})
|
|
}
|
|
if err := r.waitForErrorGroupOrContextCancelled(g); err != nil {
|
|
return fmt.Errorf("failed to retry failed instances for pool %s: %w", pool.ID, err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (r *basePoolManager) retryFailedInstances() error {
|
|
pools := cache.GetEntityPools(r.entity.ID)
|
|
g, ctx := errgroup.WithContext(r.ctx)
|
|
for _, pool := range pools {
|
|
pool := pool
|
|
g.Go(func() error {
|
|
if err := r.retryFailedInstancesForOnePool(ctx, pool); err != nil {
|
|
return fmt.Errorf("retrying failed instances for pool %s: %w", pool.ID, err)
|
|
}
|
|
return nil
|
|
})
|
|
}
|
|
|
|
if err := r.waitForErrorGroupOrContextCancelled(g); err != nil {
|
|
return fmt.Errorf("retrying failed instances: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (r *basePoolManager) scaleDown() error {
|
|
pools := cache.GetEntityPools(r.entity.ID)
|
|
g, ctx := errgroup.WithContext(r.ctx)
|
|
for _, pool := range pools {
|
|
pool := pool
|
|
g.Go(func() error {
|
|
slog.DebugContext(
|
|
ctx, "running scale down for pool",
|
|
"pool_id", pool.ID)
|
|
return r.scaleDownOnePool(ctx, pool)
|
|
})
|
|
}
|
|
if err := r.waitForErrorGroupOrContextCancelled(g); err != nil {
|
|
return fmt.Errorf("failed to scale down: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (r *basePoolManager) ensureMinIdleRunners() error {
|
|
pools := cache.GetEntityPools(r.entity.ID)
|
|
g, _ := errgroup.WithContext(r.ctx)
|
|
for _, pool := range pools {
|
|
pool := pool
|
|
g.Go(func() error {
|
|
return r.ensureIdleRunnersForOnePool(pool)
|
|
})
|
|
}
|
|
|
|
if err := r.waitForErrorGroupOrContextCancelled(g); err != nil {
|
|
return fmt.Errorf("failed to ensure minimum idle workers: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (r *basePoolManager) deleteInstanceFromProvider(ctx context.Context, instance params.Instance) error {
|
|
pool, err := r.store.GetEntityPool(r.ctx, r.entity, instance.PoolID)
|
|
if err != nil {
|
|
return errors.Wrap(err, "fetching pool")
|
|
}
|
|
|
|
provider, ok := r.providers[instance.ProviderName]
|
|
if !ok {
|
|
return fmt.Errorf("unknown provider %s for pool %s", instance.ProviderName, instance.PoolID)
|
|
}
|
|
|
|
identifier := instance.ProviderID
|
|
if identifier == "" {
|
|
// provider did not return a provider ID?
|
|
// try with name
|
|
identifier = instance.Name
|
|
}
|
|
|
|
slog.DebugContext(
|
|
ctx, "calling delete instance on provider",
|
|
"runner_name", instance.Name,
|
|
"provider_id", identifier)
|
|
|
|
deleteInstanceParams := common.DeleteInstanceParams{
|
|
DeleteInstanceV011: common.DeleteInstanceV011Params{
|
|
ProviderBaseParams: r.getProviderBaseParams(pool),
|
|
},
|
|
}
|
|
if err := provider.DeleteInstance(ctx, identifier, deleteInstanceParams); err != nil {
|
|
return errors.Wrap(err, "removing instance")
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (r *basePoolManager) sleepWithCancel(sleepTime time.Duration) (canceled bool) {
|
|
ticker := time.NewTicker(sleepTime)
|
|
defer ticker.Stop()
|
|
|
|
select {
|
|
case <-ticker.C:
|
|
return false
|
|
case <-r.quit:
|
|
case <-r.ctx.Done():
|
|
}
|
|
return true
|
|
}
|
|
|
|
func (r *basePoolManager) deletePendingInstances() error {
|
|
instances, err := r.store.ListEntityInstances(r.ctx, r.entity)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to fetch instances from store: %w", err)
|
|
}
|
|
|
|
slog.DebugContext(
|
|
r.ctx, "removing instances in pending_delete")
|
|
for _, instance := range instances {
|
|
if instance.ScaleSetID != 0 {
|
|
// instance is part of a scale set. Skip.
|
|
continue
|
|
}
|
|
|
|
if instance.Status != commonParams.InstancePendingDelete && instance.Status != commonParams.InstancePendingForceDelete {
|
|
// not in pending_delete status. Skip.
|
|
continue
|
|
}
|
|
|
|
slog.InfoContext(
|
|
r.ctx, "removing instance from pool",
|
|
"runner_name", instance.Name,
|
|
"pool_id", instance.PoolID)
|
|
lockAcquired := locking.TryLock(instance.Name, r.consumerID)
|
|
if !lockAcquired {
|
|
slog.InfoContext(
|
|
r.ctx, "failed to acquire lock for instance",
|
|
"runner_name", instance.Name)
|
|
continue
|
|
}
|
|
|
|
shouldProcess, deadline := r.backoff.ShouldProcess(instance.Name)
|
|
if !shouldProcess {
|
|
slog.DebugContext(
|
|
r.ctx, "backoff in effect for instance",
|
|
"runner_name", instance.Name, "deadline", deadline)
|
|
locking.Unlock(instance.Name, false)
|
|
continue
|
|
}
|
|
|
|
go func(instance params.Instance) (err error) {
|
|
// Prevent Thundering Herd. Should alleviate some of the database
|
|
// is locked errors in sqlite3.
|
|
num, err := rand.Int(rand.Reader, big.NewInt(2000))
|
|
if err != nil {
|
|
return fmt.Errorf("failed to generate random number: %w", err)
|
|
}
|
|
jitter := time.Duration(num.Int64()) * time.Millisecond
|
|
if canceled := r.sleepWithCancel(jitter); canceled {
|
|
return nil
|
|
}
|
|
|
|
currentStatus := instance.Status
|
|
deleteMux := false
|
|
defer func() {
|
|
locking.Unlock(instance.Name, deleteMux)
|
|
if deleteMux {
|
|
// deleteMux is set only when the instance was successfully removed.
|
|
// We can use it as a marker to signal that the backoff is no longer
|
|
// needed.
|
|
r.backoff.Delete(instance.Name)
|
|
}
|
|
}()
|
|
defer func(instance params.Instance) {
|
|
if err != nil {
|
|
slog.With(slog.Any("error", err)).ErrorContext(
|
|
r.ctx, "failed to remove instance",
|
|
"runner_name", instance.Name)
|
|
// failed to remove from provider. Set status to previous value, which will retry
|
|
// the operation.
|
|
if _, err := r.setInstanceStatus(instance.Name, currentStatus, []byte(err.Error())); err != nil {
|
|
slog.With(slog.Any("error", err)).ErrorContext(
|
|
r.ctx, "failed to update runner status",
|
|
"runner_name", instance.Name)
|
|
}
|
|
r.backoff.RecordFailure(instance.Name)
|
|
}
|
|
}(instance)
|
|
|
|
if _, err := r.setInstanceStatus(instance.Name, commonParams.InstanceDeleting, nil); err != nil {
|
|
slog.With(slog.Any("error", err)).ErrorContext(
|
|
r.ctx, "failed to update runner status",
|
|
"runner_name", instance.Name)
|
|
return err
|
|
}
|
|
|
|
slog.DebugContext(
|
|
r.ctx, "removing instance from provider",
|
|
"runner_name", instance.Name)
|
|
err = r.deleteInstanceFromProvider(r.ctx, instance)
|
|
if err != nil {
|
|
if currentStatus != commonParams.InstancePendingForceDelete {
|
|
return fmt.Errorf("failed to remove instance from provider: %w", err)
|
|
}
|
|
slog.With(slog.Any("error", err)).ErrorContext(
|
|
r.ctx, "failed to remove instance from provider (continuing anyway)",
|
|
"instance", instance.Name)
|
|
}
|
|
slog.InfoContext(
|
|
r.ctx, "removing instance from database",
|
|
"runner_name", instance.Name)
|
|
if deleteErr := r.store.DeleteInstance(r.ctx, instance.PoolID, instance.Name); deleteErr != nil {
|
|
return fmt.Errorf("failed to delete instance from database: %w", deleteErr)
|
|
}
|
|
deleteMux = true
|
|
slog.InfoContext(
|
|
r.ctx, "instance was successfully removed",
|
|
"runner_name", instance.Name)
|
|
return nil
|
|
}(instance) //nolint
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (r *basePoolManager) addPendingInstances() error {
|
|
// nolint:golangci-lint,godox
|
|
// TODO: filter instances by status.
|
|
instances, err := r.store.ListEntityInstances(r.ctx, r.entity)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to fetch instances from store: %w", err)
|
|
}
|
|
for _, instance := range instances {
|
|
if instance.ScaleSetID != 0 {
|
|
// instance is part of a scale set. Skip.
|
|
continue
|
|
}
|
|
|
|
if instance.Status != commonParams.InstancePendingCreate {
|
|
// not in pending_create status. Skip.
|
|
continue
|
|
}
|
|
|
|
slog.DebugContext(
|
|
r.ctx, "attempting to acquire lock for instance",
|
|
"runner_name", instance.Name,
|
|
"action", "create_pending")
|
|
lockAcquired := locking.TryLock(instance.Name, r.consumerID)
|
|
if !lockAcquired {
|
|
slog.DebugContext(
|
|
r.ctx, "failed to acquire lock for instance",
|
|
"runner_name", instance.Name)
|
|
continue
|
|
}
|
|
|
|
// Set the instance to "creating" before launching the goroutine. This will ensure that addPendingInstances()
|
|
// won't attempt to create the runner a second time.
|
|
if _, err := r.setInstanceStatus(instance.Name, commonParams.InstanceCreating, nil); err != nil {
|
|
slog.With(slog.Any("error", err)).ErrorContext(
|
|
r.ctx, "failed to update runner status",
|
|
"runner_name", instance.Name)
|
|
locking.Unlock(instance.Name, false)
|
|
// We failed to transition the instance to Creating. This means that garm will retry to create this instance
|
|
// when the loop runs again and we end up with multiple instances.
|
|
continue
|
|
}
|
|
|
|
go func(instance params.Instance) {
|
|
defer locking.Unlock(instance.Name, false)
|
|
slog.InfoContext(
|
|
r.ctx, "creating instance in pool",
|
|
"runner_name", instance.Name,
|
|
"pool_id", instance.PoolID)
|
|
if err := r.addInstanceToProvider(instance); err != nil {
|
|
slog.With(slog.Any("error", err)).ErrorContext(
|
|
r.ctx, "failed to add instance to provider",
|
|
"runner_name", instance.Name)
|
|
errAsBytes := []byte(err.Error())
|
|
if _, statusErr := r.setInstanceStatus(instance.Name, commonParams.InstanceError, errAsBytes); statusErr != nil {
|
|
slog.With(slog.Any("error", statusErr)).ErrorContext(
|
|
r.ctx, "failed to update runner status",
|
|
"runner_name", instance.Name)
|
|
}
|
|
slog.With(slog.Any("error", err)).ErrorContext(
|
|
r.ctx, "failed to create instance in provider",
|
|
"runner_name", instance.Name)
|
|
}
|
|
}(instance)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (r *basePoolManager) Wait() error {
|
|
done := make(chan struct{})
|
|
timer := time.NewTimer(60 * time.Second)
|
|
go func() {
|
|
r.wg.Wait()
|
|
timer.Stop()
|
|
close(done)
|
|
}()
|
|
select {
|
|
case <-done:
|
|
case <-timer.C:
|
|
return errors.Wrap(runnerErrors.ErrTimeout, "waiting for pool to stop")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (r *basePoolManager) runnerCleanup() (err error) {
|
|
slog.DebugContext(
|
|
r.ctx, "running runner cleanup")
|
|
runners, err := r.GetGithubRunners()
|
|
if err != nil {
|
|
return fmt.Errorf("failed to fetch github runners: %w", err)
|
|
}
|
|
|
|
if err := r.reapTimedOutRunners(runners); err != nil {
|
|
return fmt.Errorf("failed to reap timed out runners: %w", err)
|
|
}
|
|
|
|
if err := r.cleanupOrphanedRunners(runners); err != nil {
|
|
return fmt.Errorf("failed to cleanup orphaned runners: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (r *basePoolManager) cleanupOrphanedRunners(runners []*github.Runner) error {
|
|
if err := r.cleanupOrphanedProviderRunners(runners); err != nil {
|
|
return errors.Wrap(err, "cleaning orphaned instances")
|
|
}
|
|
|
|
if err := r.cleanupOrphanedGithubRunners(runners); err != nil {
|
|
return errors.Wrap(err, "cleaning orphaned github runners")
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (r *basePoolManager) Start() error {
|
|
initialToolUpdate := make(chan struct{}, 1)
|
|
go func() {
|
|
slog.Info("running initial tool update")
|
|
for {
|
|
slog.DebugContext(r.ctx, "waiting for tools to be available")
|
|
hasTools, stopped := r.waitForToolsOrCancel()
|
|
if stopped {
|
|
return
|
|
}
|
|
if hasTools {
|
|
break
|
|
}
|
|
}
|
|
if err := r.updateTools(); err != nil {
|
|
slog.With(slog.Any("error", err)).Error("failed to update tools")
|
|
}
|
|
initialToolUpdate <- struct{}{}
|
|
}()
|
|
|
|
go r.runWatcher()
|
|
go func() {
|
|
select {
|
|
case <-r.quit:
|
|
return
|
|
case <-r.ctx.Done():
|
|
return
|
|
case <-initialToolUpdate:
|
|
}
|
|
defer close(initialToolUpdate)
|
|
go r.startLoopForFunction(r.runnerCleanup, common.PoolReapTimeoutInterval, "timeout_reaper", false)
|
|
go r.startLoopForFunction(r.scaleDown, common.PoolScaleDownInterval, "scale_down", false)
|
|
// always run the delete pending instances routine. This way we can still remove existing runners, even if the pool is not running.
|
|
go r.startLoopForFunction(r.deletePendingInstances, common.PoolConsilitationInterval, "consolidate[delete_pending]", true)
|
|
go r.startLoopForFunction(r.addPendingInstances, common.PoolConsilitationInterval, "consolidate[add_pending]", false)
|
|
go r.startLoopForFunction(r.ensureMinIdleRunners, common.PoolConsilitationInterval, "consolidate[ensure_min_idle]", false)
|
|
go r.startLoopForFunction(r.retryFailedInstances, common.PoolConsilitationInterval, "consolidate[retry_failed]", false)
|
|
go r.startLoopForFunction(r.updateTools, common.PoolToolUpdateInterval, "update_tools", true)
|
|
go r.startLoopForFunction(r.consumeQueuedJobs, common.PoolConsilitationInterval, "job_queue_consumer", false)
|
|
}()
|
|
return nil
|
|
}
|
|
|
|
func (r *basePoolManager) Stop() error {
|
|
close(r.quit)
|
|
return nil
|
|
}
|
|
|
|
func (r *basePoolManager) WebhookSecret() string {
|
|
return r.entity.WebhookSecret
|
|
}
|
|
|
|
func (r *basePoolManager) ID() string {
|
|
return r.entity.ID
|
|
}
|
|
|
|
// Delete runner will delete a runner from a pool. If forceRemove is set to true, any error received from
|
|
// the IaaS provider will be ignored and deletion will continue.
|
|
func (r *basePoolManager) DeleteRunner(runner params.Instance, forceRemove, bypassGHUnauthorizedError bool) error {
|
|
if !r.managerIsRunning && !bypassGHUnauthorizedError {
|
|
return runnerErrors.NewConflictError("pool manager is not running for %s", r.entity.String())
|
|
}
|
|
|
|
if runner.AgentID != 0 {
|
|
if err := r.ghcli.RemoveEntityRunner(r.ctx, runner.AgentID); err != nil {
|
|
if errors.Is(err, runnerErrors.ErrUnauthorized) {
|
|
slog.With(slog.Any("error", err)).ErrorContext(r.ctx, "failed to remove runner from github")
|
|
// Mark the pool as offline from this point forward
|
|
r.SetPoolRunningState(false, fmt.Sprintf("failed to remove runner: %q", err))
|
|
slog.With(slog.Any("error", err)).ErrorContext(
|
|
r.ctx, "failed to remove runner")
|
|
if bypassGHUnauthorizedError {
|
|
slog.Info("bypass github unauthorized error is set, marking runner for deletion")
|
|
} else {
|
|
return errors.Wrap(err, "removing runner")
|
|
}
|
|
} else {
|
|
return errors.Wrap(err, "removing runner")
|
|
}
|
|
}
|
|
}
|
|
|
|
instanceStatus := commonParams.InstancePendingDelete
|
|
if forceRemove {
|
|
instanceStatus = commonParams.InstancePendingForceDelete
|
|
}
|
|
|
|
slog.InfoContext(
|
|
r.ctx, "setting instance status",
|
|
"runner_name", runner.Name,
|
|
"status", instanceStatus)
|
|
if _, err := r.setInstanceStatus(runner.Name, instanceStatus, nil); err != nil {
|
|
slog.With(slog.Any("error", err)).ErrorContext(
|
|
r.ctx, "failed to update runner",
|
|
"runner_name", runner.Name)
|
|
return errors.Wrap(err, "updating runner")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// consumeQueuedJobs will pull all the known jobs from the database and attempt to create a new
|
|
// runner in one of the pools it manages, if it matches the requested labels.
|
|
// This is a best effort attempt to consume queued jobs. We do not have any real way to know which
|
|
// runner from which pool will pick up a job we react to here. For example, the same job may be received
|
|
// by an enterprise manager, an org manager AND a repo manager. If an idle runner from another pool
|
|
// picks up the job after we created a runner in this pool, we will have an extra runner that may or may not
|
|
// have a job waiting for it.
|
|
// This is not a huge problem, as we have scale down logic which should remove any idle runners that have not
|
|
// picked up a job within a certain time frame. Also, the logic here should ensure that eventually, all known
|
|
// queued jobs will be consumed sooner or later.
|
|
//
|
|
// NOTE: jobs that were created while the garm instance was down, will be unknown to garm itself and will linger
|
|
// in queued state if the pools defined in garm have a minimum idle runner value set to 0. Simply put, garm won't
|
|
// know about the queued jobs that we didn't get a webhook for. Listing all jobs on startup is not feasible, as
|
|
// an enterprise may have thousands of repos and thousands of jobs in queued state. To fetch all jobs for an
|
|
// enterprise, we'd have to list all repos, and for each repo list all jobs currently in queued state. This is
|
|
// not desirable by any measure.
|
|
//
|
|
// One way to handle situations where garm comes up after a longer period of time, is to temporarily max out the
|
|
// min-idle-runner setting on pools, or at least raise it above 0. The idle runners will start to consume jobs, and
|
|
// as they do so, new idle runners will be spun up in their stead. New jobs will record in the DB as they come in,
|
|
// so those will trigger the creation of a runner. The jobs we don't know about will be dealt with by the idle runners.
|
|
// Once jobs are consumed, you can set min-idle-runners to 0 again.
|
|
func (r *basePoolManager) consumeQueuedJobs() error {
|
|
queued, err := r.store.ListEntityJobsByStatus(r.ctx, r.entity.EntityType, r.entity.ID, params.JobStatusQueued)
|
|
if err != nil {
|
|
return errors.Wrap(err, "listing queued jobs")
|
|
}
|
|
|
|
poolsCache := poolsForTags{
|
|
poolCacheType: r.entity.GetPoolBalancerType(),
|
|
}
|
|
|
|
slog.DebugContext(
|
|
r.ctx, "found queued jobs",
|
|
"job_count", len(queued))
|
|
for _, job := range queued {
|
|
if job.LockedBy != uuid.Nil && job.LockedBy.String() != r.ID() {
|
|
// Job was handled by us or another entity.
|
|
slog.DebugContext(
|
|
r.ctx, "job is locked",
|
|
"job_id", job.ID,
|
|
"locking_entity", job.LockedBy.String())
|
|
continue
|
|
}
|
|
|
|
if time.Since(job.UpdatedAt) < time.Second*r.controllerInfo.JobBackoff() {
|
|
// give the idle runners a chance to pick up the job.
|
|
slog.DebugContext(
|
|
r.ctx, "job backoff not reached", "backoff_interval", r.controllerInfo.MinimumJobAgeBackoff,
|
|
"job_id", job.ID)
|
|
continue
|
|
}
|
|
|
|
if time.Since(job.UpdatedAt) >= time.Minute*10 {
|
|
// Job is still queued in our db, 10 minutes after a matching runner
|
|
// was spawned. Unlock it and try again. A different job may have picked up
|
|
// the runner.
|
|
if err := r.store.UnlockJob(r.ctx, job.ID, r.ID()); err != nil {
|
|
// nolint:golangci-lint,godox
|
|
// TODO: Implament a cache? Should we return here?
|
|
slog.With(slog.Any("error", err)).ErrorContext(
|
|
r.ctx, "failed to unlock job",
|
|
"job_id", job.ID)
|
|
continue
|
|
}
|
|
}
|
|
|
|
if job.LockedBy.String() == r.ID() {
|
|
// nolint:golangci-lint,godox
|
|
// Job is locked by us. We must have already attepted to create a runner for it. Skip.
|
|
// TODO(gabriel-samfira): create an in-memory state of existing runners that we can easily
|
|
// check for existing pending or idle runners. If we can't find any, attempt to allocate another
|
|
// runner.
|
|
slog.DebugContext(
|
|
r.ctx, "job is locked by us",
|
|
"job_id", job.ID)
|
|
continue
|
|
}
|
|
|
|
poolRR, ok := poolsCache.Get(job.Labels)
|
|
if !ok {
|
|
potentialPools, err := r.store.FindPoolsMatchingAllTags(r.ctx, r.entity.EntityType, r.entity.ID, job.Labels)
|
|
if err != nil {
|
|
slog.With(slog.Any("error", err)).ErrorContext(
|
|
r.ctx, "error finding pools matching labels")
|
|
continue
|
|
}
|
|
poolRR = poolsCache.Add(job.Labels, potentialPools)
|
|
}
|
|
|
|
if poolRR.Len() == 0 {
|
|
slog.DebugContext(r.ctx, "could not find pools with labels", "requested_labels", strings.Join(job.Labels, ","))
|
|
continue
|
|
}
|
|
|
|
runnerCreated := false
|
|
if err := r.store.LockJob(r.ctx, job.ID, r.ID()); err != nil {
|
|
slog.With(slog.Any("error", err)).ErrorContext(
|
|
r.ctx, "could not lock job",
|
|
"job_id", job.ID)
|
|
continue
|
|
}
|
|
|
|
jobLabels := []string{
|
|
fmt.Sprintf("%s=%d", jobLabelPrefix, job.ID),
|
|
}
|
|
for i := 0; i < poolRR.Len(); i++ {
|
|
pool, err := poolRR.Next()
|
|
if err != nil {
|
|
slog.With(slog.Any("error", err)).ErrorContext(
|
|
r.ctx, "could not find a pool to create a runner for job",
|
|
"job_id", job.ID)
|
|
break
|
|
}
|
|
|
|
slog.InfoContext(
|
|
r.ctx, "attempting to create a runner in pool",
|
|
"pool_id", pool.ID,
|
|
"job_id", job.ID)
|
|
if err := r.addRunnerToPool(pool, jobLabels); err != nil {
|
|
slog.With(slog.Any("error", err)).ErrorContext(
|
|
r.ctx, "could not add runner to pool",
|
|
"pool_id", pool.ID)
|
|
continue
|
|
}
|
|
slog.DebugContext(r.ctx, "a new runner was added as a response to queued job",
|
|
"pool_id", pool.ID,
|
|
"job_id", job.ID)
|
|
runnerCreated = true
|
|
break
|
|
}
|
|
|
|
if !runnerCreated {
|
|
slog.WarnContext(
|
|
r.ctx, "could not create a runner for job; unlocking",
|
|
"job_id", job.ID)
|
|
if err := r.store.UnlockJob(r.ctx, job.ID, r.ID()); err != nil {
|
|
slog.With(slog.Any("error", err)).ErrorContext(
|
|
r.ctx, "failed to unlock job",
|
|
"job_id", job.ID)
|
|
return errors.Wrap(err, "unlocking job")
|
|
}
|
|
}
|
|
}
|
|
|
|
if err := r.store.DeleteCompletedJobs(r.ctx); err != nil {
|
|
slog.With(slog.Any("error", err)).ErrorContext(
|
|
r.ctx, "failed to delete completed jobs")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (r *basePoolManager) UninstallWebhook(ctx context.Context) error {
|
|
if r.controllerInfo.ControllerWebhookURL == "" {
|
|
return errors.Wrap(runnerErrors.ErrBadRequest, "controller webhook url is empty")
|
|
}
|
|
|
|
allHooks, err := r.listHooks(ctx)
|
|
if err != nil {
|
|
return errors.Wrap(err, "listing hooks")
|
|
}
|
|
|
|
var controllerHookID int64
|
|
var baseHook string
|
|
trimmedBase := strings.TrimRight(r.controllerInfo.WebhookURL, "/")
|
|
trimmedController := strings.TrimRight(r.controllerInfo.ControllerWebhookURL, "/")
|
|
|
|
for _, hook := range allHooks {
|
|
hookInfo := hookToParamsHookInfo(hook)
|
|
info := strings.TrimRight(hookInfo.URL, "/")
|
|
if strings.EqualFold(info, trimmedController) {
|
|
controllerHookID = hook.GetID()
|
|
}
|
|
|
|
if strings.EqualFold(info, trimmedBase) {
|
|
baseHook = hookInfo.URL
|
|
}
|
|
}
|
|
|
|
if controllerHookID != 0 {
|
|
_, err = r.ghcli.DeleteEntityHook(ctx, controllerHookID)
|
|
if err != nil {
|
|
return fmt.Errorf("deleting hook: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
if baseHook != "" {
|
|
return runnerErrors.NewBadRequestError("base hook found (%s) and must be deleted manually", baseHook)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (r *basePoolManager) InstallHook(ctx context.Context, req *github.Hook) (params.HookInfo, error) {
|
|
allHooks, err := r.listHooks(ctx)
|
|
if err != nil {
|
|
return params.HookInfo{}, errors.Wrap(err, "listing hooks")
|
|
}
|
|
|
|
if err := validateHookRequest(r.controllerInfo.ControllerID.String(), r.controllerInfo.WebhookURL, allHooks, req); err != nil {
|
|
return params.HookInfo{}, errors.Wrap(err, "validating hook request")
|
|
}
|
|
|
|
hook, err := r.ghcli.CreateEntityHook(ctx, req)
|
|
if err != nil {
|
|
return params.HookInfo{}, errors.Wrap(err, "creating entity hook")
|
|
}
|
|
|
|
if _, err := r.ghcli.PingEntityHook(ctx, hook.GetID()); err != nil {
|
|
slog.With(slog.Any("error", err)).ErrorContext(
|
|
ctx, "failed to ping hook",
|
|
"hook_id", hook.GetID(),
|
|
"entity", r.entity)
|
|
}
|
|
|
|
return hookToParamsHookInfo(hook), nil
|
|
}
|
|
|
|
func (r *basePoolManager) InstallWebhook(ctx context.Context, param params.InstallWebhookParams) (params.HookInfo, error) {
|
|
if r.controllerInfo.ControllerWebhookURL == "" {
|
|
return params.HookInfo{}, errors.Wrap(runnerErrors.ErrBadRequest, "controller webhook url is empty")
|
|
}
|
|
|
|
insecureSSL := "0"
|
|
if param.InsecureSSL {
|
|
insecureSSL = "1"
|
|
}
|
|
req := &github.Hook{
|
|
Active: github.Ptr(true),
|
|
Config: &github.HookConfig{
|
|
ContentType: github.Ptr("json"),
|
|
InsecureSSL: github.Ptr(insecureSSL),
|
|
URL: github.Ptr(r.controllerInfo.ControllerWebhookURL),
|
|
Secret: github.Ptr(r.WebhookSecret()),
|
|
},
|
|
Events: []string{
|
|
"workflow_job",
|
|
},
|
|
}
|
|
|
|
return r.InstallHook(ctx, req)
|
|
}
|
|
|
|
func (r *basePoolManager) ValidateOwner(job params.WorkflowJob) error {
|
|
switch r.entity.EntityType {
|
|
case params.ForgeEntityTypeRepository:
|
|
if !strings.EqualFold(job.Repository.Name, r.entity.Name) || !strings.EqualFold(job.Repository.Owner.Login, r.entity.Owner) {
|
|
return runnerErrors.NewBadRequestError("job not meant for this pool manager")
|
|
}
|
|
case params.ForgeEntityTypeOrganization:
|
|
if !strings.EqualFold(job.Organization.Login, r.entity.Owner) {
|
|
return runnerErrors.NewBadRequestError("job not meant for this pool manager")
|
|
}
|
|
case params.ForgeEntityTypeEnterprise:
|
|
if !strings.EqualFold(job.Enterprise.Slug, r.entity.Owner) {
|
|
return runnerErrors.NewBadRequestError("job not meant for this pool manager")
|
|
}
|
|
default:
|
|
return runnerErrors.NewBadRequestError("unknown entity type")
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (r *basePoolManager) GithubRunnerRegistrationToken() (string, error) {
|
|
tk, ghResp, err := r.ghcli.CreateEntityRegistrationToken(r.ctx)
|
|
if err != nil {
|
|
if ghResp != nil && ghResp.StatusCode == http.StatusUnauthorized {
|
|
return "", errors.Wrap(runnerErrors.ErrUnauthorized, "fetching token")
|
|
}
|
|
return "", errors.Wrap(err, "creating runner token")
|
|
}
|
|
return *tk.Token, nil
|
|
}
|
|
|
|
func (r *basePoolManager) FetchTools() ([]commonParams.RunnerApplicationDownload, error) {
|
|
tools, ghResp, err := r.ghcli.ListEntityRunnerApplicationDownloads(r.ctx)
|
|
if err != nil {
|
|
if ghResp != nil && ghResp.StatusCode == http.StatusUnauthorized {
|
|
return nil, errors.Wrap(runnerErrors.ErrUnauthorized, "fetching tools")
|
|
}
|
|
return nil, errors.Wrap(err, "fetching runner tools")
|
|
}
|
|
|
|
ret := []commonParams.RunnerApplicationDownload{}
|
|
for _, tool := range tools {
|
|
if tool == nil {
|
|
continue
|
|
}
|
|
ret = append(ret, commonParams.RunnerApplicationDownload(*tool))
|
|
}
|
|
return ret, nil
|
|
}
|
|
|
|
func (r *basePoolManager) GetGithubRunners() ([]*github.Runner, error) {
|
|
opts := github.ListRunnersOptions{
|
|
ListOptions: github.ListOptions{
|
|
PerPage: 100,
|
|
},
|
|
}
|
|
var allRunners []*github.Runner
|
|
|
|
for {
|
|
runners, ghResp, err := r.ghcli.ListEntityRunners(r.ctx, &opts)
|
|
if err != nil {
|
|
if ghResp != nil && ghResp.StatusCode == http.StatusUnauthorized {
|
|
return nil, errors.Wrap(runnerErrors.ErrUnauthorized, "fetching runners")
|
|
}
|
|
return nil, errors.Wrap(err, "fetching runners")
|
|
}
|
|
allRunners = append(allRunners, runners.Runners...)
|
|
if ghResp.NextPage == 0 {
|
|
break
|
|
}
|
|
opts.Page = ghResp.NextPage
|
|
}
|
|
|
|
return allRunners, nil
|
|
}
|
|
|
|
func (r *basePoolManager) GetWebhookInfo(ctx context.Context) (params.HookInfo, error) {
|
|
allHooks, err := r.listHooks(ctx)
|
|
if err != nil {
|
|
return params.HookInfo{}, errors.Wrap(err, "listing hooks")
|
|
}
|
|
trimmedBase := strings.TrimRight(r.controllerInfo.WebhookURL, "/")
|
|
trimmedController := strings.TrimRight(r.controllerInfo.ControllerWebhookURL, "/")
|
|
|
|
var controllerHookInfo *params.HookInfo
|
|
var baseHookInfo *params.HookInfo
|
|
|
|
for _, hook := range allHooks {
|
|
hookInfo := hookToParamsHookInfo(hook)
|
|
info := strings.TrimRight(hookInfo.URL, "/")
|
|
if strings.EqualFold(info, trimmedController) {
|
|
controllerHookInfo = &hookInfo
|
|
break
|
|
}
|
|
if strings.EqualFold(info, trimmedBase) {
|
|
baseHookInfo = &hookInfo
|
|
}
|
|
}
|
|
|
|
// Return the controller hook info if available.
|
|
if controllerHookInfo != nil {
|
|
return *controllerHookInfo, nil
|
|
}
|
|
|
|
// Fall back to base hook info if defined.
|
|
if baseHookInfo != nil {
|
|
return *baseHookInfo, nil
|
|
}
|
|
|
|
return params.HookInfo{}, runnerErrors.NewNotFoundError("hook not found")
|
|
}
|
|
|
|
func (r *basePoolManager) RootCABundle() (params.CertificateBundle, error) {
|
|
return r.entity.Credentials.RootCertificateBundle()
|
|
}
|