From 05057e37fd4c848671b030b440019aaaba3abee6 Mon Sep 17 00:00:00 2001 From: Gabriel Adrian Samfira Date: Thu, 20 Oct 2022 17:22:47 +0300 Subject: [PATCH] Start pool managers in the background Garm no longer fails on startup if a pool manager cannot be started. It will attempt to start the pool manager in the background. If it fails due to an unauthorized error, it will sleep for 3 hours. It is unlikely it will work a second time if credentials are not updated in the config and garm is restarted, so no point in getting rate limited. Signed-off-by: Gabriel Adrian Samfira --- cmd/garm-cli/cmd/credentials.go | 2 +- cmd/garm-cli/cmd/enterprise.go | 16 ++- cmd/garm-cli/cmd/org_instances.go | 2 +- cmd/garm-cli/cmd/org_pool.go | 10 +- cmd/garm-cli/cmd/organization.go | 17 ++- cmd/garm-cli/cmd/pool.go | 10 +- cmd/garm-cli/cmd/profile.go | 8 +- cmd/garm-cli/cmd/provider.go | 2 +- cmd/garm-cli/cmd/repo_instances.go | 2 +- cmd/garm-cli/cmd/repo_pool.go | 36 +---- cmd/garm-cli/cmd/repository.go | 16 ++- cmd/garm-cli/cmd/root.go | 14 +- cmd/garm-cli/cmd/runner.go | 6 +- params/params.go | 34 +++-- runner/common/mocks/PoolManager.go | 14 ++ runner/common/pool.go | 7 + runner/enterprises.go | 22 ++- runner/organizations.go | 24 +++- runner/organizations_test.go | 4 + runner/pool/enterprise.go | 11 +- runner/pool/organization.go | 11 +- runner/pool/pool.go | 212 ++++++++++++++++++++--------- runner/pool/repository.go | 11 +- runner/repositories.go | 23 +++- runner/repositories_test.go | 4 + runner/runner.go | 70 ++++++++-- 26 files changed, 408 insertions(+), 180 deletions(-) diff --git a/cmd/garm-cli/cmd/credentials.go b/cmd/garm-cli/cmd/credentials.go index a4986b8b..447d2957 100644 --- a/cmd/garm-cli/cmd/credentials.go +++ b/cmd/garm-cli/cmd/credentials.go @@ -46,7 +46,7 @@ func init() { SilenceUsage: true, RunE: func(cmd *cobra.Command, args []string) error { if needsInit { - return needsInitError + return errNeedsInitError } creds, err := cli.ListCredentials() diff --git a/cmd/garm-cli/cmd/enterprise.go b/cmd/garm-cli/cmd/enterprise.go index 82c0b6fe..0a58519b 100644 --- a/cmd/garm-cli/cmd/enterprise.go +++ b/cmd/garm-cli/cmd/enterprise.go @@ -50,7 +50,7 @@ var enterpriseAddCmd = &cobra.Command{ SilenceUsage: true, RunE: func(cmd *cobra.Command, args []string) error { if needsInit { - return needsInitError + return errNeedsInitError } newEnterpriseReq := params.CreateEnterpriseParams{ @@ -75,7 +75,7 @@ var enterpriseListCmd = &cobra.Command{ SilenceUsage: true, RunE: func(cmd *cobra.Command, args []string) error { if needsInit { - return needsInitError + return errNeedsInitError } enterprises, err := cli.ListEnterprises() @@ -94,7 +94,7 @@ var enterpriseShowCmd = &cobra.Command{ SilenceUsage: true, RunE: func(cmd *cobra.Command, args []string) error { if needsInit { - return needsInitError + return errNeedsInitError } if len(args) == 0 { return fmt.Errorf("requires a enterprise ID") @@ -119,7 +119,7 @@ var enterpriseDeleteCmd = &cobra.Command{ SilenceUsage: true, RunE: func(cmd *cobra.Command, args []string) error { if needsInit { - return needsInitError + return errNeedsInitError } if len(args) == 0 { return fmt.Errorf("requires a enterprise ID") @@ -154,10 +154,10 @@ func init() { func formatEnterprises(enterprises []params.Enterprise) { t := table.NewWriter() - header := table.Row{"ID", "Name", "Credentials name"} + header := table.Row{"ID", "Name", "Credentials name", "Pool mgr running"} t.AppendHeader(header) for _, val := range enterprises { - t.AppendRow(table.Row{val.ID, val.Name, val.CredentialsName}) + t.AppendRow(table.Row{val.ID, val.Name, val.CredentialsName, val.PoolManagerStatus.IsRunning}) t.AppendSeparator() } fmt.Println(t.Render()) @@ -171,6 +171,10 @@ func formatOneEnterprise(enterprise params.Enterprise) { t.AppendRow(table.Row{"ID", enterprise.ID}) t.AppendRow(table.Row{"Name", enterprise.Name}) t.AppendRow(table.Row{"Credentials", enterprise.CredentialsName}) + t.AppendRow(table.Row{"Pool manager running", enterprise.PoolManagerStatus.IsRunning}) + if !enterprise.PoolManagerStatus.IsRunning { + t.AppendRow(table.Row{"Failure reason", enterprise.PoolManagerStatus.FailureReason}) + } if len(enterprise.Pools) > 0 { for _, pool := range enterprise.Pools { diff --git a/cmd/garm-cli/cmd/org_instances.go b/cmd/garm-cli/cmd/org_instances.go index 4c8445d7..002b022c 100644 --- a/cmd/garm-cli/cmd/org_instances.go +++ b/cmd/garm-cli/cmd/org_instances.go @@ -37,7 +37,7 @@ var orgRunnerListCmd = &cobra.Command{ SilenceUsage: true, RunE: func(cmd *cobra.Command, args []string) error { if needsInit { - return needsInitError + return errNeedsInitError } if len(args) == 0 { diff --git a/cmd/garm-cli/cmd/org_pool.go b/cmd/garm-cli/cmd/org_pool.go index 976f5c79..1ad3ac1b 100644 --- a/cmd/garm-cli/cmd/org_pool.go +++ b/cmd/garm-cli/cmd/org_pool.go @@ -48,7 +48,7 @@ var orgPoolAddCmd = &cobra.Command{ SilenceUsage: true, RunE: func(cmd *cobra.Command, args []string) error { if needsInit { - return needsInitError + return errNeedsInitError } if len(args) == 0 { @@ -91,7 +91,7 @@ var orgPoolListCmd = &cobra.Command{ SilenceUsage: true, RunE: func(cmd *cobra.Command, args []string) error { if needsInit { - return needsInitError + return errNeedsInitError } if len(args) == 0 { @@ -117,7 +117,7 @@ var orgPoolShowCmd = &cobra.Command{ Long: `Displays detailed information about a single pool.`, RunE: func(cmd *cobra.Command, args []string) error { if needsInit { - return needsInitError + return errNeedsInitError } if len(args) < 2 || len(args) > 2 { @@ -142,7 +142,7 @@ var orgPoolDeleteCmd = &cobra.Command{ SilenceUsage: true, RunE: func(cmd *cobra.Command, args []string) error { if needsInit { - return needsInitError + return errNeedsInitError } if len(args) < 2 || len(args) > 2 { return fmt.Errorf("command requires orgID and poolID") @@ -167,7 +167,7 @@ explicitly remove them using the runner delete command. SilenceUsage: true, RunE: func(cmd *cobra.Command, args []string) error { if needsInit { - return needsInitError + return errNeedsInitError } if len(args) < 2 || len(args) > 2 { diff --git a/cmd/garm-cli/cmd/organization.go b/cmd/garm-cli/cmd/organization.go index 432d3263..184a9a18 100644 --- a/cmd/garm-cli/cmd/organization.go +++ b/cmd/garm-cli/cmd/organization.go @@ -50,7 +50,7 @@ var orgAddCmd = &cobra.Command{ SilenceUsage: true, RunE: func(cmd *cobra.Command, args []string) error { if needsInit { - return needsInitError + return errNeedsInitError } newOrgReq := params.CreateOrgParams{ @@ -75,7 +75,7 @@ var orgListCmd = &cobra.Command{ SilenceUsage: true, RunE: func(cmd *cobra.Command, args []string) error { if needsInit { - return needsInitError + return errNeedsInitError } orgs, err := cli.ListOrganizations() @@ -94,7 +94,7 @@ var orgShowCmd = &cobra.Command{ SilenceUsage: true, RunE: func(cmd *cobra.Command, args []string) error { if needsInit { - return needsInitError + return errNeedsInitError } if len(args) == 0 { return fmt.Errorf("requires a organization ID") @@ -119,7 +119,7 @@ var orgDeleteCmd = &cobra.Command{ SilenceUsage: true, RunE: func(cmd *cobra.Command, args []string) error { if needsInit { - return needsInitError + return errNeedsInitError } if len(args) == 0 { return fmt.Errorf("requires a organization ID") @@ -154,10 +154,10 @@ func init() { func formatOrganizations(orgs []params.Organization) { t := table.NewWriter() - header := table.Row{"ID", "Name", "Credentials name"} + header := table.Row{"ID", "Name", "Credentials name", "Pool mgr running"} t.AppendHeader(header) for _, val := range orgs { - t.AppendRow(table.Row{val.ID, val.Name, val.CredentialsName}) + t.AppendRow(table.Row{val.ID, val.Name, val.CredentialsName, val.PoolManagerStatus.IsRunning}) t.AppendSeparator() } fmt.Println(t.Render()) @@ -171,7 +171,10 @@ func formatOneOrganization(org params.Organization) { t.AppendRow(table.Row{"ID", org.ID}) t.AppendRow(table.Row{"Name", org.Name}) t.AppendRow(table.Row{"Credentials", org.CredentialsName}) - + t.AppendRow(table.Row{"Pool manager running", org.PoolManagerStatus.IsRunning}) + if !org.PoolManagerStatus.IsRunning { + t.AppendRow(table.Row{"Failure reason", org.PoolManagerStatus.FailureReason}) + } if len(org.Pools) > 0 { for _, pool := range org.Pools { t.AppendRow(table.Row{"Pools", pool.ID}, rowConfigAutoMerge) diff --git a/cmd/garm-cli/cmd/pool.go b/cmd/garm-cli/cmd/pool.go index 2d0c8802..ec5f3964 100644 --- a/cmd/garm-cli/cmd/pool.go +++ b/cmd/garm-cli/cmd/pool.go @@ -68,7 +68,7 @@ Example: SilenceUsage: true, RunE: func(cmd *cobra.Command, args []string) error { if needsInit { - return needsInitError + return errNeedsInitError } var pools []params.Pool @@ -108,7 +108,7 @@ var poolShowCmd = &cobra.Command{ SilenceUsage: true, RunE: func(cmd *cobra.Command, args []string) error { if needsInit { - return needsInitError + return errNeedsInitError } if len(args) == 0 { @@ -136,7 +136,7 @@ var poolDeleteCmd = &cobra.Command{ SilenceUsage: true, RunE: func(cmd *cobra.Command, args []string) error { if needsInit { - return needsInitError + return errNeedsInitError } if len(args) == 0 { @@ -162,7 +162,7 @@ var poolAddCmd = &cobra.Command{ SilenceUsage: true, RunE: func(cmd *cobra.Command, args []string) error { if needsInit { - return needsInitError + return errNeedsInitError } tags := strings.Split(poolTags, ",") @@ -216,7 +216,7 @@ explicitly remove them using the runner delete command. SilenceUsage: true, RunE: func(cmd *cobra.Command, args []string) error { if needsInit { - return needsInitError + return errNeedsInitError } if len(args) == 0 { diff --git a/cmd/garm-cli/cmd/profile.go b/cmd/garm-cli/cmd/profile.go index ca9b6689..64ca54f7 100644 --- a/cmd/garm-cli/cmd/profile.go +++ b/cmd/garm-cli/cmd/profile.go @@ -55,7 +55,7 @@ file of the garm client. SilenceUsage: true, RunE: func(cmd *cobra.Command, args []string) error { if needsInit { - return needsInitError + return errNeedsInitError } if cfg == nil { @@ -76,7 +76,7 @@ var profileDeleteCmd = &cobra.Command{ SilenceUsage: true, RunE: func(cmd *cobra.Command, args []string) error { if needsInit { - return needsInitError + return errNeedsInitError } if len(args) == 0 { @@ -101,7 +101,7 @@ var poolSwitchCmd = &cobra.Command{ SilenceUsage: true, RunE: func(cmd *cobra.Command, args []string) error { if needsInit { - return needsInitError + return errNeedsInitError } if len(args) == 0 { @@ -177,7 +177,7 @@ installation, by performing a login. SilenceUsage: true, RunE: func(cmd *cobra.Command, args []string) error { if needsInit { - return needsInitError + return errNeedsInitError } if cfg == nil { diff --git a/cmd/garm-cli/cmd/provider.go b/cmd/garm-cli/cmd/provider.go index 562f9462..95404364 100644 --- a/cmd/garm-cli/cmd/provider.go +++ b/cmd/garm-cli/cmd/provider.go @@ -45,7 +45,7 @@ func init() { SilenceUsage: true, RunE: func(cmd *cobra.Command, args []string) error { if needsInit { - return needsInitError + return errNeedsInitError } providers, err := cli.ListProviders() diff --git a/cmd/garm-cli/cmd/repo_instances.go b/cmd/garm-cli/cmd/repo_instances.go index ee3c513a..9094bc2b 100644 --- a/cmd/garm-cli/cmd/repo_instances.go +++ b/cmd/garm-cli/cmd/repo_instances.go @@ -37,7 +37,7 @@ var repoRunnerListCmd = &cobra.Command{ SilenceUsage: true, RunE: func(cmd *cobra.Command, args []string) error { if needsInit { - return needsInitError + return errNeedsInitError } if len(args) == 0 { diff --git a/cmd/garm-cli/cmd/repo_pool.go b/cmd/garm-cli/cmd/repo_pool.go index 30d64b99..d09bf106 100644 --- a/cmd/garm-cli/cmd/repo_pool.go +++ b/cmd/garm-cli/cmd/repo_pool.go @@ -62,7 +62,7 @@ var repoPoolAddCmd = &cobra.Command{ SilenceUsage: true, RunE: func(cmd *cobra.Command, args []string) error { if needsInit { - return needsInitError + return errNeedsInitError } if len(args) == 0 { @@ -97,41 +97,13 @@ var repoPoolAddCmd = &cobra.Command{ }, } -var repoPoolListCmd = &cobra.Command{ - Use: "list", - Aliases: []string{"ls"}, - Short: "List repository pools", - Long: `List all configured pools for a given repository.`, - SilenceUsage: true, - RunE: func(cmd *cobra.Command, args []string) error { - if needsInit { - return needsInitError - } - - if len(args) == 0 { - return fmt.Errorf("requires a repository ID") - } - - if len(args) > 1 { - return fmt.Errorf("too many arguments") - } - - pools, err := cli.ListRepoPools(args[0]) - if err != nil { - return err - } - formatPools(pools) - return nil - }, -} - var repoPoolShowCmd = &cobra.Command{ Use: "show", Short: "Show details for one pool", Long: `Displays detailed information about a single pool.`, RunE: func(cmd *cobra.Command, args []string) error { if needsInit { - return needsInitError + return errNeedsInitError } if len(args) < 2 || len(args) > 2 { @@ -156,7 +128,7 @@ var repoPoolDeleteCmd = &cobra.Command{ SilenceUsage: true, RunE: func(cmd *cobra.Command, args []string) error { if needsInit { - return needsInitError + return errNeedsInitError } if len(args) < 2 || len(args) > 2 { return fmt.Errorf("command requires repoID and poolID") @@ -181,7 +153,7 @@ explicitly remove them using the runner delete command. SilenceUsage: true, RunE: func(cmd *cobra.Command, args []string) error { if needsInit { - return needsInitError + return errNeedsInitError } if len(args) < 2 || len(args) > 2 { diff --git a/cmd/garm-cli/cmd/repository.go b/cmd/garm-cli/cmd/repository.go index bcb33db9..52896ced 100644 --- a/cmd/garm-cli/cmd/repository.go +++ b/cmd/garm-cli/cmd/repository.go @@ -51,7 +51,7 @@ var repoAddCmd = &cobra.Command{ SilenceUsage: true, RunE: func(cmd *cobra.Command, args []string) error { if needsInit { - return needsInitError + return errNeedsInitError } newRepoReq := params.CreateRepoParams{ @@ -77,7 +77,7 @@ var repoListCmd = &cobra.Command{ SilenceUsage: true, RunE: func(cmd *cobra.Command, args []string) error { if needsInit { - return needsInitError + return errNeedsInitError } repos, err := cli.ListRepositories() @@ -96,7 +96,7 @@ var repoShowCmd = &cobra.Command{ SilenceUsage: true, RunE: func(cmd *cobra.Command, args []string) error { if needsInit { - return needsInitError + return errNeedsInitError } if len(args) == 0 { return fmt.Errorf("requires a repository ID") @@ -121,7 +121,7 @@ var repoDeleteCmd = &cobra.Command{ SilenceUsage: true, RunE: func(cmd *cobra.Command, args []string) error { if needsInit { - return needsInitError + return errNeedsInitError } if len(args) == 0 { return fmt.Errorf("requires a repository ID") @@ -158,10 +158,10 @@ func init() { func formatRepositories(repos []params.Repository) { t := table.NewWriter() - header := table.Row{"ID", "Owner", "Name", "Credentials name"} + header := table.Row{"ID", "Owner", "Name", "Credentials name", "Pool mgr running"} t.AppendHeader(header) for _, val := range repos { - t.AppendRow(table.Row{val.ID, val.Owner, val.Name, val.CredentialsName}) + t.AppendRow(table.Row{val.ID, val.Owner, val.Name, val.CredentialsName, val.PoolManagerStatus.IsRunning}) t.AppendSeparator() } fmt.Println(t.Render()) @@ -176,6 +176,10 @@ func formatOneRepository(repo params.Repository) { t.AppendRow(table.Row{"Owner", repo.Owner}) t.AppendRow(table.Row{"Name", repo.Name}) t.AppendRow(table.Row{"Credentials", repo.CredentialsName}) + t.AppendRow(table.Row{"Pool manager running", repo.PoolManagerStatus.IsRunning}) + if !repo.PoolManagerStatus.IsRunning { + t.AppendRow(table.Row{"Failure reason", repo.PoolManagerStatus.FailureReason}) + } if len(repo.Pools) > 0 { for _, pool := range repo.Pools { diff --git a/cmd/garm-cli/cmd/root.go b/cmd/garm-cli/cmd/root.go index a8b24321..a3fb0197 100644 --- a/cmd/garm-cli/cmd/root.go +++ b/cmd/garm-cli/cmd/root.go @@ -26,13 +26,13 @@ import ( var Version string var ( - cfg *config.Config - mgr config.Manager - cli *client.Client - active string - needsInit bool - debug bool - needsInitError = fmt.Errorf("Please log into a garm installation first") + cfg *config.Config + mgr config.Manager + cli *client.Client + active string + needsInit bool + debug bool + errNeedsInitError = fmt.Errorf("please log into a garm installation first") ) // rootCmd represents the base command when called without any subcommands diff --git a/cmd/garm-cli/cmd/runner.go b/cmd/garm-cli/cmd/runner.go index f509d641..2f36233b 100644 --- a/cmd/garm-cli/cmd/runner.go +++ b/cmd/garm-cli/cmd/runner.go @@ -72,7 +72,7 @@ Example: SilenceUsage: true, RunE: func(cmd *cobra.Command, args []string) error { if needsInit { - return needsInitError + return errNeedsInitError } var instances []params.Instance @@ -121,7 +121,7 @@ var runnerShowCmd = &cobra.Command{ SilenceUsage: true, RunE: func(cmd *cobra.Command, args []string) error { if needsInit { - return needsInitError + return errNeedsInitError } if len(args) == 0 { @@ -158,7 +158,7 @@ to either cancel the workflow or wait for it to finish. SilenceUsage: true, RunE: func(cmd *cobra.Command, args []string) error { if needsInit { - return needsInitError + return errNeedsInitError } if len(args) == 0 { diff --git a/params/params.go b/params/params.go index b2807db0..b0a07450 100644 --- a/params/params.go +++ b/params/params.go @@ -151,29 +151,32 @@ type Internal struct { } type Repository struct { - ID string `json:"id"` - Owner string `json:"owner"` - Name string `json:"name"` - Pools []Pool `json:"pool,omitempty"` - CredentialsName string `json:"credentials_name"` + ID string `json:"id"` + Owner string `json:"owner"` + Name string `json:"name"` + Pools []Pool `json:"pool,omitempty"` + CredentialsName string `json:"credentials_name"` + PoolManagerStatus PoolManagerStatus `json:"pool_manager_status,omitempty"` // Do not serialize sensitive info. WebhookSecret string `json:"-"` } type Organization struct { - ID string `json:"id"` - Name string `json:"name"` - Pools []Pool `json:"pool,omitempty"` - CredentialsName string `json:"credentials_name"` + ID string `json:"id"` + Name string `json:"name"` + Pools []Pool `json:"pool,omitempty"` + CredentialsName string `json:"credentials_name"` + PoolManagerStatus PoolManagerStatus `json:"pool_manager_status,omitempty"` // Do not serialize sensitive info. WebhookSecret string `json:"-"` } type Enterprise struct { - ID string `json:"id"` - Name string `json:"name"` - Pools []Pool `json:"pool,omitempty"` - CredentialsName string `json:"credentials_name"` + ID string `json:"id"` + Name string `json:"name"` + Pools []Pool `json:"pool,omitempty"` + CredentialsName string `json:"credentials_name"` + PoolManagerStatus PoolManagerStatus `json:"pool_manager_status,omitempty"` // Do not serialize sensitive info. WebhookSecret string `json:"-"` } @@ -219,3 +222,8 @@ type Provider struct { type UpdatePoolStateParams struct { WebhookSecret string } + +type PoolManagerStatus struct { + IsRunning bool `json:"running"` + FailureReason string `json:"failure_reason,omitempty"` +} diff --git a/runner/common/mocks/PoolManager.go b/runner/common/mocks/PoolManager.go index be95d2cb..4d55c339 100644 --- a/runner/common/mocks/PoolManager.go +++ b/runner/common/mocks/PoolManager.go @@ -83,6 +83,20 @@ func (_m *PoolManager) Start() error { return r0 } +// Status provides a mock function with given fields: +func (_m *PoolManager) Status() params.PoolManagerStatus { + ret := _m.Called() + + var r0 params.PoolManagerStatus + if rf, ok := ret.Get(0).(func() params.PoolManagerStatus); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(params.PoolManagerStatus) + } + + return r0 +} + // Stop provides a mock function with given fields: func (_m *PoolManager) Stop() error { ret := _m.Called() diff --git a/runner/common/pool.go b/runner/common/pool.go index 73dc54cd..113125e9 100644 --- a/runner/common/pool.go +++ b/runner/common/pool.go @@ -31,6 +31,12 @@ const ( // Set this to 15 minutes. This should allow enough time even on slow // clouds for the instance to spin up, download the tools and join gh. PoolToolUpdateInterval = 15 * time.Minute + + // UnauthorizedBackoffTimer is the time we wait before making another request + // after getting an unauthorized error from github. It is unlikely that a second + // request will not receive the same error, unless the config is changed with new + // credentials and garm is restarted. + UnauthorizedBackoffTimer = 3 * time.Hour ) //go:generate mockery --all @@ -45,5 +51,6 @@ type PoolManager interface { // PoolManager lifecycle functions. Start/stop pool. Start() error Stop() error + Status() params.PoolManagerStatus Wait() error } diff --git a/runner/enterprises.go b/runner/enterprises.go index 7750b91e..4fdd4b0c 100644 --- a/runner/enterprises.go +++ b/runner/enterprises.go @@ -2,6 +2,7 @@ package runner import ( "context" + "fmt" "garm/auth" "garm/config" runnerErrors "garm/errors" @@ -72,7 +73,20 @@ func (r *Runner) ListEnterprises(ctx context.Context) ([]params.Enterprise, erro return nil, errors.Wrap(err, "listing enterprises") } - return enterprises, nil + var allEnterprises []params.Enterprise + + for _, enterprise := range enterprises { + poolMgr, err := r.poolManagerCtrl.GetEnterprisePoolManager(enterprise) + if err != nil { + enterprise.PoolManagerStatus.IsRunning = false + enterprise.PoolManagerStatus.FailureReason = fmt.Sprintf("failed to get pool manager: %q", err) + } else { + enterprise.PoolManagerStatus = poolMgr.Status() + } + allEnterprises = append(allEnterprises, enterprise) + } + + return allEnterprises, nil } func (r *Runner) GetEnterpriseByID(ctx context.Context, enterpriseID string) (params.Enterprise, error) { @@ -84,6 +98,12 @@ func (r *Runner) GetEnterpriseByID(ctx context.Context, enterpriseID string) (pa if err != nil { return params.Enterprise{}, errors.Wrap(err, "fetching enterprise") } + poolMgr, err := r.poolManagerCtrl.GetEnterprisePoolManager(enterprise) + if err != nil { + enterprise.PoolManagerStatus.IsRunning = false + enterprise.PoolManagerStatus.FailureReason = fmt.Sprintf("failed to get pool manager: %q", err) + } + enterprise.PoolManagerStatus = poolMgr.Status() return enterprise, nil } diff --git a/runner/organizations.go b/runner/organizations.go index e9f7e264..eef1e65f 100644 --- a/runner/organizations.go +++ b/runner/organizations.go @@ -16,6 +16,7 @@ package runner import ( "context" + "fmt" "log" "strings" @@ -85,7 +86,21 @@ func (r *Runner) ListOrganizations(ctx context.Context) ([]params.Organization, return nil, errors.Wrap(err, "listing organizations") } - return orgs, nil + var allOrgs []params.Organization + + for _, org := range orgs { + poolMgr, err := r.poolManagerCtrl.GetOrgPoolManager(org) + if err != nil { + org.PoolManagerStatus.IsRunning = false + org.PoolManagerStatus.FailureReason = fmt.Sprintf("failed to get pool manager: %q", err) + } else { + org.PoolManagerStatus = poolMgr.Status() + } + + allOrgs = append(allOrgs, org) + } + + return allOrgs, nil } func (r *Runner) GetOrganizationByID(ctx context.Context, orgID string) (params.Organization, error) { @@ -97,6 +112,13 @@ func (r *Runner) GetOrganizationByID(ctx context.Context, orgID string) (params. if err != nil { return params.Organization{}, errors.Wrap(err, "fetching organization") } + + poolMgr, err := r.poolManagerCtrl.GetOrgPoolManager(org) + if err != nil { + org.PoolManagerStatus.IsRunning = false + org.PoolManagerStatus.FailureReason = fmt.Sprintf("failed to get pool manager: %q", err) + } + org.PoolManagerStatus = poolMgr.Status() return org, nil } diff --git a/runner/organizations_test.go b/runner/organizations_test.go index 6a28d190..e87cc255 100644 --- a/runner/organizations_test.go +++ b/runner/organizations_test.go @@ -262,6 +262,8 @@ func (s *OrgTestSuite) TestCreateOrganizationStartPoolMgrFailed() { } func (s *OrgTestSuite) TestListOrganizations() { + s.Fixtures.PoolMgrCtrlMock.On("GetOrgPoolManager", mock.AnythingOfType("params.Organization")).Return(s.Fixtures.PoolMgrMock, nil) + s.Fixtures.PoolMgrMock.On("Status").Return(params.PoolManagerStatus{IsRunning: true}, nil) orgs, err := s.Runner.ListOrganizations(s.Fixtures.AdminContext) s.Require().Nil(err) @@ -275,6 +277,8 @@ func (s *OrgTestSuite) TestListOrganizationsErrUnauthorized() { } func (s *OrgTestSuite) TestGetOrganizationByID() { + s.Fixtures.PoolMgrCtrlMock.On("GetOrgPoolManager", mock.AnythingOfType("params.Organization")).Return(s.Fixtures.PoolMgrMock, nil) + s.Fixtures.PoolMgrMock.On("Status").Return(params.PoolManagerStatus{IsRunning: true}, nil) org, err := s.Runner.GetOrganizationByID(s.Fixtures.AdminContext, s.Fixtures.StoreOrgs["test-org-1"].ID) s.Require().Nil(err) diff --git a/runner/pool/enterprise.go b/runner/pool/enterprise.go index c2de3cd8..6bf8f789 100644 --- a/runner/pool/enterprise.go +++ b/runner/pool/enterprise.go @@ -3,6 +3,7 @@ package pool import ( "context" "fmt" + "net/http" "strings" "sync" @@ -35,7 +36,7 @@ func NewEnterprisePoolManager(ctx context.Context, cfg params.Enterprise, cfgInt store: store, } - repo := &basePool{ + repo := &basePoolManager{ ctx: ctx, store: store, providers: providers, @@ -99,6 +100,9 @@ func (r *enterprise) GetGithubRunners() ([]*github.Runner, error) { for { runners, ghResp, err := r.ghcEnterpriseCli.ListRunners(r.ctx, r.cfg.Name, &opts) if err != nil { + if ghResp.StatusCode == http.StatusUnauthorized { + return nil, errors.Wrap(runnerErrors.ErrUnauthorized, "fetching runners") + } return nil, errors.Wrap(err, "fetching runners") } allRunners = append(allRunners, runners.Runners...) @@ -113,8 +117,11 @@ func (r *enterprise) GetGithubRunners() ([]*github.Runner, error) { func (r *enterprise) FetchTools() ([]*github.RunnerApplicationDownload, error) { r.mux.Lock() defer r.mux.Unlock() - tools, _, err := r.ghcEnterpriseCli.ListRunnerApplicationDownloads(r.ctx, r.cfg.Name) + tools, ghResp, err := r.ghcEnterpriseCli.ListRunnerApplicationDownloads(r.ctx, r.cfg.Name) if err != nil { + if ghResp.StatusCode == http.StatusUnauthorized { + return nil, errors.Wrap(runnerErrors.ErrUnauthorized, "fetching runners") + } return nil, errors.Wrap(err, "fetching runner tools") } diff --git a/runner/pool/organization.go b/runner/pool/organization.go index 3ab36c62..e3cd2ebd 100644 --- a/runner/pool/organization.go +++ b/runner/pool/organization.go @@ -17,6 +17,7 @@ package pool import ( "context" "fmt" + "net/http" "strings" "sync" @@ -48,7 +49,7 @@ func NewOrganizationPoolManager(ctx context.Context, cfg params.Organization, cf store: store, } - repo := &basePool{ + repo := &basePoolManager{ ctx: ctx, store: store, providers: providers, @@ -110,6 +111,9 @@ func (r *organization) GetGithubRunners() ([]*github.Runner, error) { for { runners, ghResp, err := r.ghcli.ListOrganizationRunners(r.ctx, r.cfg.Name, &opts) if err != nil { + if ghResp.StatusCode == http.StatusUnauthorized { + return nil, errors.Wrap(runnerErrors.ErrUnauthorized, "fetching runners") + } return nil, errors.Wrap(err, "fetching runners") } allRunners = append(allRunners, runners.Runners...) @@ -125,8 +129,11 @@ func (r *organization) GetGithubRunners() ([]*github.Runner, error) { func (r *organization) FetchTools() ([]*github.RunnerApplicationDownload, error) { r.mux.Lock() defer r.mux.Unlock() - tools, _, err := r.ghcli.ListOrganizationRunnerApplicationDownloads(r.ctx, r.cfg.Name) + tools, ghResp, err := r.ghcli.ListOrganizationRunnerApplicationDownloads(r.ctx, r.cfg.Name) if err != nil { + if ghResp.StatusCode == http.StatusUnauthorized { + return nil, errors.Wrap(runnerErrors.ErrUnauthorized, "fetching tools") + } return nil, errors.Wrap(err, "fetching runner tools") } diff --git a/runner/pool/pool.go b/runner/pool/pool.go index 8898d51d..385908b0 100644 --- a/runner/pool/pool.go +++ b/runner/pool/pool.go @@ -47,7 +47,7 @@ const ( maxCreateAttempts = 5 ) -type basePool struct { +type basePoolManager struct { ctx context.Context controllerID string @@ -61,6 +61,9 @@ type basePool struct { helper poolHelper credsDetails params.GithubCredentials + managerIsRunning bool + managerErrorReason string + mux sync.Mutex } @@ -81,7 +84,7 @@ func controllerIDFromLabels(labels []*github.RunnerLabels) string { // happens, github will remove the ephemeral worker and send a webhook our way. // If we were offline and did not process the webhook, the instance will linger. // We need to remove it from the provider and database. -func (r *basePool) cleanupOrphanedProviderRunners(runners []*github.Runner) error { +func (r *basePoolManager) cleanupOrphanedProviderRunners(runners []*github.Runner) error { dbInstances, err := r.helper.FetchDbInstances() if err != nil { return errors.Wrap(err, "fetching instances from db") @@ -116,7 +119,7 @@ func (r *basePool) cleanupOrphanedProviderRunners(runners []*github.Runner) erro // reapTimedOutRunners will mark as pending_delete any runner that has a status // of "running" in the provider, but that has not registered with Github, and has // received no new updates in the configured timeout interval. -func (r *basePool) reapTimedOutRunners(runners []*github.Runner) error { +func (r *basePoolManager) reapTimedOutRunners(runners []*github.Runner) error { dbInstances, err := r.helper.FetchDbInstances() if err != nil { return errors.Wrap(err, "fetching instances from db") @@ -152,7 +155,7 @@ func (r *basePool) reapTimedOutRunners(runners []*github.Runner) error { // as offline and for which we no longer have a local instance. // This may happen if someone manually deletes the instance in the provider. We need to // first remove the instance from github, and then from our database. -func (r *basePool) cleanupOrphanedGithubRunners(runners []*github.Runner) error { +func (r *basePoolManager) cleanupOrphanedGithubRunners(runners []*github.Runner) error { for _, runner := range runners { runnerControllerID := controllerIDFromLabels(runner.Labels) if runnerControllerID != r.controllerID { @@ -243,7 +246,7 @@ func (r *basePool) cleanupOrphanedGithubRunners(runners []*github.Runner) error return nil } -func (r *basePool) fetchInstance(runnerName string) (params.Instance, error) { +func (r *basePoolManager) fetchInstance(runnerName string) (params.Instance, error) { runner, err := r.store.GetInstanceByName(r.ctx, runnerName) if err != nil { return params.Instance{}, errors.Wrap(err, "fetching instance") @@ -252,7 +255,7 @@ func (r *basePool) fetchInstance(runnerName string) (params.Instance, error) { return runner, nil } -func (r *basePool) setInstanceRunnerStatus(runnerName string, status providerCommon.RunnerStatus) error { +func (r *basePoolManager) setInstanceRunnerStatus(runnerName string, status providerCommon.RunnerStatus) error { updateParams := params.UpdateInstanceParams{ RunnerStatus: status, } @@ -263,7 +266,7 @@ func (r *basePool) setInstanceRunnerStatus(runnerName string, status providerCom return nil } -func (r *basePool) updateInstance(runnerName string, update params.UpdateInstanceParams) error { +func (r *basePoolManager) updateInstance(runnerName string, update params.UpdateInstanceParams) error { runner, err := r.fetchInstance(runnerName) if err != nil { return errors.Wrap(err, "fetching instance") @@ -275,7 +278,7 @@ func (r *basePool) updateInstance(runnerName string, update params.UpdateInstanc return nil } -func (r *basePool) setInstanceStatus(runnerName string, status providerCommon.InstanceStatus, providerFault []byte) error { +func (r *basePoolManager) setInstanceStatus(runnerName string, status providerCommon.InstanceStatus, providerFault []byte) error { updateParams := params.UpdateInstanceParams{ Status: status, ProviderFault: providerFault, @@ -287,7 +290,7 @@ func (r *basePool) setInstanceStatus(runnerName string, status providerCommon.In return nil } -func (r *basePool) acquireNewInstance(job params.WorkflowJob) error { +func (r *basePoolManager) acquireNewInstance(job params.WorkflowJob) error { requestedLabels := job.WorkflowJob.Labels if len(requestedLabels) == 0 { // no labels were requested. @@ -322,7 +325,7 @@ func (r *basePool) acquireNewInstance(job params.WorkflowJob) error { return nil } -func (r *basePool) AddRunner(ctx context.Context, poolID string) error { +func (r *basePoolManager) AddRunner(ctx context.Context, poolID string) error { pool, err := r.helper.GetPoolByID(poolID) if err != nil { return errors.Wrap(err, "fetching pool") @@ -348,7 +351,7 @@ func (r *basePool) AddRunner(ctx context.Context, poolID string) error { return nil } -func (r *basePool) loop() { +func (r *basePoolManager) loop() { consolidateTimer := time.NewTicker(common.PoolConsilitationInterval) reapTimer := time.NewTicker(common.PoolReapTimeoutInterval) toolUpdateTimer := time.NewTicker(common.PoolToolUpdateInterval) @@ -360,7 +363,8 @@ func (r *basePool) loop() { close(r.done) }() log.Printf("starting loop for %s", r.helper.String()) - // TODO: Consolidate runners on loop start. Provider runners must match runners + + // Consolidate runners on loop start. Provider runners must match runners // in github and DB. When a Workflow job is received, we will first create/update // an entity in the database, before sending the request to the provider to create/delete // an instance. If a "queued" job is received, we create an entity in the db with @@ -370,45 +374,118 @@ func (r *basePool) loop() { // in the database. // We also ensure we have runners created based on pool characteristics. This is where // we spin up "MinWorkers" for each runner type. - for { - select { - case <-reapTimer.C: - runners, err := r.helper.GetGithubRunners() - if err != nil { - log.Printf("error fetching github runners: %s", err) - continue - } - if err := r.reapTimedOutRunners(runners); err != nil { - log.Printf("failed to reap timed out runners: %q", err) - } + switch r.managerIsRunning { + case true: + select { + case <-reapTimer.C: + runners, err := r.helper.GetGithubRunners() + if err != nil { + failureReason := fmt.Sprintf("error fetching github runners for %s: %s", r.helper.String(), err) + r.setPoolRunningState(false, failureReason) + log.Print(failureReason) + if errors.Is(err, runnerErrors.ErrUnauthorized) { + r.waitForTimeoutOrCanceled(common.UnauthorizedBackoffTimer) + } else { + r.waitForTimeoutOrCanceled(60 * time.Second) + } + continue + } + if err := r.reapTimedOutRunners(runners); err != nil { + log.Printf("failed to reap timed out runners: %q", err) + } - if err := r.cleanupOrphanedGithubRunners(runners); err != nil { - log.Printf("failed to clean orphaned github runners: %q", err) + if err := r.cleanupOrphanedGithubRunners(runners); err != nil { + log.Printf("failed to clean orphaned github runners: %q", err) + } + case <-consolidateTimer.C: + // consolidate. + r.consolidate() + case <-toolUpdateTimer.C: + // Update tools cache. + tools, err := r.helper.FetchTools() + if err != nil { + failureReason := fmt.Sprintf("failed to update tools for repo %s: %s", r.helper.String(), err) + r.setPoolRunningState(false, failureReason) + log.Print(failureReason) + if errors.Is(err, runnerErrors.ErrUnauthorized) { + r.waitForTimeoutOrCanceled(common.UnauthorizedBackoffTimer) + } else { + r.waitForTimeoutOrCanceled(60 * time.Second) + } + continue + } + r.mux.Lock() + r.tools = tools + r.mux.Unlock() + case <-r.ctx.Done(): + // daemon is shutting down. + return + case <-r.quit: + // this worker was stopped. + return } - case <-consolidateTimer.C: - // consolidate. - r.consolidate() - case <-toolUpdateTimer.C: - // Update tools cache. + default: + log.Printf("attempting to start pool manager for %s", r.helper.String()) tools, err := r.helper.FetchTools() + var failureReason string if err != nil { - log.Printf("failed to update tools for repo %s: %s", r.helper.String(), err) + failureReason = fmt.Sprintf("failed to fetch tools from github for %s: %q", r.helper.String(), err) + r.setPoolRunningState(false, failureReason) + log.Print(failureReason) + if errors.Is(err, runnerErrors.ErrUnauthorized) { + r.waitForTimeoutOrCanceled(common.UnauthorizedBackoffTimer) + } else { + r.waitForTimeoutOrCanceled(60 * time.Second) + } + continue } r.mux.Lock() r.tools = tools r.mux.Unlock() - case <-r.ctx.Done(): - // daemon is shutting down. - return - case <-r.quit: - // this worker was stopped. - return + + if err := r.runnerCleanup(); err != nil { + failureReason = fmt.Sprintf("failed to clean runners for %s: %q", r.helper.String(), err) + r.setPoolRunningState(false, failureReason) + log.Print(failureReason) + if errors.Is(err, runnerErrors.ErrUnauthorized) { + r.waitForTimeoutOrCanceled(common.UnauthorizedBackoffTimer) + } else { + r.waitForTimeoutOrCanceled(60 * time.Second) + } + continue + } + r.setPoolRunningState(true, "") } } } -func (r *basePool) addInstanceToProvider(instance params.Instance) error { +func (r *basePoolManager) Status() params.PoolManagerStatus { + r.mux.Lock() + defer r.mux.Unlock() + return params.PoolManagerStatus{ + IsRunning: r.managerIsRunning, + FailureReason: r.managerErrorReason, + } +} + +func (r *basePoolManager) waitForTimeoutOrCanceled(timeout time.Duration) { + log.Printf("sleeping for %.2f minutes", timeout.Minutes()) + select { + case <-time.After(timeout): + case <-r.ctx.Done(): + case <-r.quit: + } +} + +func (r *basePoolManager) setPoolRunningState(isRunning bool, failureReason string) { + r.mux.Lock() + r.managerErrorReason = failureReason + r.managerIsRunning = isRunning + r.mux.Unlock() +} + +func (r *basePoolManager) addInstanceToProvider(instance params.Instance) error { pool, err := r.helper.GetPoolByID(instance.PoolID) if err != nil { return errors.Wrap(err, "fetching pool") @@ -416,7 +493,7 @@ func (r *basePool) addInstanceToProvider(instance params.Instance) error { provider, ok := r.providers[pool.ProviderName] if !ok { - return runnerErrors.NewNotFoundError("invalid provider ID") + return fmt.Errorf("unknown provider %s for pool %s", pool.ProviderName, pool.ID) } labels := []string{} @@ -490,7 +567,7 @@ func (r *basePool) addInstanceToProvider(instance params.Instance) error { return nil } -func (r *basePool) getRunnerNameFromJob(job params.WorkflowJob) (string, error) { +func (r *basePoolManager) getRunnerNameFromJob(job params.WorkflowJob) (string, error) { if job.WorkflowJob.RunnerName != "" { return job.WorkflowJob.RunnerName, nil } @@ -506,7 +583,7 @@ func (r *basePool) getRunnerNameFromJob(job params.WorkflowJob) (string, error) return runnerName, nil } -func (r *basePool) HandleWorkflowJob(job params.WorkflowJob) error { +func (r *basePoolManager) HandleWorkflowJob(job params.WorkflowJob) error { if err := r.helper.ValidateOwner(job); err != nil { return errors.Wrap(err, "validating owner") } @@ -559,15 +636,15 @@ func (r *basePool) HandleWorkflowJob(job params.WorkflowJob) error { return nil } -func (r *basePool) poolLabel(poolID string) string { +func (r *basePoolManager) poolLabel(poolID string) string { return fmt.Sprintf("%s%s", poolIDLabelprefix, poolID) } -func (r *basePool) controllerLabel() string { +func (r *basePoolManager) controllerLabel() string { return fmt.Sprintf("%s%s", controllerLabelPrefix, r.controllerID) } -func (r *basePool) updateArgsFromProviderInstance(providerInstance params.Instance) params.UpdateInstanceParams { +func (r *basePoolManager) updateArgsFromProviderInstance(providerInstance params.Instance) params.UpdateInstanceParams { return params.UpdateInstanceParams{ ProviderID: providerInstance.ProviderID, OSName: providerInstance.OSName, @@ -579,7 +656,7 @@ func (r *basePool) updateArgsFromProviderInstance(providerInstance params.Instan } } -func (r *basePool) ensureIdleRunnersForOnePool(pool params.Pool) { +func (r *basePoolManager) ensureIdleRunnersForOnePool(pool params.Pool) { if !pool.Enabled { return } @@ -622,7 +699,7 @@ func (r *basePool) ensureIdleRunnersForOnePool(pool params.Pool) { } } -func (r *basePool) retryFailedInstancesForOnePool(pool params.Pool) { +func (r *basePoolManager) retryFailedInstancesForOnePool(pool params.Pool) { if !pool.Enabled { return } @@ -664,7 +741,7 @@ func (r *basePool) retryFailedInstancesForOnePool(pool params.Pool) { } } -func (r *basePool) retryFailedInstances() { +func (r *basePoolManager) retryFailedInstances() { pools, err := r.helper.ListPools() if err != nil { log.Printf("error listing pools: %s", err) @@ -681,7 +758,7 @@ func (r *basePool) retryFailedInstances() { wg.Wait() } -func (r *basePool) ensureMinIdleRunners() { +func (r *basePoolManager) ensureMinIdleRunners() { pools, err := r.helper.ListPools() if err != nil { log.Printf("error listing pools: %s", err) @@ -698,7 +775,7 @@ func (r *basePool) ensureMinIdleRunners() { wg.Wait() } -func (r *basePool) deleteInstanceFromProvider(instance params.Instance) error { +func (r *basePoolManager) deleteInstanceFromProvider(instance params.Instance) error { pool, err := r.helper.GetPoolByID(instance.PoolID) if err != nil { return errors.Wrap(err, "fetching pool") @@ -706,7 +783,7 @@ func (r *basePool) deleteInstanceFromProvider(instance params.Instance) error { provider, ok := r.providers[pool.ProviderName] if !ok { - return runnerErrors.NewNotFoundError("invalid provider ID") + return fmt.Errorf("unknown provider %s for pool %s", pool.ProviderName, pool.ID) } identifier := instance.ProviderID @@ -726,7 +803,7 @@ func (r *basePool) deleteInstanceFromProvider(instance params.Instance) error { return nil } -func (r *basePool) deletePendingInstances() { +func (r *basePoolManager) deletePendingInstances() { instances, err := r.helper.FetchDbInstances() if err != nil { log.Printf("failed to fetch instances from store: %s", err) @@ -764,7 +841,7 @@ func (r *basePool) deletePendingInstances() { } } -func (r *basePool) addPendingInstances() { +func (r *basePoolManager) addPendingInstances() { // TODO: filter instances by status. instances, err := r.helper.FetchDbInstances() if err != nil { @@ -796,7 +873,7 @@ func (r *basePool) addPendingInstances() { } } -func (r *basePool) consolidate() { +func (r *basePoolManager) consolidate() { // TODO(gabriel-samfira): replace this with something more efficient. r.mux.Lock() defer r.mux.Unlock() @@ -826,7 +903,7 @@ func (r *basePool) consolidate() { wg.Wait() } -func (r *basePool) Wait() error { +func (r *basePoolManager) Wait() error { select { case <-r.done: case <-time.After(20 * time.Second): @@ -835,15 +912,7 @@ func (r *basePool) Wait() error { return nil } -func (r *basePool) Start() error { - tools, err := r.helper.FetchTools() - if err != nil { - return errors.Wrap(err, "initializing tools") - } - r.mux.Lock() - r.tools = tools - r.mux.Unlock() - +func (r *basePoolManager) runnerCleanup() error { runners, err := r.helper.GetGithubRunners() if err != nil { return errors.Wrap(err, "fetching github runners") @@ -855,28 +924,35 @@ func (r *basePool) Start() error { if err := r.cleanupOrphanedGithubRunners(runners); err != nil { return errors.Wrap(err, "cleaning orphaned github runners") } + return nil +} + +func (r *basePoolManager) Start() error { go r.loop() return nil } -func (r *basePool) Stop() error { +func (r *basePoolManager) Stop() error { close(r.quit) return nil } -func (r *basePool) RefreshState(param params.UpdatePoolStateParams) error { +func (r *basePoolManager) RefreshState(param params.UpdatePoolStateParams) error { return r.helper.UpdateState(param) } -func (r *basePool) WebhookSecret() string { +func (r *basePoolManager) WebhookSecret() string { return r.helper.WebhookSecret() } -func (r *basePool) ID() string { +func (r *basePoolManager) ID() string { return r.helper.ID() } -func (r *basePool) ForceDeleteRunner(runner params.Instance) error { +func (r *basePoolManager) ForceDeleteRunner(runner params.Instance) error { + if !r.managerIsRunning { + return runnerErrors.NewConflictError("pool manager is not running for %s", r.helper.String()) + } if runner.AgentID != 0 { resp, err := r.helper.RemoveGithubRunner(runner.AgentID) if err != nil { diff --git a/runner/pool/repository.go b/runner/pool/repository.go index 864bcab3..597bbf05 100644 --- a/runner/pool/repository.go +++ b/runner/pool/repository.go @@ -17,6 +17,7 @@ package pool import ( "context" "fmt" + "net/http" "strings" "sync" @@ -48,7 +49,7 @@ func NewRepositoryPoolManager(ctx context.Context, cfg params.Repository, cfgInt store: store, } - repo := &basePool{ + repo := &basePoolManager{ ctx: ctx, store: store, providers: providers, @@ -112,6 +113,9 @@ func (r *repository) GetGithubRunners() ([]*github.Runner, error) { for { runners, ghResp, err := r.ghcli.ListRunners(r.ctx, r.cfg.Owner, r.cfg.Name, &opts) if err != nil { + if ghResp.StatusCode == http.StatusUnauthorized { + return nil, errors.Wrap(runnerErrors.ErrUnauthorized, "fetching runners") + } return nil, errors.Wrap(err, "fetching runners") } allRunners = append(allRunners, runners.Runners...) @@ -127,8 +131,11 @@ func (r *repository) GetGithubRunners() ([]*github.Runner, error) { func (r *repository) FetchTools() ([]*github.RunnerApplicationDownload, error) { r.mux.Lock() defer r.mux.Unlock() - tools, _, err := r.ghcli.ListRunnerApplicationDownloads(r.ctx, r.cfg.Owner, r.cfg.Name) + tools, ghResp, err := r.ghcli.ListRunnerApplicationDownloads(r.ctx, r.cfg.Owner, r.cfg.Name) if err != nil { + if ghResp.StatusCode == http.StatusUnauthorized { + return nil, errors.Wrap(runnerErrors.ErrUnauthorized, "fetching tools") + } return nil, errors.Wrap(err, "fetching runner tools") } diff --git a/runner/repositories.go b/runner/repositories.go index 3cc11aad..f9a33137 100644 --- a/runner/repositories.go +++ b/runner/repositories.go @@ -16,6 +16,7 @@ package runner import ( "context" + "fmt" "log" "strings" @@ -85,7 +86,20 @@ func (r *Runner) ListRepositories(ctx context.Context) ([]params.Repository, err return nil, errors.Wrap(err, "listing repositories") } - return repos, nil + var allRepos []params.Repository + + for _, repo := range repos { + poolMgr, err := r.poolManagerCtrl.GetRepoPoolManager(repo) + if err != nil { + repo.PoolManagerStatus.IsRunning = false + repo.PoolManagerStatus.FailureReason = fmt.Sprintf("failed to get pool manager: %q", err) + } else { + repo.PoolManagerStatus = poolMgr.Status() + } + allRepos = append(allRepos, repo) + } + + return allRepos, nil } func (r *Runner) GetRepositoryByID(ctx context.Context, repoID string) (params.Repository, error) { @@ -97,6 +111,13 @@ func (r *Runner) GetRepositoryByID(ctx context.Context, repoID string) (params.R if err != nil { return params.Repository{}, errors.Wrap(err, "fetching repository") } + + poolMgr, err := r.poolManagerCtrl.GetRepoPoolManager(repo) + if err != nil { + repo.PoolManagerStatus.IsRunning = false + repo.PoolManagerStatus.FailureReason = fmt.Sprintf("failed to get pool manager: %q", err) + } + repo.PoolManagerStatus = poolMgr.Status() return repo, nil } diff --git a/runner/repositories_test.go b/runner/repositories_test.go index 93eaa8ba..2466d006 100644 --- a/runner/repositories_test.go +++ b/runner/repositories_test.go @@ -265,6 +265,8 @@ func (s *RepoTestSuite) TestCreateRepositoryStartPoolMgrFailed() { } func (s *RepoTestSuite) TestListRepositories() { + s.Fixtures.PoolMgrCtrlMock.On("GetRepoPoolManager", mock.AnythingOfType("params.Repository")).Return(s.Fixtures.PoolMgrMock, nil) + s.Fixtures.PoolMgrMock.On("Status").Return(params.PoolManagerStatus{IsRunning: true}, nil) repos, err := s.Runner.ListRepositories(s.Fixtures.AdminContext) s.Require().Nil(err) @@ -278,6 +280,8 @@ func (s *RepoTestSuite) TestListRepositoriesErrUnauthorized() { } func (s *RepoTestSuite) TestGetRepositoryByID() { + s.Fixtures.PoolMgrCtrlMock.On("GetRepoPoolManager", mock.AnythingOfType("params.Repository")).Return(s.Fixtures.PoolMgrMock, nil) + s.Fixtures.PoolMgrMock.On("Status").Return(params.PoolManagerStatus{IsRunning: true}, nil) repo, err := s.Runner.GetRepositoryByID(s.Fixtures.AdminContext, s.Fixtures.StoreRepos["test-repo-1"].ID) s.Require().Nil(err) diff --git a/runner/runner.go b/runner/runner.go index 42b33119..3c7b00ba 100644 --- a/runner/runner.go +++ b/runner/runner.go @@ -421,19 +421,49 @@ func (r *Runner) Stop() error { if err != nil { return errors.Wrap(err, "fetch repo pool managers") } - for _, repo := range repos { - if err := repo.Stop(); err != nil { - return errors.Wrap(err, "stopping repo pool manager") - } - } orgs, err := r.poolManagerCtrl.GetOrgPoolManagers() if err != nil { return errors.Wrap(err, "fetch org pool managers") } + + enterprises, err := r.poolManagerCtrl.GetEnterprisePoolManagers() + if err != nil { + return errors.Wrap(err, "fetch enterprise pool managers") + } + + expectedReplies := len(repos) + len(orgs) + len(enterprises) + errChan := make(chan error, expectedReplies) + + for _, repo := range repos { + go func(poolMgr common.PoolManager) { + err := poolMgr.Stop() + errChan <- err + }(repo) + } + for _, org := range orgs { - if err := org.Stop(); err != nil { - return errors.Wrap(err, "stopping org pool manager") + go func(poolMgr common.PoolManager) { + err := poolMgr.Stop() + errChan <- err + }(org) + } + + for _, enterprise := range enterprises { + go func(poolMgr common.PoolManager) { + err := poolMgr.Stop() + errChan <- err + }(enterprise) + } + + for i := 0; i < expectedReplies; i++ { + select { + case err := <-errChan: + if err != nil { + return errors.Wrap(err, "stopping pool manager") + } + case <-time.After(60 * time.Second): + return fmt.Errorf("timed out waiting for pool mamager stop") } } return nil @@ -449,6 +479,17 @@ func (r *Runner) Wait() error { if err != nil { return errors.Wrap(err, "fetch repo pool managers") } + + orgs, err := r.poolManagerCtrl.GetOrgPoolManagers() + if err != nil { + return errors.Wrap(err, "fetch org pool managers") + } + + enterprises, err := r.poolManagerCtrl.GetEnterprisePoolManagers() + if err != nil { + return errors.Wrap(err, "fetch enterprise pool managers") + } + for poolId, repo := range repos { wg.Add(1) go func(id string, poolMgr common.PoolManager) { @@ -459,10 +500,6 @@ func (r *Runner) Wait() error { }(poolId, repo) } - orgs, err := r.poolManagerCtrl.GetOrgPoolManagers() - if err != nil { - return errors.Wrap(err, "fetch org pool managers") - } for poolId, org := range orgs { wg.Add(1) go func(id string, poolMgr common.PoolManager) { @@ -472,6 +509,17 @@ func (r *Runner) Wait() error { } }(poolId, org) } + + for poolId, enterprise := range enterprises { + wg.Add(1) + go func(id string, poolMgr common.PoolManager) { + defer wg.Done() + if err := poolMgr.Wait(); err != nil { + log.Printf("timed out waiting for pool manager %s to exit", id) + } + }(poolId, enterprise) + } + wg.Wait() return nil }