Merge pull request #242 from gabriel-samfira/some-cleanup

Remove some code, move some code around
This commit is contained in:
Gabriel 2024-04-01 17:57:06 +03:00 committed by GitHub
commit 069bdd8b6b
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 96 additions and 119 deletions

29
runner/pool/locking.go Normal file
View file

@ -0,0 +1,29 @@
package pool
import "sync"
type keyMutex struct {
muxes sync.Map
}
func (k *keyMutex) TryLock(key string) bool {
mux, _ := k.muxes.LoadOrStore(key, &sync.Mutex{})
keyMux := mux.(*sync.Mutex)
return keyMux.TryLock()
}
func (k *keyMutex) Unlock(key string, remove bool) {
mux, ok := k.muxes.Load(key)
if !ok {
return
}
keyMux := mux.(*sync.Mutex)
if remove {
k.Delete(key)
}
keyMux.Unlock()
}
func (k *keyMutex) Delete(key string) {
k.muxes.Delete(key)
}

View file

@ -62,32 +62,6 @@ const (
maxCreateAttempts = 5
)
type keyMutex struct {
muxes sync.Map
}
func (k *keyMutex) TryLock(key string) bool {
mux, _ := k.muxes.LoadOrStore(key, &sync.Mutex{})
keyMux := mux.(*sync.Mutex)
return keyMux.TryLock()
}
func (k *keyMutex) Unlock(key string, remove bool) {
mux, ok := k.muxes.Load(key)
if !ok {
return
}
keyMux := mux.(*sync.Mutex)
if remove {
k.Delete(key)
}
keyMux.Unlock()
}
func (k *keyMutex) Delete(key string) {
k.muxes.Delete(key)
}
type urls struct {
callbackURL string
metadataURL string
@ -288,7 +262,7 @@ func (r *basePoolManager) HandleWorkflowJob(job params.WorkflowJob) error {
// A runner has picked up the job, and is now running it. It may need to be replaced if the pool has
// a minimum number of idle runners configured.
pool, err := r.GetPoolByID(instance.PoolID)
pool, err := r.store.GetEntityPool(r.ctx, r.entity, instance.PoolID)
if err != nil {
return errors.Wrap(err, "getting pool")
}
@ -386,37 +360,6 @@ func (r *basePoolManager) updateTools() error {
return err
}
func controllerIDFromLabels(labels []string) string {
for _, lbl := range labels {
if strings.HasPrefix(lbl, controllerLabelPrefix) {
return lbl[len(controllerLabelPrefix):]
}
}
return ""
}
func labelsFromRunner(runner *github.Runner) []string {
if runner == nil || runner.Labels == nil {
return []string{}
}
var labels []string
for _, val := range runner.Labels {
if val == nil {
continue
}
labels = append(labels, val.GetName())
}
return labels
}
// isManagedRunner returns true if labels indicate the runner belongs to a pool
// this manager is responsible for.
func (r *basePoolManager) isManagedRunner(labels []string) bool {
runnerControllerID := controllerIDFromLabels(labels)
return runnerControllerID == r.controllerID
}
// cleanupOrphanedProviderRunners compares runners in github with local runners and removes
// any local runners that are not present in Github. Runners that are "idle" in our
// provider, but do not exist in github, will be removed. This can happen if the
@ -425,14 +368,14 @@ func (r *basePoolManager) isManagedRunner(labels []string) bool {
// If we were offline and did not process the webhook, the instance will linger.
// We need to remove it from the provider and database.
func (r *basePoolManager) cleanupOrphanedProviderRunners(runners []*github.Runner) error {
dbInstances, err := r.FetchDbInstances()
dbInstances, err := r.store.ListEntityInstances(r.ctx, r.entity)
if err != nil {
return errors.Wrap(err, "fetching instances from db")
}
runnerNames := map[string]bool{}
for _, run := range runners {
if !r.isManagedRunner(labelsFromRunner(run)) {
if !isManagedRunner(labelsFromRunner(run), r.controllerID) {
slog.DebugContext(
r.ctx, "runner is not managed by a pool we manage",
"runner_name", run.GetName())
@ -460,7 +403,7 @@ func (r *basePoolManager) cleanupOrphanedProviderRunners(runners []*github.Runne
continue
}
pool, err := r.GetPoolByID(instance.PoolID)
pool, err := r.store.GetEntityPool(r.ctx, r.entity, instance.PoolID)
if err != nil {
return errors.Wrap(err, "fetching instance pool info")
}
@ -501,14 +444,14 @@ func (r *basePoolManager) cleanupOrphanedProviderRunners(runners []*github.Runne
// of "running" in the provider, but that has not registered with Github, and has
// received no new updates in the configured timeout interval.
func (r *basePoolManager) reapTimedOutRunners(runners []*github.Runner) error {
dbInstances, err := r.FetchDbInstances()
dbInstances, err := r.store.ListEntityInstances(r.ctx, r.entity)
if err != nil {
return errors.Wrap(err, "fetching instances from db")
}
runnersByName := map[string]*github.Runner{}
for _, run := range runners {
if !r.isManagedRunner(labelsFromRunner(run)) {
if !isManagedRunner(labelsFromRunner(run), r.controllerID) {
slog.DebugContext(
r.ctx, "runner is not managed by a pool we manage",
"runner_name", run.GetName())
@ -530,7 +473,7 @@ func (r *basePoolManager) reapTimedOutRunners(runners []*github.Runner) error {
}
defer r.keyMux.Unlock(instance.Name, false)
pool, err := r.GetPoolByID(instance.PoolID)
pool, err := r.store.GetEntityPool(r.ctx, r.entity, instance.PoolID)
if err != nil {
return errors.Wrap(err, "fetching instance pool info")
}
@ -558,15 +501,6 @@ func (r *basePoolManager) reapTimedOutRunners(runners []*github.Runner) error {
return nil
}
func instanceInList(instanceName string, instances []commonParams.ProviderInstance) (commonParams.ProviderInstance, bool) {
for _, val := range instances {
if val.Name == instanceName {
return val, true
}
}
return commonParams.ProviderInstance{}, false
}
// cleanupOrphanedGithubRunners will forcefully remove any github runners that appear
// as offline and for which we no longer have a local instance.
// This may happen if someone manually deletes the instance in the provider. We need to
@ -575,7 +509,7 @@ func (r *basePoolManager) cleanupOrphanedGithubRunners(runners []*github.Runner)
poolInstanceCache := map[string][]commonParams.ProviderInstance{}
g, ctx := errgroup.WithContext(r.ctx)
for _, runner := range runners {
if !r.isManagedRunner(labelsFromRunner(runner)) {
if !isManagedRunner(labelsFromRunner(runner), r.controllerID) {
slog.DebugContext(
r.ctx, "runner is not managed by a pool we manage",
"runner_name", runner.GetName())
@ -598,7 +532,7 @@ func (r *basePoolManager) cleanupOrphanedGithubRunners(runners []*github.Runner)
slog.InfoContext(
r.ctx, "Runner has no database entry in garm, removing from github",
"runner_name", runner.GetName())
resp, err := r.RemoveGithubRunner(*runner.ID)
resp, err := r.ghcli.RemoveEntityRunner(r.ctx, runner.GetID())
if err != nil {
// Removed in the meantime?
if resp != nil && resp.StatusCode == http.StatusNotFound {
@ -632,7 +566,7 @@ func (r *basePoolManager) cleanupOrphanedGithubRunners(runners []*github.Runner)
}
}
pool, err := r.GetPoolByID(dbInstance.PoolID)
pool, err := r.store.GetEntityPool(r.ctx, r.entity, dbInstance.PoolID)
if err != nil {
return errors.Wrap(err, "fetching pool")
}
@ -678,7 +612,7 @@ func (r *basePoolManager) cleanupOrphanedGithubRunners(runners []*github.Runner)
slog.InfoContext(
r.ctx, "Runner instance is no longer on the provider, removing from github",
"runner_name", dbInstance.Name)
resp, err := r.RemoveGithubRunner(*runner.ID)
resp, err := r.ghcli.RemoveEntityRunner(r.ctx, runner.GetID())
if err != nil {
// Removed in the meantime?
if resp != nil && resp.StatusCode == http.StatusNotFound {
@ -749,16 +683,7 @@ func (r *basePoolManager) setInstanceRunnerStatus(runnerName string, status para
updateParams := params.UpdateInstanceParams{
RunnerStatus: status,
}
instance, err := r.updateInstance(runnerName, updateParams)
if err != nil {
return params.Instance{}, errors.Wrap(err, "updating runner state")
}
return instance, nil
}
func (r *basePoolManager) updateInstance(runnerName string, update params.UpdateInstanceParams) (params.Instance, error) {
instance, err := r.store.UpdateInstance(r.ctx, runnerName, update)
instance, err := r.store.UpdateInstance(r.ctx, runnerName, updateParams)
if err != nil {
return params.Instance{}, errors.Wrap(err, "updating runner state")
}
@ -771,7 +696,7 @@ func (r *basePoolManager) setInstanceStatus(runnerName string, status commonPara
ProviderFault: providerFault,
}
instance, err := r.updateInstance(runnerName, updateParams)
instance, err := r.store.UpdateInstance(r.ctx, runnerName, updateParams)
if err != nil {
return params.Instance{}, errors.Wrap(err, "updating runner state")
}
@ -779,7 +704,7 @@ func (r *basePoolManager) setInstanceStatus(runnerName string, status commonPara
}
func (r *basePoolManager) AddRunner(ctx context.Context, poolID string, aditionalLabels []string) (err error) {
pool, err := r.GetPoolByID(poolID)
pool, err := r.store.GetEntityPool(r.ctx, r.entity, poolID)
if err != nil {
return errors.Wrap(err, "fetching pool")
}
@ -838,7 +763,7 @@ func (r *basePoolManager) AddRunner(ctx context.Context, poolID string, aditiona
}
if runner != nil {
_, runnerCleanupErr := r.RemoveGithubRunner(runner.GetID())
_, runnerCleanupErr := r.ghcli.RemoveEntityRunner(r.ctx, runner.GetID())
if err != nil {
slog.With(slog.Any("error", runnerCleanupErr)).ErrorContext(
ctx, "failed to remove runner",
@ -888,7 +813,7 @@ func (r *basePoolManager) getLabelsForInstance(pool params.Pool) []string {
}
func (r *basePoolManager) addInstanceToProvider(instance params.Instance) error {
pool, err := r.GetPoolByID(instance.PoolID)
pool, err := r.store.GetEntityPool(r.ctx, r.entity, instance.PoolID)
if err != nil {
return errors.Wrap(err, "fetching pool")
}
@ -1332,7 +1257,7 @@ func (r *basePoolManager) retryFailedInstancesForOnePool(ctx context.Context, po
ctx, "queueing previously failed instance for retry",
"runner_name", instance.Name)
// Set instance to pending create and wait for retry.
if _, err := r.updateInstance(instance.Name, updateParams); err != nil {
if _, err := r.store.UpdateInstance(r.ctx, instance.Name, updateParams); err != nil {
slog.With(slog.Any("error", err)).ErrorContext(
ctx, "failed to update runner status",
"runner_name", instance.Name)
@ -1347,7 +1272,7 @@ func (r *basePoolManager) retryFailedInstancesForOnePool(ctx context.Context, po
}
func (r *basePoolManager) retryFailedInstances() error {
pools, err := r.ListPools()
pools, err := r.store.ListEntityPools(r.ctx, r.entity)
if err != nil {
return fmt.Errorf("error listing pools: %w", err)
}
@ -1370,7 +1295,7 @@ func (r *basePoolManager) retryFailedInstances() error {
}
func (r *basePoolManager) scaleDown() error {
pools, err := r.ListPools()
pools, err := r.store.ListEntityPools(r.ctx, r.entity)
if err != nil {
return fmt.Errorf("error listing pools: %w", err)
}
@ -1391,7 +1316,7 @@ func (r *basePoolManager) scaleDown() error {
}
func (r *basePoolManager) ensureMinIdleRunners() error {
pools, err := r.ListPools()
pools, err := r.store.ListEntityPools(r.ctx, r.entity)
if err != nil {
return fmt.Errorf("error listing pools: %w", err)
}
@ -1411,7 +1336,7 @@ func (r *basePoolManager) ensureMinIdleRunners() error {
}
func (r *basePoolManager) deleteInstanceFromProvider(ctx context.Context, instance params.Instance) error {
pool, err := r.GetPoolByID(instance.PoolID)
pool, err := r.store.GetEntityPool(r.ctx, r.entity, instance.PoolID)
if err != nil {
return errors.Wrap(err, "fetching pool")
}
@ -1441,7 +1366,7 @@ func (r *basePoolManager) deleteInstanceFromProvider(ctx context.Context, instan
}
func (r *basePoolManager) deletePendingInstances() error {
instances, err := r.FetchDbInstances()
instances, err := r.store.ListEntityInstances(r.ctx, r.entity)
if err != nil {
return fmt.Errorf("failed to fetch instances from store: %w", err)
}
@ -1529,7 +1454,7 @@ func (r *basePoolManager) deletePendingInstances() error {
func (r *basePoolManager) addPendingInstances() error {
// nolint:golangci-lint,godox
// TODO: filter instances by status.
instances, err := r.FetchDbInstances()
instances, err := r.store.ListEntityInstances(r.ctx, r.entity)
if err != nil {
return fmt.Errorf("failed to fetch instances from store: %w", err)
}
@ -1717,7 +1642,7 @@ func (r *basePoolManager) DeleteRunner(runner params.Instance, forceRemove, bypa
return runnerErrors.NewConflictError("pool manager is not running for %s", r.entity.String())
}
if runner.AgentID != 0 {
resp, err := r.RemoveGithubRunner(runner.AgentID)
resp, err := r.ghcli.RemoveEntityRunner(r.ctx, runner.AgentID)
if err != nil {
if resp != nil {
switch resp.StatusCode {
@ -2083,14 +2008,6 @@ func (r *basePoolManager) GithubRunnerRegistrationToken() (string, error) {
return *tk.Token, nil
}
func (r *basePoolManager) RemoveGithubRunner(runnerID int64) (*github.Response, error) {
ghResp, err := r.ghcli.RemoveEntityRunner(r.ctx, runnerID)
if err != nil {
return nil, fmt.Errorf("removing runner: %w", err)
}
return ghResp, nil
}
func (r *basePoolManager) FetchTools() ([]commonParams.RunnerApplicationDownload, error) {
tools, ghResp, err := r.ghcli.ListEntityRunnerApplicationDownloads(r.ctx)
if err != nil {
@ -2153,18 +2070,6 @@ func (r *basePoolManager) GithubURL() string {
return ""
}
func (r *basePoolManager) FetchDbInstances() ([]params.Instance, error) {
return r.store.ListEntityInstances(r.ctx, r.entity)
}
func (r *basePoolManager) ListPools() ([]params.Pool, error) {
return r.store.ListEntityPools(r.ctx, r.entity)
}
func (r *basePoolManager) GetPoolByID(poolID string) (params.Pool, error) {
return r.store.GetEntityPool(r.ctx, r.entity, poolID)
}
func (r *basePoolManager) GetWebhookInfo(ctx context.Context) (params.HookInfo, error) {
allHooks, err := r.listHooks(ctx)
if err != nil {

View file

@ -6,7 +6,10 @@ import (
"sync"
"sync/atomic"
"github.com/google/go-github/v57/github"
runnerErrors "github.com/cloudbase/garm-provider-common/errors"
commonParams "github.com/cloudbase/garm-provider-common/params"
"github.com/cloudbase/garm/params"
)
@ -73,3 +76,43 @@ func (p *poolsForTags) Add(tags []string, pools []params.Pool) poolCacheStore {
v, _ := p.pools.LoadOrStore(key, poolRR)
return v.(*poolRoundRobin)
}
func instanceInList(instanceName string, instances []commonParams.ProviderInstance) (commonParams.ProviderInstance, bool) {
for _, val := range instances {
if val.Name == instanceName {
return val, true
}
}
return commonParams.ProviderInstance{}, false
}
func controllerIDFromLabels(labels []string) string {
for _, lbl := range labels {
if strings.HasPrefix(lbl, controllerLabelPrefix) {
return lbl[len(controllerLabelPrefix):]
}
}
return ""
}
func labelsFromRunner(runner *github.Runner) []string {
if runner == nil || runner.Labels == nil {
return []string{}
}
var labels []string
for _, val := range runner.Labels {
if val == nil {
continue
}
labels = append(labels, val.GetName())
}
return labels
}
// isManagedRunner returns true if labels indicate the runner belongs to a pool
// this manager is responsible for.
func isManagedRunner(labels []string, controllerID string) bool {
runnerControllerID := controllerIDFromLabels(labels)
return runnerControllerID == controllerID
}