diff --git a/runner/pool/enterprise.go b/runner/pool/enterprise.go deleted file mode 100644 index 6e2a9b85..00000000 --- a/runner/pool/enterprise.go +++ /dev/null @@ -1,88 +0,0 @@ -package pool - -import ( - "context" - "log/slog" - "sync" - - "github.com/pkg/errors" - - dbCommon "github.com/cloudbase/garm/database/common" - "github.com/cloudbase/garm/params" - "github.com/cloudbase/garm/runner/common" - "github.com/cloudbase/garm/util" -) - -// test that we implement PoolManager -var _ poolHelper = &enterprise{} - -func NewEnterprisePoolManager(ctx context.Context, cfg params.Enterprise, cfgInternal params.Internal, providers map[string]common.Provider, store dbCommon.Store) (common.PoolManager, error) { - ctx = util.WithContext(ctx, slog.Any("pool_mgr", cfg.Name), slog.Any("pool_type", params.GithubEntityTypeEnterprise)) - entity := params.GithubEntity{ - Owner: cfg.Name, - ID: cfg.ID, - WebhookSecret: cfg.WebhookSecret, - EntityType: params.GithubEntityTypeEnterprise, - } - ghc, err := util.GithubClient(ctx, entity, cfgInternal.GithubCredentialsDetails) - if err != nil { - return nil, errors.Wrap(err, "getting github client") - } - - wg := &sync.WaitGroup{} - keyMuxes := &keyMutex{} - - helper := &enterprise{ - ctx: ctx, - id: cfg.ID, - store: store, - } - - repo := &basePoolManager{ - ctx: ctx, - cfgInternal: cfgInternal, - ghcli: ghc, - entity: entity, - store: store, - providers: providers, - controllerID: cfgInternal.ControllerID, - urls: urls{ - webhookURL: cfgInternal.BaseWebhookURL, - callbackURL: cfgInternal.InstanceCallbackURL, - metadataURL: cfgInternal.InstanceMetadataURL, - controllerWebhookURL: cfgInternal.ControllerWebhookURL, - }, - quit: make(chan struct{}), - helper: helper, - credsDetails: cfgInternal.GithubCredentialsDetails, - wg: wg, - keyMux: keyMuxes, - } - return repo, nil -} - -type enterprise struct { - ctx context.Context - id string - store dbCommon.Store -} - -func (e *enterprise) FetchDbInstances() ([]params.Instance, error) { - return e.store.ListEnterpriseInstances(e.ctx, e.id) -} - -func (e *enterprise) ListPools() ([]params.Pool, error) { - pools, err := e.store.ListEnterprisePools(e.ctx, e.id) - if err != nil { - return nil, errors.Wrap(err, "fetching pools") - } - return pools, nil -} - -func (e *enterprise) GetPoolByID(poolID string) (params.Pool, error) { - pool, err := e.store.GetEnterprisePool(e.ctx, e.id, poolID) - if err != nil { - return params.Pool{}, errors.Wrap(err, "fetching pool") - } - return pool, nil -} diff --git a/runner/pool/interfaces.go b/runner/pool/interfaces.go deleted file mode 100644 index 3273c58c..00000000 --- a/runner/pool/interfaces.go +++ /dev/null @@ -1,25 +0,0 @@ -// Copyright 2022 Cloudbase Solutions SRL -// -// Licensed under the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. You may obtain -// a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -// License for the specific language governing permissions and limitations -// under the License. - -package pool - -import ( - "github.com/cloudbase/garm/params" -) - -type poolHelper interface { - FetchDbInstances() ([]params.Instance, error) - ListPools() ([]params.Pool, error) - GetPoolByID(poolID string) (params.Pool, error) -} diff --git a/runner/pool/organization.go b/runner/pool/organization.go deleted file mode 100644 index a0b6aa8d..00000000 --- a/runner/pool/organization.go +++ /dev/null @@ -1,102 +0,0 @@ -// Copyright 2022 Cloudbase Solutions SRL -// -// Licensed under the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. You may obtain -// a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -// License for the specific language governing permissions and limitations -// under the License. - -package pool - -import ( - "context" - "log/slog" - "sync" - - "github.com/pkg/errors" - - dbCommon "github.com/cloudbase/garm/database/common" - "github.com/cloudbase/garm/params" - "github.com/cloudbase/garm/runner/common" - "github.com/cloudbase/garm/util" -) - -// test that we implement PoolManager -var _ poolHelper = &organization{} - -func NewOrganizationPoolManager(ctx context.Context, cfg params.Organization, cfgInternal params.Internal, providers map[string]common.Provider, store dbCommon.Store) (common.PoolManager, error) { - ctx = util.WithContext(ctx, slog.Any("pool_mgr", cfg.Name), slog.Any("pool_type", params.GithubEntityTypeOrganization)) - entity := params.GithubEntity{ - Owner: cfg.Name, - ID: cfg.ID, - WebhookSecret: cfg.WebhookSecret, - EntityType: params.GithubEntityTypeOrganization, - } - ghc, err := util.GithubClient(ctx, entity, cfgInternal.GithubCredentialsDetails) - if err != nil { - return nil, errors.Wrap(err, "getting github client") - } - - wg := &sync.WaitGroup{} - keyMuxes := &keyMutex{} - - helper := &organization{ - ctx: ctx, - id: cfg.ID, - store: store, - } - - repo := &basePoolManager{ - ctx: ctx, - cfgInternal: cfgInternal, - ghcli: ghc, - entity: entity, - store: store, - providers: providers, - controllerID: cfgInternal.ControllerID, - urls: urls{ - webhookURL: cfgInternal.BaseWebhookURL, - callbackURL: cfgInternal.InstanceCallbackURL, - metadataURL: cfgInternal.InstanceMetadataURL, - controllerWebhookURL: cfgInternal.ControllerWebhookURL, - }, - quit: make(chan struct{}), - helper: helper, - credsDetails: cfgInternal.GithubCredentialsDetails, - wg: wg, - keyMux: keyMuxes, - } - return repo, nil -} - -type organization struct { - ctx context.Context - id string - store dbCommon.Store -} - -func (o *organization) FetchDbInstances() ([]params.Instance, error) { - return o.store.ListOrgInstances(o.ctx, o.id) -} - -func (o *organization) ListPools() ([]params.Pool, error) { - pools, err := o.store.ListOrgPools(o.ctx, o.id) - if err != nil { - return nil, errors.Wrap(err, "fetching pools") - } - return pools, nil -} - -func (o *organization) GetPoolByID(poolID string) (params.Pool, error) { - pool, err := o.store.GetOrganizationPool(o.ctx, o.id, poolID) - if err != nil { - return params.Pool{}, errors.Wrap(err, "fetching pool") - } - return pool, nil -} diff --git a/runner/pool/pool.go b/runner/pool/pool.go index eaed2fa7..5f4a6e03 100644 --- a/runner/pool/pool.go +++ b/runner/pool/pool.go @@ -94,6 +94,40 @@ type urls struct { webhookURL string controllerWebhookURL string } + +func NewEntityPoolManager(ctx context.Context, entity params.GithubEntity, cfgInternal params.Internal, providers map[string]common.Provider, store dbCommon.Store) (common.PoolManager, error) { + ctx = garmUtil.WithContext(ctx, slog.Any("pool_mgr", entity), slog.Any("pool_type", params.GithubEntityTypeRepository)) + ghc, err := garmUtil.GithubClient(ctx, entity, cfgInternal.GithubCredentialsDetails) + if err != nil { + return nil, errors.Wrap(err, "getting github client") + } + + wg := &sync.WaitGroup{} + keyMuxes := &keyMutex{} + + repo := &basePoolManager{ + ctx: ctx, + cfgInternal: cfgInternal, + entity: entity, + ghcli: ghc, + + store: store, + providers: providers, + controllerID: cfgInternal.ControllerID, + urls: urls{ + webhookURL: cfgInternal.BaseWebhookURL, + callbackURL: cfgInternal.InstanceCallbackURL, + metadataURL: cfgInternal.InstanceMetadataURL, + controllerWebhookURL: cfgInternal.ControllerWebhookURL, + }, + quit: make(chan struct{}), + credsDetails: cfgInternal.GithubCredentialsDetails, + wg: wg, + keyMux: keyMuxes, + } + return repo, nil +} + type basePoolManager struct { ctx context.Context controllerID string @@ -107,7 +141,6 @@ type basePoolManager struct { tools []commonParams.RunnerApplicationDownload quit chan struct{} - helper poolHelper credsDetails params.GithubCredentials managerIsRunning bool @@ -251,7 +284,7 @@ func (r *basePoolManager) HandleWorkflowJob(job params.WorkflowJob) error { // A runner has picked up the job, and is now running it. It may need to be replaced if the pool has // a minimum number of idle runners configured. - pool, err := r.helper.GetPoolByID(instance.PoolID) + pool, err := r.GetPoolByID(instance.PoolID) if err != nil { return errors.Wrap(err, "getting pool") } @@ -388,7 +421,7 @@ func (r *basePoolManager) isManagedRunner(labels []string) bool { // If we were offline and did not process the webhook, the instance will linger. // We need to remove it from the provider and database. func (r *basePoolManager) cleanupOrphanedProviderRunners(runners []*github.Runner) error { - dbInstances, err := r.helper.FetchDbInstances() + dbInstances, err := r.FetchDbInstances() if err != nil { return errors.Wrap(err, "fetching instances from db") } @@ -423,7 +456,7 @@ func (r *basePoolManager) cleanupOrphanedProviderRunners(runners []*github.Runne continue } - pool, err := r.helper.GetPoolByID(instance.PoolID) + pool, err := r.GetPoolByID(instance.PoolID) if err != nil { return errors.Wrap(err, "fetching instance pool info") } @@ -464,7 +497,7 @@ func (r *basePoolManager) cleanupOrphanedProviderRunners(runners []*github.Runne // of "running" in the provider, but that has not registered with Github, and has // received no new updates in the configured timeout interval. func (r *basePoolManager) reapTimedOutRunners(runners []*github.Runner) error { - dbInstances, err := r.helper.FetchDbInstances() + dbInstances, err := r.FetchDbInstances() if err != nil { return errors.Wrap(err, "fetching instances from db") } @@ -493,7 +526,7 @@ func (r *basePoolManager) reapTimedOutRunners(runners []*github.Runner) error { } defer r.keyMux.Unlock(instance.Name, false) - pool, err := r.helper.GetPoolByID(instance.PoolID) + pool, err := r.GetPoolByID(instance.PoolID) if err != nil { return errors.Wrap(err, "fetching instance pool info") } @@ -595,7 +628,7 @@ func (r *basePoolManager) cleanupOrphanedGithubRunners(runners []*github.Runner) } } - pool, err := r.helper.GetPoolByID(dbInstance.PoolID) + pool, err := r.GetPoolByID(dbInstance.PoolID) if err != nil { return errors.Wrap(err, "fetching pool") } @@ -714,7 +747,7 @@ func (r *basePoolManager) fetchInstance(runnerName string) (params.Instance, err return params.Instance{}, errors.Wrap(err, "fetching instance") } - _, err = r.helper.GetPoolByID(runner.PoolID) + _, err = r.GetPoolByID(runner.PoolID) if err != nil { return params.Instance{}, errors.Wrap(err, "fetching pool") } @@ -761,7 +794,7 @@ func (r *basePoolManager) setInstanceStatus(runnerName string, status commonPara } func (r *basePoolManager) AddRunner(ctx context.Context, poolID string, aditionalLabels []string) (err error) { - pool, err := r.helper.GetPoolByID(poolID) + pool, err := r.GetPoolByID(poolID) if err != nil { return errors.Wrap(err, "fetching pool") } @@ -870,7 +903,7 @@ func (r *basePoolManager) getLabelsForInstance(pool params.Pool) []string { } func (r *basePoolManager) addInstanceToProvider(instance params.Instance) error { - pool, err := r.helper.GetPoolByID(instance.PoolID) + pool, err := r.GetPoolByID(instance.PoolID) if err != nil { return errors.Wrap(err, "fetching pool") } @@ -1329,7 +1362,7 @@ func (r *basePoolManager) retryFailedInstancesForOnePool(ctx context.Context, po } func (r *basePoolManager) retryFailedInstances() error { - pools, err := r.helper.ListPools() + pools, err := r.ListPools() if err != nil { return fmt.Errorf("error listing pools: %w", err) } @@ -1352,7 +1385,7 @@ func (r *basePoolManager) retryFailedInstances() error { } func (r *basePoolManager) scaleDown() error { - pools, err := r.helper.ListPools() + pools, err := r.ListPools() if err != nil { return fmt.Errorf("error listing pools: %w", err) } @@ -1373,7 +1406,7 @@ func (r *basePoolManager) scaleDown() error { } func (r *basePoolManager) ensureMinIdleRunners() error { - pools, err := r.helper.ListPools() + pools, err := r.ListPools() if err != nil { return fmt.Errorf("error listing pools: %w", err) } @@ -1393,7 +1426,7 @@ func (r *basePoolManager) ensureMinIdleRunners() error { } func (r *basePoolManager) deleteInstanceFromProvider(ctx context.Context, instance params.Instance) error { - pool, err := r.helper.GetPoolByID(instance.PoolID) + pool, err := r.GetPoolByID(instance.PoolID) if err != nil { return errors.Wrap(err, "fetching pool") } @@ -1423,7 +1456,7 @@ func (r *basePoolManager) deleteInstanceFromProvider(ctx context.Context, instan } func (r *basePoolManager) deletePendingInstances() error { - instances, err := r.helper.FetchDbInstances() + instances, err := r.FetchDbInstances() if err != nil { return fmt.Errorf("failed to fetch instances from store: %w", err) } @@ -1511,7 +1544,7 @@ func (r *basePoolManager) deletePendingInstances() error { func (r *basePoolManager) addPendingInstances() error { // nolint:golangci-lint,godox // TODO: filter instances by status. - instances, err := r.helper.FetchDbInstances() + instances, err := r.FetchDbInstances() if err != nil { return fmt.Errorf("failed to fetch instances from store: %w", err) } @@ -2114,6 +2147,44 @@ func (r *basePoolManager) GithubURL() string { return "" } +func (r *basePoolManager) FetchDbInstances() ([]params.Instance, error) { + switch r.entity.EntityType { + case params.GithubEntityTypeRepository: + return r.store.ListRepoInstances(r.ctx, r.entity.ID) + case params.GithubEntityTypeOrganization: + return r.store.ListOrgInstances(r.ctx, r.entity.ID) + case params.GithubEntityTypeEnterprise: + return r.store.ListEnterpriseInstances(r.ctx, r.entity.ID) + } + return nil, fmt.Errorf("unknown entity type: %s", r.entity.EntityType) +} + +func (r *basePoolManager) ListPools() ([]params.Pool, error) { + switch r.entity.EntityType { + case params.GithubEntityTypeRepository: + return r.store.ListRepoPools(r.ctx, r.entity.ID) + case params.GithubEntityTypeOrganization: + return r.store.ListOrgPools(r.ctx, r.entity.ID) + case params.GithubEntityTypeEnterprise: + return r.store.ListEnterprisePools(r.ctx, r.entity.ID) + default: + return nil, fmt.Errorf("unknown entity type: %s", r.entity.EntityType) + } +} + +func (r *basePoolManager) GetPoolByID(poolID string) (params.Pool, error) { + switch r.entity.EntityType { + case params.GithubEntityTypeRepository: + return r.store.GetRepositoryPool(r.ctx, r.entity.ID, poolID) + case params.GithubEntityTypeOrganization: + return r.store.GetOrganizationPool(r.ctx, r.entity.ID, poolID) + case params.GithubEntityTypeEnterprise: + return r.store.GetEnterprisePool(r.ctx, r.entity.ID, poolID) + default: + return params.Pool{}, fmt.Errorf("unknown entity type: %s", r.entity.EntityType) + } +} + func (r *basePoolManager) UninstallWebhook(ctx context.Context) error { if r.urls.controllerWebhookURL == "" { return errors.Wrap(runnerErrors.ErrBadRequest, "controller webhook url is empty") diff --git a/runner/pool/repository.go b/runner/pool/repository.go deleted file mode 100644 index f21c3d55..00000000 --- a/runner/pool/repository.go +++ /dev/null @@ -1,107 +0,0 @@ -// Copyright 2022 Cloudbase Solutions SRL -// -// Licensed under the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. You may obtain -// a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -// License for the specific language governing permissions and limitations -// under the License. - -package pool - -import ( - "context" - "fmt" - "log/slog" - "sync" - - "github.com/pkg/errors" - - dbCommon "github.com/cloudbase/garm/database/common" - "github.com/cloudbase/garm/params" - "github.com/cloudbase/garm/runner/common" - "github.com/cloudbase/garm/util" -) - -// test that we implement PoolManager -var _ poolHelper = &repository{} - -func NewRepositoryPoolManager(ctx context.Context, cfg params.Repository, cfgInternal params.Internal, providers map[string]common.Provider, store dbCommon.Store) (common.PoolManager, error) { - ctx = util.WithContext(ctx, slog.Any("pool_mgr", fmt.Sprintf("%s/%s", cfg.Owner, cfg.Name)), slog.Any("pool_type", params.GithubEntityTypeRepository)) - entity := params.GithubEntity{ - Name: cfg.Name, - Owner: cfg.Owner, - ID: cfg.ID, - WebhookSecret: cfg.WebhookSecret, - EntityType: params.GithubEntityTypeRepository, - } - ghc, err := util.GithubClient(ctx, entity, cfgInternal.GithubCredentialsDetails) - if err != nil { - return nil, errors.Wrap(err, "getting github client") - } - - wg := &sync.WaitGroup{} - keyMuxes := &keyMutex{} - - helper := &repository{ - ctx: ctx, - id: cfg.ID, - store: store, - } - - repo := &basePoolManager{ - ctx: ctx, - cfgInternal: cfgInternal, - entity: entity, - ghcli: ghc, - - store: store, - providers: providers, - controllerID: cfgInternal.ControllerID, - urls: urls{ - webhookURL: cfgInternal.BaseWebhookURL, - callbackURL: cfgInternal.InstanceCallbackURL, - metadataURL: cfgInternal.InstanceMetadataURL, - controllerWebhookURL: cfgInternal.ControllerWebhookURL, - }, - quit: make(chan struct{}), - helper: helper, - credsDetails: cfgInternal.GithubCredentialsDetails, - wg: wg, - keyMux: keyMuxes, - } - return repo, nil -} - -var _ poolHelper = &repository{} - -type repository struct { - ctx context.Context - id string - store dbCommon.Store -} - -func (r *repository) FetchDbInstances() ([]params.Instance, error) { - return r.store.ListRepoInstances(r.ctx, r.id) -} - -func (r *repository) ListPools() ([]params.Pool, error) { - pools, err := r.store.ListRepoPools(r.ctx, r.id) - if err != nil { - return nil, errors.Wrap(err, "fetching pools") - } - return pools, nil -} - -func (r *repository) GetPoolByID(poolID string) (params.Pool, error) { - pool, err := r.store.GetRepositoryPool(r.ctx, r.id, poolID) - if err != nil { - return params.Pool{}, errors.Wrap(err, "fetching pool") - } - return pool, nil -} diff --git a/runner/runner.go b/runner/runner.go index df31ffa7..7eab27f9 100644 --- a/runner/runner.go +++ b/runner/runner.go @@ -109,7 +109,14 @@ func (p *poolManagerCtrl) CreateRepoPoolManager(ctx context.Context, repo params if err != nil { return nil, errors.Wrap(err, "fetching internal config") } - poolManager, err := pool.NewRepositoryPoolManager(ctx, repo, cfgInternal, providers, store) + entity := params.GithubEntity{ + Owner: repo.Owner, + Name: repo.Name, + ID: repo.ID, + WebhookSecret: repo.WebhookSecret, + EntityType: params.GithubEntityTypeRepository, + } + poolManager, err := pool.NewEntityPoolManager(ctx, entity, cfgInternal, providers, store) if err != nil { return nil, errors.Wrap(err, "creating repo pool manager") } @@ -175,7 +182,13 @@ func (p *poolManagerCtrl) CreateOrgPoolManager(ctx context.Context, org params.O if err != nil { return nil, errors.Wrap(err, "fetching internal config") } - poolManager, err := pool.NewOrganizationPoolManager(ctx, org, cfgInternal, providers, store) + entity := params.GithubEntity{ + Owner: org.Name, + ID: org.ID, + WebhookSecret: org.WebhookSecret, + EntityType: params.GithubEntityTypeOrganization, + } + poolManager, err := pool.NewEntityPoolManager(ctx, entity, cfgInternal, providers, store) if err != nil { return nil, errors.Wrap(err, "creating org pool manager") } @@ -241,7 +254,14 @@ func (p *poolManagerCtrl) CreateEnterprisePoolManager(ctx context.Context, enter if err != nil { return nil, errors.Wrap(err, "fetching internal config") } - poolManager, err := pool.NewEnterprisePoolManager(ctx, enterprise, cfgInternal, providers, store) + + entity := params.GithubEntity{ + Owner: enterprise.Name, + ID: enterprise.ID, + WebhookSecret: enterprise.WebhookSecret, + EntityType: params.GithubEntityTypeEnterprise, + } + poolManager, err := pool.NewEntityPoolManager(ctx, entity, cfgInternal, providers, store) if err != nil { return nil, errors.Wrap(err, "creating enterprise pool manager") }