From d5f5524934823043e0502afd9c6c079421dff466 Mon Sep 17 00:00:00 2001 From: Gabriel Adrian Samfira Date: Fri, 30 Dec 2022 15:07:40 +0000 Subject: [PATCH 1/2] Wait for loop exit and some fixes * Wait for http server graceful shutdown and for pool managers to properly exit. * Fix potential nil pointer dereference when checking response code from github API. --- cmd/garm/main.go | 21 +++++++++++++++++++-- runner/pool/enterprise.go | 8 ++++---- runner/pool/organization.go | 8 ++++---- runner/pool/pool.go | 2 +- runner/pool/repository.go | 8 ++++---- runner/runner.go | 15 +++++++++++++++ 6 files changed, 47 insertions(+), 15 deletions(-) diff --git a/cmd/garm/main.go b/cmd/garm/main.go index 255fc5e6..b6ee0884 100644 --- a/cmd/garm/main.go +++ b/cmd/garm/main.go @@ -22,7 +22,9 @@ import ( "log" "net" "net/http" + "os" "os/signal" + "time" "garm/apiserver/controllers" "garm/apiserver/routers" @@ -159,10 +161,25 @@ func main() { } go func() { - if err := srv.Serve(listener); err != nil { - log.Fatalf("Listening: %+v", err) + if err := srv.Serve(listener); err != http.ErrServerClosed { + log.Printf("Listening: %+v", err) } }() <-ctx.Done() + shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 60*time.Second) + defer shutdownCancel() + if err := srv.Shutdown(shutdownCtx); err != nil { + log.Printf("graceful api server shutdown failed: %+v", err) + } + log.Printf("stopping runner loop") + if err := runner.Stop(); err != nil { + log.Printf("failed to shutdown workers: %+v", err) + os.Exit(1) + } + log.Printf("waiting for runner to stop") + if err := runner.Wait(); err != nil { + log.Printf("failed to shutdown workers: %+v", err) + os.Exit(1) + } } diff --git a/runner/pool/enterprise.go b/runner/pool/enterprise.go index 45931842..7ff82125 100644 --- a/runner/pool/enterprise.go +++ b/runner/pool/enterprise.go @@ -67,7 +67,7 @@ func (r *enterprise) GetRunnerInfoFromWorkflow(job params.WorkflowJob) (params.R } workflow, ghResp, err := r.ghcli.GetWorkflowJobByID(r.ctx, job.Repository.Owner.Login, job.Repository.Name, job.WorkflowJob.ID) if err != nil { - if ghResp.StatusCode == http.StatusUnauthorized { + if ghResp != nil && ghResp.StatusCode == http.StatusUnauthorized { return params.RunnerInfo{}, errors.Wrap(runnerErrors.ErrUnauthorized, "fetching workflow info") } return params.RunnerInfo{}, errors.Wrap(err, "fetching workflow info") @@ -110,7 +110,7 @@ func (r *enterprise) GetGithubRunners() ([]*github.Runner, error) { for { runners, ghResp, err := r.ghcEnterpriseCli.ListRunners(r.ctx, r.cfg.Name, &opts) if err != nil { - if ghResp.StatusCode == http.StatusUnauthorized { + if ghResp != nil && ghResp.StatusCode == http.StatusUnauthorized { return nil, errors.Wrap(runnerErrors.ErrUnauthorized, "fetching runners") } return nil, errors.Wrap(err, "fetching runners") @@ -129,7 +129,7 @@ func (r *enterprise) FetchTools() ([]*github.RunnerApplicationDownload, error) { defer r.mux.Unlock() tools, ghResp, err := r.ghcEnterpriseCli.ListRunnerApplicationDownloads(r.ctx, r.cfg.Name) if err != nil { - if ghResp.StatusCode == http.StatusUnauthorized { + if ghResp != nil && ghResp.StatusCode == http.StatusUnauthorized { return nil, errors.Wrap(runnerErrors.ErrUnauthorized, "fetching runners") } return nil, errors.Wrap(err, "fetching runner tools") @@ -166,7 +166,7 @@ func (r *enterprise) GetGithubRegistrationToken() (string, error) { tk, ghResp, err := r.ghcEnterpriseCli.CreateRegistrationToken(r.ctx, r.cfg.Name) if err != nil { - if ghResp.StatusCode == http.StatusUnauthorized { + if ghResp != nil && ghResp.StatusCode == http.StatusUnauthorized { return "", errors.Wrap(runnerErrors.ErrUnauthorized, "fetching registration token") } return "", errors.Wrap(err, "creating runner token") diff --git a/runner/pool/organization.go b/runner/pool/organization.go index 9371b4d5..68d69255 100644 --- a/runner/pool/organization.go +++ b/runner/pool/organization.go @@ -79,7 +79,7 @@ func (r *organization) GetRunnerInfoFromWorkflow(job params.WorkflowJob) (params } workflow, ghResp, err := r.ghcli.GetWorkflowJobByID(r.ctx, job.Organization.Login, job.Repository.Name, job.WorkflowJob.ID) if err != nil { - if ghResp.StatusCode == http.StatusUnauthorized { + if ghResp != nil && ghResp.StatusCode == http.StatusUnauthorized { return params.RunnerInfo{}, errors.Wrap(runnerErrors.ErrUnauthorized, "fetching workflow info") } return params.RunnerInfo{}, errors.Wrap(err, "fetching workflow info") @@ -121,7 +121,7 @@ func (r *organization) GetGithubRunners() ([]*github.Runner, error) { for { runners, ghResp, err := r.ghcli.ListOrganizationRunners(r.ctx, r.cfg.Name, &opts) if err != nil { - if ghResp.StatusCode == http.StatusUnauthorized { + if ghResp != nil && ghResp.StatusCode == http.StatusUnauthorized { return nil, errors.Wrap(runnerErrors.ErrUnauthorized, "fetching runners") } return nil, errors.Wrap(err, "fetching runners") @@ -141,7 +141,7 @@ func (r *organization) FetchTools() ([]*github.RunnerApplicationDownload, error) defer r.mux.Unlock() tools, ghResp, err := r.ghcli.ListOrganizationRunnerApplicationDownloads(r.ctx, r.cfg.Name) if err != nil { - if ghResp.StatusCode == http.StatusUnauthorized { + if ghResp != nil && ghResp.StatusCode == http.StatusUnauthorized { return nil, errors.Wrap(runnerErrors.ErrUnauthorized, "fetching tools") } return nil, errors.Wrap(err, "fetching runner tools") @@ -178,7 +178,7 @@ func (r *organization) GetGithubRegistrationToken() (string, error) { tk, ghResp, err := r.ghcli.CreateOrganizationRegistrationToken(r.ctx, r.cfg.Name) if err != nil { - if ghResp.StatusCode == http.StatusUnauthorized { + if ghResp != nil && ghResp.StatusCode == http.StatusUnauthorized { return "", errors.Wrap(runnerErrors.ErrUnauthorized, "fetching token") } diff --git a/runner/pool/pool.go b/runner/pool/pool.go index a6d4f7c4..bcae2fdd 100644 --- a/runner/pool/pool.go +++ b/runner/pool/pool.go @@ -989,7 +989,7 @@ func (r *basePoolManager) consolidate() { func (r *basePoolManager) Wait() error { select { case <-r.done: - case <-time.After(20 * time.Second): + case <-time.After(60 * time.Second): return errors.Wrap(runnerErrors.ErrTimeout, "waiting for pool to stop") } return nil diff --git a/runner/pool/repository.go b/runner/pool/repository.go index 9c69915a..34e7b07c 100644 --- a/runner/pool/repository.go +++ b/runner/pool/repository.go @@ -81,7 +81,7 @@ func (r *repository) GetRunnerInfoFromWorkflow(job params.WorkflowJob) (params.R } workflow, ghResp, err := r.ghcli.GetWorkflowJobByID(r.ctx, job.Repository.Owner.Login, job.Repository.Name, job.WorkflowJob.ID) if err != nil { - if ghResp.StatusCode == http.StatusUnauthorized { + if ghResp != nil && ghResp.StatusCode == http.StatusUnauthorized { return params.RunnerInfo{}, errors.Wrap(runnerErrors.ErrUnauthorized, "fetching workflow info") } return params.RunnerInfo{}, errors.Wrap(err, "fetching workflow info") @@ -123,7 +123,7 @@ func (r *repository) GetGithubRunners() ([]*github.Runner, error) { for { runners, ghResp, err := r.ghcli.ListRunners(r.ctx, r.cfg.Owner, r.cfg.Name, &opts) if err != nil { - if ghResp.StatusCode == http.StatusUnauthorized { + if ghResp != nil && ghResp.StatusCode == http.StatusUnauthorized { return nil, errors.Wrap(runnerErrors.ErrUnauthorized, "fetching runners") } return nil, errors.Wrap(err, "fetching runners") @@ -143,7 +143,7 @@ func (r *repository) FetchTools() ([]*github.RunnerApplicationDownload, error) { defer r.mux.Unlock() tools, ghResp, err := r.ghcli.ListRunnerApplicationDownloads(r.ctx, r.cfg.Owner, r.cfg.Name) if err != nil { - if ghResp.StatusCode == http.StatusUnauthorized { + if ghResp != nil && ghResp.StatusCode == http.StatusUnauthorized { return nil, errors.Wrap(runnerErrors.ErrUnauthorized, "fetching tools") } return nil, errors.Wrap(err, "fetching runner tools") @@ -180,7 +180,7 @@ func (r *repository) GetGithubRegistrationToken() (string, error) { tk, ghResp, err := r.ghcli.CreateRegistrationToken(r.ctx, r.cfg.Owner, r.cfg.Name) if err != nil { - if ghResp.StatusCode == http.StatusUnauthorized { + if ghResp != nil && ghResp.StatusCode == http.StatusUnauthorized { return "", errors.Wrap(runnerErrors.ErrUnauthorized, "fetching token") } return "", errors.Wrap(err, "creating runner token") diff --git a/runner/runner.go b/runner/runner.go index b550a2ea..8218656d 100644 --- a/runner/runner.go +++ b/runner/runner.go @@ -439,6 +439,11 @@ func (r *Runner) Stop() error { for _, repo := range repos { go func(poolMgr common.PoolManager) { err := poolMgr.Stop() + if err != nil { + errChan <- err + return + } + err = poolMgr.Wait() errChan <- err }(repo) } @@ -446,6 +451,11 @@ func (r *Runner) Stop() error { for _, org := range orgs { go func(poolMgr common.PoolManager) { err := poolMgr.Stop() + if err != nil { + errChan <- err + return + } + err = poolMgr.Wait() errChan <- err }(org) } @@ -453,6 +463,11 @@ func (r *Runner) Stop() error { for _, enterprise := range enterprises { go func(poolMgr common.PoolManager) { err := poolMgr.Stop() + if err != nil { + errChan <- err + return + } + err = poolMgr.Wait() errChan <- err }(enterprise) } From 72ec1bf68f4c2f6b0f61111a6e5d33db71ccf3fd Mon Sep 17 00:00:00 2001 From: Gabriel Adrian Samfira Date: Fri, 30 Dec 2022 15:13:49 +0000 Subject: [PATCH 2/2] No need to explicitly call Stop() on runner loops Once the context is canceled, the loops will exit, so there is no need to explicitly Stop(). We just need to Wait() for the loops to exit. --- cmd/garm/main.go | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/cmd/garm/main.go b/cmd/garm/main.go index b6ee0884..00d6c114 100644 --- a/cmd/garm/main.go +++ b/cmd/garm/main.go @@ -172,11 +172,7 @@ func main() { if err := srv.Shutdown(shutdownCtx); err != nil { log.Printf("graceful api server shutdown failed: %+v", err) } - log.Printf("stopping runner loop") - if err := runner.Stop(); err != nil { - log.Printf("failed to shutdown workers: %+v", err) - os.Exit(1) - } + log.Printf("waiting for runner to stop") if err := runner.Wait(); err != nil { log.Printf("failed to shutdown workers: %+v", err)