From ee207b0b54a38d103c5a9b0b3764ff3d5df24230 Mon Sep 17 00:00:00 2001 From: Gabriel Adrian Samfira Date: Sat, 23 Apr 2022 13:05:40 +0000 Subject: [PATCH] Validate webhook and add some pool logic --- apiserver/controllers/controllers.go | 21 ++--- config/config.go | 6 ++ errors/errors.go | 2 + {github => params}/github.go | 2 +- runner/common/pool.go | 17 +++- runner/pool/repository.go | 136 ++++++++++++++++++++++++++- runner/runner.go | 123 ++++++++++++++++++++---- runner/types.go | 8 ++ 8 files changed, 276 insertions(+), 39 deletions(-) rename {github => params}/github.go (99%) create mode 100644 runner/types.go diff --git a/apiserver/controllers/controllers.go b/apiserver/controllers/controllers.go index 91eb2b28..b72596e4 100644 --- a/apiserver/controllers/controllers.go +++ b/apiserver/controllers/controllers.go @@ -9,7 +9,7 @@ import ( "runner-manager/apiserver/params" gErrors "runner-manager/errors" - "runner-manager/github" + runnerParams "runner-manager/params" "runner-manager/runner" "github.com/pkg/errors" @@ -56,7 +56,7 @@ func handleError(w http.ResponseWriter, err error) { func (a *APIController) authenticateHook(body []byte, headers http.Header) error { // signature := headers.Get("X-Hub-Signature-256") hookType := headers.Get("X-Github-Hook-Installation-Target-Type") - var workflowJob github.WorkflowJob + var workflowJob runnerParams.WorkflowJob if err := json.Unmarshal(body, &workflowJob); err != nil { return gErrors.NewBadRequestError("invalid post body: %s", err) } @@ -83,19 +83,10 @@ func (a *APIController) handleWorkflowJobEvent(w http.ResponseWriter, r *http.Re fmt.Printf(">>> Signature: %s\n", signature) fmt.Printf(">>> HookType: %s\n", hookType) - var workflowJob github.WorkflowJob - if err := json.Unmarshal(body, &workflowJob); err != nil { - handleError(w, gErrors.ErrBadRequest) + if err := a.r.DispatchWorkflowJob(hookType, signature, body); err != nil { + handleError(w, err) return } - // entity := workflowJob.Repository.Owner.Login - - asJs, err := json.MarshalIndent(workflowJob, "", " ") - if err != nil { - fmt.Println(err) - return - } - fmt.Printf("%s\n", string(asJs)) } func (a *APIController) CatchAll(w http.ResponseWriter, r *http.Request) { @@ -103,9 +94,9 @@ func (a *APIController) CatchAll(w http.ResponseWriter, r *http.Request) { for key, val := range headers { fmt.Printf("%s --> %v\n", key, val) } - event := github.Event(headers.Get("X-Github-Event")) + event := runnerParams.Event(headers.Get("X-Github-Event")) switch event { - case github.WorkflowJobEvent: + case runnerParams.WorkflowJobEvent: a.handleWorkflowJobEvent(w, r) default: log.Printf("ignoring unknown event %s", event) diff --git a/config/config.go b/config/config.go index e12656c6..79dff5f5 100644 --- a/config/config.go +++ b/config/config.go @@ -42,6 +42,9 @@ const ( DefaultUser = "runner" // DefaultUserShell is the shell for the default user. DefaultUserShell = "/bin/bash" + + // DefaultPoolQueueSize is the default size for a pool queue. + DefaultPoolQueueSize = 10 ) var ( @@ -280,6 +283,9 @@ type Pool struct { // the Providers array in the main config. ProviderName string `toml:"provider_name" json:"provider-name"` + // QueueSize defines the number of jobs this pool can handle simultaneously. + QueueSize uint `toml:"queue_size" json:"queue-size"` + // Runners represents a list of runner types defined for this pool. Runners []Runner `toml:"runners" json:"runners"` } diff --git a/errors/errors.go b/errors/errors.go index 6e3c4081..c7e99c2c 100644 --- a/errors/errors.go +++ b/errors/errors.go @@ -14,6 +14,8 @@ var ( ErrDuplicateEntity = NewDuplicateUserError("duplicate") // ErrBadRequest is returned is a malformed request is sent ErrBadRequest = NewBadRequestError("invalid request") + // ErrTimeout is returned when a timeout occurs. + ErrTimeout = fmt.Errorf("timed out") ) type baseError struct { diff --git a/github/github.go b/params/github.go similarity index 99% rename from github/github.go rename to params/github.go index 94a30e1d..df894aba 100644 --- a/github/github.go +++ b/params/github.go @@ -1,4 +1,4 @@ -package github +package params import "time" diff --git a/runner/common/pool.go b/runner/common/pool.go index 3ce42127..0e6c8e41 100644 --- a/runner/common/pool.go +++ b/runner/common/pool.go @@ -1,3 +1,18 @@ package common -type PoolManager interface{} +import "runner-manager/params" + +type PoolManager interface { + WebhookSecret() string + HandleWorkflowJob(job params.WorkflowJob) error + ListInstances() ([]params.Instance, error) + GetInstance() (params.Instance, error) + DeleteInstance() error + StopInstance() error + StartInstance() error + + // Pool lifecycle functions. Start/stop pool. + Start() error + Stop() error + Wait() error +} diff --git a/runner/pool/repository.go b/runner/pool/repository.go index 1846c9de..f4c95233 100644 --- a/runner/pool/repository.go +++ b/runner/pool/repository.go @@ -2,20 +2,42 @@ package pool import ( "context" + "fmt" + "log" + "sync" + "time" + "runner-manager/config" + runnerErrors "runner-manager/errors" "runner-manager/params" "runner-manager/runner/common" "github.com/google/go-github/v43/github" + "github.com/pkg/errors" ) +// test that we implement PoolManager +var _ common.PoolManager = &Repository{} + func NewRepositoryRunnerPool(ctx context.Context, cfg config.Repository, ghcli *github.Client, provider common.Provider) (common.PoolManager, error) { - return &Repository{ + queueSize := cfg.Pool.QueueSize + if queueSize == 0 { + queueSize = config.DefaultPoolQueueSize + } + repo := &Repository{ ctx: ctx, cfg: cfg, ghcli: ghcli, provider: provider, - }, nil + jobQueue: make(chan params.WorkflowJob, queueSize), + quit: make(chan struct{}), + done: make(chan struct{}), + } + + if err := repo.fetchTools(); err != nil { + return nil, errors.Wrap(err, "initializing tools") + } + return repo, nil } type Repository struct { @@ -23,6 +45,11 @@ type Repository struct { cfg config.Repository ghcli *github.Client provider common.Provider + tools []*github.RunnerApplicationDownload + jobQueue chan params.WorkflowJob + quit chan struct{} + done chan struct{} + mux sync.Mutex } func (r *Repository) getGithubRunners() ([]github.Runner, error) { @@ -34,13 +61,118 @@ func (r *Repository) getProviderInstances() ([]params.Instance, error) { } func (r *Repository) Start() error { + go r.loop() return nil } func (r *Repository) Stop() error { + close(r.quit) + return nil +} + +func (r *Repository) fetchTools() error { + r.mux.Lock() + defer r.mux.Unlock() + tools, _, err := r.ghcli.Actions.ListRunnerApplicationDownloads(r.ctx, r.cfg.Owner, r.cfg.Name) + if err != nil { + return errors.Wrap(err, "fetching runner tools") + } + r.tools = tools + return nil +} + +func (r *Repository) Wait() error { + select { + case <-r.done: + case <-time.After(20 * time.Second): + return errors.Wrap(runnerErrors.ErrTimeout, "waiting for pool to stop") + } return nil } func (r *Repository) loop() { + defer close(r.done) + // TODO: Consolidate runners on loop start. Local runners must match runners + // in github and DB. When a Workflow job is received, we will first create/update + // an entity in the database, before sending the request to the provider to create/delete + // an instance. If a "queued" job is received, we create an entity in the db with + // a state of "pending_create". Once that instance is up and calls home, it is marked + // as "active". If a "completed" job is received from github, we mark the instance + // as "pending_delete". Once the provider deletes the instance, we mark it as "deleted" + // in the database. + // We also ensure we have runners created based on pool characteristics. This is where + // we spin up "MinWorkers" for each runner type. + for { + select { + case job, ok := <-r.jobQueue: + if !ok { + // queue was closed. return. + return + } + // We handle jobs synchronously (for now) + switch job.Action { + case "queued": + // Create instance. + case "completed": + // Remove instance. + } + fmt.Println(job) + case <-time.After(3 * time.Hour): + // Update tools cache. + if err := r.fetchTools(); err != nil { + log.Printf("failed to update tools for repo %s: %s", r.cfg.String(), err) + } + case <-r.ctx.Done(): + // daemon is shutting down. + return + case <-r.quit: + // this worker was stopped. + return + } + } +} + +// addJobToQueue adds a new workflow job to the queue of jobs that need to be +// processed by this pool. Jobs are added by github webhooks, so it makes no sense +// to return an error when that happens. But we do need to log any error that comes +// up. The queue size is configurable. If we hit that limit, new jobs will be discarded +// and logged. +// TODO: setup a state pipeline that will send back updates to the runner and update the +// database as needed. +func (r *Repository) addJobToQueue(job params.WorkflowJob) { + select { + case r.jobQueue <- job: + case <-time.After(1 * time.Second): + log.Printf("timed out accepting job. Queue is full.") + } +} + +func (r *Repository) WebhookSecret() string { + return r.cfg.WebhookSecret +} + +func (r *Repository) HandleWorkflowJob(job params.WorkflowJob) error { + if job.Repository.FullName != r.cfg.String() { + return runnerErrors.NewBadRequestError("job not meant for this pool") + } + r.addJobToQueue(job) + return nil +} + +func (r *Repository) ListInstances() ([]params.Instance, error) { + return nil, nil +} + +func (r *Repository) GetInstance() (params.Instance, error) { + return params.Instance{}, nil +} +func (r *Repository) DeleteInstance() error { + return nil +} +func (r *Repository) StopInstance() error { + return nil +} +func (r *Repository) StartInstance() error { + return nil } diff --git a/runner/runner.go b/runner/runner.go index 036293ed..289749be 100644 --- a/runner/runner.go +++ b/runner/runner.go @@ -2,15 +2,22 @@ package runner import ( "context" - "fmt" + "crypto/hmac" + "crypto/sha1" + "crypto/sha256" + "encoding/hex" + "encoding/json" + "hash" "io/ioutil" "os" "path/filepath" "runner-manager/config" gErrors "runner-manager/errors" + "runner-manager/params" "runner-manager/runner/common" "runner-manager/runner/providers" "runner-manager/util" + "strings" "sync" "github.com/google/go-github/v43/github" @@ -57,26 +64,102 @@ type Runner struct { providers map[string]common.Provider } -func (r *Runner) getRepoSecret(repoName string) (string, error) { - return "", nil -} - -func (r *Runner) getOrgSecret(orgName string) (string, error) { - return "", nil -} - -func (r *Runner) ValidateHookBody(hookTargetType, signature, entity string, body []byte) error { - var secret string - var err error - switch hookTargetType { - case "repository": - secret, err = r.getRepoSecret(entity) - case "organization": - secret, err = r.getOrgSecret(entity) - default: - return gErrors.NewBadRequestError("invalid hook type: %s", hookTargetType) +func (r *Runner) findRepoPool(name string) (common.PoolManager, error) { + if pool, ok := r.repositories[name]; ok { + return pool, nil } - fmt.Println(secret, err) + return nil, errors.Wrapf(gErrors.ErrNotFound, "repository %s not configured", name) +} + +func (r *Runner) findOrgPool(name string) (common.PoolManager, error) { + if pool, ok := r.organizations[name]; ok { + return pool, nil + } + return nil, errors.Wrapf(gErrors.ErrNotFound, "organization %s not configured", name) +} + +func (r *Runner) validateHookBody(signature, secret string, body []byte) error { + if secret == "" { + // A secret was not set. Skip validation of body. + return nil + } + + if signature == "" { + // A secret was set in our config, but a signature was not received + // from Github. Authentication of the body cannot be done. + return gErrors.NewUnauthorizedError("missing github signature") + } + + sigParts := strings.SplitN(signature, "=", 2) + if len(sigParts) != 2 { + // We expect the signature from github to be of the format: + // hashType=hashValue + // ie: sha256=1fc917c7ad66487470e466c0ad40ddd45b9f7730a4b43e1b2542627f0596bbdc + return gErrors.NewBadRequestError("invalid signature format") + } + + var hashFunc func() hash.Hash + switch sigParts[0] { + case "sha256": + hashFunc = sha256.New + case "sha1": + hashFunc = sha1.New + default: + return gErrors.NewBadRequestError("unknown signature type") + } + + mac := hmac.New(hashFunc, []byte(secret)) + _, err := mac.Write(body) + if err != nil { + return errors.Wrap(err, "failed to compute sha256") + } + expectedMAC := hex.EncodeToString(mac.Sum(nil)) + + if !hmac.Equal([]byte(sigParts[1]), []byte(expectedMAC)) { + return gErrors.NewUnauthorizedError("signature missmatch") + } + + return nil +} + +func (r *Runner) DispatchWorkflowJob(hookTargetType, signature string, jobData []byte) error { + if jobData == nil || len(jobData) == 0 { + return gErrors.NewBadRequestError("missing job data") + } + + var job params.WorkflowJob + if err := json.Unmarshal(jobData, &job); err != nil { + return errors.Wrapf(gErrors.ErrBadRequest, "invalid job data: %s", err) + } + + var entity string + var pool common.PoolManager + var err error + + switch HookTargetType(hookTargetType) { + case RepoHook: + entity = job.Repository.FullName + pool, err = r.findRepoPool(entity) + case OrganizationHook: + entity = job.Organization.Login + pool, err = r.findOrgPool(entity) + default: + return gErrors.NewBadRequestError("cannot handle hook target type %s", hookTargetType) + } + + if err != nil { + // We don't have a repository or organization configured that + // can handle this workflow job. + return errors.Wrap(err, "fetching pool") + } + + // We found a pool. Validate the webhook job. If a secret is configured, + // we make sure that the source of this workflow job is valid. + secret := pool.WebhookSecret() + if err := r.validateHookBody(signature, secret, jobData); err != nil { + return errors.Wrap(err, "validating webhook data") + } + return nil } diff --git a/runner/types.go b/runner/types.go new file mode 100644 index 00000000..44ef5bb5 --- /dev/null +++ b/runner/types.go @@ -0,0 +1,8 @@ +package runner + +type HookTargetType string + +const ( + RepoHook HookTargetType = "repository" + OrganizationHook HookTargetType = "organization" +)