garm/runner/pool/pool.go
Gabriel Adrian Samfira 459906d97e Prevent abusing the GH API
On large deployments with many jobs, we cannot check each job that
we recorded in the DB against the GH API.

Before this change, if a job was updated more than 10 minutes ago,
garm would check against the GH api if that job still existed. While
this approach allowed us to maintain a consistent view over which jobs
still exist and which are stale, it had the potential of spamming the
GH API, leading to rate limiting.

This change uses the scale-down loop as an indicator for job staleness.

If a job remains in queued state in our DB, but has dissapeared from GH
or was serviced by another runner and we never got the hook (garm was down
or GH had an issue - happened in the past), then garm will spin up a new
runner for it. If that runner or any other runner is scaled down, we check
if we have jobs in the queue that should have matched that runner. If we did,
there is a high chance that the job no longer exists in GH and we can remove
the job from the queue.

Of course, there is a chance that GH is having issues and the job is never
pushed to the runner, but we can't really account for everything. In this case
I'd rather avoid rate limiting ourselves.

Signed-off-by: Gabriel Adrian Samfira <gsamfira@cloudbasesolutions.com>
2023-12-15 22:41:50 +00:00

1713 lines
58 KiB
Go

// Copyright 2022 Cloudbase Solutions SRL
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License. You may obtain
// a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations
// under the License.
package pool
import (
"context"
"fmt"
"log"
"math"
"net/http"
"strconv"
"strings"
"sync"
"time"
commonParams "github.com/cloudbase/garm-provider-common/params"
runnerErrors "github.com/cloudbase/garm-provider-common/errors"
"github.com/cloudbase/garm-provider-common/util"
"github.com/cloudbase/garm/auth"
dbCommon "github.com/cloudbase/garm/database/common"
"github.com/cloudbase/garm/params"
"github.com/cloudbase/garm/runner/common"
"github.com/google/go-github/v55/github"
"github.com/google/uuid"
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"
)
var (
poolIDLabelprefix = "runner-pool-id:"
controllerLabelPrefix = "runner-controller-id:"
// We tag runners that have been spawned as a result of a queued job with the job ID
// that spawned them. There is no way to guarantee that the runner spawned in response to a particular
// job, will be picked up by that job. We mark them so as in the very likely event that the runner
// has picked up a different job, we can clear the lock on the job that spaned it.
// The job it picked up would already be transitioned to in_progress so it will be ignored by the
// consume loop.
jobLabelPrefix = "in_response_to_job:"
)
const (
// maxCreateAttempts is the number of times we will attempt to create an instance
// before we give up.
// TODO: make this configurable(?)
maxCreateAttempts = 5
)
type keyMutex struct {
muxes sync.Map
}
func (k *keyMutex) TryLock(key string) bool {
mux, _ := k.muxes.LoadOrStore(key, &sync.Mutex{})
keyMux := mux.(*sync.Mutex)
return keyMux.TryLock()
}
func (k *keyMutex) Unlock(key string, remove bool) {
mux, ok := k.muxes.Load(key)
if !ok {
return
}
keyMux := mux.(*sync.Mutex)
if remove {
k.Delete(key)
}
keyMux.Unlock()
}
func (k *keyMutex) Delete(key string) {
k.muxes.Delete(key)
}
type urls struct {
callbackURL string
metadataURL string
webhookURL string
controllerWebhookURL string
}
type basePoolManager struct {
ctx context.Context
controllerID string
store dbCommon.Store
providers map[string]common.Provider
tools []commonParams.RunnerApplicationDownload
quit chan struct{}
helper poolHelper
credsDetails params.GithubCredentials
managerIsRunning bool
managerErrorReason string
urls urls
mux sync.Mutex
wg *sync.WaitGroup
keyMux *keyMutex
}
func (r *basePoolManager) HandleWorkflowJob(job params.WorkflowJob) error {
if err := r.helper.ValidateOwner(job); err != nil {
return errors.Wrap(err, "validating owner")
}
var jobParams params.Job
var err error
var triggeredBy int64
defer func() {
// we're updating the job in the database, regardless of whether it was successful or not.
// or if it was meant for this pool or not. Github will send the same job data to all hierarchies
// that have been configured to work with garm. Updating the job at all levels should yield the same
// outcome in the db.
if jobParams.ID == 0 {
return
}
_, err := r.store.GetJobByID(r.ctx, jobParams.ID)
if err != nil {
if !errors.Is(err, runnerErrors.ErrNotFound) {
r.log("failed to get job %d: %s", jobParams.ID, err)
return
}
// This job is new to us. Check if we have a pool that can handle it.
potentialPools, err := r.store.FindPoolsMatchingAllTags(r.ctx, r.helper.PoolType(), r.helper.ID(), jobParams.Labels)
if err != nil {
r.log("failed to find pools matching tags %s: %s; not recording job", strings.Join(jobParams.Labels, ", "), err)
return
}
if len(potentialPools) == 0 {
r.log("no pools matching tags %s; not recording job", strings.Join(jobParams.Labels, ", "))
return
}
}
if _, jobErr := r.store.CreateOrUpdateJob(r.ctx, jobParams); jobErr != nil {
r.log("failed to update job %d: %s", jobParams.ID, jobErr)
}
if triggeredBy != 0 && jobParams.ID != triggeredBy {
// The triggeredBy value is only set by the "in_progress" webhook. The runner that
// transitioned to in_progress was created as a result of a different queued job. If that job is
// still queued and we don't remove the lock, it will linger until the lock timeout is reached.
// That may take a long time, so we break the lock here and allow it to be scheduled again.
if err := r.store.BreakLockJobIsQueued(r.ctx, triggeredBy); err != nil {
r.log("failed to break lock for job %d: %s", triggeredBy, err)
}
}
}()
switch job.Action {
case "queued":
// Record the job in the database. Queued jobs will be picked up by the consumeQueuedJobs() method
// when reconciling.
jobParams, err = r.paramsWorkflowJobToParamsJob(job)
if err != nil {
return errors.Wrap(err, "converting job to params")
}
case "completed":
jobParams, err = r.paramsWorkflowJobToParamsJob(job)
if err != nil {
if errors.Is(err, runnerErrors.ErrNotFound) {
// Unassigned jobs will have an empty runner_name.
// We also need to ignore not found errors, as we may get a webhook regarding
// a workflow that is handled by a runner at a different hierarchy level.
return nil
}
return errors.Wrap(err, "converting job to params")
}
// update instance workload state.
if _, err := r.setInstanceRunnerStatus(jobParams.RunnerName, params.RunnerTerminated); err != nil {
if errors.Is(err, runnerErrors.ErrNotFound) {
return nil
}
r.log("failed to update runner %s status: %s", util.SanitizeLogEntry(jobParams.RunnerName), err)
return errors.Wrap(err, "updating runner")
}
r.log("marking instance %s as pending_delete", util.SanitizeLogEntry(jobParams.RunnerName))
if _, err := r.setInstanceStatus(jobParams.RunnerName, commonParams.InstancePendingDelete, nil); err != nil {
if errors.Is(err, runnerErrors.ErrNotFound) {
return nil
}
r.log("failed to update runner %s status: %s", util.SanitizeLogEntry(jobParams.RunnerName), err)
return errors.Wrap(err, "updating runner")
}
case "in_progress":
jobParams, err = r.paramsWorkflowJobToParamsJob(job)
if err != nil {
if errors.Is(err, runnerErrors.ErrNotFound) {
// This is most likely a runner we're not managing. If we define a repo from within an org
// and also define that same org, we will get a hook from github from both the repo and the org
// regarding the same workflow. We look for the runner in the database, and make sure it exists and is
// part of a pool that this manager is responsible for. A not found error here will most likely mean
// that we are not responsible for that runner, and we should ignore it.
return nil
}
return errors.Wrap(err, "converting job to params")
}
// update instance workload state.
instance, err := r.setInstanceRunnerStatus(jobParams.RunnerName, params.RunnerActive)
if err != nil {
if errors.Is(err, runnerErrors.ErrNotFound) {
return nil
}
r.log("failed to update runner %s status: %s", util.SanitizeLogEntry(jobParams.RunnerName), err)
return errors.Wrap(err, "updating runner")
}
// Set triggeredBy here so we break the lock on any potential queued job.
triggeredBy = jobIdFromLabels(instance.AditionalLabels)
// A runner has picked up the job, and is now running it. It may need to be replaced if the pool has
// a minimum number of idle runners configured.
pool, err := r.store.GetPoolByID(r.ctx, instance.PoolID)
if err != nil {
return errors.Wrap(err, "getting pool")
}
if err := r.ensureIdleRunnersForOnePool(pool); err != nil {
r.log("error ensuring idle runners for pool %s: %s", pool.ID, err)
}
}
return nil
}
func jobIdFromLabels(labels []string) int64 {
for _, lbl := range labels {
if strings.HasPrefix(lbl, jobLabelPrefix) {
jobId, err := strconv.ParseInt(lbl[len(jobLabelPrefix):], 10, 64)
if err != nil {
return 0
}
return jobId
}
}
return 0
}
func (r *basePoolManager) startLoopForFunction(f func() error, interval time.Duration, name string, alwaysRun bool) {
r.log("starting %s loop for %s", name, r.helper.String())
ticker := time.NewTicker(interval)
r.wg.Add(1)
defer func() {
r.log("%s loop exited for pool %s", name, r.helper.String())
ticker.Stop()
r.wg.Done()
}()
for {
shouldRun := r.managerIsRunning
if alwaysRun {
shouldRun = true
}
switch shouldRun {
case true:
select {
case <-ticker.C:
if err := f(); err != nil {
r.log("error in loop %s: %q", name, err)
if errors.Is(err, runnerErrors.ErrUnauthorized) {
r.setPoolRunningState(false, err.Error())
}
}
case <-r.ctx.Done():
// daemon is shutting down.
return
case <-r.quit:
// this worker was stopped.
return
}
default:
select {
case <-r.ctx.Done():
// daemon is shutting down.
return
case <-r.quit:
// this worker was stopped.
return
default:
r.waitForTimeoutOrCanceled(common.BackoffTimer)
}
}
}
}
func (r *basePoolManager) updateTools() error {
// Update tools cache.
tools, err := r.helper.FetchTools()
if err != nil {
r.log("failed to update tools for repo %s: %s", r.helper.String(), err)
r.setPoolRunningState(false, err.Error())
r.waitForTimeoutOrCanceled(common.BackoffTimer)
return fmt.Errorf("failed to update tools for repo %s: %w", r.helper.String(), err)
}
r.mux.Lock()
r.tools = tools
r.mux.Unlock()
r.log("successfully updated tools")
r.setPoolRunningState(true, "")
return err
}
func controllerIDFromLabels(labels []string) string {
for _, lbl := range labels {
if strings.HasPrefix(lbl, controllerLabelPrefix) {
return lbl[len(controllerLabelPrefix):]
}
}
return ""
}
func labelsFromRunner(runner *github.Runner) []string {
if runner == nil || runner.Labels == nil {
return []string{}
}
var labels []string
for _, val := range runner.Labels {
if val == nil {
continue
}
labels = append(labels, val.GetName())
}
return labels
}
// isManagedRunner returns true if labels indicate the runner belongs to a pool
// this manager is responsible for.
func (r *basePoolManager) isManagedRunner(labels []string) bool {
runnerControllerID := controllerIDFromLabels(labels)
return runnerControllerID == r.controllerID
}
// cleanupOrphanedProviderRunners compares runners in github with local runners and removes
// any local runners that are not present in Github. Runners that are "idle" in our
// provider, but do not exist in github, will be removed. This can happen if the
// garm was offline while a job was executed by a github action. When this
// happens, github will remove the ephemeral worker and send a webhook our way.
// If we were offline and did not process the webhook, the instance will linger.
// We need to remove it from the provider and database.
func (r *basePoolManager) cleanupOrphanedProviderRunners(runners []*github.Runner) error {
dbInstances, err := r.helper.FetchDbInstances()
if err != nil {
return errors.Wrap(err, "fetching instances from db")
}
runnerNames := map[string]bool{}
for _, run := range runners {
if !r.isManagedRunner(labelsFromRunner(run)) {
r.log("runner %s is not managed by a pool belonging to %s", *run.Name, r.helper.String())
continue
}
runnerNames[*run.Name] = true
}
for _, instance := range dbInstances {
lockAcquired := r.keyMux.TryLock(instance.Name)
if !lockAcquired {
r.log("failed to acquire lock for instance %s", instance.Name)
continue
}
defer r.keyMux.Unlock(instance.Name, false)
switch commonParams.InstanceStatus(instance.Status) {
case commonParams.InstancePendingCreate,
commonParams.InstancePendingDelete, commonParams.InstancePendingForceDelete:
// this instance is in the process of being created or is awaiting deletion.
// Instances in pending_create did not get a chance to register themselves in,
// github so we let them be for now.
continue
}
pool, err := r.store.GetPoolByID(r.ctx, instance.PoolID)
if err != nil {
return errors.Wrap(err, "fetching instance pool info")
}
switch instance.RunnerStatus {
case params.RunnerPending, params.RunnerInstalling:
if time.Since(instance.UpdatedAt).Minutes() < float64(pool.RunnerTimeout()) {
// runner is still installing. We give it a chance to finish.
r.log("runner %s is still installing, give it a chance to finish", instance.Name)
continue
}
}
if time.Since(instance.UpdatedAt).Minutes() < 5 {
// instance was updated recently. We give it a chance to register itself in github.
r.log("instance %s was updated recently, skipping check", instance.Name)
continue
}
if ok := runnerNames[instance.Name]; !ok {
// Set pending_delete on DB field. Allow consolidate() to remove it.
if _, err := r.setInstanceStatus(instance.Name, commonParams.InstancePendingDelete, nil); err != nil {
r.log("failed to update runner %s status: %s", instance.Name, err)
return errors.Wrap(err, "updating runner")
}
}
}
return nil
}
// reapTimedOutRunners will mark as pending_delete any runner that has a status
// of "running" in the provider, but that has not registered with Github, and has
// received no new updates in the configured timeout interval.
func (r *basePoolManager) reapTimedOutRunners(runners []*github.Runner) error {
dbInstances, err := r.helper.FetchDbInstances()
if err != nil {
return errors.Wrap(err, "fetching instances from db")
}
runnersByName := map[string]*github.Runner{}
for _, run := range runners {
if !r.isManagedRunner(labelsFromRunner(run)) {
r.log("runner %s is not managed by a pool belonging to %s", *run.Name, r.helper.String())
continue
}
runnersByName[*run.Name] = run
}
for _, instance := range dbInstances {
r.log("attempting to lock instance %s", instance.Name)
lockAcquired := r.keyMux.TryLock(instance.Name)
if !lockAcquired {
r.log("failed to acquire lock for instance %s", instance.Name)
continue
}
defer r.keyMux.Unlock(instance.Name, false)
pool, err := r.store.GetPoolByID(r.ctx, instance.PoolID)
if err != nil {
return errors.Wrap(err, "fetching instance pool info")
}
if time.Since(instance.UpdatedAt).Minutes() < float64(pool.RunnerTimeout()) {
continue
}
// There are 3 cases (currently) where we consider a runner as timed out:
// * The runner never joined github within the pool timeout
// * The runner managed to join github, but the setup process failed later and the runner
// never started on the instance.
// * A JIT config was created, but the runner never joined github.
if runner, ok := runnersByName[instance.Name]; !ok || runner.GetStatus() == "offline" {
r.log("reaping timed-out/failed runner %s", instance.Name)
if err := r.DeleteRunner(instance, false); err != nil {
r.log("failed to update runner %s status: %s", instance.Name, err)
return errors.Wrap(err, "updating runner")
}
}
}
return nil
}
func instanceInList(instanceName string, instances []commonParams.ProviderInstance) (commonParams.ProviderInstance, bool) {
for _, val := range instances {
if val.Name == instanceName {
return val, true
}
}
return commonParams.ProviderInstance{}, false
}
// cleanupOrphanedGithubRunners will forcefully remove any github runners that appear
// as offline and for which we no longer have a local instance.
// This may happen if someone manually deletes the instance in the provider. We need to
// first remove the instance from github, and then from our database.
func (r *basePoolManager) cleanupOrphanedGithubRunners(runners []*github.Runner) error {
poolInstanceCache := map[string][]commonParams.ProviderInstance{}
g, ctx := errgroup.WithContext(r.ctx)
for _, runner := range runners {
if !r.isManagedRunner(labelsFromRunner(runner)) {
r.log("runner %s is not managed by a pool belonging to %s", *runner.Name, r.helper.String())
continue
}
status := runner.GetStatus()
if status != "offline" {
// Runner is online. Ignore it.
continue
}
dbInstance, err := r.store.GetInstanceByName(r.ctx, *runner.Name)
if err != nil {
if !errors.Is(err, runnerErrors.ErrNotFound) {
return errors.Wrap(err, "fetching instance from DB")
}
// We no longer have a DB entry for this instance, and the runner appears offline in github.
// Previous forceful removal may have failed?
r.log("Runner %s has no database entry in garm, removing from github", *runner.Name)
resp, err := r.helper.RemoveGithubRunner(*runner.ID)
if err != nil {
// Removed in the meantime?
if resp != nil && resp.StatusCode == http.StatusNotFound {
continue
}
return errors.Wrap(err, "removing runner")
}
continue
}
switch commonParams.InstanceStatus(dbInstance.Status) {
case commonParams.InstancePendingDelete, commonParams.InstanceDeleting:
// already marked for deletion or is in the process of being deleted.
// Let consolidate take care of it.
continue
case commonParams.InstancePendingCreate, commonParams.InstanceCreating:
// instance is still being created. We give it a chance to finish.
r.log("instance %s is still being created, give it a chance to finish", dbInstance.Name)
continue
case commonParams.InstanceRunning:
// this check is not strictly needed, but can help avoid unnecessary strain on the provider.
// At worst, we will have a runner that is offline in github for 5 minutes before we reap it.
if time.Since(dbInstance.UpdatedAt).Minutes() < 5 {
// instance was updated recently. We give it a chance to register itself in github.
r.log("instance %s was updated recently, skipping check", dbInstance.Name)
continue
}
}
pool, err := r.helper.GetPoolByID(dbInstance.PoolID)
if err != nil {
return errors.Wrap(err, "fetching pool")
}
// check if the provider still has the instance.
provider, ok := r.providers[pool.ProviderName]
if !ok {
return fmt.Errorf("unknown provider %s for pool %s", pool.ProviderName, pool.ID)
}
var poolInstances []commonParams.ProviderInstance
poolInstances, ok = poolInstanceCache[pool.ID]
if !ok {
r.log("updating instances cache for pool %s", pool.ID)
poolInstances, err = provider.ListInstances(r.ctx, pool.ID)
if err != nil {
return errors.Wrapf(err, "fetching instances for pool %s", pool.ID)
}
poolInstanceCache[pool.ID] = poolInstances
}
lockAcquired := r.keyMux.TryLock(dbInstance.Name)
if !lockAcquired {
r.log("failed to acquire lock for instance %s", dbInstance.Name)
continue
}
// See: https://golang.org/doc/faq#closures_and_goroutines
runner := runner
g.Go(func() error {
deleteMux := false
defer func() {
r.keyMux.Unlock(dbInstance.Name, deleteMux)
}()
providerInstance, ok := instanceInList(dbInstance.Name, poolInstances)
if !ok {
// The runner instance is no longer on the provider, and it appears offline in github.
// It should be safe to force remove it.
r.log("Runner instance for %s is no longer on the provider, removing from github", dbInstance.Name)
resp, err := r.helper.RemoveGithubRunner(*runner.ID)
if err != nil {
// Removed in the meantime?
if resp != nil && resp.StatusCode == http.StatusNotFound {
r.log("runner dissapeared from github")
} else {
return errors.Wrap(err, "removing runner from github")
}
}
// Remove the database entry for the runner.
r.log("Removing %s from database", dbInstance.Name)
if err := r.store.DeleteInstance(ctx, dbInstance.PoolID, dbInstance.Name); err != nil {
return errors.Wrap(err, "removing runner from database")
}
deleteMux = true
return nil
}
if providerInstance.Status == commonParams.InstanceRunning {
// instance is running, but github reports runner as offline. Log the event.
// This scenario may require manual intervention.
// Perhaps it just came online and github did not yet change it's status?
r.log("instance %s is online but github reports runner as offline", dbInstance.Name)
return nil
} else {
r.log("instance %s was found in stopped state; starting", dbInstance.Name)
//start the instance
if err := provider.Start(r.ctx, dbInstance.ProviderID); err != nil {
return errors.Wrapf(err, "starting instance %s", dbInstance.ProviderID)
}
}
return nil
})
}
if err := r.waitForErrorGroupOrContextCancelled(g); err != nil {
return errors.Wrap(err, "removing orphaned github runners")
}
return nil
}
func (r *basePoolManager) waitForErrorGroupOrContextCancelled(g *errgroup.Group) error {
if g == nil {
return nil
}
done := make(chan error, 1)
go func() {
waitErr := g.Wait()
done <- waitErr
}()
select {
case err := <-done:
return err
case <-r.ctx.Done():
return r.ctx.Err()
}
}
func (r *basePoolManager) fetchInstance(runnerName string) (params.Instance, error) {
runner, err := r.store.GetInstanceByName(r.ctx, runnerName)
if err != nil {
return params.Instance{}, errors.Wrap(err, "fetching instance")
}
_, err = r.helper.GetPoolByID(runner.PoolID)
if err != nil {
return params.Instance{}, errors.Wrap(err, "fetching pool")
}
return runner, nil
}
func (r *basePoolManager) setInstanceRunnerStatus(runnerName string, status params.RunnerStatus) (params.Instance, error) {
updateParams := params.UpdateInstanceParams{
RunnerStatus: status,
}
instance, err := r.updateInstance(runnerName, updateParams)
if err != nil {
return params.Instance{}, errors.Wrap(err, "updating runner state")
}
return instance, nil
}
func (r *basePoolManager) updateInstance(runnerName string, update params.UpdateInstanceParams) (params.Instance, error) {
runner, err := r.fetchInstance(runnerName)
if err != nil {
return params.Instance{}, errors.Wrap(err, "fetching instance")
}
instance, err := r.store.UpdateInstance(r.ctx, runner.ID, update)
if err != nil {
return params.Instance{}, errors.Wrap(err, "updating runner state")
}
return instance, nil
}
func (r *basePoolManager) setInstanceStatus(runnerName string, status commonParams.InstanceStatus, providerFault []byte) (params.Instance, error) {
updateParams := params.UpdateInstanceParams{
Status: status,
ProviderFault: providerFault,
}
instance, err := r.updateInstance(runnerName, updateParams)
if err != nil {
return params.Instance{}, errors.Wrap(err, "updating runner state")
}
return instance, nil
}
func (r *basePoolManager) AddRunner(ctx context.Context, poolID string, aditionalLabels []string) (err error) {
pool, err := r.helper.GetPoolByID(poolID)
if err != nil {
return errors.Wrap(err, "fetching pool")
}
provider, ok := r.providers[pool.ProviderName]
if !ok {
return fmt.Errorf("unknown provider %s for pool %s", pool.ProviderName, pool.ID)
}
name := fmt.Sprintf("%s-%s", pool.GetRunnerPrefix(), util.NewID())
labels := r.getLabelsForInstance(pool)
jitConfig := make(map[string]string)
var runner *github.Runner
if !provider.DisableJITConfig() {
// Attempt to create JIT config
jitConfig, runner, err = r.helper.GetJITConfig(ctx, name, pool, labels)
if err != nil {
r.log("failed to get JIT config, falling back to registration token: %s", err)
}
}
createParams := params.CreateInstanceParams{
Name: name,
Status: commonParams.InstancePendingCreate,
RunnerStatus: params.RunnerPending,
OSArch: pool.OSArch,
OSType: pool.OSType,
CallbackURL: r.urls.callbackURL,
MetadataURL: r.urls.metadataURL,
CreateAttempt: 1,
GitHubRunnerGroup: pool.GitHubRunnerGroup,
AditionalLabels: aditionalLabels,
JitConfiguration: jitConfig,
}
if runner != nil {
createParams.AgentID = runner.GetID()
}
instance, err := r.store.CreateInstance(r.ctx, poolID, createParams)
if err != nil {
return errors.Wrap(err, "creating instance")
}
defer func() {
if err != nil {
if instance.ID != "" {
if err := r.DeleteRunner(instance, false); err != nil {
r.log("failed to cleanup instance: %s", instance.Name)
}
}
if runner != nil {
_, runnerCleanupErr := r.helper.RemoveGithubRunner(runner.GetID())
if err != nil {
r.log("failed to remove runner %d: %s", runner.GetID(), runnerCleanupErr)
}
}
}
}()
return nil
}
func (r *basePoolManager) Status() params.PoolManagerStatus {
r.mux.Lock()
defer r.mux.Unlock()
return params.PoolManagerStatus{
IsRunning: r.managerIsRunning,
FailureReason: r.managerErrorReason,
}
}
func (r *basePoolManager) waitForTimeoutOrCanceled(timeout time.Duration) {
r.log("sleeping for %.2f minutes", timeout.Minutes())
select {
case <-time.After(timeout):
case <-r.ctx.Done():
case <-r.quit:
}
}
func (r *basePoolManager) setPoolRunningState(isRunning bool, failureReason string) {
r.mux.Lock()
r.managerErrorReason = failureReason
r.managerIsRunning = isRunning
r.mux.Unlock()
}
func (r *basePoolManager) getLabelsForInstance(pool params.Pool) []string {
labels := []string{}
for _, tag := range pool.Tags {
labels = append(labels, tag.Name)
}
labels = append(labels, r.controllerLabel())
labels = append(labels, r.poolLabel(pool.ID))
return labels
}
func (r *basePoolManager) addInstanceToProvider(instance params.Instance) error {
pool, err := r.helper.GetPoolByID(instance.PoolID)
if err != nil {
return errors.Wrap(err, "fetching pool")
}
provider, ok := r.providers[pool.ProviderName]
if !ok {
return fmt.Errorf("unknown provider %s for pool %s", pool.ProviderName, pool.ID)
}
jwtValidity := pool.RunnerTimeout()
entity := r.helper.String()
jwtToken, err := auth.NewInstanceJWTToken(instance, r.helper.JwtToken(), entity, pool.PoolType(), jwtValidity)
if err != nil {
return errors.Wrap(err, "fetching instance jwt token")
}
hasJITConfig := len(instance.JitConfiguration) > 0
bootstrapArgs := commonParams.BootstrapInstance{
Name: instance.Name,
Tools: r.tools,
RepoURL: r.helper.GithubURL(),
MetadataURL: instance.MetadataURL,
CallbackURL: instance.CallbackURL,
InstanceToken: jwtToken,
OSArch: pool.OSArch,
OSType: pool.OSType,
Flavor: pool.Flavor,
Image: pool.Image,
ExtraSpecs: pool.ExtraSpecs,
PoolID: instance.PoolID,
CACertBundle: r.credsDetails.CABundle,
GitHubRunnerGroup: instance.GitHubRunnerGroup,
JitConfigEnabled: hasJITConfig,
}
if !hasJITConfig {
// We still need the labels here for situations where we don't have a JIT config generated.
// This can happen if GARM is used against an instance of GHES older than version 3.10.
// The labels field should be ignored by providers if JIT config is enabled.
bootstrapArgs.Labels = r.getLabelsForInstance(pool)
}
var instanceIDToDelete string
defer func() {
if instanceIDToDelete != "" {
if err := provider.DeleteInstance(r.ctx, instanceIDToDelete); err != nil {
if !errors.Is(err, runnerErrors.ErrNotFound) {
r.log("failed to cleanup instance: %s", instanceIDToDelete)
}
}
}
}()
providerInstance, err := provider.CreateInstance(r.ctx, bootstrapArgs)
if err != nil {
instanceIDToDelete = instance.Name
return errors.Wrap(err, "creating instance")
}
if providerInstance.Status == commonParams.InstanceError {
instanceIDToDelete = instance.ProviderID
if instanceIDToDelete == "" {
instanceIDToDelete = instance.Name
}
}
updateInstanceArgs := r.updateArgsFromProviderInstance(providerInstance)
if _, err := r.store.UpdateInstance(r.ctx, instance.ID, updateInstanceArgs); err != nil {
return errors.Wrap(err, "updating instance")
}
return nil
}
func (r *basePoolManager) getRunnerDetailsFromJob(job params.WorkflowJob) (params.RunnerInfo, error) {
runnerInfo := params.RunnerInfo{
Name: job.WorkflowJob.RunnerName,
Labels: job.WorkflowJob.Labels,
}
var err error
if job.WorkflowJob.RunnerName == "" {
if job.WorkflowJob.Conclusion == "skipped" || job.WorkflowJob.Conclusion == "canceled" {
// job was skipped or canceled before a runner was allocated. No point in continuing.
return params.RunnerInfo{}, fmt.Errorf("job %d was skipped or canceled before a runner was allocated: %w", job.WorkflowJob.ID, runnerErrors.ErrNotFound)
}
// Runner name was not set in WorkflowJob by github. We can still attempt to
// fetch the info we need, using the workflow run ID, from the API.
r.log("runner name not found in workflow job, attempting to fetch from API")
runnerInfo, err = r.helper.GetRunnerInfoFromWorkflow(job)
if err != nil {
return params.RunnerInfo{}, errors.Wrap(err, "fetching runner name from API")
}
}
runnerDetails, err := r.store.GetInstanceByName(context.Background(), runnerInfo.Name)
if err != nil {
r.log("could not find runner details for %s", util.SanitizeLogEntry(runnerInfo.Name))
return params.RunnerInfo{}, errors.Wrap(err, "fetching runner details")
}
if _, err := r.helper.GetPoolByID(runnerDetails.PoolID); err != nil {
r.log("runner %s (pool ID: %s) does not belong to any pool we manage: %s", runnerDetails.Name, runnerDetails.PoolID, err)
return params.RunnerInfo{}, errors.Wrap(err, "fetching pool for instance")
}
return runnerInfo, nil
}
// paramsWorkflowJobToParamsJob returns a params.Job from a params.WorkflowJob, and aditionally determines
// if the runner belongs to this pool or not. It will always return a valid params.Job, even if it errs out.
// This allows us to still update the job in the database, even if we determined that it wasn't necessarily meant
// for this pool.
// If garm manages multiple hierarchies (repos, org, enterprise) which involve the same repo, we will get a hook
// whenever a job involving our repo triggers a hook. So even if the job is picked up by a runner at the enterprise
// level, the repo and org still get a hook.
// We even get a hook if a particular job is picked up by a GitHub hosted runner. We don't know who will pick up the job
// until the "in_progress" event is sent and we can see which runner picked it up.
//
// We save the details of that job at every level, because we want to at least update the status of the job. We make
// decissions based on the status of saved jobs. A "queued" job will prompt garm to search for an appropriate pool
// and spin up a runner there if no other idle runner exists to pick it up.
func (r *basePoolManager) paramsWorkflowJobToParamsJob(job params.WorkflowJob) (params.Job, error) {
asUUID, err := uuid.Parse(r.ID())
if err != nil {
return params.Job{}, errors.Wrap(err, "parsing pool ID as UUID")
}
jobParams := params.Job{
ID: job.WorkflowJob.ID,
Action: job.Action,
RunID: job.WorkflowJob.RunID,
Status: job.WorkflowJob.Status,
Conclusion: job.WorkflowJob.Conclusion,
StartedAt: job.WorkflowJob.StartedAt,
CompletedAt: job.WorkflowJob.CompletedAt,
Name: job.WorkflowJob.Name,
GithubRunnerID: job.WorkflowJob.RunnerID,
RunnerGroupID: job.WorkflowJob.RunnerGroupID,
RunnerGroupName: job.WorkflowJob.RunnerGroupName,
RepositoryName: job.Repository.Name,
RepositoryOwner: job.Repository.Owner.Login,
Labels: job.WorkflowJob.Labels,
}
runnerName := job.WorkflowJob.RunnerName
if job.Action != "queued" && runnerName == "" {
if job.WorkflowJob.Conclusion != "skipped" && job.WorkflowJob.Conclusion != "canceled" {
// Runner name was not set in WorkflowJob by github. We can still attempt to fetch the info we need,
// using the workflow run ID, from the API.
// We may still get no runner name. In situations such as jobs being cancelled before a runner had the chance
// to pick up the job, the runner name is not available from the API.
runnerInfo, err := r.getRunnerDetailsFromJob(job)
if err != nil && !errors.Is(err, runnerErrors.ErrNotFound) {
return jobParams, errors.Wrap(err, "fetching runner details")
}
runnerName = runnerInfo.Name
}
}
jobParams.RunnerName = runnerName
switch r.helper.PoolType() {
case params.EnterprisePool:
jobParams.EnterpriseID = &asUUID
case params.RepositoryPool:
jobParams.RepoID = &asUUID
case params.OrganizationPool:
jobParams.OrgID = &asUUID
default:
return jobParams, errors.Errorf("unknown pool type: %s", r.helper.PoolType())
}
return jobParams, nil
}
func (r *basePoolManager) poolLabel(poolID string) string {
return fmt.Sprintf("%s%s", poolIDLabelprefix, poolID)
}
func (r *basePoolManager) controllerLabel() string {
return fmt.Sprintf("%s%s", controllerLabelPrefix, r.controllerID)
}
func (r *basePoolManager) updateArgsFromProviderInstance(providerInstance commonParams.ProviderInstance) params.UpdateInstanceParams {
return params.UpdateInstanceParams{
ProviderID: providerInstance.ProviderID,
OSName: providerInstance.OSName,
OSVersion: providerInstance.OSVersion,
Addresses: providerInstance.Addresses,
Status: providerInstance.Status,
ProviderFault: providerInstance.ProviderFault,
}
}
func (r *basePoolManager) scaleDownOnePool(ctx context.Context, pool params.Pool) error {
r.log("scaling down pool %s", pool.ID)
if !pool.Enabled {
r.log("pool %s is disabled, skipping scale down", pool.ID)
return nil
}
existingInstances, err := r.store.ListPoolInstances(r.ctx, pool.ID)
if err != nil {
return fmt.Errorf("failed to ensure minimum idle workers for pool %s: %w", pool.ID, err)
}
idleWorkers := []params.Instance{}
for _, inst := range existingInstances {
// Idle runners that have been spawned and are still idle after 5 minutes, are take into
// consideration for scale-down. The 5 minute grace period prevents a situation where a
// "queued" workflow triggers the creation of a new idle runner, and this routine reaps
// an idle runner before they have a chance to pick up a job.
if inst.RunnerStatus == params.RunnerIdle && inst.Status == commonParams.InstanceRunning && time.Since(inst.UpdatedAt).Minutes() > 2 {
idleWorkers = append(idleWorkers, inst)
}
}
if len(idleWorkers) == 0 {
return nil
}
surplus := float64(len(idleWorkers) - int(pool.MinIdleRunners))
if surplus <= 0 {
return nil
}
scaleDownFactor := 0.5 // could be configurable
numScaleDown := int(math.Ceil(surplus * scaleDownFactor))
if numScaleDown <= 0 || numScaleDown > len(idleWorkers) {
return fmt.Errorf("invalid number of instances to scale down: %v, check your scaleDownFactor: %v", numScaleDown, scaleDownFactor)
}
g, _ := errgroup.WithContext(ctx)
for _, instanceToDelete := range idleWorkers[:numScaleDown] {
instanceToDelete := instanceToDelete
lockAcquired := r.keyMux.TryLock(instanceToDelete.Name)
if !lockAcquired {
r.log("failed to acquire lock for instance %s", instanceToDelete.Name)
continue
}
defer r.keyMux.Unlock(instanceToDelete.Name, false)
g.Go(func() error {
r.log("scaling down idle worker %s from pool %s\n", instanceToDelete.Name, pool.ID)
if err := r.DeleteRunner(instanceToDelete, false); err != nil {
return fmt.Errorf("failed to delete instance %s: %w", instanceToDelete.ID, err)
}
return nil
})
}
if numScaleDown > 0 {
// We just scaled down a runner for this pool. That means that if we have jobs that are
// still queued in our DB, and those jobs should match this pool but have not been picked
// up by a runner, they are most likely stale and can be removed. For now, we can simply
// remove jobs older than 10 minutes.
//
// TODO: should probably allow aditional filters to list functions. Would help to filter by date
// instead of returning a bunch of results and filtering manually.
queued, err := r.store.ListEntityJobsByStatus(r.ctx, r.helper.PoolType(), r.helper.ID(), params.JobStatusQueued)
if err != nil && !errors.Is(err, runnerErrors.ErrNotFound) {
return errors.Wrap(err, "listing queued jobs")
}
for _, job := range queued {
if time.Since(job.CreatedAt).Minutes() > 10 && pool.HasRequiredLabels(job.Labels) {
if err := r.store.DeleteJob(ctx, job.ID); err != nil && !errors.Is(err, runnerErrors.ErrNotFound) {
r.log("failed to delete job %d: %s", job.ID, err)
}
}
}
}
if err := r.waitForErrorGroupOrContextCancelled(g); err != nil {
return fmt.Errorf("failed to scale down pool %s: %w", pool.ID, err)
}
return nil
}
func (r *basePoolManager) addRunnerToPool(pool params.Pool, aditionalLabels []string) error {
if !pool.Enabled {
return fmt.Errorf("pool %s is disabled", pool.ID)
}
poolInstanceCount, err := r.store.PoolInstanceCount(r.ctx, pool.ID)
if err != nil {
return fmt.Errorf("failed to list pool instances: %w", err)
}
if poolInstanceCount >= int64(pool.MaxRunners) {
return fmt.Errorf("max workers (%d) reached for pool %s", pool.MaxRunners, pool.ID)
}
if err := r.AddRunner(r.ctx, pool.ID, aditionalLabels); err != nil {
return fmt.Errorf("failed to add new instance for pool %s: %s", pool.ID, err)
}
return nil
}
func (r *basePoolManager) ensureIdleRunnersForOnePool(pool params.Pool) error {
if !pool.Enabled || pool.MinIdleRunners == 0 {
return nil
}
existingInstances, err := r.store.ListPoolInstances(r.ctx, pool.ID)
if err != nil {
return fmt.Errorf("failed to ensure minimum idle workers for pool %s: %w", pool.ID, err)
}
if uint(len(existingInstances)) >= pool.MaxRunners {
r.log("max workers (%d) reached for pool %s, skipping idle worker creation", pool.MaxRunners, pool.ID)
return nil
}
idleOrPendingWorkers := []params.Instance{}
for _, inst := range existingInstances {
if inst.RunnerStatus != params.RunnerActive && inst.RunnerStatus != params.RunnerTerminated {
idleOrPendingWorkers = append(idleOrPendingWorkers, inst)
}
}
var required int
if len(idleOrPendingWorkers) < int(pool.MinIdleRunners) {
// get the needed delta.
required = int(pool.MinIdleRunners) - len(idleOrPendingWorkers)
projectedInstanceCount := len(existingInstances) + required
if uint(projectedInstanceCount) > pool.MaxRunners {
// ensure we don't go above max workers
delta := projectedInstanceCount - int(pool.MaxRunners)
required = required - delta
}
}
for i := 0; i < required; i++ {
r.log("adding new idle worker to pool %s", pool.ID)
if err := r.AddRunner(r.ctx, pool.ID, nil); err != nil {
return fmt.Errorf("failed to add new instance for pool %s: %w", pool.ID, err)
}
}
return nil
}
func (r *basePoolManager) retryFailedInstancesForOnePool(ctx context.Context, pool params.Pool) error {
if !pool.Enabled {
return nil
}
r.log("running retry failed instances for pool %s", pool.ID)
existingInstances, err := r.store.ListPoolInstances(r.ctx, pool.ID)
if err != nil {
return fmt.Errorf("failed to list instances for pool %s: %w", pool.ID, err)
}
g, errCtx := errgroup.WithContext(ctx)
for _, instance := range existingInstances {
if instance.Status != commonParams.InstanceError {
continue
}
if instance.CreateAttempt >= maxCreateAttempts {
continue
}
r.log("attempting to retry failed instance %s", instance.Name)
lockAcquired := r.keyMux.TryLock(instance.Name)
if !lockAcquired {
r.log("failed to acquire lock for instance %s", instance.Name)
continue
}
instance := instance
g.Go(func() error {
defer r.keyMux.Unlock(instance.Name, false)
// NOTE(gabriel-samfira): this is done in parallel. If there are many failed instances
// this has the potential to create many API requests to the target provider.
// TODO(gabriel-samfira): implement request throttling.
if err := r.deleteInstanceFromProvider(errCtx, instance); err != nil {
r.log("failed to delete instance %s from provider: %s", instance.Name, err)
// Bail here, otherwise we risk creating multiple failing instances, and losing track
// of them. If Create instance failed to return a proper provider ID, we rely on the
// name to delete the instance. If we don't bail here, and end up with multiple
// instances with the same name, using the name to clean up failed instances will fail
// on any subsequent call, unless the external or native provider takes into account
// non unique names and loops over all of them. Something which is extremely hacky and
// which we would rather avoid.
return err
}
// TODO(gabriel-samfira): Incrementing CreateAttempt should be done within a transaction.
// It's fairly safe to do here (for now), as there should be no other code path that updates
// an instance in this state.
var tokenFetched bool = len(instance.JitConfiguration) > 0
updateParams := params.UpdateInstanceParams{
CreateAttempt: instance.CreateAttempt + 1,
TokenFetched: &tokenFetched,
Status: commonParams.InstancePendingCreate,
RunnerStatus: params.RunnerPending,
}
r.log("queueing previously failed instance %s for retry", instance.Name)
// Set instance to pending create and wait for retry.
if _, err := r.updateInstance(instance.Name, updateParams); err != nil {
r.log("failed to update runner %s status: %s", instance.Name, err)
}
return nil
})
}
if err := r.waitForErrorGroupOrContextCancelled(g); err != nil {
return fmt.Errorf("failed to retry failed instances for pool %s: %w", pool.ID, err)
}
return nil
}
func (r *basePoolManager) retryFailedInstances() error {
pools, err := r.helper.ListPools()
if err != nil {
return fmt.Errorf("error listing pools: %w", err)
}
g, ctx := errgroup.WithContext(r.ctx)
for _, pool := range pools {
pool := pool
g.Go(func() error {
if err := r.retryFailedInstancesForOnePool(ctx, pool); err != nil {
return fmt.Errorf("retrying failed instances for pool %s: %w", pool.ID, err)
}
return nil
})
}
if err := r.waitForErrorGroupOrContextCancelled(g); err != nil {
return fmt.Errorf("retrying failed instances: %w", err)
}
return nil
}
func (r *basePoolManager) scaleDown() error {
pools, err := r.helper.ListPools()
if err != nil {
return fmt.Errorf("error listing pools: %w", err)
}
g, ctx := errgroup.WithContext(r.ctx)
for _, pool := range pools {
pool := pool
g.Go(func() error {
r.log("running scale down for pool %s", pool.ID)
return r.scaleDownOnePool(ctx, pool)
})
}
if err := r.waitForErrorGroupOrContextCancelled(g); err != nil {
return fmt.Errorf("failed to scale down: %w", err)
}
return nil
}
func (r *basePoolManager) ensureMinIdleRunners() error {
pools, err := r.helper.ListPools()
if err != nil {
return fmt.Errorf("error listing pools: %w", err)
}
g, _ := errgroup.WithContext(r.ctx)
for _, pool := range pools {
pool := pool
g.Go(func() error {
return r.ensureIdleRunnersForOnePool(pool)
})
}
if err := r.waitForErrorGroupOrContextCancelled(g); err != nil {
return fmt.Errorf("failed to ensure minimum idle workers: %w", err)
}
return nil
}
func (r *basePoolManager) deleteInstanceFromProvider(ctx context.Context, instance params.Instance) error {
pool, err := r.helper.GetPoolByID(instance.PoolID)
if err != nil {
return errors.Wrap(err, "fetching pool")
}
provider, ok := r.providers[pool.ProviderName]
if !ok {
return fmt.Errorf("unknown provider %s for pool %s", pool.ProviderName, pool.ID)
}
identifier := instance.ProviderID
if identifier == "" {
// provider did not return a provider ID?
// try with name
identifier = instance.Name
}
if err := provider.DeleteInstance(ctx, identifier); err != nil {
return errors.Wrap(err, "removing instance")
}
return nil
}
func (r *basePoolManager) deletePendingInstances() error {
instances, err := r.helper.FetchDbInstances()
if err != nil {
return fmt.Errorf("failed to fetch instances from store: %w", err)
}
r.log("removing instances in pending_delete")
for _, instance := range instances {
if instance.Status != commonParams.InstancePendingDelete && instance.Status != commonParams.InstancePendingForceDelete {
// not in pending_delete status. Skip.
continue
}
r.log("removing instance %s in pool %s", instance.Name, instance.PoolID)
lockAcquired := r.keyMux.TryLock(instance.Name)
if !lockAcquired {
r.log("failed to acquire lock for instance %s", instance.Name)
continue
}
currentStatus := instance.Status
// Set the status to deleting before launching the goroutine that removes
// the runner from the provider (which can take a long time).
if _, err := r.setInstanceStatus(instance.Name, commonParams.InstanceDeleting, nil); err != nil {
r.log("failed to update runner %s status: %q", instance.Name, err)
r.keyMux.Unlock(instance.Name, false)
continue
}
go func(instance params.Instance) (err error) {
deleteMux := false
defer func() {
r.keyMux.Unlock(instance.Name, deleteMux)
}()
defer func(instance params.Instance) {
if err != nil {
r.log("failed to remove instance %s: %s", instance.Name, err)
// failed to remove from provider. Set status to previous value, which will retry
// the operation.
if _, err := r.setInstanceStatus(instance.Name, currentStatus, nil); err != nil {
r.log("failed to update runner %s status: %s", instance.Name, err)
}
}
}(instance)
r.log("removing instance %s from provider", instance.Name)
err = r.deleteInstanceFromProvider(r.ctx, instance)
if err != nil {
if currentStatus != commonParams.InstancePendingForceDelete {
return fmt.Errorf("failed to remove instance from provider: %w", err)
}
log.Printf("failed to remove instance %s from provider (continuing anyway): %s", instance.Name, err)
}
r.log("removing instance %s from database", instance.Name)
if deleteErr := r.store.DeleteInstance(r.ctx, instance.PoolID, instance.Name); deleteErr != nil {
return fmt.Errorf("failed to delete instance from database: %w", deleteErr)
}
deleteMux = true
r.log("instance %s was successfully removed", instance.Name)
return nil
}(instance) //nolint
}
return nil
}
func (r *basePoolManager) addPendingInstances() error {
// TODO: filter instances by status.
instances, err := r.helper.FetchDbInstances()
if err != nil {
return fmt.Errorf("failed to fetch instances from store: %w", err)
}
for _, instance := range instances {
if instance.Status != commonParams.InstancePendingCreate {
// not in pending_create status. Skip.
continue
}
r.log("attempting to acquire lock for instance %s (create)", instance.Name)
lockAcquired := r.keyMux.TryLock(instance.Name)
if !lockAcquired {
r.log("failed to acquire lock for instance %s", instance.Name)
continue
}
// Set the instance to "creating" before launching the goroutine. This will ensure that addPendingInstances()
// won't attempt to create the runner a second time.
if _, err := r.setInstanceStatus(instance.Name, commonParams.InstanceCreating, nil); err != nil {
r.log("failed to update runner %s status: %s", instance.Name, err)
r.keyMux.Unlock(instance.Name, false)
// We failed to transition the instance to Creating. This means that garm will retry to create this instance
// when the loop runs again and we end up with multiple instances.
continue
}
go func(instance params.Instance) {
defer r.keyMux.Unlock(instance.Name, false)
r.log("creating instance %s in pool %s", instance.Name, instance.PoolID)
if err := r.addInstanceToProvider(instance); err != nil {
r.log("failed to add instance to provider: %s", err)
errAsBytes := []byte(err.Error())
if _, err := r.setInstanceStatus(instance.Name, commonParams.InstanceError, errAsBytes); err != nil {
r.log("failed to update runner %s status: %s", instance.Name, err)
}
r.log("failed to create instance in provider: %s", err)
}
}(instance)
}
return nil
}
func (r *basePoolManager) Wait() error {
done := make(chan struct{})
go func() {
r.wg.Wait()
close(done)
}()
select {
case <-done:
case <-time.After(60 * time.Second):
return errors.Wrap(runnerErrors.ErrTimeout, "waiting for pool to stop")
}
return nil
}
func (r *basePoolManager) runnerCleanup() (err error) {
r.log("running runner cleanup")
runners, err := r.helper.GetGithubRunners()
if err != nil {
return fmt.Errorf("failed to fetch github runners: %w", err)
}
if err := r.reapTimedOutRunners(runners); err != nil {
return fmt.Errorf("failed to reap timed out runners: %w", err)
}
if err := r.cleanupOrphanedRunners(runners); err != nil {
return fmt.Errorf("failed to cleanup orphaned runners: %w", err)
}
return nil
}
func (r *basePoolManager) cleanupOrphanedRunners(runners []*github.Runner) error {
if err := r.cleanupOrphanedProviderRunners(runners); err != nil {
return errors.Wrap(err, "cleaning orphaned instances")
}
if err := r.cleanupOrphanedGithubRunners(runners); err != nil {
return errors.Wrap(err, "cleaning orphaned github runners")
}
return nil
}
func (r *basePoolManager) Start() error {
initialToolUpdate := make(chan struct{}, 1)
go func() {
r.updateTools() //nolint
initialToolUpdate <- struct{}{}
}()
go func() {
<-initialToolUpdate
defer close(initialToolUpdate)
go r.startLoopForFunction(r.runnerCleanup, common.PoolReapTimeoutInterval, "timeout_reaper", false)
go r.startLoopForFunction(r.scaleDown, common.PoolScaleDownInterval, "scale_down", false)
go r.startLoopForFunction(r.deletePendingInstances, common.PoolConsilitationInterval, "consolidate[delete_pending]", false)
go r.startLoopForFunction(r.addPendingInstances, common.PoolConsilitationInterval, "consolidate[add_pending]", false)
go r.startLoopForFunction(r.ensureMinIdleRunners, common.PoolConsilitationInterval, "consolidate[ensure_min_idle]", false)
go r.startLoopForFunction(r.retryFailedInstances, common.PoolConsilitationInterval, "consolidate[retry_failed]", false)
go r.startLoopForFunction(r.updateTools, common.PoolToolUpdateInterval, "update_tools", true)
go r.startLoopForFunction(r.consumeQueuedJobs, common.PoolConsilitationInterval, "job_queue_consumer", false)
}()
return nil
}
func (r *basePoolManager) Stop() error {
close(r.quit)
return nil
}
func (r *basePoolManager) RefreshState(param params.UpdatePoolStateParams) error {
return r.helper.UpdateState(param)
}
func (r *basePoolManager) WebhookSecret() string {
return r.helper.WebhookSecret()
}
func (r *basePoolManager) GithubRunnerRegistrationToken() (string, error) {
return r.helper.GetGithubRegistrationToken()
}
func (r *basePoolManager) ID() string {
return r.helper.ID()
}
// ForceDeleteRunner will delete a runner from a pool.
//
// Deprecated: Use DeleteRunner instead.
func (r *basePoolManager) ForceDeleteRunner(runner params.Instance) error {
return r.DeleteRunner(runner, true)
}
// Delete runner will delete a runner from a pool. If forceRemove is set to true, any error received from
// the IaaS provider will be ignored and deletion will continue.
func (r *basePoolManager) DeleteRunner(runner params.Instance, forceRemove bool) error {
if !r.managerIsRunning {
return runnerErrors.NewConflictError("pool manager is not running for %s", r.helper.String())
}
if runner.AgentID != 0 {
resp, err := r.helper.RemoveGithubRunner(runner.AgentID)
if err != nil {
if resp != nil {
switch resp.StatusCode {
case http.StatusUnprocessableEntity:
return errors.Wrapf(runnerErrors.ErrBadRequest, "removing runner: %q", err)
case http.StatusNotFound:
// Runner may have been deleted by a finished job, or manually by the user.
r.log("runner with agent id %d was not found in github", runner.AgentID)
case http.StatusUnauthorized:
// Mark the pool as offline from this point forward
failureReason := fmt.Sprintf("failed to remove runner: %q", err)
r.setPoolRunningState(false, failureReason)
log.Print(failureReason)
// evaluate the next switch case.
fallthrough
default:
return errors.Wrap(err, "removing runner")
}
} else {
// We got a nil response. Assume we are in error.
return errors.Wrap(err, "removing runner")
}
}
}
instanceStatus := commonParams.InstancePendingDelete
if forceRemove {
instanceStatus = commonParams.InstancePendingForceDelete
}
r.log("setting instance status for %v to %v", runner.Name, instanceStatus)
if _, err := r.setInstanceStatus(runner.Name, instanceStatus, nil); err != nil {
r.log("failed to update runner %s status: %s", runner.Name, err)
return errors.Wrap(err, "updating runner")
}
return nil
}
// consumeQueuedJobs will pull all the known jobs from the database and attempt to create a new
// runner in one of the pools it manages, if it matches the requested labels.
// This is a best effort attempt to consume queued jobs. We do not have any real way to know which
// runner from which pool will pick up a job we react to here. For example, the same job may be received
// by an enterprise manager, an org manager AND a repo manager. If an idle runner from another pool
// picks up the job after we created a runner in this pool, we will have an extra runner that may or may not
// have a job waiting for it.
// This is not a huge problem, as we have scale down logic which should remove any idle runners that have not
// picked up a job within a certain time frame. Also, the logic here should ensure that eventually, all known
// queued jobs will be consumed sooner or later.
//
// NOTE: jobs that were created while the garm instance was down, will be unknown to garm itself and will linger
// in queued state if the pools defined in garm have a minimum idle runner value set to 0. Simply put, garm won't
// know about the queued jobs that we didn't get a webhook for. Listing all jobs on startup is not feasible, as
// an enterprise may have thousands of repos and thousands of jobs in queued state. To fetch all jobs for an
// enterprise, we'd have to list all repos, and for each repo list all jobs currently in queued state. This is
// not desirable by any measure.
//
// One way to handle situations where garm comes up after a longer period of time, is to temporarily max out the
// min-idle-runner setting on pools, or at least raise it above 0. The idle runners will start to consume jobs, and
// as they do so, new idle runners will be spun up in their stead. New jobs will record in the DB as they come in,
// so those will trigger the creation of a runner. The jobs we don't know about will be dealt with by the idle runners.
// Once jobs are consumed, you can set min-idle-runners to 0 again.
func (r *basePoolManager) consumeQueuedJobs() error {
queued, err := r.store.ListEntityJobsByStatus(r.ctx, r.helper.PoolType(), r.helper.ID(), params.JobStatusQueued)
if err != nil {
return errors.Wrap(err, "listing queued jobs")
}
poolsCache := poolsForTags{}
r.log("found %d queued jobs for %s", len(queued), r.helper.String())
for _, job := range queued {
if job.LockedBy != uuid.Nil && job.LockedBy.String() != r.ID() {
// Job was handled by us or another entity.
r.log("job %d is locked by %s", job.ID, job.LockedBy.String())
continue
}
if time.Since(job.UpdatedAt) < time.Second*30 {
// give the idle runners a chance to pick up the job.
r.log("job %d was updated less than 30 seconds ago. Skipping", job.ID)
continue
}
if time.Since(job.UpdatedAt) >= time.Minute*10 {
// Job is still queued in our db, 10 minutes after a matching runner
// was spawned. Unlock it and try again. A different job may have picked up
// the runner.
if err := r.store.UnlockJob(r.ctx, job.ID, r.ID()); err != nil {
// TODO: Implament a cache? Should we return here?
r.log("failed to unlock job %d: %q", job.ID, err)
continue
}
}
if job.LockedBy.String() == r.ID() {
// Job is locked by us. We must have already attepted to create a runner for it. Skip.
// TODO(gabriel-samfira): create an in-memory state of existing runners that we can easily
// check for existing pending or idle runners. If we can't find any, attempt to allocate another
// runner.
r.log("job %d is locked by us", job.ID)
continue
}
poolRR, ok := poolsCache.Get(job.Labels)
if !ok {
potentialPools, err := r.store.FindPoolsMatchingAllTags(r.ctx, r.helper.PoolType(), r.helper.ID(), job.Labels)
if err != nil {
r.log("error finding pools matching labels: %s", err)
continue
}
poolRR = poolsCache.Add(job.Labels, potentialPools)
}
if poolRR.Len() == 0 {
r.log("could not find pools with labels %s", strings.Join(job.Labels, ","))
continue
}
runnerCreated := false
if err := r.store.LockJob(r.ctx, job.ID, r.ID()); err != nil {
r.log("could not lock job %d: %s", job.ID, err)
continue
}
jobLabels := []string{
fmt.Sprintf("%s%d", jobLabelPrefix, job.ID),
}
for i := 0; i < poolRR.Len(); i++ {
pool, err := poolRR.Next()
if err != nil {
r.log("could not find a pool to create a runner for job %d: %s", job.ID, err)
break
}
r.log("attempting to create a runner in pool %s for job %d", pool.ID, job.ID)
if err := r.addRunnerToPool(pool, jobLabels); err != nil {
r.log("[PoolRR] could not add runner to pool %s: %s", pool.ID, err)
continue
}
r.log("a new runner was added to pool %s as a response to queued job %d", pool.ID, job.ID)
runnerCreated = true
break
}
if !runnerCreated {
r.log("could not create a runner for job %d; unlocking", job.ID)
if err := r.store.UnlockJob(r.ctx, job.ID, r.ID()); err != nil {
r.log("failed to unlock job: %d", job.ID)
return errors.Wrap(err, "unlocking job")
}
}
}
if err := r.store.DeleteCompletedJobs(r.ctx); err != nil {
r.log("failed to delete completed jobs: %q", err)
}
return nil
}
func (r *basePoolManager) InstallWebhook(ctx context.Context, param params.InstallWebhookParams) (params.HookInfo, error) {
if r.urls.controllerWebhookURL == "" {
return params.HookInfo{}, errors.Wrap(runnerErrors.ErrBadRequest, "controller webhook url is empty")
}
insecureSSL := "0"
if param.InsecureSSL {
insecureSSL = "1"
}
req := &github.Hook{
Active: github.Bool(true),
Config: map[string]interface{}{
"url": r.urls.controllerWebhookURL,
"content_type": "json",
"insecure_ssl": insecureSSL,
"secret": r.WebhookSecret(),
},
Events: []string{
"workflow_job",
},
}
return r.helper.InstallHook(ctx, req)
}
func (r *basePoolManager) UninstallWebhook(ctx context.Context) error {
if r.urls.controllerWebhookURL == "" {
return errors.Wrap(runnerErrors.ErrBadRequest, "controller webhook url is empty")
}
return r.helper.UninstallHook(ctx, r.urls.controllerWebhookURL)
}
func (r *basePoolManager) GetWebhookInfo(ctx context.Context) (params.HookInfo, error) {
return r.helper.GetHookInfo(ctx)
}
func (r *basePoolManager) RootCABundle() (params.CertificateBundle, error) {
return r.credsDetails.RootCertificateBundle()
}