Add some API resources
This commit is contained in:
parent
75e06b36b0
commit
0314fd3b67
10 changed files with 507 additions and 60 deletions
|
|
@ -13,6 +13,7 @@ import (
|
|||
runnerParams "runner-manager/params"
|
||||
"runner-manager/runner"
|
||||
|
||||
"github.com/gorilla/mux"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
|
|
@ -194,24 +195,116 @@ func (a *APIController) ListProviders(w http.ResponseWriter, r *http.Request) {
|
|||
}
|
||||
|
||||
func (a *APIController) CreateRepoHandler(w http.ResponseWriter, r *http.Request) {
|
||||
// ctx := r.Context()
|
||||
ctx := r.Context()
|
||||
|
||||
// var repoData runnerParams.CreateRepoParams
|
||||
// if err := json.NewDecoder(r.Body).Decode(&repoData); err != nil {
|
||||
// handleError(w, gErrors.ErrBadRequest)
|
||||
// return
|
||||
// }
|
||||
var repoData runnerParams.CreateRepoParams
|
||||
if err := json.NewDecoder(r.Body).Decode(&repoData); err != nil {
|
||||
handleError(w, gErrors.ErrBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
// pasteInfo, err := p.paster.Create(
|
||||
// ctx, pasteData.Data, pasteData.Name,
|
||||
// pasteData.Language, pasteData.Description,
|
||||
// pasteData.Expires, pasteData.Public, "",
|
||||
// pasteData.Metadata)
|
||||
// if err != nil {
|
||||
// handleError(w, err)
|
||||
// return
|
||||
// }
|
||||
// w.Header().Set("Content-Type", "application/json")
|
||||
// json.NewEncoder(w).Encode(pasteInfo)
|
||||
repo, err := a.r.CreateRepository(ctx, repoData)
|
||||
if err != nil {
|
||||
log.Printf("error creating repository: %+v", err)
|
||||
handleError(w, err)
|
||||
return
|
||||
}
|
||||
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
json.NewEncoder(w).Encode(repo)
|
||||
}
|
||||
|
||||
func (a *APIController) ListReposHandler(w http.ResponseWriter, r *http.Request) {
|
||||
ctx := r.Context()
|
||||
|
||||
repos, err := a.r.ListRepositories(ctx)
|
||||
if err != nil {
|
||||
log.Printf("listing repos: %+v", err)
|
||||
handleError(w, err)
|
||||
return
|
||||
}
|
||||
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
json.NewEncoder(w).Encode(repos)
|
||||
}
|
||||
|
||||
func (a *APIController) GetRepoByIDHandler(w http.ResponseWriter, r *http.Request) {
|
||||
ctx := r.Context()
|
||||
|
||||
vars := mux.Vars(r)
|
||||
repoID, ok := vars["repoID"]
|
||||
if !ok {
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
json.NewEncoder(w).Encode(params.APIErrorResponse{
|
||||
Error: "Bad Request",
|
||||
Details: "No repo ID specified",
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
repo, err := a.r.GetRepositoryByID(ctx, repoID)
|
||||
if err != nil {
|
||||
log.Printf("fetching repo: %+v", err)
|
||||
handleError(w, err)
|
||||
return
|
||||
}
|
||||
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
json.NewEncoder(w).Encode(repo)
|
||||
}
|
||||
|
||||
func (a *APIController) DeleteRepoHandler(w http.ResponseWriter, r *http.Request) {
|
||||
ctx := r.Context()
|
||||
|
||||
vars := mux.Vars(r)
|
||||
repoID, ok := vars["repoID"]
|
||||
if !ok {
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
json.NewEncoder(w).Encode(params.APIErrorResponse{
|
||||
Error: "Bad Request",
|
||||
Details: "No repo ID specified",
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
if err := a.r.DeleteRepository(ctx, repoID); err != nil {
|
||||
log.Printf("fetching repo: %+v", err)
|
||||
handleError(w, err)
|
||||
return
|
||||
}
|
||||
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.WriteHeader(http.StatusOK)
|
||||
|
||||
}
|
||||
|
||||
func (a *APIController) UpdateRepoHandler(w http.ResponseWriter, r *http.Request) {
|
||||
ctx := r.Context()
|
||||
|
||||
vars := mux.Vars(r)
|
||||
repoID, ok := vars["repoID"]
|
||||
if !ok {
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
json.NewEncoder(w).Encode(params.APIErrorResponse{
|
||||
Error: "Bad Request",
|
||||
Details: "No repo ID specified",
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
var updatePayload runnerParams.UpdateRepositoryParams
|
||||
if err := json.NewDecoder(r.Body).Decode(&updatePayload); err != nil {
|
||||
handleError(w, gErrors.ErrBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
repo, err := a.r.UpdateRepository(ctx, repoID, updatePayload)
|
||||
if err != nil {
|
||||
log.Printf("error updating repository: %+v", err)
|
||||
handleError(w, err)
|
||||
return
|
||||
}
|
||||
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
json.NewEncoder(w).Encode(repo)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -51,15 +51,20 @@ func NewAPIRouter(han *controllers.APIController, logWriter io.Writer, authMiddl
|
|||
apiRouter.Handle("/repositories/{repoID}/pools", log(os.Stdout, http.HandlerFunc(han.CatchAll))).Methods("POST", "OPTIONS")
|
||||
|
||||
// Get repo
|
||||
apiRouter.Handle("/repositories/{repoID:repoID\\/?}", log(os.Stdout, http.HandlerFunc(han.CatchAll))).Methods("GET", "OPTIONS")
|
||||
apiRouter.Handle("/repositories/{repoID}/", log(os.Stdout, http.HandlerFunc(han.GetRepoByIDHandler))).Methods("GET", "OPTIONS")
|
||||
apiRouter.Handle("/repositories/{repoID}", log(os.Stdout, http.HandlerFunc(han.GetRepoByIDHandler))).Methods("GET", "OPTIONS")
|
||||
// Update repo
|
||||
apiRouter.Handle("/repositories/{repoID}/", log(os.Stdout, http.HandlerFunc(han.UpdateRepoHandler))).Methods("PUT", "OPTIONS")
|
||||
apiRouter.Handle("/repositories/{repoID}", log(os.Stdout, http.HandlerFunc(han.UpdateRepoHandler))).Methods("PUT", "OPTIONS")
|
||||
// Delete repo
|
||||
apiRouter.Handle("/repositories/{repoID:repoID\\/?}", log(os.Stdout, http.HandlerFunc(han.CatchAll))).Methods("DELETE", "OPTIONS")
|
||||
apiRouter.Handle("/repositories/{repoID}/", log(os.Stdout, http.HandlerFunc(han.DeleteRepoHandler))).Methods("DELETE", "OPTIONS")
|
||||
apiRouter.Handle("/repositories/{repoID}", log(os.Stdout, http.HandlerFunc(han.DeleteRepoHandler))).Methods("DELETE", "OPTIONS")
|
||||
// List repos
|
||||
apiRouter.Handle("/repositories/", log(os.Stdout, http.HandlerFunc(han.CatchAll))).Methods("GET", "OPTIONS")
|
||||
apiRouter.Handle("/repositories", log(os.Stdout, http.HandlerFunc(han.CatchAll))).Methods("GET", "OPTIONS")
|
||||
apiRouter.Handle("/repositories/", log(os.Stdout, http.HandlerFunc(han.ListReposHandler))).Methods("GET", "OPTIONS")
|
||||
apiRouter.Handle("/repositories", log(os.Stdout, http.HandlerFunc(han.ListReposHandler))).Methods("GET", "OPTIONS")
|
||||
// Create repo
|
||||
apiRouter.Handle("/repositories/", log(os.Stdout, http.HandlerFunc(han.CatchAll))).Methods("POST", "OPTIONS")
|
||||
apiRouter.Handle("/repositories", log(os.Stdout, http.HandlerFunc(han.CatchAll))).Methods("POST", "OPTIONS")
|
||||
apiRouter.Handle("/repositories/", log(os.Stdout, http.HandlerFunc(han.CreateRepoHandler))).Methods("POST", "OPTIONS")
|
||||
apiRouter.Handle("/repositories", log(os.Stdout, http.HandlerFunc(han.CreateRepoHandler))).Methods("POST", "OPTIONS")
|
||||
|
||||
/////////////////////////////
|
||||
// Organizations and pools //
|
||||
|
|
|
|||
|
|
@ -88,7 +88,7 @@ func (amw *jwtMiddleware) claimsToContext(ctx context.Context, claims *JWTClaims
|
|||
return nil, runnerErrors.ErrUnauthorized
|
||||
}
|
||||
|
||||
userInfo, err := amw.store.GetUser(ctx, claims.UserID)
|
||||
userInfo, err := amw.store.GetUserByID(ctx, claims.UserID)
|
||||
if err != nil {
|
||||
return ctx, runnerErrors.ErrUnauthorized
|
||||
}
|
||||
|
|
|
|||
|
|
@ -56,6 +56,12 @@ func main() {
|
|||
log.Fatalf("failed to create controller: %+v", err)
|
||||
}
|
||||
|
||||
// If there are many repos/pools, this may take a long time.
|
||||
// TODO: start pool managers in the background and log errors.
|
||||
if err := runner.Start(); err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
db, err := database.NewDatabase(ctx, cfg.Database)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
|
|
|
|||
|
|
@ -8,8 +8,10 @@ import (
|
|||
type Store interface {
|
||||
CreateRepository(ctx context.Context, owner, name, credentialsName, webhookSecret string) (params.Repository, error)
|
||||
GetRepository(ctx context.Context, owner, name string) (params.Repository, error)
|
||||
GetRepositoryByID(ctx context.Context, repoID string) (params.Repository, error)
|
||||
ListRepositories(ctx context.Context) ([]params.Repository, error)
|
||||
DeleteRepository(ctx context.Context, owner, name string) error
|
||||
DeleteRepository(ctx context.Context, repoID string, hardDelete bool) error
|
||||
UpdateRepository(ctx context.Context, repoID string, param params.UpdateRepositoryParams) (params.Repository, error)
|
||||
|
||||
CreateOrganization(ctx context.Context, name, credentialsName, webhookSecret string) (params.Organization, error)
|
||||
GetOrganization(ctx context.Context, name string) (params.Organization, error)
|
||||
|
|
@ -22,6 +24,9 @@ type Store interface {
|
|||
GetRepositoryPool(ctx context.Context, repoID, poolID string) (params.Pool, error)
|
||||
GetOrganizationPool(ctx context.Context, orgID, poolID string) (params.Pool, error)
|
||||
|
||||
ListRepoPools(ctx context.Context, repoID string) ([]params.Pool, error)
|
||||
ListOrgPools(ctx context.Context, orgID string) ([]params.Pool, error)
|
||||
|
||||
DeleteRepositoryPool(ctx context.Context, repoID, poolID string) error
|
||||
DeleteOrganizationPool(ctx context.Context, orgID, poolID string) error
|
||||
|
||||
|
|
@ -42,8 +47,10 @@ type Store interface {
|
|||
// GetInstance(ctx context.Context, poolID string, instanceID string) (params.Instance, error)
|
||||
GetInstanceByName(ctx context.Context, poolID string, instanceName string) (params.Instance, error)
|
||||
|
||||
CreateUser(ctx context.Context, user params.NewUserParams) (params.User, error)
|
||||
GetUser(ctx context.Context, user string) (params.User, error)
|
||||
GetUserByID(ctx context.Context, userID string) (params.User, error)
|
||||
|
||||
CreateUser(ctx context.Context, user params.NewUserParams) (params.User, error)
|
||||
UpdateUser(ctx context.Context, user string, param params.UpdateUserParams) (params.User, error)
|
||||
HasAdminUser(ctx context.Context) bool
|
||||
|
||||
|
|
|
|||
|
|
@ -138,6 +138,38 @@ func (s *sqlDatabase) CreateRepository(ctx context.Context, owner, name, credent
|
|||
return param, nil
|
||||
}
|
||||
|
||||
func (s *sqlDatabase) UpdateRepository(ctx context.Context, repoID string, param params.UpdateRepositoryParams) (params.Repository, error) {
|
||||
repo, err := s.getRepoByID(ctx, repoID)
|
||||
if err != nil {
|
||||
return params.Repository{}, errors.Wrap(err, "fetching repo")
|
||||
}
|
||||
|
||||
if param.CredentialsName != "" {
|
||||
repo.CredentialsName = param.CredentialsName
|
||||
}
|
||||
|
||||
if param.WebhookSecret != "" {
|
||||
secret, err := util.Aes256EncodeString(param.WebhookSecret, s.cfg.Passphrase)
|
||||
if err != nil {
|
||||
return params.Repository{}, fmt.Errorf("failed to encrypt string")
|
||||
}
|
||||
repo.WebhookSecret = secret
|
||||
}
|
||||
|
||||
q := s.conn.Save(&repo)
|
||||
if q.Error != nil {
|
||||
return params.Repository{}, errors.Wrap(err, "saving repo")
|
||||
}
|
||||
|
||||
newParams := s.sqlToCommonRepository(repo)
|
||||
secret, err := util.Aes256DecodeString(repo.WebhookSecret, s.cfg.Passphrase)
|
||||
if err != nil {
|
||||
return params.Repository{}, errors.Wrap(err, "decrypting secret")
|
||||
}
|
||||
newParams.WebhookSecret = secret
|
||||
return newParams, nil
|
||||
}
|
||||
|
||||
func (s *sqlDatabase) getRepo(ctx context.Context, owner, name string, preloadAll bool) (Repository, error) {
|
||||
var repo Repository
|
||||
|
||||
|
|
@ -159,15 +191,20 @@ func (s *sqlDatabase) getRepo(ctx context.Context, owner, name string, preloadAl
|
|||
return repo, nil
|
||||
}
|
||||
|
||||
func (s *sqlDatabase) getRepoByID(ctx context.Context, id string) (Repository, error) {
|
||||
func (s *sqlDatabase) getRepoByID(ctx context.Context, id string, preload ...string) (Repository, error) {
|
||||
u, err := uuid.FromString(id)
|
||||
if err != nil {
|
||||
return Repository{}, errors.Wrap(runnerErrors.ErrBadRequest, "parsing id")
|
||||
}
|
||||
var repo Repository
|
||||
q := s.conn.
|
||||
Where("id = ?", u).
|
||||
First(&repo)
|
||||
|
||||
q := s.conn
|
||||
if len(preload) > 0 {
|
||||
for _, field := range preload {
|
||||
q = q.Preload(field)
|
||||
}
|
||||
}
|
||||
q = q.Where("id = ?", u).First(&repo)
|
||||
|
||||
if q.Error != nil {
|
||||
if errors.Is(q.Error, gorm.ErrRecordNotFound) {
|
||||
|
|
@ -194,6 +231,22 @@ func (s *sqlDatabase) GetRepository(ctx context.Context, owner, name string) (pa
|
|||
return param, nil
|
||||
}
|
||||
|
||||
func (s *sqlDatabase) GetRepositoryByID(ctx context.Context, repoID string) (params.Repository, error) {
|
||||
repo, err := s.getRepoByID(ctx, repoID, "Pools")
|
||||
if err != nil {
|
||||
return params.Repository{}, errors.Wrap(err, "fetching repo")
|
||||
}
|
||||
|
||||
param := s.sqlToCommonRepository(repo)
|
||||
secret, err := util.Aes256DecodeString(repo.WebhookSecret, s.cfg.Passphrase)
|
||||
if err != nil {
|
||||
return params.Repository{}, errors.Wrap(err, "decrypting secret")
|
||||
}
|
||||
param.WebhookSecret = secret
|
||||
|
||||
return param, nil
|
||||
}
|
||||
|
||||
func (s *sqlDatabase) ListRepositories(ctx context.Context) ([]params.Repository, error) {
|
||||
var repos []Repository
|
||||
q := s.conn.Find(&repos)
|
||||
|
|
@ -204,13 +257,21 @@ func (s *sqlDatabase) ListRepositories(ctx context.Context) ([]params.Repository
|
|||
ret := make([]params.Repository, len(repos))
|
||||
for idx, val := range repos {
|
||||
ret[idx] = s.sqlToCommonRepository(val)
|
||||
if len(val.WebhookSecret) > 0 {
|
||||
secret, err := util.Aes256DecodeString(val.WebhookSecret, s.cfg.Passphrase)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "decrypting secret")
|
||||
}
|
||||
ret[idx].WebhookSecret = secret
|
||||
}
|
||||
}
|
||||
|
||||
return ret, nil
|
||||
}
|
||||
|
||||
func (s *sqlDatabase) DeleteRepository(ctx context.Context, owner, name string) error {
|
||||
repo, err := s.getRepo(ctx, owner, name, false)
|
||||
// func (s *sqlDatabase) DeleteRepository(ctx context.Context, owner, name string, hardDelete bool) error {
|
||||
func (s *sqlDatabase) DeleteRepository(ctx context.Context, repoID string, hardDelete bool) error {
|
||||
repo, err := s.getRepoByID(ctx, repoID)
|
||||
if err != nil {
|
||||
if err == runnerErrors.ErrNotFound {
|
||||
return nil
|
||||
|
|
@ -218,7 +279,11 @@ func (s *sqlDatabase) DeleteRepository(ctx context.Context, owner, name string)
|
|||
return errors.Wrap(err, "fetching repo")
|
||||
}
|
||||
|
||||
q := s.conn.Delete(&repo)
|
||||
q := s.conn
|
||||
if hardDelete {
|
||||
q = q.Unscoped()
|
||||
}
|
||||
q = q.Delete(&repo)
|
||||
if q.Error != nil && !errors.Is(q.Error, gorm.ErrRecordNotFound) {
|
||||
return errors.Wrap(q.Error, "deleting repo")
|
||||
}
|
||||
|
|
@ -479,6 +544,34 @@ func (s *sqlDatabase) getOrgPools(ctx context.Context, orgID string, preloadAll
|
|||
return pools, nil
|
||||
}
|
||||
|
||||
func (s *sqlDatabase) ListRepoPools(ctx context.Context, repoID string) ([]params.Pool, error) {
|
||||
pools, err := s.getRepoPools(ctx, repoID, false)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "fetching pools")
|
||||
}
|
||||
|
||||
ret := make([]params.Pool, len(pools))
|
||||
for idx, pool := range pools {
|
||||
ret[idx] = s.sqlToCommonPool(pool)
|
||||
}
|
||||
|
||||
return ret, nil
|
||||
}
|
||||
|
||||
func (s *sqlDatabase) ListOrgPools(ctx context.Context, orgID string) ([]params.Pool, error) {
|
||||
pools, err := s.getOrgPools(ctx, orgID, false)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "fetching pools")
|
||||
}
|
||||
|
||||
ret := make([]params.Pool, len(pools))
|
||||
for idx, pool := range pools {
|
||||
ret[idx] = s.sqlToCommonPool(pool)
|
||||
}
|
||||
|
||||
return ret, nil
|
||||
}
|
||||
|
||||
func (s *sqlDatabase) getRepoPool(ctx context.Context, repoID, poolID string) (Pool, error) {
|
||||
repo, err := s.getRepoByID(ctx, repoID)
|
||||
if err != nil {
|
||||
|
|
@ -990,7 +1083,15 @@ func (s *sqlDatabase) HasAdminUser(ctx context.Context) bool {
|
|||
}
|
||||
|
||||
func (s *sqlDatabase) GetUser(ctx context.Context, user string) (params.User, error) {
|
||||
dbUser, err := s.getUserByID(user)
|
||||
dbUser, err := s.getUserByUsernameOrEmail(user)
|
||||
if err != nil {
|
||||
return params.User{}, errors.Wrap(err, "fetching user")
|
||||
}
|
||||
return s.sqlToParamsUser(dbUser), nil
|
||||
}
|
||||
|
||||
func (s *sqlDatabase) GetUserByID(ctx context.Context, userID string) (params.User, error) {
|
||||
dbUser, err := s.getUserByID(userID)
|
||||
if err != nil {
|
||||
return params.User{}, errors.Wrap(err, "fetching user")
|
||||
}
|
||||
|
|
|
|||
|
|
@ -19,6 +19,21 @@ type CreateRepoParams struct {
|
|||
WebhookSecret string `json:"webhook_secret"`
|
||||
}
|
||||
|
||||
func (c *CreateRepoParams) Validate() error {
|
||||
if c.Owner == "" {
|
||||
return errors.NewBadRequestError("missing owner")
|
||||
}
|
||||
|
||||
if c.Name == "" {
|
||||
return errors.NewBadRequestError("missing repo name")
|
||||
}
|
||||
|
||||
if c.CredentialsName == "" {
|
||||
return errors.NewBadRequestError("missing credentials name")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// NewUserParams holds the needed information to create
|
||||
// a new user
|
||||
type NewUserParams struct {
|
||||
|
|
@ -99,3 +114,8 @@ func (p PasswordLoginParams) Validate() error {
|
|||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type UpdateRepositoryParams struct {
|
||||
CredentialsName string `json:"credentials_name"`
|
||||
WebhookSecret string `json:"webhook_secret"`
|
||||
}
|
||||
|
|
|
|||
|
|
@ -12,6 +12,7 @@ const (
|
|||
type PoolManager interface {
|
||||
WebhookSecret() string
|
||||
HandleWorkflowJob(job params.WorkflowJob) error
|
||||
RefreshState(cfg params.Repository) error
|
||||
|
||||
// PoolManager lifecycle functions. Start/stop pool.
|
||||
Start() error
|
||||
|
|
|
|||
|
|
@ -39,6 +39,7 @@ func NewRepositoryPoolManager(ctx context.Context, cfg params.Repository, provid
|
|||
ctx: ctx,
|
||||
cfg: cfg,
|
||||
ghcli: ghc,
|
||||
id: cfg.ID,
|
||||
store: store,
|
||||
providers: providers,
|
||||
pools: pools,
|
||||
|
|
@ -46,6 +47,11 @@ func NewRepositoryPoolManager(ctx context.Context, cfg params.Repository, provid
|
|||
quit: make(chan struct{}),
|
||||
done: make(chan struct{}),
|
||||
}
|
||||
|
||||
if err := repo.loadPools(); err != nil {
|
||||
return nil, errors.Wrap(err, "loading pools")
|
||||
}
|
||||
|
||||
return repo, nil
|
||||
}
|
||||
|
||||
|
|
@ -65,6 +71,33 @@ type Repository struct {
|
|||
mux sync.Mutex
|
||||
}
|
||||
|
||||
func (r *Repository) RefreshState(cfg params.Repository) error {
|
||||
r.mux.Lock()
|
||||
defer r.mux.Unlock()
|
||||
|
||||
r.cfg = cfg
|
||||
ghc, err := util.GithubClient(r.ctx, r.cfg.Internal.OAuth2Token)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "getting github client")
|
||||
}
|
||||
r.ghcli = ghc
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *Repository) loadPools() error {
|
||||
pools, err := r.store.ListRepoPools(r.ctx, r.id)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "fetching pools")
|
||||
}
|
||||
|
||||
for _, pool := range pools {
|
||||
if err := r.AddPool(r.ctx, pool); err != nil {
|
||||
return errors.Wrap(err, "adding pool")
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *Repository) AddPool(ctx context.Context, pool params.Pool) error {
|
||||
r.mux.Lock()
|
||||
defer r.mux.Unlock()
|
||||
|
|
@ -73,7 +106,6 @@ func (r *Repository) AddPool(ctx context.Context, pool params.Pool) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// start pool loop
|
||||
r.pools[pool.ID] = pool
|
||||
return nil
|
||||
}
|
||||
|
|
@ -140,6 +172,7 @@ func (r *Repository) consolidate() {
|
|||
r.mux.Lock()
|
||||
defer r.mux.Unlock()
|
||||
|
||||
log.Printf("Webhook secret: %s", r.cfg.WebhookSecret)
|
||||
r.deletePendingInstances()
|
||||
r.addPendingInstances()
|
||||
r.ensureMinIdleRunners()
|
||||
|
|
@ -481,7 +514,11 @@ func (r *Repository) AddRunner(ctx context.Context, poolID string) error {
|
|||
}
|
||||
|
||||
func (r *Repository) loop() {
|
||||
defer close(r.done)
|
||||
defer func() {
|
||||
log.Printf("repository %s/%s loop exited", r.cfg.Owner, r.cfg.Name)
|
||||
close(r.done)
|
||||
}()
|
||||
log.Printf("starting loop for %s/%s", r.cfg.Owner, r.cfg.Name)
|
||||
// TODO: Consolidate runners on loop start. Provider runners must match runners
|
||||
// in github and DB. When a Workflow job is received, we will first create/update
|
||||
// an entity in the database, before sending the request to the provider to create/delete
|
||||
|
|
|
|||
225
runner/runner.go
225
runner/runner.go
|
|
@ -15,10 +15,11 @@ import (
|
|||
"strings"
|
||||
"sync"
|
||||
|
||||
"runner-manager/auth"
|
||||
"runner-manager/config"
|
||||
"runner-manager/database"
|
||||
dbCommon "runner-manager/database/common"
|
||||
gErrors "runner-manager/errors"
|
||||
runnerErrors "runner-manager/errors"
|
||||
"runner-manager/params"
|
||||
"runner-manager/runner/common"
|
||||
"runner-manager/runner/pool"
|
||||
|
|
@ -44,6 +45,11 @@ func NewRunner(ctx context.Context, cfg config.Config) (*Runner, error) {
|
|||
log.Fatal(err)
|
||||
}
|
||||
|
||||
creds := map[string]config.Github{}
|
||||
|
||||
for _, ghcreds := range cfg.Github {
|
||||
creds[ghcreds.Name] = ghcreds
|
||||
}
|
||||
runner := &Runner{
|
||||
ctx: ctx,
|
||||
config: cfg,
|
||||
|
|
@ -52,6 +58,7 @@ func NewRunner(ctx context.Context, cfg config.Config) (*Runner, error) {
|
|||
repositories: map[string]common.PoolManager{},
|
||||
organizations: map[string]common.PoolManager{},
|
||||
providers: providers,
|
||||
credentials: creds,
|
||||
}
|
||||
|
||||
if err := runner.ensureSSHKeys(); err != nil {
|
||||
|
|
@ -78,26 +85,160 @@ type Runner struct {
|
|||
repositories map[string]common.PoolManager
|
||||
organizations map[string]common.PoolManager
|
||||
providers map[string]common.Provider
|
||||
credentials map[string]config.Github
|
||||
}
|
||||
|
||||
func (r *Runner) CreateRepository(ctx context.Context) error {
|
||||
func (r *Runner) CreateRepository(ctx context.Context, param params.CreateRepoParams) (repo params.Repository, err error) {
|
||||
if !auth.IsAdmin(ctx) {
|
||||
return repo, runnerErrors.ErrUnauthorized
|
||||
}
|
||||
|
||||
if err := param.Validate(); err != nil {
|
||||
return params.Repository{}, errors.Wrap(err, "validating params")
|
||||
}
|
||||
|
||||
creds, ok := r.credentials[param.CredentialsName]
|
||||
if !ok {
|
||||
return params.Repository{}, runnerErrors.NewBadRequestError("credentials %s not defined", param.CredentialsName)
|
||||
}
|
||||
|
||||
_, err = r.store.GetRepository(ctx, param.Owner, param.Name)
|
||||
if err != nil {
|
||||
if !errors.Is(err, runnerErrors.ErrNotFound) {
|
||||
return params.Repository{}, errors.Wrap(err, "fetching repo")
|
||||
}
|
||||
} else {
|
||||
return params.Repository{}, runnerErrors.NewConflictError("repository %s/%s already exists", param.Owner, param.Name)
|
||||
}
|
||||
|
||||
repo, err = r.store.CreateRepository(ctx, param.Owner, param.Name, creds.Name, param.WebhookSecret)
|
||||
if err != nil {
|
||||
return params.Repository{}, errors.Wrap(err, "creating repository")
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if err != nil {
|
||||
r.store.DeleteRepository(ctx, repo.ID, true)
|
||||
}
|
||||
}()
|
||||
|
||||
poolMgr, err := r.loadRepoPoolManager(repo)
|
||||
if err := poolMgr.Start(); err != nil {
|
||||
return params.Repository{}, errors.Wrap(err, "starting pool manager")
|
||||
}
|
||||
r.repositories[repo.ID] = poolMgr
|
||||
return repo, nil
|
||||
}
|
||||
|
||||
func (r *Runner) ListRepositories(ctx context.Context) ([]params.Repository, error) {
|
||||
if !auth.IsAdmin(ctx) {
|
||||
return nil, runnerErrors.ErrUnauthorized
|
||||
}
|
||||
|
||||
repos, err := r.store.ListRepositories(ctx)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "listing repositories")
|
||||
}
|
||||
|
||||
return repos, nil
|
||||
}
|
||||
|
||||
func (r *Runner) GetRepositoryByID(ctx context.Context, repoID string) (params.Repository, error) {
|
||||
if !auth.IsAdmin(ctx) {
|
||||
return params.Repository{}, runnerErrors.ErrUnauthorized
|
||||
}
|
||||
|
||||
repo, err := r.store.GetRepositoryByID(ctx, repoID)
|
||||
if err != nil {
|
||||
return params.Repository{}, errors.Wrap(err, "fetching repository")
|
||||
}
|
||||
return repo, nil
|
||||
}
|
||||
|
||||
func (r *Runner) DeleteRepository(ctx context.Context, repoID string) error {
|
||||
if !auth.IsAdmin(ctx) {
|
||||
return runnerErrors.ErrUnauthorized
|
||||
}
|
||||
|
||||
repo, err := r.store.GetRepositoryByID(ctx, repoID)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "fetching repo")
|
||||
}
|
||||
|
||||
poolMgr, ok := r.repositories[repo.ID]
|
||||
if ok {
|
||||
if err := poolMgr.Stop(); err != nil {
|
||||
log.Printf("failed to stop pool for repo %s", repo.ID)
|
||||
}
|
||||
delete(r.repositories, repoID)
|
||||
}
|
||||
|
||||
pools, err := r.store.ListRepoPools(ctx, repoID)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "fetching repo pools")
|
||||
}
|
||||
|
||||
if len(pools) > 0 {
|
||||
poolIds := []string{}
|
||||
for _, pool := range pools {
|
||||
poolIds = append(poolIds, pool.ID)
|
||||
}
|
||||
|
||||
return runnerErrors.NewBadRequestError("repo has pools defined (%s)", strings.Join(poolIds, ", "))
|
||||
}
|
||||
|
||||
if err := r.store.DeleteRepository(ctx, repoID, true); err != nil {
|
||||
return errors.Wrap(err, "removing repository")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *Runner) ListRepositories(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
func (r *Runner) UpdateRepository(ctx context.Context, repoID string, param params.UpdateRepositoryParams) (params.Repository, error) {
|
||||
if !auth.IsAdmin(ctx) {
|
||||
return params.Repository{}, runnerErrors.ErrUnauthorized
|
||||
}
|
||||
|
||||
func (r *Runner) GetRepository(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
r.mux.Lock()
|
||||
defer r.mux.Unlock()
|
||||
|
||||
func (r *Runner) DeleteRepository(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
repo, err := r.store.GetRepositoryByID(ctx, repoID)
|
||||
if err != nil {
|
||||
return params.Repository{}, errors.Wrap(err, "fetching repo")
|
||||
}
|
||||
|
||||
func (r *Runner) UpdateRepository(ctx context.Context) error {
|
||||
return nil
|
||||
if param.CredentialsName != "" {
|
||||
// Check that credentials are set before saving to db
|
||||
if _, ok := r.credentials[param.CredentialsName]; !ok {
|
||||
return params.Repository{}, runnerErrors.NewBadRequestError("invalid credentials (%s) for repo %s/%s", param.CredentialsName, repo.Owner, repo.Name)
|
||||
}
|
||||
}
|
||||
|
||||
repo, err = r.store.UpdateRepository(ctx, repoID, param)
|
||||
if err != nil {
|
||||
return params.Repository{}, errors.Wrap(err, "updating repo")
|
||||
}
|
||||
|
||||
log.Printf("post-update webhook secret: %s", repo.WebhookSecret)
|
||||
poolMgr, ok := r.repositories[repo.ID]
|
||||
if ok {
|
||||
internalCfg, err := r.getInternalConfig(repo)
|
||||
if err != nil {
|
||||
return params.Repository{}, errors.Wrap(err, "fetching internal config")
|
||||
}
|
||||
repo.Internal = internalCfg
|
||||
// stop the pool mgr
|
||||
if err := poolMgr.RefreshState(repo); err != nil {
|
||||
return params.Repository{}, errors.Wrap(err, "updating pool manager")
|
||||
}
|
||||
} else {
|
||||
poolMgr, err := r.loadRepoPoolManager(repo)
|
||||
if err != nil {
|
||||
return params.Repository{}, errors.Wrap(err, "loading pool manager")
|
||||
}
|
||||
r.repositories[repo.ID] = poolMgr
|
||||
}
|
||||
|
||||
return repo, nil
|
||||
}
|
||||
|
||||
func (r *Runner) CreateRepoPool(ctx context.Context) error {
|
||||
|
|
@ -141,6 +282,33 @@ func (r *Runner) ListProviders(ctx context.Context) ([]params.Provider, error) {
|
|||
return ret, nil
|
||||
}
|
||||
|
||||
func (r *Runner) getInternalConfig(repo params.Repository) (params.Internal, error) {
|
||||
creds, ok := r.credentials[repo.CredentialsName]
|
||||
if !ok {
|
||||
return params.Internal{}, runnerErrors.NewBadRequestError("invalid credentials (%s) for repo %s/%s", repo.CredentialsName, repo.Owner, repo.Name)
|
||||
}
|
||||
|
||||
return params.Internal{
|
||||
OAuth2Token: creds.OAuth2Token,
|
||||
ControllerID: r.controllerID,
|
||||
InstanceCallbackURL: r.config.Default.CallbackURL,
|
||||
JWTSecret: r.config.JWTAuth.Secret,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (r *Runner) loadRepoPoolManager(repo params.Repository) (common.PoolManager, error) {
|
||||
cfg, err := r.getInternalConfig(repo)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "fetching internal config")
|
||||
}
|
||||
repo.Internal = cfg
|
||||
poolManager, err := pool.NewRepositoryPoolManager(r.ctx, repo, r.providers, r.store)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "creating pool manager")
|
||||
}
|
||||
return poolManager, nil
|
||||
}
|
||||
|
||||
func (r *Runner) loadReposAndOrgs() error {
|
||||
r.mux.Lock()
|
||||
defer r.mux.Unlock()
|
||||
|
|
@ -152,9 +320,9 @@ func (r *Runner) loadReposAndOrgs() error {
|
|||
|
||||
for _, repo := range repos {
|
||||
log.Printf("creating pool manager for %s/%s", repo.Owner, repo.Name)
|
||||
poolManager, err := pool.NewRepositoryPoolManager(r.ctx, repo, r.providers, r.store)
|
||||
poolManager, err := r.loadRepoPoolManager(repo)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "creating pool manager")
|
||||
return errors.Wrap(err, "loading repo pool manager")
|
||||
}
|
||||
r.repositories[repo.ID] = poolManager
|
||||
}
|
||||
|
|
@ -163,6 +331,9 @@ func (r *Runner) loadReposAndOrgs() error {
|
|||
}
|
||||
|
||||
func (r *Runner) Start() error {
|
||||
r.mux.Lock()
|
||||
defer r.mux.Unlock()
|
||||
|
||||
for _, repo := range r.repositories {
|
||||
if err := repo.Start(); err != nil {
|
||||
return errors.Wrap(err, "starting repo pool manager")
|
||||
|
|
@ -178,6 +349,9 @@ func (r *Runner) Start() error {
|
|||
}
|
||||
|
||||
func (r *Runner) Stop() error {
|
||||
r.mux.Lock()
|
||||
defer r.mux.Unlock()
|
||||
|
||||
for _, repo := range r.repositories {
|
||||
if err := repo.Stop(); err != nil {
|
||||
return errors.Wrap(err, "starting repo pool manager")
|
||||
|
|
@ -193,6 +367,9 @@ func (r *Runner) Stop() error {
|
|||
}
|
||||
|
||||
func (r *Runner) Wait() error {
|
||||
r.mux.Lock()
|
||||
defer r.mux.Unlock()
|
||||
|
||||
var wg sync.WaitGroup
|
||||
|
||||
for poolId, repo := range r.repositories {
|
||||
|
|
@ -230,7 +407,7 @@ func (r *Runner) findRepoPoolManager(owner, name string) (common.PoolManager, er
|
|||
if repo, ok := r.repositories[repo.ID]; ok {
|
||||
return repo, nil
|
||||
}
|
||||
return nil, errors.Wrapf(gErrors.ErrNotFound, "repository %s/%s not configured", owner, name)
|
||||
return nil, errors.Wrapf(runnerErrors.ErrNotFound, "repository %s/%s not configured", owner, name)
|
||||
}
|
||||
|
||||
func (r *Runner) findOrgPoolManager(name string) (common.PoolManager, error) {
|
||||
|
|
@ -245,7 +422,7 @@ func (r *Runner) findOrgPoolManager(name string) (common.PoolManager, error) {
|
|||
if orgPoolMgr, ok := r.organizations[org.ID]; ok {
|
||||
return orgPoolMgr, nil
|
||||
}
|
||||
return nil, errors.Wrapf(gErrors.ErrNotFound, "organization %s not configured", name)
|
||||
return nil, errors.Wrapf(runnerErrors.ErrNotFound, "organization %s not configured", name)
|
||||
}
|
||||
|
||||
func (r *Runner) validateHookBody(signature, secret string, body []byte) error {
|
||||
|
|
@ -257,7 +434,7 @@ func (r *Runner) validateHookBody(signature, secret string, body []byte) error {
|
|||
if signature == "" {
|
||||
// A secret was set in our config, but a signature was not received
|
||||
// from Github. Authentication of the body cannot be done.
|
||||
return gErrors.NewUnauthorizedError("missing github signature")
|
||||
return runnerErrors.NewUnauthorizedError("missing github signature")
|
||||
}
|
||||
|
||||
sigParts := strings.SplitN(signature, "=", 2)
|
||||
|
|
@ -265,7 +442,7 @@ func (r *Runner) validateHookBody(signature, secret string, body []byte) error {
|
|||
// We expect the signature from github to be of the format:
|
||||
// hashType=hashValue
|
||||
// ie: sha256=1fc917c7ad66487470e466c0ad40ddd45b9f7730a4b43e1b2542627f0596bbdc
|
||||
return gErrors.NewBadRequestError("invalid signature format")
|
||||
return runnerErrors.NewBadRequestError("invalid signature format")
|
||||
}
|
||||
|
||||
var hashFunc func() hash.Hash
|
||||
|
|
@ -275,7 +452,7 @@ func (r *Runner) validateHookBody(signature, secret string, body []byte) error {
|
|||
case "sha1":
|
||||
hashFunc = sha1.New
|
||||
default:
|
||||
return gErrors.NewBadRequestError("unknown signature type")
|
||||
return runnerErrors.NewBadRequestError("unknown signature type")
|
||||
}
|
||||
|
||||
mac := hmac.New(hashFunc, []byte(secret))
|
||||
|
|
@ -286,7 +463,7 @@ func (r *Runner) validateHookBody(signature, secret string, body []byte) error {
|
|||
expectedMAC := hex.EncodeToString(mac.Sum(nil))
|
||||
|
||||
if !hmac.Equal([]byte(sigParts[1]), []byte(expectedMAC)) {
|
||||
return gErrors.NewUnauthorizedError("signature missmatch")
|
||||
return runnerErrors.NewUnauthorizedError("signature missmatch")
|
||||
}
|
||||
|
||||
return nil
|
||||
|
|
@ -294,12 +471,12 @@ func (r *Runner) validateHookBody(signature, secret string, body []byte) error {
|
|||
|
||||
func (r *Runner) DispatchWorkflowJob(hookTargetType, signature string, jobData []byte) error {
|
||||
if jobData == nil || len(jobData) == 0 {
|
||||
return gErrors.NewBadRequestError("missing job data")
|
||||
return runnerErrors.NewBadRequestError("missing job data")
|
||||
}
|
||||
|
||||
var job params.WorkflowJob
|
||||
if err := json.Unmarshal(jobData, &job); err != nil {
|
||||
return errors.Wrapf(gErrors.ErrBadRequest, "invalid job data: %s", err)
|
||||
return errors.Wrapf(runnerErrors.ErrBadRequest, "invalid job data: %s", err)
|
||||
}
|
||||
|
||||
var poolManager common.PoolManager
|
||||
|
|
@ -311,7 +488,7 @@ func (r *Runner) DispatchWorkflowJob(hookTargetType, signature string, jobData [
|
|||
case OrganizationHook:
|
||||
poolManager, err = r.findOrgPoolManager(job.Organization.Login)
|
||||
default:
|
||||
return gErrors.NewBadRequestError("cannot handle hook target type %s", hookTargetType)
|
||||
return runnerErrors.NewBadRequestError("cannot handle hook target type %s", hookTargetType)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue