From e8fa6dba6e3cd621043ec4627e225756f511732a Mon Sep 17 00:00:00 2001 From: Gabriel Adrian Samfira Date: Tue, 20 Sep 2022 14:25:52 +0300 Subject: [PATCH] Attempt to fetch runner name from API In some cases, runner information is not sent via webhook by Github when a workflow job transitions to in_progress. We need to know the runner name in order to update the state in the database. Attempt to fetch the runner from the API using the workflow ID. Signed-off-by: Gabriel Adrian Samfira --- runner/common/util.go | 3 +++ runner/pool/interfaces.go | 1 + runner/pool/organization.go | 13 +++++++++- runner/pool/pool.go | 49 ++++++++++++++++++++++++------------- runner/pool/repository.go | 11 +++++++++ 5 files changed, 59 insertions(+), 18 deletions(-) diff --git a/runner/common/util.go b/runner/common/util.go index 2304543d..4a848ee3 100644 --- a/runner/common/util.go +++ b/runner/common/util.go @@ -9,6 +9,8 @@ import ( // GithubClient that describes the minimum list of functions we need to interact with github. // Allows for easier testing. type GithubClient interface { + // GetWorkflowJobByID gets details about a single workflow job. + GetWorkflowJobByID(ctx context.Context, owner, repo string, jobID int64) (*github.WorkflowJob, *github.Response, error) // ListRunners lists all runners within a repository. ListRunners(ctx context.Context, owner, repo string, opts *github.ListOptions) (*github.Runners, *github.Response, error) // ListRunnerApplicationDownloads returns a list of github runner application downloads for the @@ -18,6 +20,7 @@ type GithubClient interface { RemoveRunner(ctx context.Context, owner, repo string, runnerID int64) (*github.Response, error) // CreateRegistrationToken creates a runner registration token for one repository. CreateRegistrationToken(ctx context.Context, owner, repo string) (*github.RegistrationToken, *github.Response, error) + // ListOrganizationRunners lists all runners within an organization. ListOrganizationRunners(ctx context.Context, owner string, opts *github.ListOptions) (*github.Runners, *github.Response, error) // ListOrganizationRunnerApplicationDownloads returns a list of github runner application downloads for the diff --git a/runner/pool/interfaces.go b/runner/pool/interfaces.go index 6cc522a9..182dc452 100644 --- a/runner/pool/interfaces.go +++ b/runner/pool/interfaces.go @@ -37,5 +37,6 @@ type poolHelper interface { ValidateOwner(job params.WorkflowJob) error UpdateState(param params.UpdatePoolStateParams) error WebhookSecret() string + GetRunnerNameFromWorkflow(job params.WorkflowJob) (string, error) ID() string } diff --git a/runner/pool/organization.go b/runner/pool/organization.go index 3f984e53..675ce85c 100644 --- a/runner/pool/organization.go +++ b/runner/pool/organization.go @@ -71,6 +71,17 @@ type organization struct { mux sync.Mutex } +func (r *organization) GetRunnerNameFromWorkflow(job params.WorkflowJob) (string, error) { + workflow, _, err := r.ghcli.GetWorkflowJobByID(r.ctx, job.Organization.Login, job.Repository.Name, job.WorkflowJob.ID) + if err != nil { + return "", errors.Wrap(err, "fetching workflow info") + } + if workflow.RunnerName != nil { + return *workflow.RunnerName, nil + } + return "", fmt.Errorf("failed to find runner name from workflow") +} + func (r *organization) UpdateState(param params.UpdatePoolStateParams) error { r.mux.Lock() defer r.mux.Unlock() @@ -143,7 +154,7 @@ func (r *organization) GetGithubRegistrationToken() (string, error) { } func (r *organization) String() string { - return fmt.Sprintf("%s", r.cfg.Name) + return r.cfg.Name } func (r *organization) WebhookSecret() string { diff --git a/runner/pool/pool.go b/runner/pool/pool.go index 100577fb..38c7e74d 100644 --- a/runner/pool/pool.go +++ b/runner/pool/pool.go @@ -106,7 +106,6 @@ func (r *basePool) cleanupOrphanedProviderRunners(runners []*github.Runner) erro // of "running" in the provider, but that has not registered with Github, and has // received no new updates in the configured timeout interval. func (r *basePool) reapTimedOutRunners(runners []*github.Runner) error { - log.Printf("Checking for timed out runners") dbInstances, err := r.helper.FetchDbInstances() if err != nil { return errors.Wrap(err, "fetching instances from db") @@ -236,12 +235,12 @@ func (r *basePool) fetchInstance(runnerName string) (params.Instance, error) { return runner, nil } -func (r *basePool) setInstanceRunnerStatus(job params.WorkflowJob, status providerCommon.RunnerStatus) error { +func (r *basePool) setInstanceRunnerStatus(runnerName string, status providerCommon.RunnerStatus) error { updateParams := params.UpdateInstanceParams{ RunnerStatus: status, } - if err := r.updateInstance(job.WorkflowJob.RunnerName, updateParams); err != nil { + if err := r.updateInstance(runnerName, updateParams); err != nil { return errors.Wrap(err, "updating runner state") } return nil @@ -473,13 +472,20 @@ func (r *basePool) addInstanceToProvider(instance params.Instance) error { return nil } -func (r *basePool) poolIDFromStringLabels(labels []string) (string, error) { - for _, lbl := range labels { - if strings.HasPrefix(lbl, poolIDLabelprefix) { - return lbl[len(poolIDLabelprefix):], nil - } +func (r *basePool) getRunnerNameFromJob(job params.WorkflowJob) (string, error) { + if job.WorkflowJob.RunnerName != "" { + return job.WorkflowJob.RunnerName, nil } - return "", runnerErrors.ErrNotFound + + // Runner name was not set in WorkflowJob by github. We can still attempt to + // fetch the info we need, using the workflow run ID, from the API. + log.Printf("runner name not found in workflow job, attempting to fetch from API") + runnerName, err := r.helper.GetRunnerNameFromWorkflow(job) + if err != nil { + return "", errors.Wrap(err, "fetching runner name from API") + } + + return runnerName, nil } func (r *basePool) HandleWorkflowJob(job params.WorkflowJob) error { @@ -496,29 +502,38 @@ func (r *basePool) HandleWorkflowJob(job params.WorkflowJob) error { // exclude that runner from available runners and attempt to ensure // the needed number of runners. if err := r.acquireNewInstance(job); err != nil { - log.Printf("failed to add instance") + log.Printf("failed to add instance: %s", err) } case "completed": + // ignore the error here. A completed job may not have a runner name set + // if it was never assigned to a runner, and was canceled. + runnerName, _ := r.getRunnerNameFromJob(job) // Set instance in database to pending delete. - if job.WorkflowJob.RunnerName == "" { + if runnerName == "" { // Unassigned jobs will have an empty runner_name. // There is nothing to to in this case. log.Printf("no runner was assigned. Skipping.") return nil } // update instance workload state. - if err := r.setInstanceRunnerStatus(job, providerCommon.RunnerTerminated); err != nil { - log.Printf("failed to update runner %s status", job.WorkflowJob.RunnerName) + if err := r.setInstanceRunnerStatus(runnerName, providerCommon.RunnerTerminated); err != nil { + log.Printf("failed to update runner %s status", runnerName) return errors.Wrap(err, "updating runner") } - log.Printf("marking instance %s as pending_delete", job.WorkflowJob.RunnerName) - if err := r.setInstanceStatus(job.WorkflowJob.RunnerName, providerCommon.InstancePendingDelete, nil); err != nil { - log.Printf("failed to update runner %s status", job.WorkflowJob.RunnerName) + log.Printf("marking instance %s as pending_delete", runnerName) + if err := r.setInstanceStatus(runnerName, providerCommon.InstancePendingDelete, nil); err != nil { + log.Printf("failed to update runner %s status", runnerName) return errors.Wrap(err, "updating runner") } case "in_progress": + // in_progress jobs must have a runner name/ID assigned. Sometimes github will send a hook without + // a runner set. In such cases, we attemt to fetch it from the API. + runnerName, err := r.getRunnerNameFromJob(job) + if err != nil { + return errors.Wrap(err, "determining runner name") + } // update instance workload state. - if err := r.setInstanceRunnerStatus(job, providerCommon.RunnerActive); err != nil { + if err := r.setInstanceRunnerStatus(runnerName, providerCommon.RunnerActive); err != nil { log.Printf("failed to update runner %s status", job.WorkflowJob.RunnerName) return errors.Wrap(err, "updating runner") } diff --git a/runner/pool/repository.go b/runner/pool/repository.go index b17216f4..59dae638 100644 --- a/runner/pool/repository.go +++ b/runner/pool/repository.go @@ -73,6 +73,17 @@ type repository struct { mux sync.Mutex } +func (r *repository) GetRunnerNameFromWorkflow(job params.WorkflowJob) (string, error) { + workflow, _, err := r.ghcli.GetWorkflowJobByID(r.ctx, job.Repository.Owner.Login, job.Repository.Name, job.WorkflowJob.ID) + if err != nil { + return "", errors.Wrap(err, "fetching workflow info") + } + if workflow.RunnerName != nil { + return *workflow.RunnerName, nil + } + return "", fmt.Errorf("failed to find runner name from workflow") +} + func (r *repository) UpdateState(param params.UpdatePoolStateParams) error { r.mux.Lock() defer r.mux.Unlock()