Wait for loop exit and some fixes
* Wait for http server graceful shutdown and for pool managers to properly exit. * Fix potential nil pointer dereference when checking response code from github API.
This commit is contained in:
parent
6fc950e513
commit
d5f5524934
6 changed files with 47 additions and 15 deletions
|
|
@ -22,7 +22,9 @@ import (
|
|||
"log"
|
||||
"net"
|
||||
"net/http"
|
||||
"os"
|
||||
"os/signal"
|
||||
"time"
|
||||
|
||||
"garm/apiserver/controllers"
|
||||
"garm/apiserver/routers"
|
||||
|
|
@ -159,10 +161,25 @@ func main() {
|
|||
}
|
||||
|
||||
go func() {
|
||||
if err := srv.Serve(listener); err != nil {
|
||||
log.Fatalf("Listening: %+v", err)
|
||||
if err := srv.Serve(listener); err != http.ErrServerClosed {
|
||||
log.Printf("Listening: %+v", err)
|
||||
}
|
||||
}()
|
||||
|
||||
<-ctx.Done()
|
||||
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 60*time.Second)
|
||||
defer shutdownCancel()
|
||||
if err := srv.Shutdown(shutdownCtx); err != nil {
|
||||
log.Printf("graceful api server shutdown failed: %+v", err)
|
||||
}
|
||||
log.Printf("stopping runner loop")
|
||||
if err := runner.Stop(); err != nil {
|
||||
log.Printf("failed to shutdown workers: %+v", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
log.Printf("waiting for runner to stop")
|
||||
if err := runner.Wait(); err != nil {
|
||||
log.Printf("failed to shutdown workers: %+v", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -67,7 +67,7 @@ func (r *enterprise) GetRunnerInfoFromWorkflow(job params.WorkflowJob) (params.R
|
|||
}
|
||||
workflow, ghResp, err := r.ghcli.GetWorkflowJobByID(r.ctx, job.Repository.Owner.Login, job.Repository.Name, job.WorkflowJob.ID)
|
||||
if err != nil {
|
||||
if ghResp.StatusCode == http.StatusUnauthorized {
|
||||
if ghResp != nil && ghResp.StatusCode == http.StatusUnauthorized {
|
||||
return params.RunnerInfo{}, errors.Wrap(runnerErrors.ErrUnauthorized, "fetching workflow info")
|
||||
}
|
||||
return params.RunnerInfo{}, errors.Wrap(err, "fetching workflow info")
|
||||
|
|
@ -110,7 +110,7 @@ func (r *enterprise) GetGithubRunners() ([]*github.Runner, error) {
|
|||
for {
|
||||
runners, ghResp, err := r.ghcEnterpriseCli.ListRunners(r.ctx, r.cfg.Name, &opts)
|
||||
if err != nil {
|
||||
if ghResp.StatusCode == http.StatusUnauthorized {
|
||||
if ghResp != nil && ghResp.StatusCode == http.StatusUnauthorized {
|
||||
return nil, errors.Wrap(runnerErrors.ErrUnauthorized, "fetching runners")
|
||||
}
|
||||
return nil, errors.Wrap(err, "fetching runners")
|
||||
|
|
@ -129,7 +129,7 @@ func (r *enterprise) FetchTools() ([]*github.RunnerApplicationDownload, error) {
|
|||
defer r.mux.Unlock()
|
||||
tools, ghResp, err := r.ghcEnterpriseCli.ListRunnerApplicationDownloads(r.ctx, r.cfg.Name)
|
||||
if err != nil {
|
||||
if ghResp.StatusCode == http.StatusUnauthorized {
|
||||
if ghResp != nil && ghResp.StatusCode == http.StatusUnauthorized {
|
||||
return nil, errors.Wrap(runnerErrors.ErrUnauthorized, "fetching runners")
|
||||
}
|
||||
return nil, errors.Wrap(err, "fetching runner tools")
|
||||
|
|
@ -166,7 +166,7 @@ func (r *enterprise) GetGithubRegistrationToken() (string, error) {
|
|||
tk, ghResp, err := r.ghcEnterpriseCli.CreateRegistrationToken(r.ctx, r.cfg.Name)
|
||||
|
||||
if err != nil {
|
||||
if ghResp.StatusCode == http.StatusUnauthorized {
|
||||
if ghResp != nil && ghResp.StatusCode == http.StatusUnauthorized {
|
||||
return "", errors.Wrap(runnerErrors.ErrUnauthorized, "fetching registration token")
|
||||
}
|
||||
return "", errors.Wrap(err, "creating runner token")
|
||||
|
|
|
|||
|
|
@ -79,7 +79,7 @@ func (r *organization) GetRunnerInfoFromWorkflow(job params.WorkflowJob) (params
|
|||
}
|
||||
workflow, ghResp, err := r.ghcli.GetWorkflowJobByID(r.ctx, job.Organization.Login, job.Repository.Name, job.WorkflowJob.ID)
|
||||
if err != nil {
|
||||
if ghResp.StatusCode == http.StatusUnauthorized {
|
||||
if ghResp != nil && ghResp.StatusCode == http.StatusUnauthorized {
|
||||
return params.RunnerInfo{}, errors.Wrap(runnerErrors.ErrUnauthorized, "fetching workflow info")
|
||||
}
|
||||
return params.RunnerInfo{}, errors.Wrap(err, "fetching workflow info")
|
||||
|
|
@ -121,7 +121,7 @@ func (r *organization) GetGithubRunners() ([]*github.Runner, error) {
|
|||
for {
|
||||
runners, ghResp, err := r.ghcli.ListOrganizationRunners(r.ctx, r.cfg.Name, &opts)
|
||||
if err != nil {
|
||||
if ghResp.StatusCode == http.StatusUnauthorized {
|
||||
if ghResp != nil && ghResp.StatusCode == http.StatusUnauthorized {
|
||||
return nil, errors.Wrap(runnerErrors.ErrUnauthorized, "fetching runners")
|
||||
}
|
||||
return nil, errors.Wrap(err, "fetching runners")
|
||||
|
|
@ -141,7 +141,7 @@ func (r *organization) FetchTools() ([]*github.RunnerApplicationDownload, error)
|
|||
defer r.mux.Unlock()
|
||||
tools, ghResp, err := r.ghcli.ListOrganizationRunnerApplicationDownloads(r.ctx, r.cfg.Name)
|
||||
if err != nil {
|
||||
if ghResp.StatusCode == http.StatusUnauthorized {
|
||||
if ghResp != nil && ghResp.StatusCode == http.StatusUnauthorized {
|
||||
return nil, errors.Wrap(runnerErrors.ErrUnauthorized, "fetching tools")
|
||||
}
|
||||
return nil, errors.Wrap(err, "fetching runner tools")
|
||||
|
|
@ -178,7 +178,7 @@ func (r *organization) GetGithubRegistrationToken() (string, error) {
|
|||
tk, ghResp, err := r.ghcli.CreateOrganizationRegistrationToken(r.ctx, r.cfg.Name)
|
||||
|
||||
if err != nil {
|
||||
if ghResp.StatusCode == http.StatusUnauthorized {
|
||||
if ghResp != nil && ghResp.StatusCode == http.StatusUnauthorized {
|
||||
return "", errors.Wrap(runnerErrors.ErrUnauthorized, "fetching token")
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -989,7 +989,7 @@ func (r *basePoolManager) consolidate() {
|
|||
func (r *basePoolManager) Wait() error {
|
||||
select {
|
||||
case <-r.done:
|
||||
case <-time.After(20 * time.Second):
|
||||
case <-time.After(60 * time.Second):
|
||||
return errors.Wrap(runnerErrors.ErrTimeout, "waiting for pool to stop")
|
||||
}
|
||||
return nil
|
||||
|
|
|
|||
|
|
@ -81,7 +81,7 @@ func (r *repository) GetRunnerInfoFromWorkflow(job params.WorkflowJob) (params.R
|
|||
}
|
||||
workflow, ghResp, err := r.ghcli.GetWorkflowJobByID(r.ctx, job.Repository.Owner.Login, job.Repository.Name, job.WorkflowJob.ID)
|
||||
if err != nil {
|
||||
if ghResp.StatusCode == http.StatusUnauthorized {
|
||||
if ghResp != nil && ghResp.StatusCode == http.StatusUnauthorized {
|
||||
return params.RunnerInfo{}, errors.Wrap(runnerErrors.ErrUnauthorized, "fetching workflow info")
|
||||
}
|
||||
return params.RunnerInfo{}, errors.Wrap(err, "fetching workflow info")
|
||||
|
|
@ -123,7 +123,7 @@ func (r *repository) GetGithubRunners() ([]*github.Runner, error) {
|
|||
for {
|
||||
runners, ghResp, err := r.ghcli.ListRunners(r.ctx, r.cfg.Owner, r.cfg.Name, &opts)
|
||||
if err != nil {
|
||||
if ghResp.StatusCode == http.StatusUnauthorized {
|
||||
if ghResp != nil && ghResp.StatusCode == http.StatusUnauthorized {
|
||||
return nil, errors.Wrap(runnerErrors.ErrUnauthorized, "fetching runners")
|
||||
}
|
||||
return nil, errors.Wrap(err, "fetching runners")
|
||||
|
|
@ -143,7 +143,7 @@ func (r *repository) FetchTools() ([]*github.RunnerApplicationDownload, error) {
|
|||
defer r.mux.Unlock()
|
||||
tools, ghResp, err := r.ghcli.ListRunnerApplicationDownloads(r.ctx, r.cfg.Owner, r.cfg.Name)
|
||||
if err != nil {
|
||||
if ghResp.StatusCode == http.StatusUnauthorized {
|
||||
if ghResp != nil && ghResp.StatusCode == http.StatusUnauthorized {
|
||||
return nil, errors.Wrap(runnerErrors.ErrUnauthorized, "fetching tools")
|
||||
}
|
||||
return nil, errors.Wrap(err, "fetching runner tools")
|
||||
|
|
@ -180,7 +180,7 @@ func (r *repository) GetGithubRegistrationToken() (string, error) {
|
|||
tk, ghResp, err := r.ghcli.CreateRegistrationToken(r.ctx, r.cfg.Owner, r.cfg.Name)
|
||||
|
||||
if err != nil {
|
||||
if ghResp.StatusCode == http.StatusUnauthorized {
|
||||
if ghResp != nil && ghResp.StatusCode == http.StatusUnauthorized {
|
||||
return "", errors.Wrap(runnerErrors.ErrUnauthorized, "fetching token")
|
||||
}
|
||||
return "", errors.Wrap(err, "creating runner token")
|
||||
|
|
|
|||
|
|
@ -439,6 +439,11 @@ func (r *Runner) Stop() error {
|
|||
for _, repo := range repos {
|
||||
go func(poolMgr common.PoolManager) {
|
||||
err := poolMgr.Stop()
|
||||
if err != nil {
|
||||
errChan <- err
|
||||
return
|
||||
}
|
||||
err = poolMgr.Wait()
|
||||
errChan <- err
|
||||
}(repo)
|
||||
}
|
||||
|
|
@ -446,6 +451,11 @@ func (r *Runner) Stop() error {
|
|||
for _, org := range orgs {
|
||||
go func(poolMgr common.PoolManager) {
|
||||
err := poolMgr.Stop()
|
||||
if err != nil {
|
||||
errChan <- err
|
||||
return
|
||||
}
|
||||
err = poolMgr.Wait()
|
||||
errChan <- err
|
||||
}(org)
|
||||
}
|
||||
|
|
@ -453,6 +463,11 @@ func (r *Runner) Stop() error {
|
|||
for _, enterprise := range enterprises {
|
||||
go func(poolMgr common.PoolManager) {
|
||||
err := poolMgr.Stop()
|
||||
if err != nil {
|
||||
errChan <- err
|
||||
return
|
||||
}
|
||||
err = poolMgr.Wait()
|
||||
errChan <- err
|
||||
}(enterprise)
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue