Start pool managers in the background

Garm no longer fails on startup if a pool manager cannot be started. It
will attempt to start the pool manager in the background. If it fails
due to an unauthorized error, it will sleep for 3 hours. It is unlikely
it will work a second time if credentials are not updated in the config
and garm is restarted, so no point in getting rate limited.

Signed-off-by: Gabriel Adrian Samfira <gsamfira@cloudbasesolutions.com>
This commit is contained in:
Gabriel Adrian Samfira 2022-10-20 17:22:47 +03:00
parent 80452aac39
commit 05057e37fd
No known key found for this signature in database
GPG key ID: 7D073DCC2C074CB5
26 changed files with 408 additions and 180 deletions

View file

@ -46,7 +46,7 @@ func init() {
SilenceUsage: true,
RunE: func(cmd *cobra.Command, args []string) error {
if needsInit {
return needsInitError
return errNeedsInitError
}
creds, err := cli.ListCredentials()

View file

@ -50,7 +50,7 @@ var enterpriseAddCmd = &cobra.Command{
SilenceUsage: true,
RunE: func(cmd *cobra.Command, args []string) error {
if needsInit {
return needsInitError
return errNeedsInitError
}
newEnterpriseReq := params.CreateEnterpriseParams{
@ -75,7 +75,7 @@ var enterpriseListCmd = &cobra.Command{
SilenceUsage: true,
RunE: func(cmd *cobra.Command, args []string) error {
if needsInit {
return needsInitError
return errNeedsInitError
}
enterprises, err := cli.ListEnterprises()
@ -94,7 +94,7 @@ var enterpriseShowCmd = &cobra.Command{
SilenceUsage: true,
RunE: func(cmd *cobra.Command, args []string) error {
if needsInit {
return needsInitError
return errNeedsInitError
}
if len(args) == 0 {
return fmt.Errorf("requires a enterprise ID")
@ -119,7 +119,7 @@ var enterpriseDeleteCmd = &cobra.Command{
SilenceUsage: true,
RunE: func(cmd *cobra.Command, args []string) error {
if needsInit {
return needsInitError
return errNeedsInitError
}
if len(args) == 0 {
return fmt.Errorf("requires a enterprise ID")
@ -154,10 +154,10 @@ func init() {
func formatEnterprises(enterprises []params.Enterprise) {
t := table.NewWriter()
header := table.Row{"ID", "Name", "Credentials name"}
header := table.Row{"ID", "Name", "Credentials name", "Pool mgr running"}
t.AppendHeader(header)
for _, val := range enterprises {
t.AppendRow(table.Row{val.ID, val.Name, val.CredentialsName})
t.AppendRow(table.Row{val.ID, val.Name, val.CredentialsName, val.PoolManagerStatus.IsRunning})
t.AppendSeparator()
}
fmt.Println(t.Render())
@ -171,6 +171,10 @@ func formatOneEnterprise(enterprise params.Enterprise) {
t.AppendRow(table.Row{"ID", enterprise.ID})
t.AppendRow(table.Row{"Name", enterprise.Name})
t.AppendRow(table.Row{"Credentials", enterprise.CredentialsName})
t.AppendRow(table.Row{"Pool manager running", enterprise.PoolManagerStatus.IsRunning})
if !enterprise.PoolManagerStatus.IsRunning {
t.AppendRow(table.Row{"Failure reason", enterprise.PoolManagerStatus.FailureReason})
}
if len(enterprise.Pools) > 0 {
for _, pool := range enterprise.Pools {

View file

@ -37,7 +37,7 @@ var orgRunnerListCmd = &cobra.Command{
SilenceUsage: true,
RunE: func(cmd *cobra.Command, args []string) error {
if needsInit {
return needsInitError
return errNeedsInitError
}
if len(args) == 0 {

View file

@ -48,7 +48,7 @@ var orgPoolAddCmd = &cobra.Command{
SilenceUsage: true,
RunE: func(cmd *cobra.Command, args []string) error {
if needsInit {
return needsInitError
return errNeedsInitError
}
if len(args) == 0 {
@ -91,7 +91,7 @@ var orgPoolListCmd = &cobra.Command{
SilenceUsage: true,
RunE: func(cmd *cobra.Command, args []string) error {
if needsInit {
return needsInitError
return errNeedsInitError
}
if len(args) == 0 {
@ -117,7 +117,7 @@ var orgPoolShowCmd = &cobra.Command{
Long: `Displays detailed information about a single pool.`,
RunE: func(cmd *cobra.Command, args []string) error {
if needsInit {
return needsInitError
return errNeedsInitError
}
if len(args) < 2 || len(args) > 2 {
@ -142,7 +142,7 @@ var orgPoolDeleteCmd = &cobra.Command{
SilenceUsage: true,
RunE: func(cmd *cobra.Command, args []string) error {
if needsInit {
return needsInitError
return errNeedsInitError
}
if len(args) < 2 || len(args) > 2 {
return fmt.Errorf("command requires orgID and poolID")
@ -167,7 +167,7 @@ explicitly remove them using the runner delete command.
SilenceUsage: true,
RunE: func(cmd *cobra.Command, args []string) error {
if needsInit {
return needsInitError
return errNeedsInitError
}
if len(args) < 2 || len(args) > 2 {

View file

@ -50,7 +50,7 @@ var orgAddCmd = &cobra.Command{
SilenceUsage: true,
RunE: func(cmd *cobra.Command, args []string) error {
if needsInit {
return needsInitError
return errNeedsInitError
}
newOrgReq := params.CreateOrgParams{
@ -75,7 +75,7 @@ var orgListCmd = &cobra.Command{
SilenceUsage: true,
RunE: func(cmd *cobra.Command, args []string) error {
if needsInit {
return needsInitError
return errNeedsInitError
}
orgs, err := cli.ListOrganizations()
@ -94,7 +94,7 @@ var orgShowCmd = &cobra.Command{
SilenceUsage: true,
RunE: func(cmd *cobra.Command, args []string) error {
if needsInit {
return needsInitError
return errNeedsInitError
}
if len(args) == 0 {
return fmt.Errorf("requires a organization ID")
@ -119,7 +119,7 @@ var orgDeleteCmd = &cobra.Command{
SilenceUsage: true,
RunE: func(cmd *cobra.Command, args []string) error {
if needsInit {
return needsInitError
return errNeedsInitError
}
if len(args) == 0 {
return fmt.Errorf("requires a organization ID")
@ -154,10 +154,10 @@ func init() {
func formatOrganizations(orgs []params.Organization) {
t := table.NewWriter()
header := table.Row{"ID", "Name", "Credentials name"}
header := table.Row{"ID", "Name", "Credentials name", "Pool mgr running"}
t.AppendHeader(header)
for _, val := range orgs {
t.AppendRow(table.Row{val.ID, val.Name, val.CredentialsName})
t.AppendRow(table.Row{val.ID, val.Name, val.CredentialsName, val.PoolManagerStatus.IsRunning})
t.AppendSeparator()
}
fmt.Println(t.Render())
@ -171,7 +171,10 @@ func formatOneOrganization(org params.Organization) {
t.AppendRow(table.Row{"ID", org.ID})
t.AppendRow(table.Row{"Name", org.Name})
t.AppendRow(table.Row{"Credentials", org.CredentialsName})
t.AppendRow(table.Row{"Pool manager running", org.PoolManagerStatus.IsRunning})
if !org.PoolManagerStatus.IsRunning {
t.AppendRow(table.Row{"Failure reason", org.PoolManagerStatus.FailureReason})
}
if len(org.Pools) > 0 {
for _, pool := range org.Pools {
t.AppendRow(table.Row{"Pools", pool.ID}, rowConfigAutoMerge)

View file

@ -68,7 +68,7 @@ Example:
SilenceUsage: true,
RunE: func(cmd *cobra.Command, args []string) error {
if needsInit {
return needsInitError
return errNeedsInitError
}
var pools []params.Pool
@ -108,7 +108,7 @@ var poolShowCmd = &cobra.Command{
SilenceUsage: true,
RunE: func(cmd *cobra.Command, args []string) error {
if needsInit {
return needsInitError
return errNeedsInitError
}
if len(args) == 0 {
@ -136,7 +136,7 @@ var poolDeleteCmd = &cobra.Command{
SilenceUsage: true,
RunE: func(cmd *cobra.Command, args []string) error {
if needsInit {
return needsInitError
return errNeedsInitError
}
if len(args) == 0 {
@ -162,7 +162,7 @@ var poolAddCmd = &cobra.Command{
SilenceUsage: true,
RunE: func(cmd *cobra.Command, args []string) error {
if needsInit {
return needsInitError
return errNeedsInitError
}
tags := strings.Split(poolTags, ",")
@ -216,7 +216,7 @@ explicitly remove them using the runner delete command.
SilenceUsage: true,
RunE: func(cmd *cobra.Command, args []string) error {
if needsInit {
return needsInitError
return errNeedsInitError
}
if len(args) == 0 {

View file

@ -55,7 +55,7 @@ file of the garm client.
SilenceUsage: true,
RunE: func(cmd *cobra.Command, args []string) error {
if needsInit {
return needsInitError
return errNeedsInitError
}
if cfg == nil {
@ -76,7 +76,7 @@ var profileDeleteCmd = &cobra.Command{
SilenceUsage: true,
RunE: func(cmd *cobra.Command, args []string) error {
if needsInit {
return needsInitError
return errNeedsInitError
}
if len(args) == 0 {
@ -101,7 +101,7 @@ var poolSwitchCmd = &cobra.Command{
SilenceUsage: true,
RunE: func(cmd *cobra.Command, args []string) error {
if needsInit {
return needsInitError
return errNeedsInitError
}
if len(args) == 0 {
@ -177,7 +177,7 @@ installation, by performing a login.
SilenceUsage: true,
RunE: func(cmd *cobra.Command, args []string) error {
if needsInit {
return needsInitError
return errNeedsInitError
}
if cfg == nil {

View file

@ -45,7 +45,7 @@ func init() {
SilenceUsage: true,
RunE: func(cmd *cobra.Command, args []string) error {
if needsInit {
return needsInitError
return errNeedsInitError
}
providers, err := cli.ListProviders()

View file

@ -37,7 +37,7 @@ var repoRunnerListCmd = &cobra.Command{
SilenceUsage: true,
RunE: func(cmd *cobra.Command, args []string) error {
if needsInit {
return needsInitError
return errNeedsInitError
}
if len(args) == 0 {

View file

@ -62,7 +62,7 @@ var repoPoolAddCmd = &cobra.Command{
SilenceUsage: true,
RunE: func(cmd *cobra.Command, args []string) error {
if needsInit {
return needsInitError
return errNeedsInitError
}
if len(args) == 0 {
@ -97,41 +97,13 @@ var repoPoolAddCmd = &cobra.Command{
},
}
var repoPoolListCmd = &cobra.Command{
Use: "list",
Aliases: []string{"ls"},
Short: "List repository pools",
Long: `List all configured pools for a given repository.`,
SilenceUsage: true,
RunE: func(cmd *cobra.Command, args []string) error {
if needsInit {
return needsInitError
}
if len(args) == 0 {
return fmt.Errorf("requires a repository ID")
}
if len(args) > 1 {
return fmt.Errorf("too many arguments")
}
pools, err := cli.ListRepoPools(args[0])
if err != nil {
return err
}
formatPools(pools)
return nil
},
}
var repoPoolShowCmd = &cobra.Command{
Use: "show",
Short: "Show details for one pool",
Long: `Displays detailed information about a single pool.`,
RunE: func(cmd *cobra.Command, args []string) error {
if needsInit {
return needsInitError
return errNeedsInitError
}
if len(args) < 2 || len(args) > 2 {
@ -156,7 +128,7 @@ var repoPoolDeleteCmd = &cobra.Command{
SilenceUsage: true,
RunE: func(cmd *cobra.Command, args []string) error {
if needsInit {
return needsInitError
return errNeedsInitError
}
if len(args) < 2 || len(args) > 2 {
return fmt.Errorf("command requires repoID and poolID")
@ -181,7 +153,7 @@ explicitly remove them using the runner delete command.
SilenceUsage: true,
RunE: func(cmd *cobra.Command, args []string) error {
if needsInit {
return needsInitError
return errNeedsInitError
}
if len(args) < 2 || len(args) > 2 {

View file

@ -51,7 +51,7 @@ var repoAddCmd = &cobra.Command{
SilenceUsage: true,
RunE: func(cmd *cobra.Command, args []string) error {
if needsInit {
return needsInitError
return errNeedsInitError
}
newRepoReq := params.CreateRepoParams{
@ -77,7 +77,7 @@ var repoListCmd = &cobra.Command{
SilenceUsage: true,
RunE: func(cmd *cobra.Command, args []string) error {
if needsInit {
return needsInitError
return errNeedsInitError
}
repos, err := cli.ListRepositories()
@ -96,7 +96,7 @@ var repoShowCmd = &cobra.Command{
SilenceUsage: true,
RunE: func(cmd *cobra.Command, args []string) error {
if needsInit {
return needsInitError
return errNeedsInitError
}
if len(args) == 0 {
return fmt.Errorf("requires a repository ID")
@ -121,7 +121,7 @@ var repoDeleteCmd = &cobra.Command{
SilenceUsage: true,
RunE: func(cmd *cobra.Command, args []string) error {
if needsInit {
return needsInitError
return errNeedsInitError
}
if len(args) == 0 {
return fmt.Errorf("requires a repository ID")
@ -158,10 +158,10 @@ func init() {
func formatRepositories(repos []params.Repository) {
t := table.NewWriter()
header := table.Row{"ID", "Owner", "Name", "Credentials name"}
header := table.Row{"ID", "Owner", "Name", "Credentials name", "Pool mgr running"}
t.AppendHeader(header)
for _, val := range repos {
t.AppendRow(table.Row{val.ID, val.Owner, val.Name, val.CredentialsName})
t.AppendRow(table.Row{val.ID, val.Owner, val.Name, val.CredentialsName, val.PoolManagerStatus.IsRunning})
t.AppendSeparator()
}
fmt.Println(t.Render())
@ -176,6 +176,10 @@ func formatOneRepository(repo params.Repository) {
t.AppendRow(table.Row{"Owner", repo.Owner})
t.AppendRow(table.Row{"Name", repo.Name})
t.AppendRow(table.Row{"Credentials", repo.CredentialsName})
t.AppendRow(table.Row{"Pool manager running", repo.PoolManagerStatus.IsRunning})
if !repo.PoolManagerStatus.IsRunning {
t.AppendRow(table.Row{"Failure reason", repo.PoolManagerStatus.FailureReason})
}
if len(repo.Pools) > 0 {
for _, pool := range repo.Pools {

View file

@ -26,13 +26,13 @@ import (
var Version string
var (
cfg *config.Config
mgr config.Manager
cli *client.Client
active string
needsInit bool
debug bool
needsInitError = fmt.Errorf("Please log into a garm installation first")
cfg *config.Config
mgr config.Manager
cli *client.Client
active string
needsInit bool
debug bool
errNeedsInitError = fmt.Errorf("please log into a garm installation first")
)
// rootCmd represents the base command when called without any subcommands

View file

@ -72,7 +72,7 @@ Example:
SilenceUsage: true,
RunE: func(cmd *cobra.Command, args []string) error {
if needsInit {
return needsInitError
return errNeedsInitError
}
var instances []params.Instance
@ -121,7 +121,7 @@ var runnerShowCmd = &cobra.Command{
SilenceUsage: true,
RunE: func(cmd *cobra.Command, args []string) error {
if needsInit {
return needsInitError
return errNeedsInitError
}
if len(args) == 0 {
@ -158,7 +158,7 @@ to either cancel the workflow or wait for it to finish.
SilenceUsage: true,
RunE: func(cmd *cobra.Command, args []string) error {
if needsInit {
return needsInitError
return errNeedsInitError
}
if len(args) == 0 {

View file

@ -151,29 +151,32 @@ type Internal struct {
}
type Repository struct {
ID string `json:"id"`
Owner string `json:"owner"`
Name string `json:"name"`
Pools []Pool `json:"pool,omitempty"`
CredentialsName string `json:"credentials_name"`
ID string `json:"id"`
Owner string `json:"owner"`
Name string `json:"name"`
Pools []Pool `json:"pool,omitempty"`
CredentialsName string `json:"credentials_name"`
PoolManagerStatus PoolManagerStatus `json:"pool_manager_status,omitempty"`
// Do not serialize sensitive info.
WebhookSecret string `json:"-"`
}
type Organization struct {
ID string `json:"id"`
Name string `json:"name"`
Pools []Pool `json:"pool,omitempty"`
CredentialsName string `json:"credentials_name"`
ID string `json:"id"`
Name string `json:"name"`
Pools []Pool `json:"pool,omitempty"`
CredentialsName string `json:"credentials_name"`
PoolManagerStatus PoolManagerStatus `json:"pool_manager_status,omitempty"`
// Do not serialize sensitive info.
WebhookSecret string `json:"-"`
}
type Enterprise struct {
ID string `json:"id"`
Name string `json:"name"`
Pools []Pool `json:"pool,omitempty"`
CredentialsName string `json:"credentials_name"`
ID string `json:"id"`
Name string `json:"name"`
Pools []Pool `json:"pool,omitempty"`
CredentialsName string `json:"credentials_name"`
PoolManagerStatus PoolManagerStatus `json:"pool_manager_status,omitempty"`
// Do not serialize sensitive info.
WebhookSecret string `json:"-"`
}
@ -219,3 +222,8 @@ type Provider struct {
type UpdatePoolStateParams struct {
WebhookSecret string
}
type PoolManagerStatus struct {
IsRunning bool `json:"running"`
FailureReason string `json:"failure_reason,omitempty"`
}

View file

@ -83,6 +83,20 @@ func (_m *PoolManager) Start() error {
return r0
}
// Status provides a mock function with given fields:
func (_m *PoolManager) Status() params.PoolManagerStatus {
ret := _m.Called()
var r0 params.PoolManagerStatus
if rf, ok := ret.Get(0).(func() params.PoolManagerStatus); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(params.PoolManagerStatus)
}
return r0
}
// Stop provides a mock function with given fields:
func (_m *PoolManager) Stop() error {
ret := _m.Called()

View file

@ -31,6 +31,12 @@ const (
// Set this to 15 minutes. This should allow enough time even on slow
// clouds for the instance to spin up, download the tools and join gh.
PoolToolUpdateInterval = 15 * time.Minute
// UnauthorizedBackoffTimer is the time we wait before making another request
// after getting an unauthorized error from github. It is unlikely that a second
// request will not receive the same error, unless the config is changed with new
// credentials and garm is restarted.
UnauthorizedBackoffTimer = 3 * time.Hour
)
//go:generate mockery --all
@ -45,5 +51,6 @@ type PoolManager interface {
// PoolManager lifecycle functions. Start/stop pool.
Start() error
Stop() error
Status() params.PoolManagerStatus
Wait() error
}

View file

@ -2,6 +2,7 @@ package runner
import (
"context"
"fmt"
"garm/auth"
"garm/config"
runnerErrors "garm/errors"
@ -72,7 +73,20 @@ func (r *Runner) ListEnterprises(ctx context.Context) ([]params.Enterprise, erro
return nil, errors.Wrap(err, "listing enterprises")
}
return enterprises, nil
var allEnterprises []params.Enterprise
for _, enterprise := range enterprises {
poolMgr, err := r.poolManagerCtrl.GetEnterprisePoolManager(enterprise)
if err != nil {
enterprise.PoolManagerStatus.IsRunning = false
enterprise.PoolManagerStatus.FailureReason = fmt.Sprintf("failed to get pool manager: %q", err)
} else {
enterprise.PoolManagerStatus = poolMgr.Status()
}
allEnterprises = append(allEnterprises, enterprise)
}
return allEnterprises, nil
}
func (r *Runner) GetEnterpriseByID(ctx context.Context, enterpriseID string) (params.Enterprise, error) {
@ -84,6 +98,12 @@ func (r *Runner) GetEnterpriseByID(ctx context.Context, enterpriseID string) (pa
if err != nil {
return params.Enterprise{}, errors.Wrap(err, "fetching enterprise")
}
poolMgr, err := r.poolManagerCtrl.GetEnterprisePoolManager(enterprise)
if err != nil {
enterprise.PoolManagerStatus.IsRunning = false
enterprise.PoolManagerStatus.FailureReason = fmt.Sprintf("failed to get pool manager: %q", err)
}
enterprise.PoolManagerStatus = poolMgr.Status()
return enterprise, nil
}

View file

@ -16,6 +16,7 @@ package runner
import (
"context"
"fmt"
"log"
"strings"
@ -85,7 +86,21 @@ func (r *Runner) ListOrganizations(ctx context.Context) ([]params.Organization,
return nil, errors.Wrap(err, "listing organizations")
}
return orgs, nil
var allOrgs []params.Organization
for _, org := range orgs {
poolMgr, err := r.poolManagerCtrl.GetOrgPoolManager(org)
if err != nil {
org.PoolManagerStatus.IsRunning = false
org.PoolManagerStatus.FailureReason = fmt.Sprintf("failed to get pool manager: %q", err)
} else {
org.PoolManagerStatus = poolMgr.Status()
}
allOrgs = append(allOrgs, org)
}
return allOrgs, nil
}
func (r *Runner) GetOrganizationByID(ctx context.Context, orgID string) (params.Organization, error) {
@ -97,6 +112,13 @@ func (r *Runner) GetOrganizationByID(ctx context.Context, orgID string) (params.
if err != nil {
return params.Organization{}, errors.Wrap(err, "fetching organization")
}
poolMgr, err := r.poolManagerCtrl.GetOrgPoolManager(org)
if err != nil {
org.PoolManagerStatus.IsRunning = false
org.PoolManagerStatus.FailureReason = fmt.Sprintf("failed to get pool manager: %q", err)
}
org.PoolManagerStatus = poolMgr.Status()
return org, nil
}

View file

@ -262,6 +262,8 @@ func (s *OrgTestSuite) TestCreateOrganizationStartPoolMgrFailed() {
}
func (s *OrgTestSuite) TestListOrganizations() {
s.Fixtures.PoolMgrCtrlMock.On("GetOrgPoolManager", mock.AnythingOfType("params.Organization")).Return(s.Fixtures.PoolMgrMock, nil)
s.Fixtures.PoolMgrMock.On("Status").Return(params.PoolManagerStatus{IsRunning: true}, nil)
orgs, err := s.Runner.ListOrganizations(s.Fixtures.AdminContext)
s.Require().Nil(err)
@ -275,6 +277,8 @@ func (s *OrgTestSuite) TestListOrganizationsErrUnauthorized() {
}
func (s *OrgTestSuite) TestGetOrganizationByID() {
s.Fixtures.PoolMgrCtrlMock.On("GetOrgPoolManager", mock.AnythingOfType("params.Organization")).Return(s.Fixtures.PoolMgrMock, nil)
s.Fixtures.PoolMgrMock.On("Status").Return(params.PoolManagerStatus{IsRunning: true}, nil)
org, err := s.Runner.GetOrganizationByID(s.Fixtures.AdminContext, s.Fixtures.StoreOrgs["test-org-1"].ID)
s.Require().Nil(err)

View file

@ -3,6 +3,7 @@ package pool
import (
"context"
"fmt"
"net/http"
"strings"
"sync"
@ -35,7 +36,7 @@ func NewEnterprisePoolManager(ctx context.Context, cfg params.Enterprise, cfgInt
store: store,
}
repo := &basePool{
repo := &basePoolManager{
ctx: ctx,
store: store,
providers: providers,
@ -99,6 +100,9 @@ func (r *enterprise) GetGithubRunners() ([]*github.Runner, error) {
for {
runners, ghResp, err := r.ghcEnterpriseCli.ListRunners(r.ctx, r.cfg.Name, &opts)
if err != nil {
if ghResp.StatusCode == http.StatusUnauthorized {
return nil, errors.Wrap(runnerErrors.ErrUnauthorized, "fetching runners")
}
return nil, errors.Wrap(err, "fetching runners")
}
allRunners = append(allRunners, runners.Runners...)
@ -113,8 +117,11 @@ func (r *enterprise) GetGithubRunners() ([]*github.Runner, error) {
func (r *enterprise) FetchTools() ([]*github.RunnerApplicationDownload, error) {
r.mux.Lock()
defer r.mux.Unlock()
tools, _, err := r.ghcEnterpriseCli.ListRunnerApplicationDownloads(r.ctx, r.cfg.Name)
tools, ghResp, err := r.ghcEnterpriseCli.ListRunnerApplicationDownloads(r.ctx, r.cfg.Name)
if err != nil {
if ghResp.StatusCode == http.StatusUnauthorized {
return nil, errors.Wrap(runnerErrors.ErrUnauthorized, "fetching runners")
}
return nil, errors.Wrap(err, "fetching runner tools")
}

View file

@ -17,6 +17,7 @@ package pool
import (
"context"
"fmt"
"net/http"
"strings"
"sync"
@ -48,7 +49,7 @@ func NewOrganizationPoolManager(ctx context.Context, cfg params.Organization, cf
store: store,
}
repo := &basePool{
repo := &basePoolManager{
ctx: ctx,
store: store,
providers: providers,
@ -110,6 +111,9 @@ func (r *organization) GetGithubRunners() ([]*github.Runner, error) {
for {
runners, ghResp, err := r.ghcli.ListOrganizationRunners(r.ctx, r.cfg.Name, &opts)
if err != nil {
if ghResp.StatusCode == http.StatusUnauthorized {
return nil, errors.Wrap(runnerErrors.ErrUnauthorized, "fetching runners")
}
return nil, errors.Wrap(err, "fetching runners")
}
allRunners = append(allRunners, runners.Runners...)
@ -125,8 +129,11 @@ func (r *organization) GetGithubRunners() ([]*github.Runner, error) {
func (r *organization) FetchTools() ([]*github.RunnerApplicationDownload, error) {
r.mux.Lock()
defer r.mux.Unlock()
tools, _, err := r.ghcli.ListOrganizationRunnerApplicationDownloads(r.ctx, r.cfg.Name)
tools, ghResp, err := r.ghcli.ListOrganizationRunnerApplicationDownloads(r.ctx, r.cfg.Name)
if err != nil {
if ghResp.StatusCode == http.StatusUnauthorized {
return nil, errors.Wrap(runnerErrors.ErrUnauthorized, "fetching tools")
}
return nil, errors.Wrap(err, "fetching runner tools")
}

View file

@ -47,7 +47,7 @@ const (
maxCreateAttempts = 5
)
type basePool struct {
type basePoolManager struct {
ctx context.Context
controllerID string
@ -61,6 +61,9 @@ type basePool struct {
helper poolHelper
credsDetails params.GithubCredentials
managerIsRunning bool
managerErrorReason string
mux sync.Mutex
}
@ -81,7 +84,7 @@ func controllerIDFromLabels(labels []*github.RunnerLabels) string {
// happens, github will remove the ephemeral worker and send a webhook our way.
// If we were offline and did not process the webhook, the instance will linger.
// We need to remove it from the provider and database.
func (r *basePool) cleanupOrphanedProviderRunners(runners []*github.Runner) error {
func (r *basePoolManager) cleanupOrphanedProviderRunners(runners []*github.Runner) error {
dbInstances, err := r.helper.FetchDbInstances()
if err != nil {
return errors.Wrap(err, "fetching instances from db")
@ -116,7 +119,7 @@ func (r *basePool) cleanupOrphanedProviderRunners(runners []*github.Runner) erro
// reapTimedOutRunners will mark as pending_delete any runner that has a status
// of "running" in the provider, but that has not registered with Github, and has
// received no new updates in the configured timeout interval.
func (r *basePool) reapTimedOutRunners(runners []*github.Runner) error {
func (r *basePoolManager) reapTimedOutRunners(runners []*github.Runner) error {
dbInstances, err := r.helper.FetchDbInstances()
if err != nil {
return errors.Wrap(err, "fetching instances from db")
@ -152,7 +155,7 @@ func (r *basePool) reapTimedOutRunners(runners []*github.Runner) error {
// as offline and for which we no longer have a local instance.
// This may happen if someone manually deletes the instance in the provider. We need to
// first remove the instance from github, and then from our database.
func (r *basePool) cleanupOrphanedGithubRunners(runners []*github.Runner) error {
func (r *basePoolManager) cleanupOrphanedGithubRunners(runners []*github.Runner) error {
for _, runner := range runners {
runnerControllerID := controllerIDFromLabels(runner.Labels)
if runnerControllerID != r.controllerID {
@ -243,7 +246,7 @@ func (r *basePool) cleanupOrphanedGithubRunners(runners []*github.Runner) error
return nil
}
func (r *basePool) fetchInstance(runnerName string) (params.Instance, error) {
func (r *basePoolManager) fetchInstance(runnerName string) (params.Instance, error) {
runner, err := r.store.GetInstanceByName(r.ctx, runnerName)
if err != nil {
return params.Instance{}, errors.Wrap(err, "fetching instance")
@ -252,7 +255,7 @@ func (r *basePool) fetchInstance(runnerName string) (params.Instance, error) {
return runner, nil
}
func (r *basePool) setInstanceRunnerStatus(runnerName string, status providerCommon.RunnerStatus) error {
func (r *basePoolManager) setInstanceRunnerStatus(runnerName string, status providerCommon.RunnerStatus) error {
updateParams := params.UpdateInstanceParams{
RunnerStatus: status,
}
@ -263,7 +266,7 @@ func (r *basePool) setInstanceRunnerStatus(runnerName string, status providerCom
return nil
}
func (r *basePool) updateInstance(runnerName string, update params.UpdateInstanceParams) error {
func (r *basePoolManager) updateInstance(runnerName string, update params.UpdateInstanceParams) error {
runner, err := r.fetchInstance(runnerName)
if err != nil {
return errors.Wrap(err, "fetching instance")
@ -275,7 +278,7 @@ func (r *basePool) updateInstance(runnerName string, update params.UpdateInstanc
return nil
}
func (r *basePool) setInstanceStatus(runnerName string, status providerCommon.InstanceStatus, providerFault []byte) error {
func (r *basePoolManager) setInstanceStatus(runnerName string, status providerCommon.InstanceStatus, providerFault []byte) error {
updateParams := params.UpdateInstanceParams{
Status: status,
ProviderFault: providerFault,
@ -287,7 +290,7 @@ func (r *basePool) setInstanceStatus(runnerName string, status providerCommon.In
return nil
}
func (r *basePool) acquireNewInstance(job params.WorkflowJob) error {
func (r *basePoolManager) acquireNewInstance(job params.WorkflowJob) error {
requestedLabels := job.WorkflowJob.Labels
if len(requestedLabels) == 0 {
// no labels were requested.
@ -322,7 +325,7 @@ func (r *basePool) acquireNewInstance(job params.WorkflowJob) error {
return nil
}
func (r *basePool) AddRunner(ctx context.Context, poolID string) error {
func (r *basePoolManager) AddRunner(ctx context.Context, poolID string) error {
pool, err := r.helper.GetPoolByID(poolID)
if err != nil {
return errors.Wrap(err, "fetching pool")
@ -348,7 +351,7 @@ func (r *basePool) AddRunner(ctx context.Context, poolID string) error {
return nil
}
func (r *basePool) loop() {
func (r *basePoolManager) loop() {
consolidateTimer := time.NewTicker(common.PoolConsilitationInterval)
reapTimer := time.NewTicker(common.PoolReapTimeoutInterval)
toolUpdateTimer := time.NewTicker(common.PoolToolUpdateInterval)
@ -360,7 +363,8 @@ func (r *basePool) loop() {
close(r.done)
}()
log.Printf("starting loop for %s", r.helper.String())
// TODO: Consolidate runners on loop start. Provider runners must match runners
// Consolidate runners on loop start. Provider runners must match runners
// in github and DB. When a Workflow job is received, we will first create/update
// an entity in the database, before sending the request to the provider to create/delete
// an instance. If a "queued" job is received, we create an entity in the db with
@ -370,45 +374,118 @@ func (r *basePool) loop() {
// in the database.
// We also ensure we have runners created based on pool characteristics. This is where
// we spin up "MinWorkers" for each runner type.
for {
select {
case <-reapTimer.C:
runners, err := r.helper.GetGithubRunners()
if err != nil {
log.Printf("error fetching github runners: %s", err)
continue
}
if err := r.reapTimedOutRunners(runners); err != nil {
log.Printf("failed to reap timed out runners: %q", err)
}
switch r.managerIsRunning {
case true:
select {
case <-reapTimer.C:
runners, err := r.helper.GetGithubRunners()
if err != nil {
failureReason := fmt.Sprintf("error fetching github runners for %s: %s", r.helper.String(), err)
r.setPoolRunningState(false, failureReason)
log.Print(failureReason)
if errors.Is(err, runnerErrors.ErrUnauthorized) {
r.waitForTimeoutOrCanceled(common.UnauthorizedBackoffTimer)
} else {
r.waitForTimeoutOrCanceled(60 * time.Second)
}
continue
}
if err := r.reapTimedOutRunners(runners); err != nil {
log.Printf("failed to reap timed out runners: %q", err)
}
if err := r.cleanupOrphanedGithubRunners(runners); err != nil {
log.Printf("failed to clean orphaned github runners: %q", err)
if err := r.cleanupOrphanedGithubRunners(runners); err != nil {
log.Printf("failed to clean orphaned github runners: %q", err)
}
case <-consolidateTimer.C:
// consolidate.
r.consolidate()
case <-toolUpdateTimer.C:
// Update tools cache.
tools, err := r.helper.FetchTools()
if err != nil {
failureReason := fmt.Sprintf("failed to update tools for repo %s: %s", r.helper.String(), err)
r.setPoolRunningState(false, failureReason)
log.Print(failureReason)
if errors.Is(err, runnerErrors.ErrUnauthorized) {
r.waitForTimeoutOrCanceled(common.UnauthorizedBackoffTimer)
} else {
r.waitForTimeoutOrCanceled(60 * time.Second)
}
continue
}
r.mux.Lock()
r.tools = tools
r.mux.Unlock()
case <-r.ctx.Done():
// daemon is shutting down.
return
case <-r.quit:
// this worker was stopped.
return
}
case <-consolidateTimer.C:
// consolidate.
r.consolidate()
case <-toolUpdateTimer.C:
// Update tools cache.
default:
log.Printf("attempting to start pool manager for %s", r.helper.String())
tools, err := r.helper.FetchTools()
var failureReason string
if err != nil {
log.Printf("failed to update tools for repo %s: %s", r.helper.String(), err)
failureReason = fmt.Sprintf("failed to fetch tools from github for %s: %q", r.helper.String(), err)
r.setPoolRunningState(false, failureReason)
log.Print(failureReason)
if errors.Is(err, runnerErrors.ErrUnauthorized) {
r.waitForTimeoutOrCanceled(common.UnauthorizedBackoffTimer)
} else {
r.waitForTimeoutOrCanceled(60 * time.Second)
}
continue
}
r.mux.Lock()
r.tools = tools
r.mux.Unlock()
case <-r.ctx.Done():
// daemon is shutting down.
return
case <-r.quit:
// this worker was stopped.
return
if err := r.runnerCleanup(); err != nil {
failureReason = fmt.Sprintf("failed to clean runners for %s: %q", r.helper.String(), err)
r.setPoolRunningState(false, failureReason)
log.Print(failureReason)
if errors.Is(err, runnerErrors.ErrUnauthorized) {
r.waitForTimeoutOrCanceled(common.UnauthorizedBackoffTimer)
} else {
r.waitForTimeoutOrCanceled(60 * time.Second)
}
continue
}
r.setPoolRunningState(true, "")
}
}
}
func (r *basePool) addInstanceToProvider(instance params.Instance) error {
func (r *basePoolManager) Status() params.PoolManagerStatus {
r.mux.Lock()
defer r.mux.Unlock()
return params.PoolManagerStatus{
IsRunning: r.managerIsRunning,
FailureReason: r.managerErrorReason,
}
}
func (r *basePoolManager) waitForTimeoutOrCanceled(timeout time.Duration) {
log.Printf("sleeping for %.2f minutes", timeout.Minutes())
select {
case <-time.After(timeout):
case <-r.ctx.Done():
case <-r.quit:
}
}
func (r *basePoolManager) setPoolRunningState(isRunning bool, failureReason string) {
r.mux.Lock()
r.managerErrorReason = failureReason
r.managerIsRunning = isRunning
r.mux.Unlock()
}
func (r *basePoolManager) addInstanceToProvider(instance params.Instance) error {
pool, err := r.helper.GetPoolByID(instance.PoolID)
if err != nil {
return errors.Wrap(err, "fetching pool")
@ -416,7 +493,7 @@ func (r *basePool) addInstanceToProvider(instance params.Instance) error {
provider, ok := r.providers[pool.ProviderName]
if !ok {
return runnerErrors.NewNotFoundError("invalid provider ID")
return fmt.Errorf("unknown provider %s for pool %s", pool.ProviderName, pool.ID)
}
labels := []string{}
@ -490,7 +567,7 @@ func (r *basePool) addInstanceToProvider(instance params.Instance) error {
return nil
}
func (r *basePool) getRunnerNameFromJob(job params.WorkflowJob) (string, error) {
func (r *basePoolManager) getRunnerNameFromJob(job params.WorkflowJob) (string, error) {
if job.WorkflowJob.RunnerName != "" {
return job.WorkflowJob.RunnerName, nil
}
@ -506,7 +583,7 @@ func (r *basePool) getRunnerNameFromJob(job params.WorkflowJob) (string, error)
return runnerName, nil
}
func (r *basePool) HandleWorkflowJob(job params.WorkflowJob) error {
func (r *basePoolManager) HandleWorkflowJob(job params.WorkflowJob) error {
if err := r.helper.ValidateOwner(job); err != nil {
return errors.Wrap(err, "validating owner")
}
@ -559,15 +636,15 @@ func (r *basePool) HandleWorkflowJob(job params.WorkflowJob) error {
return nil
}
func (r *basePool) poolLabel(poolID string) string {
func (r *basePoolManager) poolLabel(poolID string) string {
return fmt.Sprintf("%s%s", poolIDLabelprefix, poolID)
}
func (r *basePool) controllerLabel() string {
func (r *basePoolManager) controllerLabel() string {
return fmt.Sprintf("%s%s", controllerLabelPrefix, r.controllerID)
}
func (r *basePool) updateArgsFromProviderInstance(providerInstance params.Instance) params.UpdateInstanceParams {
func (r *basePoolManager) updateArgsFromProviderInstance(providerInstance params.Instance) params.UpdateInstanceParams {
return params.UpdateInstanceParams{
ProviderID: providerInstance.ProviderID,
OSName: providerInstance.OSName,
@ -579,7 +656,7 @@ func (r *basePool) updateArgsFromProviderInstance(providerInstance params.Instan
}
}
func (r *basePool) ensureIdleRunnersForOnePool(pool params.Pool) {
func (r *basePoolManager) ensureIdleRunnersForOnePool(pool params.Pool) {
if !pool.Enabled {
return
}
@ -622,7 +699,7 @@ func (r *basePool) ensureIdleRunnersForOnePool(pool params.Pool) {
}
}
func (r *basePool) retryFailedInstancesForOnePool(pool params.Pool) {
func (r *basePoolManager) retryFailedInstancesForOnePool(pool params.Pool) {
if !pool.Enabled {
return
}
@ -664,7 +741,7 @@ func (r *basePool) retryFailedInstancesForOnePool(pool params.Pool) {
}
}
func (r *basePool) retryFailedInstances() {
func (r *basePoolManager) retryFailedInstances() {
pools, err := r.helper.ListPools()
if err != nil {
log.Printf("error listing pools: %s", err)
@ -681,7 +758,7 @@ func (r *basePool) retryFailedInstances() {
wg.Wait()
}
func (r *basePool) ensureMinIdleRunners() {
func (r *basePoolManager) ensureMinIdleRunners() {
pools, err := r.helper.ListPools()
if err != nil {
log.Printf("error listing pools: %s", err)
@ -698,7 +775,7 @@ func (r *basePool) ensureMinIdleRunners() {
wg.Wait()
}
func (r *basePool) deleteInstanceFromProvider(instance params.Instance) error {
func (r *basePoolManager) deleteInstanceFromProvider(instance params.Instance) error {
pool, err := r.helper.GetPoolByID(instance.PoolID)
if err != nil {
return errors.Wrap(err, "fetching pool")
@ -706,7 +783,7 @@ func (r *basePool) deleteInstanceFromProvider(instance params.Instance) error {
provider, ok := r.providers[pool.ProviderName]
if !ok {
return runnerErrors.NewNotFoundError("invalid provider ID")
return fmt.Errorf("unknown provider %s for pool %s", pool.ProviderName, pool.ID)
}
identifier := instance.ProviderID
@ -726,7 +803,7 @@ func (r *basePool) deleteInstanceFromProvider(instance params.Instance) error {
return nil
}
func (r *basePool) deletePendingInstances() {
func (r *basePoolManager) deletePendingInstances() {
instances, err := r.helper.FetchDbInstances()
if err != nil {
log.Printf("failed to fetch instances from store: %s", err)
@ -764,7 +841,7 @@ func (r *basePool) deletePendingInstances() {
}
}
func (r *basePool) addPendingInstances() {
func (r *basePoolManager) addPendingInstances() {
// TODO: filter instances by status.
instances, err := r.helper.FetchDbInstances()
if err != nil {
@ -796,7 +873,7 @@ func (r *basePool) addPendingInstances() {
}
}
func (r *basePool) consolidate() {
func (r *basePoolManager) consolidate() {
// TODO(gabriel-samfira): replace this with something more efficient.
r.mux.Lock()
defer r.mux.Unlock()
@ -826,7 +903,7 @@ func (r *basePool) consolidate() {
wg.Wait()
}
func (r *basePool) Wait() error {
func (r *basePoolManager) Wait() error {
select {
case <-r.done:
case <-time.After(20 * time.Second):
@ -835,15 +912,7 @@ func (r *basePool) Wait() error {
return nil
}
func (r *basePool) Start() error {
tools, err := r.helper.FetchTools()
if err != nil {
return errors.Wrap(err, "initializing tools")
}
r.mux.Lock()
r.tools = tools
r.mux.Unlock()
func (r *basePoolManager) runnerCleanup() error {
runners, err := r.helper.GetGithubRunners()
if err != nil {
return errors.Wrap(err, "fetching github runners")
@ -855,28 +924,35 @@ func (r *basePool) Start() error {
if err := r.cleanupOrphanedGithubRunners(runners); err != nil {
return errors.Wrap(err, "cleaning orphaned github runners")
}
return nil
}
func (r *basePoolManager) Start() error {
go r.loop()
return nil
}
func (r *basePool) Stop() error {
func (r *basePoolManager) Stop() error {
close(r.quit)
return nil
}
func (r *basePool) RefreshState(param params.UpdatePoolStateParams) error {
func (r *basePoolManager) RefreshState(param params.UpdatePoolStateParams) error {
return r.helper.UpdateState(param)
}
func (r *basePool) WebhookSecret() string {
func (r *basePoolManager) WebhookSecret() string {
return r.helper.WebhookSecret()
}
func (r *basePool) ID() string {
func (r *basePoolManager) ID() string {
return r.helper.ID()
}
func (r *basePool) ForceDeleteRunner(runner params.Instance) error {
func (r *basePoolManager) ForceDeleteRunner(runner params.Instance) error {
if !r.managerIsRunning {
return runnerErrors.NewConflictError("pool manager is not running for %s", r.helper.String())
}
if runner.AgentID != 0 {
resp, err := r.helper.RemoveGithubRunner(runner.AgentID)
if err != nil {

View file

@ -17,6 +17,7 @@ package pool
import (
"context"
"fmt"
"net/http"
"strings"
"sync"
@ -48,7 +49,7 @@ func NewRepositoryPoolManager(ctx context.Context, cfg params.Repository, cfgInt
store: store,
}
repo := &basePool{
repo := &basePoolManager{
ctx: ctx,
store: store,
providers: providers,
@ -112,6 +113,9 @@ func (r *repository) GetGithubRunners() ([]*github.Runner, error) {
for {
runners, ghResp, err := r.ghcli.ListRunners(r.ctx, r.cfg.Owner, r.cfg.Name, &opts)
if err != nil {
if ghResp.StatusCode == http.StatusUnauthorized {
return nil, errors.Wrap(runnerErrors.ErrUnauthorized, "fetching runners")
}
return nil, errors.Wrap(err, "fetching runners")
}
allRunners = append(allRunners, runners.Runners...)
@ -127,8 +131,11 @@ func (r *repository) GetGithubRunners() ([]*github.Runner, error) {
func (r *repository) FetchTools() ([]*github.RunnerApplicationDownload, error) {
r.mux.Lock()
defer r.mux.Unlock()
tools, _, err := r.ghcli.ListRunnerApplicationDownloads(r.ctx, r.cfg.Owner, r.cfg.Name)
tools, ghResp, err := r.ghcli.ListRunnerApplicationDownloads(r.ctx, r.cfg.Owner, r.cfg.Name)
if err != nil {
if ghResp.StatusCode == http.StatusUnauthorized {
return nil, errors.Wrap(runnerErrors.ErrUnauthorized, "fetching tools")
}
return nil, errors.Wrap(err, "fetching runner tools")
}

View file

@ -16,6 +16,7 @@ package runner
import (
"context"
"fmt"
"log"
"strings"
@ -85,7 +86,20 @@ func (r *Runner) ListRepositories(ctx context.Context) ([]params.Repository, err
return nil, errors.Wrap(err, "listing repositories")
}
return repos, nil
var allRepos []params.Repository
for _, repo := range repos {
poolMgr, err := r.poolManagerCtrl.GetRepoPoolManager(repo)
if err != nil {
repo.PoolManagerStatus.IsRunning = false
repo.PoolManagerStatus.FailureReason = fmt.Sprintf("failed to get pool manager: %q", err)
} else {
repo.PoolManagerStatus = poolMgr.Status()
}
allRepos = append(allRepos, repo)
}
return allRepos, nil
}
func (r *Runner) GetRepositoryByID(ctx context.Context, repoID string) (params.Repository, error) {
@ -97,6 +111,13 @@ func (r *Runner) GetRepositoryByID(ctx context.Context, repoID string) (params.R
if err != nil {
return params.Repository{}, errors.Wrap(err, "fetching repository")
}
poolMgr, err := r.poolManagerCtrl.GetRepoPoolManager(repo)
if err != nil {
repo.PoolManagerStatus.IsRunning = false
repo.PoolManagerStatus.FailureReason = fmt.Sprintf("failed to get pool manager: %q", err)
}
repo.PoolManagerStatus = poolMgr.Status()
return repo, nil
}

View file

@ -265,6 +265,8 @@ func (s *RepoTestSuite) TestCreateRepositoryStartPoolMgrFailed() {
}
func (s *RepoTestSuite) TestListRepositories() {
s.Fixtures.PoolMgrCtrlMock.On("GetRepoPoolManager", mock.AnythingOfType("params.Repository")).Return(s.Fixtures.PoolMgrMock, nil)
s.Fixtures.PoolMgrMock.On("Status").Return(params.PoolManagerStatus{IsRunning: true}, nil)
repos, err := s.Runner.ListRepositories(s.Fixtures.AdminContext)
s.Require().Nil(err)
@ -278,6 +280,8 @@ func (s *RepoTestSuite) TestListRepositoriesErrUnauthorized() {
}
func (s *RepoTestSuite) TestGetRepositoryByID() {
s.Fixtures.PoolMgrCtrlMock.On("GetRepoPoolManager", mock.AnythingOfType("params.Repository")).Return(s.Fixtures.PoolMgrMock, nil)
s.Fixtures.PoolMgrMock.On("Status").Return(params.PoolManagerStatus{IsRunning: true}, nil)
repo, err := s.Runner.GetRepositoryByID(s.Fixtures.AdminContext, s.Fixtures.StoreRepos["test-repo-1"].ID)
s.Require().Nil(err)

View file

@ -421,19 +421,49 @@ func (r *Runner) Stop() error {
if err != nil {
return errors.Wrap(err, "fetch repo pool managers")
}
for _, repo := range repos {
if err := repo.Stop(); err != nil {
return errors.Wrap(err, "stopping repo pool manager")
}
}
orgs, err := r.poolManagerCtrl.GetOrgPoolManagers()
if err != nil {
return errors.Wrap(err, "fetch org pool managers")
}
enterprises, err := r.poolManagerCtrl.GetEnterprisePoolManagers()
if err != nil {
return errors.Wrap(err, "fetch enterprise pool managers")
}
expectedReplies := len(repos) + len(orgs) + len(enterprises)
errChan := make(chan error, expectedReplies)
for _, repo := range repos {
go func(poolMgr common.PoolManager) {
err := poolMgr.Stop()
errChan <- err
}(repo)
}
for _, org := range orgs {
if err := org.Stop(); err != nil {
return errors.Wrap(err, "stopping org pool manager")
go func(poolMgr common.PoolManager) {
err := poolMgr.Stop()
errChan <- err
}(org)
}
for _, enterprise := range enterprises {
go func(poolMgr common.PoolManager) {
err := poolMgr.Stop()
errChan <- err
}(enterprise)
}
for i := 0; i < expectedReplies; i++ {
select {
case err := <-errChan:
if err != nil {
return errors.Wrap(err, "stopping pool manager")
}
case <-time.After(60 * time.Second):
return fmt.Errorf("timed out waiting for pool mamager stop")
}
}
return nil
@ -449,6 +479,17 @@ func (r *Runner) Wait() error {
if err != nil {
return errors.Wrap(err, "fetch repo pool managers")
}
orgs, err := r.poolManagerCtrl.GetOrgPoolManagers()
if err != nil {
return errors.Wrap(err, "fetch org pool managers")
}
enterprises, err := r.poolManagerCtrl.GetEnterprisePoolManagers()
if err != nil {
return errors.Wrap(err, "fetch enterprise pool managers")
}
for poolId, repo := range repos {
wg.Add(1)
go func(id string, poolMgr common.PoolManager) {
@ -459,10 +500,6 @@ func (r *Runner) Wait() error {
}(poolId, repo)
}
orgs, err := r.poolManagerCtrl.GetOrgPoolManagers()
if err != nil {
return errors.Wrap(err, "fetch org pool managers")
}
for poolId, org := range orgs {
wg.Add(1)
go func(id string, poolMgr common.PoolManager) {
@ -472,6 +509,17 @@ func (r *Runner) Wait() error {
}
}(poolId, org)
}
for poolId, enterprise := range enterprises {
wg.Add(1)
go func(id string, poolMgr common.PoolManager) {
defer wg.Done()
if err := poolMgr.Wait(); err != nil {
log.Printf("timed out waiting for pool manager %s to exit", id)
}
}(poolId, enterprise)
}
wg.Wait()
return nil
}