Merge pull request #19 from gabriel-samfira/fetch-runner-name-from-api

Attempt to fetch runner name from API
This commit is contained in:
Gabriel 2022-09-20 16:50:13 +03:00 committed by GitHub
commit 93619b8d4e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 59 additions and 18 deletions

View file

@ -9,6 +9,8 @@ import (
// GithubClient that describes the minimum list of functions we need to interact with github.
// Allows for easier testing.
type GithubClient interface {
// GetWorkflowJobByID gets details about a single workflow job.
GetWorkflowJobByID(ctx context.Context, owner, repo string, jobID int64) (*github.WorkflowJob, *github.Response, error)
// ListRunners lists all runners within a repository.
ListRunners(ctx context.Context, owner, repo string, opts *github.ListOptions) (*github.Runners, *github.Response, error)
// ListRunnerApplicationDownloads returns a list of github runner application downloads for the
@ -18,6 +20,7 @@ type GithubClient interface {
RemoveRunner(ctx context.Context, owner, repo string, runnerID int64) (*github.Response, error)
// CreateRegistrationToken creates a runner registration token for one repository.
CreateRegistrationToken(ctx context.Context, owner, repo string) (*github.RegistrationToken, *github.Response, error)
// ListOrganizationRunners lists all runners within an organization.
ListOrganizationRunners(ctx context.Context, owner string, opts *github.ListOptions) (*github.Runners, *github.Response, error)
// ListOrganizationRunnerApplicationDownloads returns a list of github runner application downloads for the

View file

@ -37,5 +37,6 @@ type poolHelper interface {
ValidateOwner(job params.WorkflowJob) error
UpdateState(param params.UpdatePoolStateParams) error
WebhookSecret() string
GetRunnerNameFromWorkflow(job params.WorkflowJob) (string, error)
ID() string
}

View file

@ -71,6 +71,17 @@ type organization struct {
mux sync.Mutex
}
func (r *organization) GetRunnerNameFromWorkflow(job params.WorkflowJob) (string, error) {
workflow, _, err := r.ghcli.GetWorkflowJobByID(r.ctx, job.Organization.Login, job.Repository.Name, job.WorkflowJob.ID)
if err != nil {
return "", errors.Wrap(err, "fetching workflow info")
}
if workflow.RunnerName != nil {
return *workflow.RunnerName, nil
}
return "", fmt.Errorf("failed to find runner name from workflow")
}
func (r *organization) UpdateState(param params.UpdatePoolStateParams) error {
r.mux.Lock()
defer r.mux.Unlock()
@ -143,7 +154,7 @@ func (r *organization) GetGithubRegistrationToken() (string, error) {
}
func (r *organization) String() string {
return fmt.Sprintf("%s", r.cfg.Name)
return r.cfg.Name
}
func (r *organization) WebhookSecret() string {

View file

@ -106,7 +106,6 @@ func (r *basePool) cleanupOrphanedProviderRunners(runners []*github.Runner) erro
// of "running" in the provider, but that has not registered with Github, and has
// received no new updates in the configured timeout interval.
func (r *basePool) reapTimedOutRunners(runners []*github.Runner) error {
log.Printf("Checking for timed out runners")
dbInstances, err := r.helper.FetchDbInstances()
if err != nil {
return errors.Wrap(err, "fetching instances from db")
@ -236,12 +235,12 @@ func (r *basePool) fetchInstance(runnerName string) (params.Instance, error) {
return runner, nil
}
func (r *basePool) setInstanceRunnerStatus(job params.WorkflowJob, status providerCommon.RunnerStatus) error {
func (r *basePool) setInstanceRunnerStatus(runnerName string, status providerCommon.RunnerStatus) error {
updateParams := params.UpdateInstanceParams{
RunnerStatus: status,
}
if err := r.updateInstance(job.WorkflowJob.RunnerName, updateParams); err != nil {
if err := r.updateInstance(runnerName, updateParams); err != nil {
return errors.Wrap(err, "updating runner state")
}
return nil
@ -473,13 +472,20 @@ func (r *basePool) addInstanceToProvider(instance params.Instance) error {
return nil
}
func (r *basePool) poolIDFromStringLabels(labels []string) (string, error) {
for _, lbl := range labels {
if strings.HasPrefix(lbl, poolIDLabelprefix) {
return lbl[len(poolIDLabelprefix):], nil
}
func (r *basePool) getRunnerNameFromJob(job params.WorkflowJob) (string, error) {
if job.WorkflowJob.RunnerName != "" {
return job.WorkflowJob.RunnerName, nil
}
return "", runnerErrors.ErrNotFound
// Runner name was not set in WorkflowJob by github. We can still attempt to
// fetch the info we need, using the workflow run ID, from the API.
log.Printf("runner name not found in workflow job, attempting to fetch from API")
runnerName, err := r.helper.GetRunnerNameFromWorkflow(job)
if err != nil {
return "", errors.Wrap(err, "fetching runner name from API")
}
return runnerName, nil
}
func (r *basePool) HandleWorkflowJob(job params.WorkflowJob) error {
@ -496,29 +502,38 @@ func (r *basePool) HandleWorkflowJob(job params.WorkflowJob) error {
// exclude that runner from available runners and attempt to ensure
// the needed number of runners.
if err := r.acquireNewInstance(job); err != nil {
log.Printf("failed to add instance")
log.Printf("failed to add instance: %s", err)
}
case "completed":
// ignore the error here. A completed job may not have a runner name set
// if it was never assigned to a runner, and was canceled.
runnerName, _ := r.getRunnerNameFromJob(job)
// Set instance in database to pending delete.
if job.WorkflowJob.RunnerName == "" {
if runnerName == "" {
// Unassigned jobs will have an empty runner_name.
// There is nothing to to in this case.
log.Printf("no runner was assigned. Skipping.")
return nil
}
// update instance workload state.
if err := r.setInstanceRunnerStatus(job, providerCommon.RunnerTerminated); err != nil {
log.Printf("failed to update runner %s status", job.WorkflowJob.RunnerName)
if err := r.setInstanceRunnerStatus(runnerName, providerCommon.RunnerTerminated); err != nil {
log.Printf("failed to update runner %s status", runnerName)
return errors.Wrap(err, "updating runner")
}
log.Printf("marking instance %s as pending_delete", job.WorkflowJob.RunnerName)
if err := r.setInstanceStatus(job.WorkflowJob.RunnerName, providerCommon.InstancePendingDelete, nil); err != nil {
log.Printf("failed to update runner %s status", job.WorkflowJob.RunnerName)
log.Printf("marking instance %s as pending_delete", runnerName)
if err := r.setInstanceStatus(runnerName, providerCommon.InstancePendingDelete, nil); err != nil {
log.Printf("failed to update runner %s status", runnerName)
return errors.Wrap(err, "updating runner")
}
case "in_progress":
// in_progress jobs must have a runner name/ID assigned. Sometimes github will send a hook without
// a runner set. In such cases, we attemt to fetch it from the API.
runnerName, err := r.getRunnerNameFromJob(job)
if err != nil {
return errors.Wrap(err, "determining runner name")
}
// update instance workload state.
if err := r.setInstanceRunnerStatus(job, providerCommon.RunnerActive); err != nil {
if err := r.setInstanceRunnerStatus(runnerName, providerCommon.RunnerActive); err != nil {
log.Printf("failed to update runner %s status", job.WorkflowJob.RunnerName)
return errors.Wrap(err, "updating runner")
}

View file

@ -73,6 +73,17 @@ type repository struct {
mux sync.Mutex
}
func (r *repository) GetRunnerNameFromWorkflow(job params.WorkflowJob) (string, error) {
workflow, _, err := r.ghcli.GetWorkflowJobByID(r.ctx, job.Repository.Owner.Login, job.Repository.Name, job.WorkflowJob.ID)
if err != nil {
return "", errors.Wrap(err, "fetching workflow info")
}
if workflow.RunnerName != nil {
return *workflow.RunnerName, nil
}
return "", fmt.Errorf("failed to find runner name from workflow")
}
func (r *repository) UpdateState(param params.UpdatePoolStateParams) error {
r.mux.Lock()
defer r.mux.Unlock()