Refactor code to allow more unit testing

In order to allow mocking for some of the `runner` functions, we created a
separate interface (called `PoolManagerController`) with `Create`, `Get`,
`Delete` operations for the `organization` / `repository` pool managers.

Furthermore, a new runner struct (`poolManagerCtrl`) implements this new
interface. The existing code is refactored to use the `poolManagerCtrl`
whenever the pool managers for `org` / `repo` are handled.

This allows more unit testing for the runner functions since `poolManagerCtrl`
field can be mocked now.

Besides this, there are some typos fixed as well.
This commit is contained in:
Ionut Balutoiu 2022-08-10 12:15:12 +03:00 committed by mihaelabalutoiu
parent ad97baac70
commit 7b6c2e6106
8 changed files with 305 additions and 191 deletions

View file

@ -150,8 +150,7 @@ type Repository struct {
Pools []Pool `json:"pool,omitempty"`
CredentialsName string `json:"credentials_name"`
// Do not serialize sensitive info.
WebhookSecret string `json:"-"`
Internal Internal `json:"-"`
WebhookSecret string `json:"-"`
}
type Organization struct {
@ -160,8 +159,7 @@ type Organization struct {
Pools []Pool `json:"pool,omitempty"`
CredentialsName string `json:"credentials_name"`
// Do not serialize sensitive info.
WebhookSecret string `json:"-"`
Internal Internal `json:"-"`
WebhookSecret string `json:"-"`
}
// Users holds information about a particular user
@ -200,5 +198,4 @@ type Provider struct {
type UpdatePoolStateParams struct {
WebhookSecret string
Internal Internal
}

View file

@ -57,7 +57,7 @@ type CreateOrgParams struct {
func (c *CreateOrgParams) Validate() error {
if c.Name == "" {
return errors.NewBadRequestError("missing repo name")
return errors.NewBadRequestError("missing org name")
}
if c.CredentialsName == "" {

34
runner/interfaces.go Normal file
View file

@ -0,0 +1,34 @@
// Copyright 2022 Cloudbase Solutions SRL
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License. You may obtain
// a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations
// under the License.
package runner
import (
"context"
dbCommon "garm/database/common"
"garm/params"
"garm/runner/common"
)
type PoolManagerController interface {
CreateRepoPoolManager(ctx context.Context, repo params.Repository, providers map[string]common.Provider, store dbCommon.Store) (common.PoolManager, error)
GetRepoPoolManager(repo params.Repository) (common.PoolManager, error)
DeleteRepoPoolManager(repo params.Repository) error
GetRepoPoolManagers() (map[string]common.PoolManager, error)
CreateOrgPoolManager(ctx context.Context, org params.Organization, providers map[string]common.Provider, store dbCommon.Store) (common.PoolManager, error)
GetOrgPoolManager(org params.Organization) (common.PoolManager, error)
DeleteOrgPoolManager(org params.Organization) error
GetOrgPoolManagers() (map[string]common.PoolManager, error)
}

View file

@ -24,7 +24,6 @@ import (
runnerErrors "garm/errors"
"garm/params"
"garm/runner/common"
"garm/runner/pool"
"github.com/pkg/errors"
)
@ -46,7 +45,7 @@ func (r *Runner) CreateOrganization(ctx context.Context, param params.CreateOrgP
_, err = r.store.GetOrganization(ctx, param.Name)
if err != nil {
if !errors.Is(err, runnerErrors.ErrNotFound) {
return params.Organization{}, errors.Wrap(err, "fetching repo")
return params.Organization{}, errors.Wrap(err, "fetching org")
}
} else {
return params.Organization{}, runnerErrors.NewConflictError("organization %s already exists", param.Name)
@ -63,11 +62,16 @@ func (r *Runner) CreateOrganization(ctx context.Context, param params.CreateOrgP
}
}()
poolMgr, err := r.loadOrgPoolManager(org)
if err := poolMgr.Start(); err != nil {
return params.Organization{}, errors.Wrap(err, "starting pool manager")
poolMgr, err := r.poolManagerCtrl.CreateOrgPoolManager(r.ctx, org, r.providers, r.store)
if err != nil {
return params.Organization{}, errors.Wrap(err, "creating org pool manager")
}
if err := poolMgr.Start(); err != nil {
if deleteErr := r.poolManagerCtrl.DeleteOrgPoolManager(org); deleteErr != nil {
log.Printf("failed to cleanup pool manager for org %s", org.ID)
}
return params.Organization{}, errors.Wrap(err, "starting org pool manager")
}
r.organizations[org.ID] = poolMgr
return org, nil
}
@ -91,7 +95,7 @@ func (r *Runner) GetOrganizationByID(ctx context.Context, orgID string) (params.
org, err := r.store.GetOrganizationByID(ctx, orgID)
if err != nil {
return params.Organization{}, errors.Wrap(err, "fetching repository")
return params.Organization{}, errors.Wrap(err, "fetching organization")
}
return org, nil
}
@ -103,20 +107,12 @@ func (r *Runner) DeleteOrganization(ctx context.Context, orgID string) error {
org, err := r.store.GetOrganizationByID(ctx, orgID)
if err != nil {
return errors.Wrap(err, "fetching repo")
}
poolMgr, ok := r.organizations[org.ID]
if ok {
if err := poolMgr.Stop(); err != nil {
log.Printf("failed to stop pool for repo %s", org.ID)
}
delete(r.organizations, orgID)
return errors.Wrap(err, "fetching org")
}
pools, err := r.store.ListOrgPools(ctx, orgID)
if err != nil {
return errors.Wrap(err, "fetching repo pools")
return errors.Wrap(err, "fetching org pools")
}
if len(pools) > 0 {
@ -125,7 +121,11 @@ func (r *Runner) DeleteOrganization(ctx context.Context, orgID string) error {
poolIds = append(poolIds, pool.ID)
}
return runnerErrors.NewBadRequestError("repo has pools defined (%s)", strings.Join(poolIds, ", "))
return runnerErrors.NewBadRequestError("org has pools defined (%s)", strings.Join(poolIds, ", "))
}
if err := r.poolManagerCtrl.DeleteOrgPoolManager(org); err != nil {
return errors.Wrap(err, "deleting org pool manager")
}
if err := r.store.DeleteOrganization(ctx, orgID); err != nil {
@ -159,27 +159,19 @@ func (r *Runner) UpdateOrganization(ctx context.Context, orgID string, param par
return params.Organization{}, errors.Wrap(err, "updating org")
}
poolMgr, ok := r.organizations[org.ID]
if ok {
internalCfg, err := r.getInternalConfig(org.CredentialsName)
if err != nil {
return params.Organization{}, errors.Wrap(err, "fetching internal config")
}
poolMgr, err := r.poolManagerCtrl.GetOrgPoolManager(org)
if err != nil {
newState := params.UpdatePoolStateParams{
WebhookSecret: org.WebhookSecret,
Internal: internalCfg,
}
org.Internal = internalCfg
// stop the pool mgr
if err := poolMgr.RefreshState(newState); err != nil {
return params.Organization{}, errors.Wrap(err, "updating pool manager")
return params.Organization{}, errors.Wrap(err, "updating org pool manager")
}
} else {
poolMgr, err := r.loadOrgPoolManager(org)
if err != nil {
return params.Organization{}, errors.Wrap(err, "loading pool manager")
if _, err := r.poolManagerCtrl.CreateOrgPoolManager(r.ctx, org, r.providers, r.store); err != nil {
return params.Organization{}, errors.Wrap(err, "creating org pool manager")
}
r.organizations[org.ID] = poolMgr
}
return org, nil
@ -193,8 +185,12 @@ func (r *Runner) CreateOrgPool(ctx context.Context, orgID string, param params.C
r.mux.Lock()
defer r.mux.Unlock()
_, ok := r.organizations[orgID]
if !ok {
org, err := r.store.GetOrganizationByID(ctx, orgID)
if err != nil {
return params.Pool{}, errors.Wrap(err, "fetching org")
}
if _, err := r.poolManagerCtrl.GetOrgPoolManager(org); err != nil {
return params.Pool{}, runnerErrors.ErrNotFound
}
@ -313,30 +309,18 @@ func (r *Runner) ListOrgInstances(ctx context.Context, orgID string) ([]params.I
return instances, nil
}
func (r *Runner) loadOrgPoolManager(org params.Organization) (common.PoolManager, error) {
cfg, err := r.getInternalConfig(org.CredentialsName)
if err != nil {
return nil, errors.Wrap(err, "fetching internal config")
}
org.Internal = cfg
poolManager, err := pool.NewOrganizationPoolManager(r.ctx, org, r.providers, r.store)
if err != nil {
return nil, errors.Wrap(err, "creating pool manager")
}
return poolManager, nil
}
func (r *Runner) findOrgPoolManager(name string) (common.PoolManager, error) {
r.mux.Lock()
defer r.mux.Unlock()
org, err := r.store.GetOrganization(r.ctx, name)
if err != nil {
return nil, errors.Wrap(err, "fetching repo")
return nil, errors.Wrap(err, "fetching org")
}
if orgPoolMgr, ok := r.organizations[org.ID]; ok {
return orgPoolMgr, nil
poolManager, err := r.poolManagerCtrl.GetOrgPoolManager(org)
if err != nil {
return nil, errors.Wrap(err, "fetching pool manager for org")
}
return nil, errors.Wrapf(runnerErrors.ErrNotFound, "organization %s not configured", name)
return poolManager, nil
}

View file

@ -33,25 +33,26 @@ import (
// test that we implement PoolManager
var _ poolHelper = &organization{}
func NewOrganizationPoolManager(ctx context.Context, cfg params.Organization, providers map[string]common.Provider, store dbCommon.Store) (common.PoolManager, error) {
ghc, err := util.GithubClient(ctx, cfg.Internal.OAuth2Token)
func NewOrganizationPoolManager(ctx context.Context, cfg params.Organization, cfgInternal params.Internal, providers map[string]common.Provider, store dbCommon.Store) (common.PoolManager, error) {
ghc, err := util.GithubClient(ctx, cfgInternal.OAuth2Token)
if err != nil {
return nil, errors.Wrap(err, "getting github client")
}
helper := &organization{
cfg: cfg,
ctx: ctx,
ghcli: ghc,
id: cfg.ID,
store: store,
cfg: cfg,
cfgInternal: cfgInternal,
ctx: ctx,
ghcli: ghc,
id: cfg.ID,
store: store,
}
repo := &basePool{
ctx: ctx,
store: store,
providers: providers,
controllerID: cfg.Internal.ControllerID,
controllerID: cfgInternal.ControllerID,
quit: make(chan struct{}),
done: make(chan struct{}),
helper: helper,
@ -60,11 +61,12 @@ func NewOrganizationPoolManager(ctx context.Context, cfg params.Organization, pr
}
type organization struct {
cfg params.Organization
ctx context.Context
ghcli common.GithubClient
id string
store dbCommon.Store
cfg params.Organization
cfgInternal params.Internal
ctx context.Context
ghcli common.GithubClient
id string
store dbCommon.Store
mux sync.Mutex
}
@ -74,7 +76,6 @@ func (r *organization) UpdateState(param params.UpdatePoolStateParams) error {
defer r.mux.Unlock()
r.cfg.WebhookSecret = param.WebhookSecret
r.cfg.Internal = param.Internal
ghc, err := util.GithubClient(r.ctx, r.GetGithubToken())
if err != nil {
@ -85,7 +86,7 @@ func (r *organization) UpdateState(param params.UpdatePoolStateParams) error {
}
func (r *organization) GetGithubToken() string {
return r.cfg.Internal.OAuth2Token
return r.cfgInternal.OAuth2Token
}
func (r *organization) GetGithubRunners() ([]*github.Runner, error) {
@ -129,7 +130,7 @@ func (r *organization) GithubURL() string {
}
func (r *organization) JwtToken() string {
return r.cfg.Internal.JWTSecret
return r.cfgInternal.JWTSecret
}
func (r *organization) GetGithubRegistrationToken() (string, error) {
@ -150,7 +151,7 @@ func (r *organization) WebhookSecret() string {
}
func (r *organization) GetCallbackURL() string {
return r.cfg.Internal.InstanceCallbackURL
return r.cfgInternal.InstanceCallbackURL
}
func (r *organization) FindPoolByTags(labels []string) (params.Pool, error) {

View file

@ -33,25 +33,26 @@ import (
// test that we implement PoolManager
var _ poolHelper = &repository{}
func NewRepositoryPoolManager(ctx context.Context, cfg params.Repository, providers map[string]common.Provider, store dbCommon.Store) (common.PoolManager, error) {
ghc, err := util.GithubClient(ctx, cfg.Internal.OAuth2Token)
func NewRepositoryPoolManager(ctx context.Context, cfg params.Repository, cfgInternal params.Internal, providers map[string]common.Provider, store dbCommon.Store) (common.PoolManager, error) {
ghc, err := util.GithubClient(ctx, cfgInternal.OAuth2Token)
if err != nil {
return nil, errors.Wrap(err, "getting github client")
}
helper := &repository{
cfg: cfg,
ctx: ctx,
ghcli: ghc,
id: cfg.ID,
store: store,
cfg: cfg,
cfgInternal: cfgInternal,
ctx: ctx,
ghcli: ghc,
id: cfg.ID,
store: store,
}
repo := &basePool{
ctx: ctx,
store: store,
providers: providers,
controllerID: cfg.Internal.ControllerID,
controllerID: cfgInternal.ControllerID,
quit: make(chan struct{}),
done: make(chan struct{}),
helper: helper,
@ -62,11 +63,12 @@ func NewRepositoryPoolManager(ctx context.Context, cfg params.Repository, provid
var _ poolHelper = &repository{}
type repository struct {
cfg params.Repository
ctx context.Context
ghcli common.GithubClient
id string
store dbCommon.Store
cfg params.Repository
cfgInternal params.Internal
ctx context.Context
ghcli common.GithubClient
id string
store dbCommon.Store
mux sync.Mutex
}
@ -76,7 +78,6 @@ func (r *repository) UpdateState(param params.UpdatePoolStateParams) error {
defer r.mux.Unlock()
r.cfg.WebhookSecret = param.WebhookSecret
r.cfg.Internal = param.Internal
ghc, err := util.GithubClient(r.ctx, r.GetGithubToken())
if err != nil {
@ -87,7 +88,7 @@ func (r *repository) UpdateState(param params.UpdatePoolStateParams) error {
}
func (r *repository) GetGithubToken() string {
return r.cfg.Internal.OAuth2Token
return r.cfgInternal.OAuth2Token
}
func (r *repository) GetGithubRunners() ([]*github.Runner, error) {
@ -131,7 +132,7 @@ func (r *repository) GithubURL() string {
}
func (r *repository) JwtToken() string {
return r.cfg.Internal.JWTSecret
return r.cfgInternal.JWTSecret
}
func (r *repository) GetGithubRegistrationToken() (string, error) {
@ -152,7 +153,7 @@ func (r *repository) WebhookSecret() string {
}
func (r *repository) GetCallbackURL() string {
return r.cfg.Internal.InstanceCallbackURL
return r.cfgInternal.InstanceCallbackURL
}
func (r *repository) FindPoolByTags(labels []string) (params.Pool, error) {

View file

@ -24,7 +24,6 @@ import (
runnerErrors "garm/errors"
"garm/params"
"garm/runner/common"
"garm/runner/pool"
"github.com/pkg/errors"
)
@ -63,11 +62,16 @@ func (r *Runner) CreateRepository(ctx context.Context, param params.CreateRepoPa
}
}()
poolMgr, err := r.loadRepoPoolManager(repo)
if err := poolMgr.Start(); err != nil {
return params.Repository{}, errors.Wrap(err, "starting pool manager")
poolMgr, err := r.poolManagerCtrl.CreateRepoPoolManager(r.ctx, repo, r.providers, r.store)
if err != nil {
return params.Repository{}, errors.Wrap(err, "creating repo pool manager")
}
if err := poolMgr.Start(); err != nil {
if deleteErr := r.poolManagerCtrl.DeleteRepoPoolManager(repo); deleteErr != nil {
log.Printf("failed to cleanup pool manager for repo %s", repo.ID)
}
return params.Repository{}, errors.Wrap(err, "starting repo pool manager")
}
r.repositories[repo.ID] = poolMgr
return repo, nil
}
@ -106,14 +110,6 @@ func (r *Runner) DeleteRepository(ctx context.Context, repoID string) error {
return errors.Wrap(err, "fetching repo")
}
poolMgr, ok := r.repositories[repo.ID]
if ok {
if err := poolMgr.Stop(); err != nil {
log.Printf("failed to stop pool for repo %s", repo.ID)
}
delete(r.repositories, repoID)
}
pools, err := r.store.ListRepoPools(ctx, repoID)
if err != nil {
return errors.Wrap(err, "fetching repo pools")
@ -128,6 +124,10 @@ func (r *Runner) DeleteRepository(ctx context.Context, repoID string) error {
return runnerErrors.NewBadRequestError("repo has pools defined (%s)", strings.Join(poolIds, ", "))
}
if err := r.poolManagerCtrl.DeleteRepoPoolManager(repo); err != nil {
return errors.Wrap(err, "deleting repo pool manager")
}
if err := r.store.DeleteRepository(ctx, repoID); err != nil {
return errors.Wrap(err, "removing repository")
}
@ -159,27 +159,19 @@ func (r *Runner) UpdateRepository(ctx context.Context, repoID string, param para
return params.Repository{}, errors.Wrap(err, "updating repo")
}
poolMgr, ok := r.repositories[repo.ID]
if ok {
internalCfg, err := r.getInternalConfig(repo.CredentialsName)
if err != nil {
return params.Repository{}, errors.Wrap(err, "fetching internal config")
}
poolMgr, err := r.poolManagerCtrl.GetRepoPoolManager(repo)
if err != nil {
newState := params.UpdatePoolStateParams{
WebhookSecret: repo.WebhookSecret,
Internal: internalCfg,
}
repo.Internal = internalCfg
// stop the pool mgr
if err := poolMgr.RefreshState(newState); err != nil {
return params.Repository{}, errors.Wrap(err, "updating pool manager")
return params.Repository{}, errors.Wrap(err, "updating repo pool manager")
}
} else {
poolMgr, err := r.loadRepoPoolManager(repo)
if err != nil {
return params.Repository{}, errors.Wrap(err, "loading pool manager")
if _, err := r.poolManagerCtrl.CreateRepoPoolManager(r.ctx, repo, r.providers, r.store); err != nil {
return params.Repository{}, errors.Wrap(err, "creating repo pool manager")
}
r.repositories[repo.ID] = poolMgr
}
return repo, nil
@ -193,8 +185,12 @@ func (r *Runner) CreateRepoPool(ctx context.Context, repoID string, param params
r.mux.Lock()
defer r.mux.Unlock()
_, ok := r.repositories[repoID]
if !ok {
repo, err := r.store.GetRepositoryByID(ctx, repoID)
if err != nil {
return params.Pool{}, errors.Wrap(err, "fetching repo")
}
if _, err := r.poolManagerCtrl.GetRepoPoolManager(repo); err != nil {
return params.Pool{}, runnerErrors.ErrNotFound
}
@ -324,19 +320,6 @@ func (r *Runner) ListRepoInstances(ctx context.Context, repoID string) ([]params
return instances, nil
}
func (r *Runner) loadRepoPoolManager(repo params.Repository) (common.PoolManager, error) {
cfg, err := r.getInternalConfig(repo.CredentialsName)
if err != nil {
return nil, errors.Wrap(err, "fetching internal config")
}
repo.Internal = cfg
poolManager, err := pool.NewRepositoryPoolManager(r.ctx, repo, r.providers, r.store)
if err != nil {
return nil, errors.Wrap(err, "creating pool manager")
}
return poolManager, nil
}
func (r *Runner) findRepoPoolManager(owner, name string) (common.PoolManager, error) {
r.mux.Lock()
defer r.mux.Unlock()
@ -346,8 +329,9 @@ func (r *Runner) findRepoPoolManager(owner, name string) (common.PoolManager, er
return nil, errors.Wrap(err, "fetching repo")
}
if repo, ok := r.repositories[repo.ID]; ok {
return repo, nil
poolManager, err := r.poolManagerCtrl.GetRepoPoolManager(repo)
if err != nil {
return nil, errors.Wrap(err, "fetching pool manager for repo")
}
return nil, errors.Wrapf(runnerErrors.ErrNotFound, "repository %s/%s not configured", owner, name)
return poolManager, nil
}

View file

@ -37,6 +37,7 @@ import (
runnerErrors "garm/errors"
"garm/params"
"garm/runner/common"
"garm/runner/pool"
"garm/runner/providers"
providerCommon "garm/runner/providers/common"
"garm/util"
@ -66,15 +67,21 @@ func NewRunner(ctx context.Context, cfg config.Config) (*Runner, error) {
for _, ghcreds := range cfg.Github {
creds[ghcreds.Name] = ghcreds
}
runner := &Runner{
ctx: ctx,
poolManagerCtrl := &poolManagerCtrl{
controllerID: ctrlId.ControllerID.String(),
config: cfg,
store: db,
credentials: creds,
repositories: map[string]common.PoolManager{},
organizations: map[string]common.PoolManager{},
providers: providers,
controllerID: ctrlId.ControllerID.String(),
credentials: creds,
}
runner := &Runner{
ctx: ctx,
config: cfg,
store: db,
poolManagerCtrl: poolManagerCtrl,
providers: providers,
credentials: creds,
}
if err := runner.loadReposAndOrgs(); err != nil {
@ -84,18 +91,124 @@ func NewRunner(ctx context.Context, cfg config.Config) (*Runner, error) {
return runner, nil
}
type Runner struct {
type poolManagerCtrl struct {
mux sync.Mutex
config config.Config
controllerID string
ctx context.Context
store dbCommon.Store
config config.Config
credentials map[string]config.Github
repositories map[string]common.PoolManager
organizations map[string]common.PoolManager
providers map[string]common.Provider
credentials map[string]config.Github
}
func (p *poolManagerCtrl) CreateRepoPoolManager(ctx context.Context, repo params.Repository, providers map[string]common.Provider, store dbCommon.Store) (common.PoolManager, error) {
p.mux.Lock()
defer p.mux.Unlock()
cfgInternal, err := p.getInternalConfig(repo.CredentialsName)
if err != nil {
return nil, errors.Wrap(err, "fetching internal config")
}
poolManager, err := pool.NewRepositoryPoolManager(ctx, repo, cfgInternal, providers, store)
if err != nil {
return nil, errors.Wrap(err, "creating repo pool manager")
}
p.repositories[repo.ID] = poolManager
return poolManager, nil
}
func (p *poolManagerCtrl) GetRepoPoolManager(repo params.Repository) (common.PoolManager, error) {
if repoPoolMgr, ok := p.repositories[repo.ID]; ok {
return repoPoolMgr, nil
}
return nil, errors.Wrapf(runnerErrors.ErrNotFound, "repository %s/%s pool manager not loaded", repo.Owner, repo.Name)
}
func (p *poolManagerCtrl) DeleteRepoPoolManager(repo params.Repository) error {
p.mux.Lock()
defer p.mux.Unlock()
poolMgr, ok := p.repositories[repo.ID]
if ok {
if err := poolMgr.Stop(); err != nil {
return errors.Wrap(err, "stopping repo pool manager")
}
delete(p.repositories, repo.ID)
}
return nil
}
func (p *poolManagerCtrl) GetRepoPoolManagers() (map[string]common.PoolManager, error) {
return p.repositories, nil
}
func (p *poolManagerCtrl) CreateOrgPoolManager(ctx context.Context, org params.Organization, providers map[string]common.Provider, store dbCommon.Store) (common.PoolManager, error) {
p.mux.Lock()
defer p.mux.Unlock()
cfgInternal, err := p.getInternalConfig(org.CredentialsName)
if err != nil {
return nil, errors.Wrap(err, "fetching internal config")
}
poolManager, err := pool.NewOrganizationPoolManager(ctx, org, cfgInternal, providers, store)
if err != nil {
return nil, errors.Wrap(err, "creating org pool manager")
}
p.organizations[org.ID] = poolManager
return poolManager, nil
}
func (p *poolManagerCtrl) GetOrgPoolManager(org params.Organization) (common.PoolManager, error) {
if orgPoolMgr, ok := p.organizations[org.ID]; ok {
return orgPoolMgr, nil
}
return nil, errors.Wrapf(runnerErrors.ErrNotFound, "organization %s pool manager not loaded", org.Name)
}
func (p *poolManagerCtrl) DeleteOrgPoolManager(org params.Organization) error {
p.mux.Lock()
defer p.mux.Unlock()
poolMgr, ok := p.organizations[org.ID]
if ok {
if err := poolMgr.Stop(); err != nil {
return errors.Wrap(err, "stopping org pool manager")
}
delete(p.organizations, org.ID)
}
return nil
}
func (p *poolManagerCtrl) GetOrgPoolManagers() (map[string]common.PoolManager, error) {
return p.organizations, nil
}
func (p *poolManagerCtrl) getInternalConfig(credsName string) (params.Internal, error) {
creds, ok := p.credentials[credsName]
if !ok {
return params.Internal{}, runnerErrors.NewBadRequestError("invalid credential name (%s)", credsName)
}
return params.Internal{
OAuth2Token: creds.OAuth2Token,
ControllerID: p.controllerID,
InstanceCallbackURL: p.config.Default.CallbackURL,
JWTSecret: p.config.JWTAuth.Secret,
}, nil
}
type Runner struct {
mux sync.Mutex
config config.Config
ctx context.Context
store dbCommon.Store
poolManagerCtrl PoolManagerController
providers map[string]common.Provider
credentials map[string]config.Github
}
func (r *Runner) ListCredentials(ctx context.Context) ([]params.GithubCredentials, error) {
@ -119,20 +232,6 @@ func (r *Runner) ListProviders(ctx context.Context) ([]params.Provider, error) {
return ret, nil
}
func (r *Runner) getInternalConfig(credsName string) (params.Internal, error) {
creds, ok := r.credentials[credsName]
if !ok {
return params.Internal{}, runnerErrors.NewBadRequestError("invalid credential name (%s)", credsName)
}
return params.Internal{
OAuth2Token: creds.OAuth2Token,
ControllerID: r.controllerID,
InstanceCallbackURL: r.config.Default.CallbackURL,
JWTSecret: r.config.JWTAuth.Secret,
}, nil
}
func (r *Runner) loadReposAndOrgs() error {
r.mux.Lock()
defer r.mux.Unlock()
@ -144,48 +243,36 @@ func (r *Runner) loadReposAndOrgs() error {
orgs, err := r.store.ListOrganizations(r.ctx)
if err != nil {
return errors.Wrap(err, "fetching repositories")
return errors.Wrap(err, "fetching organizations")
}
expectedReplies := len(repos) + len(orgs)
repoPoolMgrChan := make(chan common.PoolManager, len(repos))
orgPoolMgrChan := make(chan common.PoolManager, len(orgs))
errChan := make(chan error, expectedReplies)
for _, repo := range repos {
go func(repo params.Repository) {
log.Printf("creating pool manager for %s/%s", repo.Owner, repo.Name)
poolManager, err := r.loadRepoPoolManager(repo)
if err != nil {
errChan <- err
return
}
repoPoolMgrChan <- poolManager
log.Printf("creating pool manager for repo %s/%s", repo.Owner, repo.Name)
_, err := r.poolManagerCtrl.CreateRepoPoolManager(r.ctx, repo, r.providers, r.store)
errChan <- err
}(repo)
}
for _, org := range orgs {
go func(org params.Organization) {
log.Printf("creating pool manager for organization %s", org.Name)
poolManager, err := r.loadOrgPoolManager(org)
if err != nil {
errChan <- err
return
}
orgPoolMgrChan <- poolManager
_, err := r.poolManagerCtrl.CreateOrgPoolManager(r.ctx, org, r.providers, r.store)
errChan <- err
}(org)
}
for i := 0; i < expectedReplies; i++ {
select {
case repoPool := <-repoPoolMgrChan:
r.repositories[repoPool.ID()] = repoPool
case orgPool := <-orgPoolMgrChan:
r.organizations[orgPool.ID()] = orgPool
case err := <-errChan:
return errors.Wrap(err, "failed to load repos and pools")
if err != nil {
return errors.Wrap(err, "failed to load pool managers for repos and orgs")
}
case <-time.After(60 * time.Second):
return fmt.Errorf("timed out waiting for pool mamager load")
return fmt.Errorf("timed out waiting for pool manager load")
}
}
@ -196,10 +283,20 @@ func (r *Runner) Start() error {
r.mux.Lock()
defer r.mux.Unlock()
expectedReplies := len(r.repositories) + len(r.organizations)
repositories, err := r.poolManagerCtrl.GetRepoPoolManagers()
if err != nil {
return errors.Wrap(err, "fetch repo pool managers")
}
organizations, err := r.poolManagerCtrl.GetOrgPoolManagers()
if err != nil {
return errors.Wrap(err, "fetch org pool managers")
}
expectedReplies := len(repositories) + len(organizations)
errChan := make(chan error, expectedReplies)
for _, repo := range r.repositories {
for _, repo := range repositories {
go func(repo common.PoolManager) {
err := repo.Start()
errChan <- err
@ -207,7 +304,7 @@ func (r *Runner) Start() error {
}(repo)
}
for _, org := range r.organizations {
for _, org := range organizations {
go func(org common.PoolManager) {
err := org.Start()
errChan <- err
@ -232,13 +329,21 @@ func (r *Runner) Stop() error {
r.mux.Lock()
defer r.mux.Unlock()
for _, repo := range r.repositories {
repos, err := r.poolManagerCtrl.GetRepoPoolManagers()
if err != nil {
return errors.Wrap(err, "fetch repo pool managers")
}
for _, repo := range repos {
if err := repo.Stop(); err != nil {
return errors.Wrap(err, "stopping repo pool manager")
}
}
for _, org := range r.organizations {
orgs, err := r.poolManagerCtrl.GetOrgPoolManagers()
if err != nil {
return errors.Wrap(err, "fetch org pool managers")
}
for _, org := range orgs {
if err := org.Stop(); err != nil {
return errors.Wrap(err, "stopping org pool manager")
}
@ -252,7 +357,11 @@ func (r *Runner) Wait() error {
var wg sync.WaitGroup
for poolId, repo := range r.repositories {
repos, err := r.poolManagerCtrl.GetRepoPoolManagers()
if err != nil {
return errors.Wrap(err, "fetch repo pool managers")
}
for poolId, repo := range repos {
wg.Add(1)
go func(id string, poolMgr common.PoolManager) {
defer wg.Done()
@ -262,7 +371,11 @@ func (r *Runner) Wait() error {
}(poolId, repo)
}
for poolId, org := range r.organizations {
orgs, err := r.poolManagerCtrl.GetOrgPoolManagers()
if err != nil {
return errors.Wrap(err, "fetch org pool managers")
}
for poolId, org := range orgs {
wg.Add(1)
go func(id string, poolMgr common.PoolManager) {
defer wg.Done()
@ -320,7 +433,7 @@ func (r *Runner) validateHookBody(signature, secret string, body []byte) error {
}
func (r *Runner) DispatchWorkflowJob(hookTargetType, signature string, jobData []byte) error {
if jobData == nil || len(jobData) == 0 {
if len(jobData) == 0 {
return runnerErrors.NewBadRequestError("missing job data")
}
@ -480,7 +593,7 @@ func (r *Runner) ListAllInstances(ctx context.Context) ([]params.Instance, error
instances, err := r.store.ListAllInstances(ctx)
if err != nil {
return nil, errors.Wrap(err, "fetcing instances")
return nil, errors.Wrap(err, "fetching instances")
}
return instances, nil
}