Dedupe more code
Signed-off-by: Gabriel Adrian Samfira <gsamfira@cloudbasesolutions.com>
This commit is contained in:
parent
b550d0c5b9
commit
fa75ecfa8e
6 changed files with 110 additions and 341 deletions
|
|
@ -94,6 +94,40 @@ type urls struct {
|
|||
webhookURL string
|
||||
controllerWebhookURL string
|
||||
}
|
||||
|
||||
func NewEntityPoolManager(ctx context.Context, entity params.GithubEntity, cfgInternal params.Internal, providers map[string]common.Provider, store dbCommon.Store) (common.PoolManager, error) {
|
||||
ctx = garmUtil.WithContext(ctx, slog.Any("pool_mgr", entity), slog.Any("pool_type", params.GithubEntityTypeRepository))
|
||||
ghc, err := garmUtil.GithubClient(ctx, entity, cfgInternal.GithubCredentialsDetails)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "getting github client")
|
||||
}
|
||||
|
||||
wg := &sync.WaitGroup{}
|
||||
keyMuxes := &keyMutex{}
|
||||
|
||||
repo := &basePoolManager{
|
||||
ctx: ctx,
|
||||
cfgInternal: cfgInternal,
|
||||
entity: entity,
|
||||
ghcli: ghc,
|
||||
|
||||
store: store,
|
||||
providers: providers,
|
||||
controllerID: cfgInternal.ControllerID,
|
||||
urls: urls{
|
||||
webhookURL: cfgInternal.BaseWebhookURL,
|
||||
callbackURL: cfgInternal.InstanceCallbackURL,
|
||||
metadataURL: cfgInternal.InstanceMetadataURL,
|
||||
controllerWebhookURL: cfgInternal.ControllerWebhookURL,
|
||||
},
|
||||
quit: make(chan struct{}),
|
||||
credsDetails: cfgInternal.GithubCredentialsDetails,
|
||||
wg: wg,
|
||||
keyMux: keyMuxes,
|
||||
}
|
||||
return repo, nil
|
||||
}
|
||||
|
||||
type basePoolManager struct {
|
||||
ctx context.Context
|
||||
controllerID string
|
||||
|
|
@ -107,7 +141,6 @@ type basePoolManager struct {
|
|||
tools []commonParams.RunnerApplicationDownload
|
||||
quit chan struct{}
|
||||
|
||||
helper poolHelper
|
||||
credsDetails params.GithubCredentials
|
||||
|
||||
managerIsRunning bool
|
||||
|
|
@ -251,7 +284,7 @@ func (r *basePoolManager) HandleWorkflowJob(job params.WorkflowJob) error {
|
|||
|
||||
// A runner has picked up the job, and is now running it. It may need to be replaced if the pool has
|
||||
// a minimum number of idle runners configured.
|
||||
pool, err := r.helper.GetPoolByID(instance.PoolID)
|
||||
pool, err := r.GetPoolByID(instance.PoolID)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "getting pool")
|
||||
}
|
||||
|
|
@ -388,7 +421,7 @@ func (r *basePoolManager) isManagedRunner(labels []string) bool {
|
|||
// If we were offline and did not process the webhook, the instance will linger.
|
||||
// We need to remove it from the provider and database.
|
||||
func (r *basePoolManager) cleanupOrphanedProviderRunners(runners []*github.Runner) error {
|
||||
dbInstances, err := r.helper.FetchDbInstances()
|
||||
dbInstances, err := r.FetchDbInstances()
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "fetching instances from db")
|
||||
}
|
||||
|
|
@ -423,7 +456,7 @@ func (r *basePoolManager) cleanupOrphanedProviderRunners(runners []*github.Runne
|
|||
continue
|
||||
}
|
||||
|
||||
pool, err := r.helper.GetPoolByID(instance.PoolID)
|
||||
pool, err := r.GetPoolByID(instance.PoolID)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "fetching instance pool info")
|
||||
}
|
||||
|
|
@ -464,7 +497,7 @@ func (r *basePoolManager) cleanupOrphanedProviderRunners(runners []*github.Runne
|
|||
// of "running" in the provider, but that has not registered with Github, and has
|
||||
// received no new updates in the configured timeout interval.
|
||||
func (r *basePoolManager) reapTimedOutRunners(runners []*github.Runner) error {
|
||||
dbInstances, err := r.helper.FetchDbInstances()
|
||||
dbInstances, err := r.FetchDbInstances()
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "fetching instances from db")
|
||||
}
|
||||
|
|
@ -493,7 +526,7 @@ func (r *basePoolManager) reapTimedOutRunners(runners []*github.Runner) error {
|
|||
}
|
||||
defer r.keyMux.Unlock(instance.Name, false)
|
||||
|
||||
pool, err := r.helper.GetPoolByID(instance.PoolID)
|
||||
pool, err := r.GetPoolByID(instance.PoolID)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "fetching instance pool info")
|
||||
}
|
||||
|
|
@ -595,7 +628,7 @@ func (r *basePoolManager) cleanupOrphanedGithubRunners(runners []*github.Runner)
|
|||
}
|
||||
}
|
||||
|
||||
pool, err := r.helper.GetPoolByID(dbInstance.PoolID)
|
||||
pool, err := r.GetPoolByID(dbInstance.PoolID)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "fetching pool")
|
||||
}
|
||||
|
|
@ -714,7 +747,7 @@ func (r *basePoolManager) fetchInstance(runnerName string) (params.Instance, err
|
|||
return params.Instance{}, errors.Wrap(err, "fetching instance")
|
||||
}
|
||||
|
||||
_, err = r.helper.GetPoolByID(runner.PoolID)
|
||||
_, err = r.GetPoolByID(runner.PoolID)
|
||||
if err != nil {
|
||||
return params.Instance{}, errors.Wrap(err, "fetching pool")
|
||||
}
|
||||
|
|
@ -761,7 +794,7 @@ func (r *basePoolManager) setInstanceStatus(runnerName string, status commonPara
|
|||
}
|
||||
|
||||
func (r *basePoolManager) AddRunner(ctx context.Context, poolID string, aditionalLabels []string) (err error) {
|
||||
pool, err := r.helper.GetPoolByID(poolID)
|
||||
pool, err := r.GetPoolByID(poolID)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "fetching pool")
|
||||
}
|
||||
|
|
@ -870,7 +903,7 @@ func (r *basePoolManager) getLabelsForInstance(pool params.Pool) []string {
|
|||
}
|
||||
|
||||
func (r *basePoolManager) addInstanceToProvider(instance params.Instance) error {
|
||||
pool, err := r.helper.GetPoolByID(instance.PoolID)
|
||||
pool, err := r.GetPoolByID(instance.PoolID)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "fetching pool")
|
||||
}
|
||||
|
|
@ -1329,7 +1362,7 @@ func (r *basePoolManager) retryFailedInstancesForOnePool(ctx context.Context, po
|
|||
}
|
||||
|
||||
func (r *basePoolManager) retryFailedInstances() error {
|
||||
pools, err := r.helper.ListPools()
|
||||
pools, err := r.ListPools()
|
||||
if err != nil {
|
||||
return fmt.Errorf("error listing pools: %w", err)
|
||||
}
|
||||
|
|
@ -1352,7 +1385,7 @@ func (r *basePoolManager) retryFailedInstances() error {
|
|||
}
|
||||
|
||||
func (r *basePoolManager) scaleDown() error {
|
||||
pools, err := r.helper.ListPools()
|
||||
pools, err := r.ListPools()
|
||||
if err != nil {
|
||||
return fmt.Errorf("error listing pools: %w", err)
|
||||
}
|
||||
|
|
@ -1373,7 +1406,7 @@ func (r *basePoolManager) scaleDown() error {
|
|||
}
|
||||
|
||||
func (r *basePoolManager) ensureMinIdleRunners() error {
|
||||
pools, err := r.helper.ListPools()
|
||||
pools, err := r.ListPools()
|
||||
if err != nil {
|
||||
return fmt.Errorf("error listing pools: %w", err)
|
||||
}
|
||||
|
|
@ -1393,7 +1426,7 @@ func (r *basePoolManager) ensureMinIdleRunners() error {
|
|||
}
|
||||
|
||||
func (r *basePoolManager) deleteInstanceFromProvider(ctx context.Context, instance params.Instance) error {
|
||||
pool, err := r.helper.GetPoolByID(instance.PoolID)
|
||||
pool, err := r.GetPoolByID(instance.PoolID)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "fetching pool")
|
||||
}
|
||||
|
|
@ -1423,7 +1456,7 @@ func (r *basePoolManager) deleteInstanceFromProvider(ctx context.Context, instan
|
|||
}
|
||||
|
||||
func (r *basePoolManager) deletePendingInstances() error {
|
||||
instances, err := r.helper.FetchDbInstances()
|
||||
instances, err := r.FetchDbInstances()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to fetch instances from store: %w", err)
|
||||
}
|
||||
|
|
@ -1511,7 +1544,7 @@ func (r *basePoolManager) deletePendingInstances() error {
|
|||
func (r *basePoolManager) addPendingInstances() error {
|
||||
// nolint:golangci-lint,godox
|
||||
// TODO: filter instances by status.
|
||||
instances, err := r.helper.FetchDbInstances()
|
||||
instances, err := r.FetchDbInstances()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to fetch instances from store: %w", err)
|
||||
}
|
||||
|
|
@ -2114,6 +2147,44 @@ func (r *basePoolManager) GithubURL() string {
|
|||
return ""
|
||||
}
|
||||
|
||||
func (r *basePoolManager) FetchDbInstances() ([]params.Instance, error) {
|
||||
switch r.entity.EntityType {
|
||||
case params.GithubEntityTypeRepository:
|
||||
return r.store.ListRepoInstances(r.ctx, r.entity.ID)
|
||||
case params.GithubEntityTypeOrganization:
|
||||
return r.store.ListOrgInstances(r.ctx, r.entity.ID)
|
||||
case params.GithubEntityTypeEnterprise:
|
||||
return r.store.ListEnterpriseInstances(r.ctx, r.entity.ID)
|
||||
}
|
||||
return nil, fmt.Errorf("unknown entity type: %s", r.entity.EntityType)
|
||||
}
|
||||
|
||||
func (r *basePoolManager) ListPools() ([]params.Pool, error) {
|
||||
switch r.entity.EntityType {
|
||||
case params.GithubEntityTypeRepository:
|
||||
return r.store.ListRepoPools(r.ctx, r.entity.ID)
|
||||
case params.GithubEntityTypeOrganization:
|
||||
return r.store.ListOrgPools(r.ctx, r.entity.ID)
|
||||
case params.GithubEntityTypeEnterprise:
|
||||
return r.store.ListEnterprisePools(r.ctx, r.entity.ID)
|
||||
default:
|
||||
return nil, fmt.Errorf("unknown entity type: %s", r.entity.EntityType)
|
||||
}
|
||||
}
|
||||
|
||||
func (r *basePoolManager) GetPoolByID(poolID string) (params.Pool, error) {
|
||||
switch r.entity.EntityType {
|
||||
case params.GithubEntityTypeRepository:
|
||||
return r.store.GetRepositoryPool(r.ctx, r.entity.ID, poolID)
|
||||
case params.GithubEntityTypeOrganization:
|
||||
return r.store.GetOrganizationPool(r.ctx, r.entity.ID, poolID)
|
||||
case params.GithubEntityTypeEnterprise:
|
||||
return r.store.GetEnterprisePool(r.ctx, r.entity.ID, poolID)
|
||||
default:
|
||||
return params.Pool{}, fmt.Errorf("unknown entity type: %s", r.entity.EntityType)
|
||||
}
|
||||
}
|
||||
|
||||
func (r *basePoolManager) UninstallWebhook(ctx context.Context) error {
|
||||
if r.urls.controllerWebhookURL == "" {
|
||||
return errors.Wrap(runnerErrors.ErrBadRequest, "controller webhook url is empty")
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue