Validate webhook and add some pool logic

This commit is contained in:
Gabriel Adrian Samfira 2022-04-23 13:05:40 +00:00
parent ebec0dda52
commit ee207b0b54
8 changed files with 276 additions and 39 deletions

View file

@ -9,7 +9,7 @@ import (
"runner-manager/apiserver/params"
gErrors "runner-manager/errors"
"runner-manager/github"
runnerParams "runner-manager/params"
"runner-manager/runner"
"github.com/pkg/errors"
@ -56,7 +56,7 @@ func handleError(w http.ResponseWriter, err error) {
func (a *APIController) authenticateHook(body []byte, headers http.Header) error {
// signature := headers.Get("X-Hub-Signature-256")
hookType := headers.Get("X-Github-Hook-Installation-Target-Type")
var workflowJob github.WorkflowJob
var workflowJob runnerParams.WorkflowJob
if err := json.Unmarshal(body, &workflowJob); err != nil {
return gErrors.NewBadRequestError("invalid post body: %s", err)
}
@ -83,19 +83,10 @@ func (a *APIController) handleWorkflowJobEvent(w http.ResponseWriter, r *http.Re
fmt.Printf(">>> Signature: %s\n", signature)
fmt.Printf(">>> HookType: %s\n", hookType)
var workflowJob github.WorkflowJob
if err := json.Unmarshal(body, &workflowJob); err != nil {
handleError(w, gErrors.ErrBadRequest)
if err := a.r.DispatchWorkflowJob(hookType, signature, body); err != nil {
handleError(w, err)
return
}
// entity := workflowJob.Repository.Owner.Login
asJs, err := json.MarshalIndent(workflowJob, "", " ")
if err != nil {
fmt.Println(err)
return
}
fmt.Printf("%s\n", string(asJs))
}
func (a *APIController) CatchAll(w http.ResponseWriter, r *http.Request) {
@ -103,9 +94,9 @@ func (a *APIController) CatchAll(w http.ResponseWriter, r *http.Request) {
for key, val := range headers {
fmt.Printf("%s --> %v\n", key, val)
}
event := github.Event(headers.Get("X-Github-Event"))
event := runnerParams.Event(headers.Get("X-Github-Event"))
switch event {
case github.WorkflowJobEvent:
case runnerParams.WorkflowJobEvent:
a.handleWorkflowJobEvent(w, r)
default:
log.Printf("ignoring unknown event %s", event)

View file

@ -42,6 +42,9 @@ const (
DefaultUser = "runner"
// DefaultUserShell is the shell for the default user.
DefaultUserShell = "/bin/bash"
// DefaultPoolQueueSize is the default size for a pool queue.
DefaultPoolQueueSize = 10
)
var (
@ -280,6 +283,9 @@ type Pool struct {
// the Providers array in the main config.
ProviderName string `toml:"provider_name" json:"provider-name"`
// QueueSize defines the number of jobs this pool can handle simultaneously.
QueueSize uint `toml:"queue_size" json:"queue-size"`
// Runners represents a list of runner types defined for this pool.
Runners []Runner `toml:"runners" json:"runners"`
}

View file

@ -14,6 +14,8 @@ var (
ErrDuplicateEntity = NewDuplicateUserError("duplicate")
// ErrBadRequest is returned is a malformed request is sent
ErrBadRequest = NewBadRequestError("invalid request")
// ErrTimeout is returned when a timeout occurs.
ErrTimeout = fmt.Errorf("timed out")
)
type baseError struct {

View file

@ -1,4 +1,4 @@
package github
package params
import "time"

View file

@ -1,3 +1,18 @@
package common
type PoolManager interface{}
import "runner-manager/params"
type PoolManager interface {
WebhookSecret() string
HandleWorkflowJob(job params.WorkflowJob) error
ListInstances() ([]params.Instance, error)
GetInstance() (params.Instance, error)
DeleteInstance() error
StopInstance() error
StartInstance() error
// Pool lifecycle functions. Start/stop pool.
Start() error
Stop() error
Wait() error
}

View file

@ -2,20 +2,42 @@ package pool
import (
"context"
"fmt"
"log"
"sync"
"time"
"runner-manager/config"
runnerErrors "runner-manager/errors"
"runner-manager/params"
"runner-manager/runner/common"
"github.com/google/go-github/v43/github"
"github.com/pkg/errors"
)
// test that we implement PoolManager
var _ common.PoolManager = &Repository{}
func NewRepositoryRunnerPool(ctx context.Context, cfg config.Repository, ghcli *github.Client, provider common.Provider) (common.PoolManager, error) {
return &Repository{
queueSize := cfg.Pool.QueueSize
if queueSize == 0 {
queueSize = config.DefaultPoolQueueSize
}
repo := &Repository{
ctx: ctx,
cfg: cfg,
ghcli: ghcli,
provider: provider,
}, nil
jobQueue: make(chan params.WorkflowJob, queueSize),
quit: make(chan struct{}),
done: make(chan struct{}),
}
if err := repo.fetchTools(); err != nil {
return nil, errors.Wrap(err, "initializing tools")
}
return repo, nil
}
type Repository struct {
@ -23,6 +45,11 @@ type Repository struct {
cfg config.Repository
ghcli *github.Client
provider common.Provider
tools []*github.RunnerApplicationDownload
jobQueue chan params.WorkflowJob
quit chan struct{}
done chan struct{}
mux sync.Mutex
}
func (r *Repository) getGithubRunners() ([]github.Runner, error) {
@ -34,13 +61,118 @@ func (r *Repository) getProviderInstances() ([]params.Instance, error) {
}
func (r *Repository) Start() error {
go r.loop()
return nil
}
func (r *Repository) Stop() error {
close(r.quit)
return nil
}
func (r *Repository) fetchTools() error {
r.mux.Lock()
defer r.mux.Unlock()
tools, _, err := r.ghcli.Actions.ListRunnerApplicationDownloads(r.ctx, r.cfg.Owner, r.cfg.Name)
if err != nil {
return errors.Wrap(err, "fetching runner tools")
}
r.tools = tools
return nil
}
func (r *Repository) Wait() error {
select {
case <-r.done:
case <-time.After(20 * time.Second):
return errors.Wrap(runnerErrors.ErrTimeout, "waiting for pool to stop")
}
return nil
}
func (r *Repository) loop() {
defer close(r.done)
// TODO: Consolidate runners on loop start. Local runners must match runners
// in github and DB. When a Workflow job is received, we will first create/update
// an entity in the database, before sending the request to the provider to create/delete
// an instance. If a "queued" job is received, we create an entity in the db with
// a state of "pending_create". Once that instance is up and calls home, it is marked
// as "active". If a "completed" job is received from github, we mark the instance
// as "pending_delete". Once the provider deletes the instance, we mark it as "deleted"
// in the database.
// We also ensure we have runners created based on pool characteristics. This is where
// we spin up "MinWorkers" for each runner type.
for {
select {
case job, ok := <-r.jobQueue:
if !ok {
// queue was closed. return.
return
}
// We handle jobs synchronously (for now)
switch job.Action {
case "queued":
// Create instance.
case "completed":
// Remove instance.
}
fmt.Println(job)
case <-time.After(3 * time.Hour):
// Update tools cache.
if err := r.fetchTools(); err != nil {
log.Printf("failed to update tools for repo %s: %s", r.cfg.String(), err)
}
case <-r.ctx.Done():
// daemon is shutting down.
return
case <-r.quit:
// this worker was stopped.
return
}
}
}
// addJobToQueue adds a new workflow job to the queue of jobs that need to be
// processed by this pool. Jobs are added by github webhooks, so it makes no sense
// to return an error when that happens. But we do need to log any error that comes
// up. The queue size is configurable. If we hit that limit, new jobs will be discarded
// and logged.
// TODO: setup a state pipeline that will send back updates to the runner and update the
// database as needed.
func (r *Repository) addJobToQueue(job params.WorkflowJob) {
select {
case r.jobQueue <- job:
case <-time.After(1 * time.Second):
log.Printf("timed out accepting job. Queue is full.")
}
}
func (r *Repository) WebhookSecret() string {
return r.cfg.WebhookSecret
}
func (r *Repository) HandleWorkflowJob(job params.WorkflowJob) error {
if job.Repository.FullName != r.cfg.String() {
return runnerErrors.NewBadRequestError("job not meant for this pool")
}
r.addJobToQueue(job)
return nil
}
func (r *Repository) ListInstances() ([]params.Instance, error) {
return nil, nil
}
func (r *Repository) GetInstance() (params.Instance, error) {
return params.Instance{}, nil
}
func (r *Repository) DeleteInstance() error {
return nil
}
func (r *Repository) StopInstance() error {
return nil
}
func (r *Repository) StartInstance() error {
return nil
}

View file

@ -2,15 +2,22 @@ package runner
import (
"context"
"fmt"
"crypto/hmac"
"crypto/sha1"
"crypto/sha256"
"encoding/hex"
"encoding/json"
"hash"
"io/ioutil"
"os"
"path/filepath"
"runner-manager/config"
gErrors "runner-manager/errors"
"runner-manager/params"
"runner-manager/runner/common"
"runner-manager/runner/providers"
"runner-manager/util"
"strings"
"sync"
"github.com/google/go-github/v43/github"
@ -57,26 +64,102 @@ type Runner struct {
providers map[string]common.Provider
}
func (r *Runner) getRepoSecret(repoName string) (string, error) {
return "", nil
}
func (r *Runner) getOrgSecret(orgName string) (string, error) {
return "", nil
}
func (r *Runner) ValidateHookBody(hookTargetType, signature, entity string, body []byte) error {
var secret string
var err error
switch hookTargetType {
case "repository":
secret, err = r.getRepoSecret(entity)
case "organization":
secret, err = r.getOrgSecret(entity)
default:
return gErrors.NewBadRequestError("invalid hook type: %s", hookTargetType)
func (r *Runner) findRepoPool(name string) (common.PoolManager, error) {
if pool, ok := r.repositories[name]; ok {
return pool, nil
}
fmt.Println(secret, err)
return nil, errors.Wrapf(gErrors.ErrNotFound, "repository %s not configured", name)
}
func (r *Runner) findOrgPool(name string) (common.PoolManager, error) {
if pool, ok := r.organizations[name]; ok {
return pool, nil
}
return nil, errors.Wrapf(gErrors.ErrNotFound, "organization %s not configured", name)
}
func (r *Runner) validateHookBody(signature, secret string, body []byte) error {
if secret == "" {
// A secret was not set. Skip validation of body.
return nil
}
if signature == "" {
// A secret was set in our config, but a signature was not received
// from Github. Authentication of the body cannot be done.
return gErrors.NewUnauthorizedError("missing github signature")
}
sigParts := strings.SplitN(signature, "=", 2)
if len(sigParts) != 2 {
// We expect the signature from github to be of the format:
// hashType=hashValue
// ie: sha256=1fc917c7ad66487470e466c0ad40ddd45b9f7730a4b43e1b2542627f0596bbdc
return gErrors.NewBadRequestError("invalid signature format")
}
var hashFunc func() hash.Hash
switch sigParts[0] {
case "sha256":
hashFunc = sha256.New
case "sha1":
hashFunc = sha1.New
default:
return gErrors.NewBadRequestError("unknown signature type")
}
mac := hmac.New(hashFunc, []byte(secret))
_, err := mac.Write(body)
if err != nil {
return errors.Wrap(err, "failed to compute sha256")
}
expectedMAC := hex.EncodeToString(mac.Sum(nil))
if !hmac.Equal([]byte(sigParts[1]), []byte(expectedMAC)) {
return gErrors.NewUnauthorizedError("signature missmatch")
}
return nil
}
func (r *Runner) DispatchWorkflowJob(hookTargetType, signature string, jobData []byte) error {
if jobData == nil || len(jobData) == 0 {
return gErrors.NewBadRequestError("missing job data")
}
var job params.WorkflowJob
if err := json.Unmarshal(jobData, &job); err != nil {
return errors.Wrapf(gErrors.ErrBadRequest, "invalid job data: %s", err)
}
var entity string
var pool common.PoolManager
var err error
switch HookTargetType(hookTargetType) {
case RepoHook:
entity = job.Repository.FullName
pool, err = r.findRepoPool(entity)
case OrganizationHook:
entity = job.Organization.Login
pool, err = r.findOrgPool(entity)
default:
return gErrors.NewBadRequestError("cannot handle hook target type %s", hookTargetType)
}
if err != nil {
// We don't have a repository or organization configured that
// can handle this workflow job.
return errors.Wrap(err, "fetching pool")
}
// We found a pool. Validate the webhook job. If a secret is configured,
// we make sure that the source of this workflow job is valid.
secret := pool.WebhookSecret()
if err := r.validateHookBody(signature, secret, jobData); err != nil {
return errors.Wrap(err, "validating webhook data")
}
return nil
}

8
runner/types.go Normal file
View file

@ -0,0 +1,8 @@
package runner
type HookTargetType string
const (
RepoHook HookTargetType = "repository"
OrganizationHook HookTargetType = "organization"
)