From 0314fd3b67cd00af838f90ba78939dcf905ca14f Mon Sep 17 00:00:00 2001 From: Gabriel Adrian Samfira Date: Fri, 29 Apr 2022 14:18:22 +0000 Subject: [PATCH] Add some API resources --- apiserver/controllers/controllers.go | 127 +++++++++++++-- apiserver/routers/routers.go | 17 +- auth/jwt.go | 2 +- cmd/runner-manager/main.go | 6 + database/common/common.go | 11 +- database/sql/sql.go | 117 +++++++++++++- params/requests.go | 20 +++ runner/common/pool.go | 1 + runner/pool/repository.go | 41 ++++- runner/runner.go | 225 ++++++++++++++++++++++++--- 10 files changed, 507 insertions(+), 60 deletions(-) diff --git a/apiserver/controllers/controllers.go b/apiserver/controllers/controllers.go index 5056ef55..19012a01 100644 --- a/apiserver/controllers/controllers.go +++ b/apiserver/controllers/controllers.go @@ -13,6 +13,7 @@ import ( runnerParams "runner-manager/params" "runner-manager/runner" + "github.com/gorilla/mux" "github.com/pkg/errors" ) @@ -194,24 +195,116 @@ func (a *APIController) ListProviders(w http.ResponseWriter, r *http.Request) { } func (a *APIController) CreateRepoHandler(w http.ResponseWriter, r *http.Request) { - // ctx := r.Context() + ctx := r.Context() - // var repoData runnerParams.CreateRepoParams - // if err := json.NewDecoder(r.Body).Decode(&repoData); err != nil { - // handleError(w, gErrors.ErrBadRequest) - // return - // } + var repoData runnerParams.CreateRepoParams + if err := json.NewDecoder(r.Body).Decode(&repoData); err != nil { + handleError(w, gErrors.ErrBadRequest) + return + } - // pasteInfo, err := p.paster.Create( - // ctx, pasteData.Data, pasteData.Name, - // pasteData.Language, pasteData.Description, - // pasteData.Expires, pasteData.Public, "", - // pasteData.Metadata) - // if err != nil { - // handleError(w, err) - // return - // } - // w.Header().Set("Content-Type", "application/json") - // json.NewEncoder(w).Encode(pasteInfo) + repo, err := a.r.CreateRepository(ctx, repoData) + if err != nil { + log.Printf("error creating repository: %+v", err) + handleError(w, err) + return + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(repo) +} + +func (a *APIController) ListReposHandler(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + + repos, err := a.r.ListRepositories(ctx) + if err != nil { + log.Printf("listing repos: %+v", err) + handleError(w, err) + return + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(repos) +} + +func (a *APIController) GetRepoByIDHandler(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + + vars := mux.Vars(r) + repoID, ok := vars["repoID"] + if !ok { + w.WriteHeader(http.StatusBadRequest) + json.NewEncoder(w).Encode(params.APIErrorResponse{ + Error: "Bad Request", + Details: "No repo ID specified", + }) + return + } + + repo, err := a.r.GetRepositoryByID(ctx, repoID) + if err != nil { + log.Printf("fetching repo: %+v", err) + handleError(w, err) + return + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(repo) +} + +func (a *APIController) DeleteRepoHandler(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + + vars := mux.Vars(r) + repoID, ok := vars["repoID"] + if !ok { + w.WriteHeader(http.StatusBadRequest) + json.NewEncoder(w).Encode(params.APIErrorResponse{ + Error: "Bad Request", + Details: "No repo ID specified", + }) + return + } + + if err := a.r.DeleteRepository(ctx, repoID); err != nil { + log.Printf("fetching repo: %+v", err) + handleError(w, err) + return + } + + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) } + +func (a *APIController) UpdateRepoHandler(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + + vars := mux.Vars(r) + repoID, ok := vars["repoID"] + if !ok { + w.WriteHeader(http.StatusBadRequest) + json.NewEncoder(w).Encode(params.APIErrorResponse{ + Error: "Bad Request", + Details: "No repo ID specified", + }) + return + } + + var updatePayload runnerParams.UpdateRepositoryParams + if err := json.NewDecoder(r.Body).Decode(&updatePayload); err != nil { + handleError(w, gErrors.ErrBadRequest) + return + } + + repo, err := a.r.UpdateRepository(ctx, repoID, updatePayload) + if err != nil { + log.Printf("error updating repository: %+v", err) + handleError(w, err) + return + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(repo) +} diff --git a/apiserver/routers/routers.go b/apiserver/routers/routers.go index bc89503b..019b6858 100644 --- a/apiserver/routers/routers.go +++ b/apiserver/routers/routers.go @@ -51,15 +51,20 @@ func NewAPIRouter(han *controllers.APIController, logWriter io.Writer, authMiddl apiRouter.Handle("/repositories/{repoID}/pools", log(os.Stdout, http.HandlerFunc(han.CatchAll))).Methods("POST", "OPTIONS") // Get repo - apiRouter.Handle("/repositories/{repoID:repoID\\/?}", log(os.Stdout, http.HandlerFunc(han.CatchAll))).Methods("GET", "OPTIONS") + apiRouter.Handle("/repositories/{repoID}/", log(os.Stdout, http.HandlerFunc(han.GetRepoByIDHandler))).Methods("GET", "OPTIONS") + apiRouter.Handle("/repositories/{repoID}", log(os.Stdout, http.HandlerFunc(han.GetRepoByIDHandler))).Methods("GET", "OPTIONS") + // Update repo + apiRouter.Handle("/repositories/{repoID}/", log(os.Stdout, http.HandlerFunc(han.UpdateRepoHandler))).Methods("PUT", "OPTIONS") + apiRouter.Handle("/repositories/{repoID}", log(os.Stdout, http.HandlerFunc(han.UpdateRepoHandler))).Methods("PUT", "OPTIONS") // Delete repo - apiRouter.Handle("/repositories/{repoID:repoID\\/?}", log(os.Stdout, http.HandlerFunc(han.CatchAll))).Methods("DELETE", "OPTIONS") + apiRouter.Handle("/repositories/{repoID}/", log(os.Stdout, http.HandlerFunc(han.DeleteRepoHandler))).Methods("DELETE", "OPTIONS") + apiRouter.Handle("/repositories/{repoID}", log(os.Stdout, http.HandlerFunc(han.DeleteRepoHandler))).Methods("DELETE", "OPTIONS") // List repos - apiRouter.Handle("/repositories/", log(os.Stdout, http.HandlerFunc(han.CatchAll))).Methods("GET", "OPTIONS") - apiRouter.Handle("/repositories", log(os.Stdout, http.HandlerFunc(han.CatchAll))).Methods("GET", "OPTIONS") + apiRouter.Handle("/repositories/", log(os.Stdout, http.HandlerFunc(han.ListReposHandler))).Methods("GET", "OPTIONS") + apiRouter.Handle("/repositories", log(os.Stdout, http.HandlerFunc(han.ListReposHandler))).Methods("GET", "OPTIONS") // Create repo - apiRouter.Handle("/repositories/", log(os.Stdout, http.HandlerFunc(han.CatchAll))).Methods("POST", "OPTIONS") - apiRouter.Handle("/repositories", log(os.Stdout, http.HandlerFunc(han.CatchAll))).Methods("POST", "OPTIONS") + apiRouter.Handle("/repositories/", log(os.Stdout, http.HandlerFunc(han.CreateRepoHandler))).Methods("POST", "OPTIONS") + apiRouter.Handle("/repositories", log(os.Stdout, http.HandlerFunc(han.CreateRepoHandler))).Methods("POST", "OPTIONS") ///////////////////////////// // Organizations and pools // diff --git a/auth/jwt.go b/auth/jwt.go index 699072e7..ad52ca30 100644 --- a/auth/jwt.go +++ b/auth/jwt.go @@ -88,7 +88,7 @@ func (amw *jwtMiddleware) claimsToContext(ctx context.Context, claims *JWTClaims return nil, runnerErrors.ErrUnauthorized } - userInfo, err := amw.store.GetUser(ctx, claims.UserID) + userInfo, err := amw.store.GetUserByID(ctx, claims.UserID) if err != nil { return ctx, runnerErrors.ErrUnauthorized } diff --git a/cmd/runner-manager/main.go b/cmd/runner-manager/main.go index e7e2e694..b9abb589 100644 --- a/cmd/runner-manager/main.go +++ b/cmd/runner-manager/main.go @@ -56,6 +56,12 @@ func main() { log.Fatalf("failed to create controller: %+v", err) } + // If there are many repos/pools, this may take a long time. + // TODO: start pool managers in the background and log errors. + if err := runner.Start(); err != nil { + log.Fatal(err) + } + db, err := database.NewDatabase(ctx, cfg.Database) if err != nil { log.Fatal(err) diff --git a/database/common/common.go b/database/common/common.go index 7a4e597d..dacedda5 100644 --- a/database/common/common.go +++ b/database/common/common.go @@ -8,8 +8,10 @@ import ( type Store interface { CreateRepository(ctx context.Context, owner, name, credentialsName, webhookSecret string) (params.Repository, error) GetRepository(ctx context.Context, owner, name string) (params.Repository, error) + GetRepositoryByID(ctx context.Context, repoID string) (params.Repository, error) ListRepositories(ctx context.Context) ([]params.Repository, error) - DeleteRepository(ctx context.Context, owner, name string) error + DeleteRepository(ctx context.Context, repoID string, hardDelete bool) error + UpdateRepository(ctx context.Context, repoID string, param params.UpdateRepositoryParams) (params.Repository, error) CreateOrganization(ctx context.Context, name, credentialsName, webhookSecret string) (params.Organization, error) GetOrganization(ctx context.Context, name string) (params.Organization, error) @@ -22,6 +24,9 @@ type Store interface { GetRepositoryPool(ctx context.Context, repoID, poolID string) (params.Pool, error) GetOrganizationPool(ctx context.Context, orgID, poolID string) (params.Pool, error) + ListRepoPools(ctx context.Context, repoID string) ([]params.Pool, error) + ListOrgPools(ctx context.Context, orgID string) ([]params.Pool, error) + DeleteRepositoryPool(ctx context.Context, repoID, poolID string) error DeleteOrganizationPool(ctx context.Context, orgID, poolID string) error @@ -42,8 +47,10 @@ type Store interface { // GetInstance(ctx context.Context, poolID string, instanceID string) (params.Instance, error) GetInstanceByName(ctx context.Context, poolID string, instanceName string) (params.Instance, error) - CreateUser(ctx context.Context, user params.NewUserParams) (params.User, error) GetUser(ctx context.Context, user string) (params.User, error) + GetUserByID(ctx context.Context, userID string) (params.User, error) + + CreateUser(ctx context.Context, user params.NewUserParams) (params.User, error) UpdateUser(ctx context.Context, user string, param params.UpdateUserParams) (params.User, error) HasAdminUser(ctx context.Context) bool diff --git a/database/sql/sql.go b/database/sql/sql.go index 0ea44dfa..d21000ea 100644 --- a/database/sql/sql.go +++ b/database/sql/sql.go @@ -138,6 +138,38 @@ func (s *sqlDatabase) CreateRepository(ctx context.Context, owner, name, credent return param, nil } +func (s *sqlDatabase) UpdateRepository(ctx context.Context, repoID string, param params.UpdateRepositoryParams) (params.Repository, error) { + repo, err := s.getRepoByID(ctx, repoID) + if err != nil { + return params.Repository{}, errors.Wrap(err, "fetching repo") + } + + if param.CredentialsName != "" { + repo.CredentialsName = param.CredentialsName + } + + if param.WebhookSecret != "" { + secret, err := util.Aes256EncodeString(param.WebhookSecret, s.cfg.Passphrase) + if err != nil { + return params.Repository{}, fmt.Errorf("failed to encrypt string") + } + repo.WebhookSecret = secret + } + + q := s.conn.Save(&repo) + if q.Error != nil { + return params.Repository{}, errors.Wrap(err, "saving repo") + } + + newParams := s.sqlToCommonRepository(repo) + secret, err := util.Aes256DecodeString(repo.WebhookSecret, s.cfg.Passphrase) + if err != nil { + return params.Repository{}, errors.Wrap(err, "decrypting secret") + } + newParams.WebhookSecret = secret + return newParams, nil +} + func (s *sqlDatabase) getRepo(ctx context.Context, owner, name string, preloadAll bool) (Repository, error) { var repo Repository @@ -159,15 +191,20 @@ func (s *sqlDatabase) getRepo(ctx context.Context, owner, name string, preloadAl return repo, nil } -func (s *sqlDatabase) getRepoByID(ctx context.Context, id string) (Repository, error) { +func (s *sqlDatabase) getRepoByID(ctx context.Context, id string, preload ...string) (Repository, error) { u, err := uuid.FromString(id) if err != nil { return Repository{}, errors.Wrap(runnerErrors.ErrBadRequest, "parsing id") } var repo Repository - q := s.conn. - Where("id = ?", u). - First(&repo) + + q := s.conn + if len(preload) > 0 { + for _, field := range preload { + q = q.Preload(field) + } + } + q = q.Where("id = ?", u).First(&repo) if q.Error != nil { if errors.Is(q.Error, gorm.ErrRecordNotFound) { @@ -194,6 +231,22 @@ func (s *sqlDatabase) GetRepository(ctx context.Context, owner, name string) (pa return param, nil } +func (s *sqlDatabase) GetRepositoryByID(ctx context.Context, repoID string) (params.Repository, error) { + repo, err := s.getRepoByID(ctx, repoID, "Pools") + if err != nil { + return params.Repository{}, errors.Wrap(err, "fetching repo") + } + + param := s.sqlToCommonRepository(repo) + secret, err := util.Aes256DecodeString(repo.WebhookSecret, s.cfg.Passphrase) + if err != nil { + return params.Repository{}, errors.Wrap(err, "decrypting secret") + } + param.WebhookSecret = secret + + return param, nil +} + func (s *sqlDatabase) ListRepositories(ctx context.Context) ([]params.Repository, error) { var repos []Repository q := s.conn.Find(&repos) @@ -204,13 +257,21 @@ func (s *sqlDatabase) ListRepositories(ctx context.Context) ([]params.Repository ret := make([]params.Repository, len(repos)) for idx, val := range repos { ret[idx] = s.sqlToCommonRepository(val) + if len(val.WebhookSecret) > 0 { + secret, err := util.Aes256DecodeString(val.WebhookSecret, s.cfg.Passphrase) + if err != nil { + return nil, errors.Wrap(err, "decrypting secret") + } + ret[idx].WebhookSecret = secret + } } return ret, nil } -func (s *sqlDatabase) DeleteRepository(ctx context.Context, owner, name string) error { - repo, err := s.getRepo(ctx, owner, name, false) +// func (s *sqlDatabase) DeleteRepository(ctx context.Context, owner, name string, hardDelete bool) error { +func (s *sqlDatabase) DeleteRepository(ctx context.Context, repoID string, hardDelete bool) error { + repo, err := s.getRepoByID(ctx, repoID) if err != nil { if err == runnerErrors.ErrNotFound { return nil @@ -218,7 +279,11 @@ func (s *sqlDatabase) DeleteRepository(ctx context.Context, owner, name string) return errors.Wrap(err, "fetching repo") } - q := s.conn.Delete(&repo) + q := s.conn + if hardDelete { + q = q.Unscoped() + } + q = q.Delete(&repo) if q.Error != nil && !errors.Is(q.Error, gorm.ErrRecordNotFound) { return errors.Wrap(q.Error, "deleting repo") } @@ -479,6 +544,34 @@ func (s *sqlDatabase) getOrgPools(ctx context.Context, orgID string, preloadAll return pools, nil } +func (s *sqlDatabase) ListRepoPools(ctx context.Context, repoID string) ([]params.Pool, error) { + pools, err := s.getRepoPools(ctx, repoID, false) + if err != nil { + return nil, errors.Wrap(err, "fetching pools") + } + + ret := make([]params.Pool, len(pools)) + for idx, pool := range pools { + ret[idx] = s.sqlToCommonPool(pool) + } + + return ret, nil +} + +func (s *sqlDatabase) ListOrgPools(ctx context.Context, orgID string) ([]params.Pool, error) { + pools, err := s.getOrgPools(ctx, orgID, false) + if err != nil { + return nil, errors.Wrap(err, "fetching pools") + } + + ret := make([]params.Pool, len(pools)) + for idx, pool := range pools { + ret[idx] = s.sqlToCommonPool(pool) + } + + return ret, nil +} + func (s *sqlDatabase) getRepoPool(ctx context.Context, repoID, poolID string) (Pool, error) { repo, err := s.getRepoByID(ctx, repoID) if err != nil { @@ -990,7 +1083,15 @@ func (s *sqlDatabase) HasAdminUser(ctx context.Context) bool { } func (s *sqlDatabase) GetUser(ctx context.Context, user string) (params.User, error) { - dbUser, err := s.getUserByID(user) + dbUser, err := s.getUserByUsernameOrEmail(user) + if err != nil { + return params.User{}, errors.Wrap(err, "fetching user") + } + return s.sqlToParamsUser(dbUser), nil +} + +func (s *sqlDatabase) GetUserByID(ctx context.Context, userID string) (params.User, error) { + dbUser, err := s.getUserByID(userID) if err != nil { return params.User{}, errors.Wrap(err, "fetching user") } diff --git a/params/requests.go b/params/requests.go index c3113024..75b2f4e4 100644 --- a/params/requests.go +++ b/params/requests.go @@ -19,6 +19,21 @@ type CreateRepoParams struct { WebhookSecret string `json:"webhook_secret"` } +func (c *CreateRepoParams) Validate() error { + if c.Owner == "" { + return errors.NewBadRequestError("missing owner") + } + + if c.Name == "" { + return errors.NewBadRequestError("missing repo name") + } + + if c.CredentialsName == "" { + return errors.NewBadRequestError("missing credentials name") + } + return nil +} + // NewUserParams holds the needed information to create // a new user type NewUserParams struct { @@ -99,3 +114,8 @@ func (p PasswordLoginParams) Validate() error { } return nil } + +type UpdateRepositoryParams struct { + CredentialsName string `json:"credentials_name"` + WebhookSecret string `json:"webhook_secret"` +} diff --git a/runner/common/pool.go b/runner/common/pool.go index 26f1c337..54dfb789 100644 --- a/runner/common/pool.go +++ b/runner/common/pool.go @@ -12,6 +12,7 @@ const ( type PoolManager interface { WebhookSecret() string HandleWorkflowJob(job params.WorkflowJob) error + RefreshState(cfg params.Repository) error // PoolManager lifecycle functions. Start/stop pool. Start() error diff --git a/runner/pool/repository.go b/runner/pool/repository.go index 5545cc39..697baa2b 100644 --- a/runner/pool/repository.go +++ b/runner/pool/repository.go @@ -39,6 +39,7 @@ func NewRepositoryPoolManager(ctx context.Context, cfg params.Repository, provid ctx: ctx, cfg: cfg, ghcli: ghc, + id: cfg.ID, store: store, providers: providers, pools: pools, @@ -46,6 +47,11 @@ func NewRepositoryPoolManager(ctx context.Context, cfg params.Repository, provid quit: make(chan struct{}), done: make(chan struct{}), } + + if err := repo.loadPools(); err != nil { + return nil, errors.Wrap(err, "loading pools") + } + return repo, nil } @@ -65,6 +71,33 @@ type Repository struct { mux sync.Mutex } +func (r *Repository) RefreshState(cfg params.Repository) error { + r.mux.Lock() + defer r.mux.Unlock() + + r.cfg = cfg + ghc, err := util.GithubClient(r.ctx, r.cfg.Internal.OAuth2Token) + if err != nil { + return errors.Wrap(err, "getting github client") + } + r.ghcli = ghc + return nil +} + +func (r *Repository) loadPools() error { + pools, err := r.store.ListRepoPools(r.ctx, r.id) + if err != nil { + return errors.Wrap(err, "fetching pools") + } + + for _, pool := range pools { + if err := r.AddPool(r.ctx, pool); err != nil { + return errors.Wrap(err, "adding pool") + } + } + return nil +} + func (r *Repository) AddPool(ctx context.Context, pool params.Pool) error { r.mux.Lock() defer r.mux.Unlock() @@ -73,7 +106,6 @@ func (r *Repository) AddPool(ctx context.Context, pool params.Pool) error { return nil } - // start pool loop r.pools[pool.ID] = pool return nil } @@ -140,6 +172,7 @@ func (r *Repository) consolidate() { r.mux.Lock() defer r.mux.Unlock() + log.Printf("Webhook secret: %s", r.cfg.WebhookSecret) r.deletePendingInstances() r.addPendingInstances() r.ensureMinIdleRunners() @@ -481,7 +514,11 @@ func (r *Repository) AddRunner(ctx context.Context, poolID string) error { } func (r *Repository) loop() { - defer close(r.done) + defer func() { + log.Printf("repository %s/%s loop exited", r.cfg.Owner, r.cfg.Name) + close(r.done) + }() + log.Printf("starting loop for %s/%s", r.cfg.Owner, r.cfg.Name) // TODO: Consolidate runners on loop start. Provider runners must match runners // in github and DB. When a Workflow job is received, we will first create/update // an entity in the database, before sending the request to the provider to create/delete diff --git a/runner/runner.go b/runner/runner.go index c0f93b9c..4d58706d 100644 --- a/runner/runner.go +++ b/runner/runner.go @@ -15,10 +15,11 @@ import ( "strings" "sync" + "runner-manager/auth" "runner-manager/config" "runner-manager/database" dbCommon "runner-manager/database/common" - gErrors "runner-manager/errors" + runnerErrors "runner-manager/errors" "runner-manager/params" "runner-manager/runner/common" "runner-manager/runner/pool" @@ -44,6 +45,11 @@ func NewRunner(ctx context.Context, cfg config.Config) (*Runner, error) { log.Fatal(err) } + creds := map[string]config.Github{} + + for _, ghcreds := range cfg.Github { + creds[ghcreds.Name] = ghcreds + } runner := &Runner{ ctx: ctx, config: cfg, @@ -52,6 +58,7 @@ func NewRunner(ctx context.Context, cfg config.Config) (*Runner, error) { repositories: map[string]common.PoolManager{}, organizations: map[string]common.PoolManager{}, providers: providers, + credentials: creds, } if err := runner.ensureSSHKeys(); err != nil { @@ -78,26 +85,160 @@ type Runner struct { repositories map[string]common.PoolManager organizations map[string]common.PoolManager providers map[string]common.Provider + credentials map[string]config.Github } -func (r *Runner) CreateRepository(ctx context.Context) error { +func (r *Runner) CreateRepository(ctx context.Context, param params.CreateRepoParams) (repo params.Repository, err error) { + if !auth.IsAdmin(ctx) { + return repo, runnerErrors.ErrUnauthorized + } + + if err := param.Validate(); err != nil { + return params.Repository{}, errors.Wrap(err, "validating params") + } + + creds, ok := r.credentials[param.CredentialsName] + if !ok { + return params.Repository{}, runnerErrors.NewBadRequestError("credentials %s not defined", param.CredentialsName) + } + + _, err = r.store.GetRepository(ctx, param.Owner, param.Name) + if err != nil { + if !errors.Is(err, runnerErrors.ErrNotFound) { + return params.Repository{}, errors.Wrap(err, "fetching repo") + } + } else { + return params.Repository{}, runnerErrors.NewConflictError("repository %s/%s already exists", param.Owner, param.Name) + } + + repo, err = r.store.CreateRepository(ctx, param.Owner, param.Name, creds.Name, param.WebhookSecret) + if err != nil { + return params.Repository{}, errors.Wrap(err, "creating repository") + } + + defer func() { + if err != nil { + r.store.DeleteRepository(ctx, repo.ID, true) + } + }() + + poolMgr, err := r.loadRepoPoolManager(repo) + if err := poolMgr.Start(); err != nil { + return params.Repository{}, errors.Wrap(err, "starting pool manager") + } + r.repositories[repo.ID] = poolMgr + return repo, nil +} + +func (r *Runner) ListRepositories(ctx context.Context) ([]params.Repository, error) { + if !auth.IsAdmin(ctx) { + return nil, runnerErrors.ErrUnauthorized + } + + repos, err := r.store.ListRepositories(ctx) + if err != nil { + return nil, errors.Wrap(err, "listing repositories") + } + + return repos, nil +} + +func (r *Runner) GetRepositoryByID(ctx context.Context, repoID string) (params.Repository, error) { + if !auth.IsAdmin(ctx) { + return params.Repository{}, runnerErrors.ErrUnauthorized + } + + repo, err := r.store.GetRepositoryByID(ctx, repoID) + if err != nil { + return params.Repository{}, errors.Wrap(err, "fetching repository") + } + return repo, nil +} + +func (r *Runner) DeleteRepository(ctx context.Context, repoID string) error { + if !auth.IsAdmin(ctx) { + return runnerErrors.ErrUnauthorized + } + + repo, err := r.store.GetRepositoryByID(ctx, repoID) + if err != nil { + return errors.Wrap(err, "fetching repo") + } + + poolMgr, ok := r.repositories[repo.ID] + if ok { + if err := poolMgr.Stop(); err != nil { + log.Printf("failed to stop pool for repo %s", repo.ID) + } + delete(r.repositories, repoID) + } + + pools, err := r.store.ListRepoPools(ctx, repoID) + if err != nil { + return errors.Wrap(err, "fetching repo pools") + } + + if len(pools) > 0 { + poolIds := []string{} + for _, pool := range pools { + poolIds = append(poolIds, pool.ID) + } + + return runnerErrors.NewBadRequestError("repo has pools defined (%s)", strings.Join(poolIds, ", ")) + } + + if err := r.store.DeleteRepository(ctx, repoID, true); err != nil { + return errors.Wrap(err, "removing repository") + } return nil } -func (r *Runner) ListRepositories(ctx context.Context) error { - return nil -} +func (r *Runner) UpdateRepository(ctx context.Context, repoID string, param params.UpdateRepositoryParams) (params.Repository, error) { + if !auth.IsAdmin(ctx) { + return params.Repository{}, runnerErrors.ErrUnauthorized + } -func (r *Runner) GetRepository(ctx context.Context) error { - return nil -} + r.mux.Lock() + defer r.mux.Unlock() -func (r *Runner) DeleteRepository(ctx context.Context) error { - return nil -} + repo, err := r.store.GetRepositoryByID(ctx, repoID) + if err != nil { + return params.Repository{}, errors.Wrap(err, "fetching repo") + } -func (r *Runner) UpdateRepository(ctx context.Context) error { - return nil + if param.CredentialsName != "" { + // Check that credentials are set before saving to db + if _, ok := r.credentials[param.CredentialsName]; !ok { + return params.Repository{}, runnerErrors.NewBadRequestError("invalid credentials (%s) for repo %s/%s", param.CredentialsName, repo.Owner, repo.Name) + } + } + + repo, err = r.store.UpdateRepository(ctx, repoID, param) + if err != nil { + return params.Repository{}, errors.Wrap(err, "updating repo") + } + + log.Printf("post-update webhook secret: %s", repo.WebhookSecret) + poolMgr, ok := r.repositories[repo.ID] + if ok { + internalCfg, err := r.getInternalConfig(repo) + if err != nil { + return params.Repository{}, errors.Wrap(err, "fetching internal config") + } + repo.Internal = internalCfg + // stop the pool mgr + if err := poolMgr.RefreshState(repo); err != nil { + return params.Repository{}, errors.Wrap(err, "updating pool manager") + } + } else { + poolMgr, err := r.loadRepoPoolManager(repo) + if err != nil { + return params.Repository{}, errors.Wrap(err, "loading pool manager") + } + r.repositories[repo.ID] = poolMgr + } + + return repo, nil } func (r *Runner) CreateRepoPool(ctx context.Context) error { @@ -141,6 +282,33 @@ func (r *Runner) ListProviders(ctx context.Context) ([]params.Provider, error) { return ret, nil } +func (r *Runner) getInternalConfig(repo params.Repository) (params.Internal, error) { + creds, ok := r.credentials[repo.CredentialsName] + if !ok { + return params.Internal{}, runnerErrors.NewBadRequestError("invalid credentials (%s) for repo %s/%s", repo.CredentialsName, repo.Owner, repo.Name) + } + + return params.Internal{ + OAuth2Token: creds.OAuth2Token, + ControllerID: r.controllerID, + InstanceCallbackURL: r.config.Default.CallbackURL, + JWTSecret: r.config.JWTAuth.Secret, + }, nil +} + +func (r *Runner) loadRepoPoolManager(repo params.Repository) (common.PoolManager, error) { + cfg, err := r.getInternalConfig(repo) + if err != nil { + return nil, errors.Wrap(err, "fetching internal config") + } + repo.Internal = cfg + poolManager, err := pool.NewRepositoryPoolManager(r.ctx, repo, r.providers, r.store) + if err != nil { + return nil, errors.Wrap(err, "creating pool manager") + } + return poolManager, nil +} + func (r *Runner) loadReposAndOrgs() error { r.mux.Lock() defer r.mux.Unlock() @@ -152,9 +320,9 @@ func (r *Runner) loadReposAndOrgs() error { for _, repo := range repos { log.Printf("creating pool manager for %s/%s", repo.Owner, repo.Name) - poolManager, err := pool.NewRepositoryPoolManager(r.ctx, repo, r.providers, r.store) + poolManager, err := r.loadRepoPoolManager(repo) if err != nil { - return errors.Wrap(err, "creating pool manager") + return errors.Wrap(err, "loading repo pool manager") } r.repositories[repo.ID] = poolManager } @@ -163,6 +331,9 @@ func (r *Runner) loadReposAndOrgs() error { } func (r *Runner) Start() error { + r.mux.Lock() + defer r.mux.Unlock() + for _, repo := range r.repositories { if err := repo.Start(); err != nil { return errors.Wrap(err, "starting repo pool manager") @@ -178,6 +349,9 @@ func (r *Runner) Start() error { } func (r *Runner) Stop() error { + r.mux.Lock() + defer r.mux.Unlock() + for _, repo := range r.repositories { if err := repo.Stop(); err != nil { return errors.Wrap(err, "starting repo pool manager") @@ -193,6 +367,9 @@ func (r *Runner) Stop() error { } func (r *Runner) Wait() error { + r.mux.Lock() + defer r.mux.Unlock() + var wg sync.WaitGroup for poolId, repo := range r.repositories { @@ -230,7 +407,7 @@ func (r *Runner) findRepoPoolManager(owner, name string) (common.PoolManager, er if repo, ok := r.repositories[repo.ID]; ok { return repo, nil } - return nil, errors.Wrapf(gErrors.ErrNotFound, "repository %s/%s not configured", owner, name) + return nil, errors.Wrapf(runnerErrors.ErrNotFound, "repository %s/%s not configured", owner, name) } func (r *Runner) findOrgPoolManager(name string) (common.PoolManager, error) { @@ -245,7 +422,7 @@ func (r *Runner) findOrgPoolManager(name string) (common.PoolManager, error) { if orgPoolMgr, ok := r.organizations[org.ID]; ok { return orgPoolMgr, nil } - return nil, errors.Wrapf(gErrors.ErrNotFound, "organization %s not configured", name) + return nil, errors.Wrapf(runnerErrors.ErrNotFound, "organization %s not configured", name) } func (r *Runner) validateHookBody(signature, secret string, body []byte) error { @@ -257,7 +434,7 @@ func (r *Runner) validateHookBody(signature, secret string, body []byte) error { if signature == "" { // A secret was set in our config, but a signature was not received // from Github. Authentication of the body cannot be done. - return gErrors.NewUnauthorizedError("missing github signature") + return runnerErrors.NewUnauthorizedError("missing github signature") } sigParts := strings.SplitN(signature, "=", 2) @@ -265,7 +442,7 @@ func (r *Runner) validateHookBody(signature, secret string, body []byte) error { // We expect the signature from github to be of the format: // hashType=hashValue // ie: sha256=1fc917c7ad66487470e466c0ad40ddd45b9f7730a4b43e1b2542627f0596bbdc - return gErrors.NewBadRequestError("invalid signature format") + return runnerErrors.NewBadRequestError("invalid signature format") } var hashFunc func() hash.Hash @@ -275,7 +452,7 @@ func (r *Runner) validateHookBody(signature, secret string, body []byte) error { case "sha1": hashFunc = sha1.New default: - return gErrors.NewBadRequestError("unknown signature type") + return runnerErrors.NewBadRequestError("unknown signature type") } mac := hmac.New(hashFunc, []byte(secret)) @@ -286,7 +463,7 @@ func (r *Runner) validateHookBody(signature, secret string, body []byte) error { expectedMAC := hex.EncodeToString(mac.Sum(nil)) if !hmac.Equal([]byte(sigParts[1]), []byte(expectedMAC)) { - return gErrors.NewUnauthorizedError("signature missmatch") + return runnerErrors.NewUnauthorizedError("signature missmatch") } return nil @@ -294,12 +471,12 @@ func (r *Runner) validateHookBody(signature, secret string, body []byte) error { func (r *Runner) DispatchWorkflowJob(hookTargetType, signature string, jobData []byte) error { if jobData == nil || len(jobData) == 0 { - return gErrors.NewBadRequestError("missing job data") + return runnerErrors.NewBadRequestError("missing job data") } var job params.WorkflowJob if err := json.Unmarshal(jobData, &job); err != nil { - return errors.Wrapf(gErrors.ErrBadRequest, "invalid job data: %s", err) + return errors.Wrapf(runnerErrors.ErrBadRequest, "invalid job data: %s", err) } var poolManager common.PoolManager @@ -311,7 +488,7 @@ func (r *Runner) DispatchWorkflowJob(hookTargetType, signature string, jobData [ case OrganizationHook: poolManager, err = r.findOrgPoolManager(job.Organization.Login) default: - return gErrors.NewBadRequestError("cannot handle hook target type %s", hookTargetType) + return runnerErrors.NewBadRequestError("cannot handle hook target type %s", hookTargetType) } if err != nil {