garm/runner/runner.go

465 lines
12 KiB
Go
Raw Normal View History

package runner
import (
2022-04-15 15:22:47 +00:00
"context"
"crypto/hmac"
"crypto/sha1"
"crypto/sha256"
"encoding/hex"
"encoding/json"
"hash"
2022-04-18 17:26:13 +00:00
"io/ioutil"
2022-04-26 20:29:58 +00:00
"log"
2022-04-18 17:26:13 +00:00
"os"
"path/filepath"
2022-04-26 20:29:58 +00:00
"strings"
"sync"
2022-05-04 16:27:24 +00:00
"garm/auth"
"garm/config"
"garm/database"
dbCommon "garm/database/common"
runnerErrors "garm/errors"
"garm/params"
"garm/runner/common"
"garm/runner/providers"
"garm/util"
2022-04-18 17:26:13 +00:00
"github.com/pkg/errors"
"golang.org/x/crypto/ssh"
)
2022-04-22 14:46:27 +00:00
func NewRunner(ctx context.Context, cfg config.Config) (*Runner, error) {
2022-04-29 16:08:31 +00:00
db, err := database.NewDatabase(ctx, cfg.Database)
if err != nil {
return nil, errors.Wrap(err, "creating db connection")
}
2022-04-22 14:46:27 +00:00
2022-04-29 16:08:31 +00:00
ctrlId, err := db.ControllerInfo()
2022-04-22 14:46:27 +00:00
if err != nil {
2022-04-29 16:08:31 +00:00
return nil, errors.Wrap(err, "fetching controller info")
2022-04-22 14:46:27 +00:00
}
2022-04-29 16:08:31 +00:00
providers, err := providers.LoadProvidersFromConfig(ctx, cfg, ctrlId.ControllerID.String())
2022-04-26 20:29:58 +00:00
if err != nil {
2022-04-29 16:08:31 +00:00
return nil, errors.Wrap(err, "loading providers")
2022-04-26 20:29:58 +00:00
}
2022-04-22 14:46:27 +00:00
2022-04-29 14:18:22 +00:00
creds := map[string]config.Github{}
for _, ghcreds := range cfg.Github {
creds[ghcreds.Name] = ghcreds
}
2022-04-18 17:26:13 +00:00
runner := &Runner{
2022-04-29 16:08:31 +00:00
ctx: ctx,
config: cfg,
store: db,
2022-04-28 16:13:20 +00:00
repositories: map[string]common.PoolManager{},
organizations: map[string]common.PoolManager{},
providers: providers,
controllerID: ctrlId.ControllerID.String(),
2022-04-29 14:18:22 +00:00
credentials: creds,
2022-04-18 17:26:13 +00:00
}
if err := runner.ensureSSHKeys(); err != nil {
return nil, errors.Wrap(err, "ensuring SSH keys")
}
2022-04-28 16:13:20 +00:00
if err := runner.loadReposAndOrgs(); err != nil {
return nil, errors.Wrap(err, "loading pool managers")
}
2022-04-18 17:26:13 +00:00
return runner, nil
2022-04-15 15:22:47 +00:00
}
type Runner struct {
2022-04-18 17:26:13 +00:00
mux sync.Mutex
2022-04-29 16:08:31 +00:00
config config.Config
2022-04-22 14:46:27 +00:00
controllerID string
2022-04-29 16:08:31 +00:00
ctx context.Context
store dbCommon.Store
2022-04-22 14:46:27 +00:00
repositories map[string]common.PoolManager
organizations map[string]common.PoolManager
providers map[string]common.Provider
2022-04-29 14:18:22 +00:00
credentials map[string]config.Github
2022-04-22 14:46:27 +00:00
}
2022-04-28 16:13:20 +00:00
func (r *Runner) ListCredentials(ctx context.Context) ([]params.GithubCredentials, error) {
ret := []params.GithubCredentials{}
for _, val := range r.config.Github {
ret = append(ret, params.GithubCredentials{
Name: val.Name,
Description: val.Description,
})
}
return ret, nil
}
func (r *Runner) ListProviders(ctx context.Context) ([]params.Provider, error) {
ret := []params.Provider{}
for _, val := range r.providers {
ret = append(ret, val.AsParams())
}
return ret, nil
}
2022-04-29 16:08:31 +00:00
func (r *Runner) getInternalConfig(credsName string) (params.Internal, error) {
creds, ok := r.credentials[credsName]
2022-04-29 14:18:22 +00:00
if !ok {
2022-04-29 16:08:31 +00:00
return params.Internal{}, runnerErrors.NewBadRequestError("invalid credential name (%s)", credsName)
2022-04-29 14:18:22 +00:00
}
return params.Internal{
OAuth2Token: creds.OAuth2Token,
ControllerID: r.controllerID,
InstanceCallbackURL: r.config.Default.CallbackURL,
JWTSecret: r.config.JWTAuth.Secret,
}, nil
}
2022-04-28 16:13:20 +00:00
func (r *Runner) loadReposAndOrgs() error {
2022-04-26 20:29:58 +00:00
r.mux.Lock()
defer r.mux.Unlock()
2022-04-28 16:13:20 +00:00
repos, err := r.store.ListRepositories(r.ctx)
if err != nil {
return errors.Wrap(err, "fetching repositories")
}
for _, repo := range repos {
log.Printf("creating pool manager for %s/%s", repo.Owner, repo.Name)
2022-04-29 14:18:22 +00:00
poolManager, err := r.loadRepoPoolManager(repo)
2022-04-28 16:13:20 +00:00
if err != nil {
2022-04-29 14:18:22 +00:00
return errors.Wrap(err, "loading repo pool manager")
2022-04-28 16:13:20 +00:00
}
r.repositories[repo.ID] = poolManager
}
2022-05-04 16:27:24 +00:00
orgs, err := r.store.ListOrganizations(r.ctx)
if err != nil {
return errors.Wrap(err, "fetching repositories")
}
for _, org := range orgs {
log.Printf("creating pool manager for organization %s", org.Name)
poolManager, err := r.loadOrgPoolManager(org)
if err != nil {
return errors.Wrap(err, "loading repo pool manager")
}
r.organizations[org.ID] = poolManager
}
2022-04-28 16:13:20 +00:00
return nil
}
func (r *Runner) Start() error {
2022-04-29 14:18:22 +00:00
r.mux.Lock()
defer r.mux.Unlock()
2022-04-28 16:13:20 +00:00
for _, repo := range r.repositories {
if err := repo.Start(); err != nil {
return errors.Wrap(err, "starting repo pool manager")
}
}
for _, org := range r.organizations {
if err := org.Start(); err != nil {
return errors.Wrap(err, "starting org pool manager")
}
}
return nil
}
func (r *Runner) Stop() error {
2022-04-29 14:18:22 +00:00
r.mux.Lock()
defer r.mux.Unlock()
2022-04-28 16:13:20 +00:00
for _, repo := range r.repositories {
if err := repo.Stop(); err != nil {
return errors.Wrap(err, "starting repo pool manager")
}
}
2022-04-26 20:29:58 +00:00
2022-04-28 16:13:20 +00:00
for _, org := range r.organizations {
if err := org.Stop(); err != nil {
return errors.Wrap(err, "starting org pool manager")
}
}
return nil
}
func (r *Runner) Wait() error {
2022-04-29 14:18:22 +00:00
r.mux.Lock()
defer r.mux.Unlock()
2022-04-28 16:13:20 +00:00
var wg sync.WaitGroup
for poolId, repo := range r.repositories {
wg.Add(1)
go func(id string, poolMgr common.PoolManager) {
defer wg.Done()
if err := poolMgr.Wait(); err != nil {
log.Printf("timed out waiting for pool manager %s to exit", id)
}
}(poolId, repo)
}
for poolId, org := range r.organizations {
wg.Add(1)
go func(id string, poolMgr common.PoolManager) {
defer wg.Done()
if err := poolMgr.Wait(); err != nil {
log.Printf("timed out waiting for pool manager %s to exit", id)
}
}(poolId, org)
}
wg.Wait()
2022-04-26 20:29:58 +00:00
return nil
}
func (r *Runner) validateHookBody(signature, secret string, body []byte) error {
if secret == "" {
// A secret was not set. Skip validation of body.
return nil
}
if signature == "" {
// A secret was set in our config, but a signature was not received
// from Github. Authentication of the body cannot be done.
2022-04-29 14:18:22 +00:00
return runnerErrors.NewUnauthorizedError("missing github signature")
}
sigParts := strings.SplitN(signature, "=", 2)
if len(sigParts) != 2 {
// We expect the signature from github to be of the format:
// hashType=hashValue
// ie: sha256=1fc917c7ad66487470e466c0ad40ddd45b9f7730a4b43e1b2542627f0596bbdc
2022-04-29 14:18:22 +00:00
return runnerErrors.NewBadRequestError("invalid signature format")
}
var hashFunc func() hash.Hash
switch sigParts[0] {
case "sha256":
hashFunc = sha256.New
case "sha1":
hashFunc = sha1.New
default:
2022-04-29 14:18:22 +00:00
return runnerErrors.NewBadRequestError("unknown signature type")
}
mac := hmac.New(hashFunc, []byte(secret))
_, err := mac.Write(body)
if err != nil {
return errors.Wrap(err, "failed to compute sha256")
}
expectedMAC := hex.EncodeToString(mac.Sum(nil))
if !hmac.Equal([]byte(sigParts[1]), []byte(expectedMAC)) {
2022-04-29 14:18:22 +00:00
return runnerErrors.NewUnauthorizedError("signature missmatch")
}
return nil
2022-04-22 14:46:27 +00:00
}
func (r *Runner) DispatchWorkflowJob(hookTargetType, signature string, jobData []byte) error {
if jobData == nil || len(jobData) == 0 {
2022-04-29 14:18:22 +00:00
return runnerErrors.NewBadRequestError("missing job data")
}
var job params.WorkflowJob
if err := json.Unmarshal(jobData, &job); err != nil {
2022-04-29 14:18:22 +00:00
return errors.Wrapf(runnerErrors.ErrBadRequest, "invalid job data: %s", err)
}
2022-04-26 20:29:58 +00:00
var poolManager common.PoolManager
2022-04-22 14:46:27 +00:00
var err error
switch HookTargetType(hookTargetType) {
case RepoHook:
2022-04-28 16:13:20 +00:00
poolManager, err = r.findRepoPoolManager(job.Repository.Owner.Login, job.Repository.Name)
case OrganizationHook:
2022-04-28 16:13:20 +00:00
poolManager, err = r.findOrgPoolManager(job.Organization.Login)
2022-04-22 14:46:27 +00:00
default:
2022-04-29 14:18:22 +00:00
return runnerErrors.NewBadRequestError("cannot handle hook target type %s", hookTargetType)
}
if err != nil {
// We don't have a repository or organization configured that
// can handle this workflow job.
2022-04-26 20:29:58 +00:00
return errors.Wrap(err, "fetching poolManager")
2022-04-22 14:46:27 +00:00
}
// We found a pool. Validate the webhook job. If a secret is configured,
// we make sure that the source of this workflow job is valid.
2022-04-26 20:29:58 +00:00
secret := poolManager.WebhookSecret()
if err := r.validateHookBody(signature, secret, jobData); err != nil {
return errors.Wrap(err, "validating webhook data")
}
2022-04-28 16:13:20 +00:00
if err := poolManager.HandleWorkflowJob(job); err != nil {
return errors.Wrap(err, "handling workflow job")
}
2022-04-22 14:46:27 +00:00
return nil
}
2022-04-18 17:26:13 +00:00
func (r *Runner) sshDir() string {
2022-04-27 16:56:28 +00:00
return filepath.Join(r.config.Default.ConfigDir, "ssh")
2022-04-18 17:26:13 +00:00
}
func (r *Runner) sshKeyPath() string {
keyPath := filepath.Join(r.sshDir(), "runner_rsa_key")
return keyPath
}
func (r *Runner) sshPubKeyPath() string {
keyPath := filepath.Join(r.sshDir(), "runner_rsa_key.pub")
return keyPath
}
func (r *Runner) parseSSHKey() (ssh.Signer, error) {
r.mux.Lock()
defer r.mux.Unlock()
key, err := ioutil.ReadFile(r.sshKeyPath())
if err != nil {
return nil, errors.Wrapf(err, "reading private key %s", r.sshKeyPath())
}
signer, err := ssh.ParsePrivateKey(key)
if err != nil {
return nil, errors.Wrapf(err, "parsing private key %s", r.sshKeyPath())
}
return signer, nil
}
func (r *Runner) sshPubKey() ([]byte, error) {
key, err := ioutil.ReadFile(r.sshPubKeyPath())
if err != nil {
return nil, errors.Wrapf(err, "reading public key %s", r.sshPubKeyPath())
}
return key, nil
}
func (r *Runner) ensureSSHKeys() error {
sshDir := r.sshDir()
if _, err := os.Stat(sshDir); err != nil {
if !errors.Is(err, os.ErrNotExist) {
return errors.Wrapf(err, "checking SSH dir %s", sshDir)
}
if err := os.MkdirAll(sshDir, 0o700); err != nil {
return errors.Wrapf(err, "creating ssh dir %s", sshDir)
}
}
privKeyFile := r.sshKeyPath()
pubKeyFile := r.sshPubKeyPath()
if _, err := os.Stat(privKeyFile); err == nil {
return nil
}
pubKey, privKey, err := util.GenerateSSHKeyPair()
if err != nil {
errors.Wrap(err, "generating keypair")
}
if err := ioutil.WriteFile(privKeyFile, privKey, 0o600); err != nil {
return errors.Wrap(err, "writing private key")
}
if err := ioutil.WriteFile(pubKeyFile, pubKey, 0o600); err != nil {
return errors.Wrap(err, "writing public key")
}
return nil
}
2022-05-04 16:27:24 +00:00
func (r *Runner) appendTagsToCreatePoolParams(param params.CreatePoolParams) (params.CreatePoolParams, error) {
if err := param.Validate(); err != nil {
return params.CreatePoolParams{}, errors.Wrapf(runnerErrors.ErrBadRequest, "validating params: %s", err)
}
if !IsSupportedOSType(param.OSType) {
return params.CreatePoolParams{}, runnerErrors.NewBadRequestError("invalid OS type %s", param.OSType)
}
if !IsSupportedArch(param.OSArch) {
return params.CreatePoolParams{}, runnerErrors.NewBadRequestError("invalid OS architecture %s", param.OSArch)
}
_, ok := r.providers[param.ProviderName]
if !ok {
return params.CreatePoolParams{}, runnerErrors.NewBadRequestError("no such provider %s", param.ProviderName)
}
// github automatically adds the "self-hosted" tag as well as the OS type (linux, windows, etc)
// and architecture (arm, x64, etc) to all self hosted runners. When a workflow job comes in, we try
// to find a pool based on the labels that are set in the workflow. If we don't explicitly define these
// default tags for each pool, and the user targets these labels, we won't be able to match any pools.
// The downside is that all pools with the same OS and arch will have these default labels. Users should
// set distinct and unique labels on each pool, and explicitly target those labels, or risk assigning
// the job to the wrong worker type.
ghArch, err := util.ResolveToGithubArch(string(param.OSArch))
if err != nil {
return params.CreatePoolParams{}, errors.Wrap(err, "invalid arch")
}
osType, err := util.ResolveToGithubOSType(string(param.OSType))
if err != nil {
return params.CreatePoolParams{}, errors.Wrap(err, "invalid os type")
}
extraLabels := []string{
"self-hosted",
ghArch,
osType,
}
param.Tags = append(param.Tags, extraLabels...)
return param, nil
}
func (r *Runner) GetInstance(ctx context.Context, instanceName string) (params.Instance, error) {
if !auth.IsAdmin(ctx) {
return params.Instance{}, runnerErrors.ErrUnauthorized
}
instance, err := r.store.GetInstanceByName(ctx, instanceName)
if err != nil {
return params.Instance{}, errors.Wrap(err, "fetching instance")
}
return instance, nil
}
func (r *Runner) AddInstanceStatusMessage(ctx context.Context, param params.InstanceUpdateMessage) error {
instanceID := auth.InstanceID(ctx)
if instanceID == "" {
return runnerErrors.ErrUnauthorized
}
if err := r.store.AddInstanceStatusMessage(ctx, instanceID, param.Message); err != nil {
return errors.Wrap(err, "adding status update")
}
// if param.Status == providerCommon.RunnerIdle {
// }
updateParams := params.UpdateInstanceParams{
RunnerStatus: param.Status,
}
if _, err := r.store.UpdateInstance(r.ctx, instanceID, updateParams); err != nil {
return errors.Wrap(err, "updating runner state")
}
return nil
}