Auto create runners for pools
This commit is contained in:
parent
0314fd3b67
commit
a78ad539fe
10 changed files with 371 additions and 239 deletions
|
|
@ -308,3 +308,35 @@ func (a *APIController) UpdateRepoHandler(w http.ResponseWriter, r *http.Request
|
|||
w.Header().Set("Content-Type", "application/json")
|
||||
json.NewEncoder(w).Encode(repo)
|
||||
}
|
||||
|
||||
func (a *APIController) CreateRepoPoolHandler(w http.ResponseWriter, r *http.Request) {
|
||||
ctx := r.Context()
|
||||
|
||||
vars := mux.Vars(r)
|
||||
repoID, ok := vars["repoID"]
|
||||
if !ok {
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
json.NewEncoder(w).Encode(params.APIErrorResponse{
|
||||
Error: "Bad Request",
|
||||
Details: "No repo ID specified",
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
var poolData runnerParams.CreatePoolParams
|
||||
if err := json.NewDecoder(r.Body).Decode(&poolData); err != nil {
|
||||
log.Printf("failed to decode: %+v", err)
|
||||
handleError(w, gErrors.ErrBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
pool, err := a.r.CreateRepoPool(ctx, repoID, poolData)
|
||||
if err != nil {
|
||||
log.Printf("error creating repository pool: %+v", err)
|
||||
handleError(w, err)
|
||||
return
|
||||
}
|
||||
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
json.NewEncoder(w).Encode(pool)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -19,6 +19,7 @@ func NewAPIRouter(han *controllers.APIController, logWriter io.Writer, authMiddl
|
|||
// Handles github webhooks
|
||||
webhookRouter := router.PathPrefix("/webhooks").Subrouter()
|
||||
webhookRouter.PathPrefix("/").Handler(log(logWriter, http.HandlerFunc(han.CatchAll)))
|
||||
webhookRouter.PathPrefix("").Handler(log(logWriter, http.HandlerFunc(han.CatchAll)))
|
||||
|
||||
// Handles API calls
|
||||
apiSubRouter := router.PathPrefix("/api/v1").Subrouter()
|
||||
|
|
@ -47,8 +48,8 @@ func NewAPIRouter(han *controllers.APIController, logWriter io.Writer, authMiddl
|
|||
apiRouter.Handle("/repositories/{repoID}/pools/", log(os.Stdout, http.HandlerFunc(han.CatchAll))).Methods("GET", "OPTIONS")
|
||||
apiRouter.Handle("/repositories/{repoID}/pools", log(os.Stdout, http.HandlerFunc(han.CatchAll))).Methods("GET", "OPTIONS")
|
||||
// Create pool
|
||||
apiRouter.Handle("/repositories/{repoID}/pools/", log(os.Stdout, http.HandlerFunc(han.CatchAll))).Methods("POST", "OPTIONS")
|
||||
apiRouter.Handle("/repositories/{repoID}/pools", log(os.Stdout, http.HandlerFunc(han.CatchAll))).Methods("POST", "OPTIONS")
|
||||
apiRouter.Handle("/repositories/{repoID}/pools/", log(os.Stdout, http.HandlerFunc(han.CreateRepoPoolHandler))).Methods("POST", "OPTIONS")
|
||||
apiRouter.Handle("/repositories/{repoID}/pools", log(os.Stdout, http.HandlerFunc(han.CreateRepoPoolHandler))).Methods("POST", "OPTIONS")
|
||||
|
||||
// Get repo
|
||||
apiRouter.Handle("/repositories/{repoID}/", log(os.Stdout, http.HandlerFunc(han.GetRepoByIDHandler))).Methods("GET", "OPTIONS")
|
||||
|
|
|
|||
|
|
@ -506,7 +506,7 @@ func (s *sqlDatabase) CreateOrganizationPool(ctx context.Context, orgId string,
|
|||
return s.sqlToCommonPool(newPool), nil
|
||||
}
|
||||
|
||||
func (s *sqlDatabase) getRepoPools(ctx context.Context, repoID string, preloadAll bool) ([]Pool, error) {
|
||||
func (s *sqlDatabase) getRepoPools(ctx context.Context, repoID string, preload ...string) ([]Pool, error) {
|
||||
repo, err := s.getRepoByID(ctx, repoID)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "fetching repo")
|
||||
|
|
@ -514,8 +514,10 @@ func (s *sqlDatabase) getRepoPools(ctx context.Context, repoID string, preloadAl
|
|||
|
||||
var pools []Pool
|
||||
q := s.conn.Model(&repo)
|
||||
if preloadAll {
|
||||
q = q.Preload(clause.Associations)
|
||||
if len(preload) > 0 {
|
||||
for _, item := range preload {
|
||||
q = q.Preload(item)
|
||||
}
|
||||
}
|
||||
err = q.Association("Pools").Find(&pools)
|
||||
if err != nil {
|
||||
|
|
@ -545,7 +547,7 @@ func (s *sqlDatabase) getOrgPools(ctx context.Context, orgID string, preloadAll
|
|||
}
|
||||
|
||||
func (s *sqlDatabase) ListRepoPools(ctx context.Context, repoID string) ([]params.Pool, error) {
|
||||
pools, err := s.getRepoPools(ctx, repoID, false)
|
||||
pools, err := s.getRepoPools(ctx, repoID, "Tags")
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "fetching pools")
|
||||
}
|
||||
|
|
@ -750,7 +752,7 @@ func (s *sqlDatabase) sqlToParamsInstance(instance Instance) params.Instance {
|
|||
OSArch: instance.OSArch,
|
||||
Status: instance.Status,
|
||||
RunnerStatus: instance.RunnerStatus,
|
||||
PoolID: instance.Pool.ID.String(),
|
||||
PoolID: instance.PoolID.String(),
|
||||
CallbackURL: instance.CallbackURL,
|
||||
}
|
||||
|
||||
|
|
@ -907,7 +909,7 @@ func (s *sqlDatabase) ListInstances(ctx context.Context, poolID string) ([]param
|
|||
}
|
||||
|
||||
func (s *sqlDatabase) ListRepoInstances(ctx context.Context, repoID string) ([]params.Instance, error) {
|
||||
pools, err := s.getRepoPools(ctx, repoID, true)
|
||||
pools, err := s.getRepoPools(ctx, repoID, "Instances")
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "fetching repo")
|
||||
}
|
||||
|
|
|
|||
|
|
@ -41,10 +41,10 @@ type UnauthorizedError struct {
|
|||
}
|
||||
|
||||
// NewNotFoundError returns a new NotFoundError
|
||||
func NewNotFoundError(msg string) error {
|
||||
func NewNotFoundError(msg string, a ...interface{}) error {
|
||||
return &NotFoundError{
|
||||
baseError{
|
||||
msg: msg,
|
||||
msg: fmt.Sprintf(msg, a...),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
package params
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"runner-manager/config"
|
||||
"runner-manager/errors"
|
||||
"runner-manager/runner/providers/common"
|
||||
|
|
@ -79,6 +80,34 @@ type CreatePoolParams struct {
|
|||
Enabled bool `json:"enabled"`
|
||||
}
|
||||
|
||||
func (p *CreatePoolParams) Validate() error {
|
||||
if p.ProviderName == "" {
|
||||
return fmt.Errorf("missing provider")
|
||||
}
|
||||
|
||||
if p.MinIdleRunners > p.MaxRunners {
|
||||
return fmt.Errorf("min_idle_runners cannot be larger than max_runners")
|
||||
}
|
||||
|
||||
if p.MaxRunners == 0 {
|
||||
return fmt.Errorf("max_runners cannot be 0")
|
||||
}
|
||||
|
||||
if len(p.Tags) == 0 {
|
||||
return fmt.Errorf("missing tags")
|
||||
}
|
||||
|
||||
if p.Flavor == "" {
|
||||
return fmt.Errorf("missing flavor")
|
||||
}
|
||||
|
||||
if p.Image == "" {
|
||||
return fmt.Errorf("missing image")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
type UpdateInstanceParams struct {
|
||||
ProviderID string `json:"provider_id,omitempty"`
|
||||
// OSName is the name of the OS. Eg: ubuntu, centos, etc.
|
||||
|
|
|
|||
|
|
@ -2,6 +2,7 @@ package pool
|
|||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log"
|
||||
"strings"
|
||||
|
|
@ -191,7 +192,8 @@ func (r *Repository) addPendingInstances() {
|
|||
// not in pending_create status. Skip.
|
||||
continue
|
||||
}
|
||||
|
||||
asJs, _ := json.MarshalIndent(instance, "", " ")
|
||||
log.Printf(">>> %s", string(asJs))
|
||||
if err := r.addInstanceToProvider(instance); err != nil {
|
||||
log.Printf("failed to create instance in provider: %s", err)
|
||||
}
|
||||
|
|
@ -435,7 +437,7 @@ func (r *Repository) deleteInstanceFromProvider(instance params.Instance) error
|
|||
func (r *Repository) addInstanceToProvider(instance params.Instance) error {
|
||||
pool, ok := r.pools[instance.PoolID]
|
||||
if !ok {
|
||||
return runnerErrors.NewNotFoundError("invalid pool ID")
|
||||
return runnerErrors.NewNotFoundError("invalid pool ID: %s", instance.PoolID)
|
||||
}
|
||||
|
||||
provider, ok := r.providers[pool.ProviderName]
|
||||
|
|
@ -462,6 +464,7 @@ func (r *Repository) addInstanceToProvider(instance params.Instance) error {
|
|||
}
|
||||
|
||||
bootstrapArgs := params.BootstrapInstance{
|
||||
Name: instance.Name,
|
||||
Tools: r.tools,
|
||||
RepoURL: r.githubURL(),
|
||||
GithubRunnerAccessToken: *tk.Token,
|
||||
|
|
|
|||
|
|
@ -182,8 +182,9 @@ func (l *LXD) secureBootEnabled() string {
|
|||
}
|
||||
|
||||
func (l *LXD) getCreateInstanceArgs(bootstrapParams params.BootstrapInstance) (api.InstancesPost, error) {
|
||||
// name := fmt.Sprintf("runner-manager-%s", uuid.New())
|
||||
|
||||
if bootstrapParams.Name == "" {
|
||||
return api.InstancesPost{}, runnerErrors.NewBadRequestError("missing name")
|
||||
}
|
||||
profiles, err := l.getProfiles(bootstrapParams.Flavor)
|
||||
if err != nil {
|
||||
return api.InstancesPost{}, errors.Wrap(err, "fetching profiles")
|
||||
|
|
|
|||
243
runner/repositories.go
Normal file
243
runner/repositories.go
Normal file
|
|
@ -0,0 +1,243 @@
|
|||
package runner
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log"
|
||||
"runner-manager/auth"
|
||||
runnerErrors "runner-manager/errors"
|
||||
"runner-manager/params"
|
||||
"runner-manager/runner/common"
|
||||
"runner-manager/runner/pool"
|
||||
"strings"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
func (r *Runner) CreateRepository(ctx context.Context, param params.CreateRepoParams) (repo params.Repository, err error) {
|
||||
if !auth.IsAdmin(ctx) {
|
||||
return repo, runnerErrors.ErrUnauthorized
|
||||
}
|
||||
|
||||
if err := param.Validate(); err != nil {
|
||||
return params.Repository{}, errors.Wrap(err, "validating params")
|
||||
}
|
||||
|
||||
creds, ok := r.credentials[param.CredentialsName]
|
||||
if !ok {
|
||||
return params.Repository{}, runnerErrors.NewBadRequestError("credentials %s not defined", param.CredentialsName)
|
||||
}
|
||||
|
||||
_, err = r.store.GetRepository(ctx, param.Owner, param.Name)
|
||||
if err != nil {
|
||||
if !errors.Is(err, runnerErrors.ErrNotFound) {
|
||||
return params.Repository{}, errors.Wrap(err, "fetching repo")
|
||||
}
|
||||
} else {
|
||||
return params.Repository{}, runnerErrors.NewConflictError("repository %s/%s already exists", param.Owner, param.Name)
|
||||
}
|
||||
|
||||
repo, err = r.store.CreateRepository(ctx, param.Owner, param.Name, creds.Name, param.WebhookSecret)
|
||||
if err != nil {
|
||||
return params.Repository{}, errors.Wrap(err, "creating repository")
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if err != nil {
|
||||
r.store.DeleteRepository(ctx, repo.ID, true)
|
||||
}
|
||||
}()
|
||||
|
||||
poolMgr, err := r.loadRepoPoolManager(repo)
|
||||
if err := poolMgr.Start(); err != nil {
|
||||
return params.Repository{}, errors.Wrap(err, "starting pool manager")
|
||||
}
|
||||
r.repositories[repo.ID] = poolMgr
|
||||
return repo, nil
|
||||
}
|
||||
|
||||
func (r *Runner) ListRepositories(ctx context.Context) ([]params.Repository, error) {
|
||||
if !auth.IsAdmin(ctx) {
|
||||
return nil, runnerErrors.ErrUnauthorized
|
||||
}
|
||||
|
||||
repos, err := r.store.ListRepositories(ctx)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "listing repositories")
|
||||
}
|
||||
|
||||
return repos, nil
|
||||
}
|
||||
|
||||
func (r *Runner) GetRepositoryByID(ctx context.Context, repoID string) (params.Repository, error) {
|
||||
if !auth.IsAdmin(ctx) {
|
||||
return params.Repository{}, runnerErrors.ErrUnauthorized
|
||||
}
|
||||
|
||||
repo, err := r.store.GetRepositoryByID(ctx, repoID)
|
||||
if err != nil {
|
||||
return params.Repository{}, errors.Wrap(err, "fetching repository")
|
||||
}
|
||||
return repo, nil
|
||||
}
|
||||
|
||||
func (r *Runner) DeleteRepository(ctx context.Context, repoID string) error {
|
||||
if !auth.IsAdmin(ctx) {
|
||||
return runnerErrors.ErrUnauthorized
|
||||
}
|
||||
|
||||
repo, err := r.store.GetRepositoryByID(ctx, repoID)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "fetching repo")
|
||||
}
|
||||
|
||||
poolMgr, ok := r.repositories[repo.ID]
|
||||
if ok {
|
||||
if err := poolMgr.Stop(); err != nil {
|
||||
log.Printf("failed to stop pool for repo %s", repo.ID)
|
||||
}
|
||||
delete(r.repositories, repoID)
|
||||
}
|
||||
|
||||
pools, err := r.store.ListRepoPools(ctx, repoID)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "fetching repo pools")
|
||||
}
|
||||
|
||||
if len(pools) > 0 {
|
||||
poolIds := []string{}
|
||||
for _, pool := range pools {
|
||||
poolIds = append(poolIds, pool.ID)
|
||||
}
|
||||
|
||||
return runnerErrors.NewBadRequestError("repo has pools defined (%s)", strings.Join(poolIds, ", "))
|
||||
}
|
||||
|
||||
if err := r.store.DeleteRepository(ctx, repoID, true); err != nil {
|
||||
return errors.Wrap(err, "removing repository")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *Runner) UpdateRepository(ctx context.Context, repoID string, param params.UpdateRepositoryParams) (params.Repository, error) {
|
||||
if !auth.IsAdmin(ctx) {
|
||||
return params.Repository{}, runnerErrors.ErrUnauthorized
|
||||
}
|
||||
|
||||
r.mux.Lock()
|
||||
defer r.mux.Unlock()
|
||||
|
||||
repo, err := r.store.GetRepositoryByID(ctx, repoID)
|
||||
if err != nil {
|
||||
return params.Repository{}, errors.Wrap(err, "fetching repo")
|
||||
}
|
||||
|
||||
if param.CredentialsName != "" {
|
||||
// Check that credentials are set before saving to db
|
||||
if _, ok := r.credentials[param.CredentialsName]; !ok {
|
||||
return params.Repository{}, runnerErrors.NewBadRequestError("invalid credentials (%s) for repo %s/%s", param.CredentialsName, repo.Owner, repo.Name)
|
||||
}
|
||||
}
|
||||
|
||||
repo, err = r.store.UpdateRepository(ctx, repoID, param)
|
||||
if err != nil {
|
||||
return params.Repository{}, errors.Wrap(err, "updating repo")
|
||||
}
|
||||
|
||||
poolMgr, ok := r.repositories[repo.ID]
|
||||
if ok {
|
||||
internalCfg, err := r.getInternalConfig(repo.CredentialsName)
|
||||
if err != nil {
|
||||
return params.Repository{}, errors.Wrap(err, "fetching internal config")
|
||||
}
|
||||
repo.Internal = internalCfg
|
||||
// stop the pool mgr
|
||||
if err := poolMgr.RefreshState(repo); err != nil {
|
||||
return params.Repository{}, errors.Wrap(err, "updating pool manager")
|
||||
}
|
||||
} else {
|
||||
poolMgr, err := r.loadRepoPoolManager(repo)
|
||||
if err != nil {
|
||||
return params.Repository{}, errors.Wrap(err, "loading pool manager")
|
||||
}
|
||||
r.repositories[repo.ID] = poolMgr
|
||||
}
|
||||
|
||||
return repo, nil
|
||||
}
|
||||
|
||||
func (r *Runner) CreateRepoPool(ctx context.Context, repoID string, param params.CreatePoolParams) (params.Pool, error) {
|
||||
if !auth.IsAdmin(ctx) {
|
||||
return params.Pool{}, runnerErrors.ErrUnauthorized
|
||||
}
|
||||
|
||||
r.mux.Lock()
|
||||
defer r.mux.Unlock()
|
||||
|
||||
if err := param.Validate(); err != nil {
|
||||
return params.Pool{}, errors.Wrapf(runnerErrors.ErrBadRequest, "validating params: %s", err)
|
||||
}
|
||||
|
||||
if !IsSupportedOSType(param.OSType) {
|
||||
return params.Pool{}, runnerErrors.NewBadRequestError("invalid OS type %s", param.OSType)
|
||||
}
|
||||
|
||||
if !IsSupportedArch(param.OSArch) {
|
||||
return params.Pool{}, runnerErrors.NewBadRequestError("invalid OS architecture %s", param.OSArch)
|
||||
}
|
||||
|
||||
_, ok := r.providers[param.ProviderName]
|
||||
if !ok {
|
||||
return params.Pool{}, runnerErrors.NewBadRequestError("no such provider %s", param.ProviderName)
|
||||
}
|
||||
|
||||
pool, err := r.store.CreateRepositoryPool(ctx, repoID, param)
|
||||
if err != nil {
|
||||
return params.Pool{}, errors.Wrap(err, "creating pool")
|
||||
}
|
||||
|
||||
return pool, nil
|
||||
}
|
||||
|
||||
func (r *Runner) DeleteRepoPool(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *Runner) ListRepoPools(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *Runner) UpdateRepoPool(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *Runner) ListPoolInstances(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *Runner) loadRepoPoolManager(repo params.Repository) (common.PoolManager, error) {
|
||||
cfg, err := r.getInternalConfig(repo.CredentialsName)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "fetching internal config")
|
||||
}
|
||||
repo.Internal = cfg
|
||||
poolManager, err := pool.NewRepositoryPoolManager(r.ctx, repo, r.providers, r.store)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "creating pool manager")
|
||||
}
|
||||
return poolManager, nil
|
||||
}
|
||||
|
||||
func (r *Runner) findRepoPoolManager(owner, name string) (common.PoolManager, error) {
|
||||
r.mux.Lock()
|
||||
defer r.mux.Unlock()
|
||||
|
||||
repo, err := r.store.GetRepository(r.ctx, owner, name)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "fetching repo")
|
||||
}
|
||||
|
||||
if repo, ok := r.repositories[repo.ID]; ok {
|
||||
return repo, nil
|
||||
}
|
||||
return nil, errors.Wrapf(runnerErrors.ErrNotFound, "repository %s/%s not configured", owner, name)
|
||||
}
|
||||
245
runner/runner.go
245
runner/runner.go
|
|
@ -15,14 +15,12 @@ import (
|
|||
"strings"
|
||||
"sync"
|
||||
|
||||
"runner-manager/auth"
|
||||
"runner-manager/config"
|
||||
"runner-manager/database"
|
||||
dbCommon "runner-manager/database/common"
|
||||
runnerErrors "runner-manager/errors"
|
||||
"runner-manager/params"
|
||||
"runner-manager/runner/common"
|
||||
"runner-manager/runner/pool"
|
||||
"runner-manager/runner/providers"
|
||||
"runner-manager/util"
|
||||
|
||||
|
|
@ -31,18 +29,19 @@ import (
|
|||
)
|
||||
|
||||
func NewRunner(ctx context.Context, cfg config.Config) (*Runner, error) {
|
||||
// ghc, err := util.GithubClient(ctx, cfg.Github.OAuth2Token)
|
||||
// if err != nil {
|
||||
// return nil, errors.Wrap(err, "getting github client")
|
||||
// }
|
||||
|
||||
providers, err := providers.LoadProvidersFromConfig(ctx, cfg, "")
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "loading providers")
|
||||
}
|
||||
db, err := database.NewDatabase(ctx, cfg.Database)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
return nil, errors.Wrap(err, "creating db connection")
|
||||
}
|
||||
|
||||
ctrlId, err := db.ControllerInfo()
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "fetching controller info")
|
||||
}
|
||||
|
||||
providers, err := providers.LoadProvidersFromConfig(ctx, cfg, ctrlId.ControllerID.String())
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "loading providers")
|
||||
}
|
||||
|
||||
creds := map[string]config.Github{}
|
||||
|
|
@ -51,10 +50,9 @@ func NewRunner(ctx context.Context, cfg config.Config) (*Runner, error) {
|
|||
creds[ghcreds.Name] = ghcreds
|
||||
}
|
||||
runner := &Runner{
|
||||
ctx: ctx,
|
||||
config: cfg,
|
||||
store: db,
|
||||
// ghc: ghc,
|
||||
ctx: ctx,
|
||||
config: cfg,
|
||||
store: db,
|
||||
repositories: map[string]common.PoolManager{},
|
||||
organizations: map[string]common.PoolManager{},
|
||||
providers: providers,
|
||||
|
|
@ -75,192 +73,17 @@ func NewRunner(ctx context.Context, cfg config.Config) (*Runner, error) {
|
|||
type Runner struct {
|
||||
mux sync.Mutex
|
||||
|
||||
ctx context.Context
|
||||
// ghc *github.Client
|
||||
store dbCommon.Store
|
||||
|
||||
config config.Config
|
||||
controllerID string
|
||||
ctx context.Context
|
||||
store dbCommon.Store
|
||||
|
||||
config config.Config
|
||||
repositories map[string]common.PoolManager
|
||||
organizations map[string]common.PoolManager
|
||||
providers map[string]common.Provider
|
||||
credentials map[string]config.Github
|
||||
}
|
||||
|
||||
func (r *Runner) CreateRepository(ctx context.Context, param params.CreateRepoParams) (repo params.Repository, err error) {
|
||||
if !auth.IsAdmin(ctx) {
|
||||
return repo, runnerErrors.ErrUnauthorized
|
||||
}
|
||||
|
||||
if err := param.Validate(); err != nil {
|
||||
return params.Repository{}, errors.Wrap(err, "validating params")
|
||||
}
|
||||
|
||||
creds, ok := r.credentials[param.CredentialsName]
|
||||
if !ok {
|
||||
return params.Repository{}, runnerErrors.NewBadRequestError("credentials %s not defined", param.CredentialsName)
|
||||
}
|
||||
|
||||
_, err = r.store.GetRepository(ctx, param.Owner, param.Name)
|
||||
if err != nil {
|
||||
if !errors.Is(err, runnerErrors.ErrNotFound) {
|
||||
return params.Repository{}, errors.Wrap(err, "fetching repo")
|
||||
}
|
||||
} else {
|
||||
return params.Repository{}, runnerErrors.NewConflictError("repository %s/%s already exists", param.Owner, param.Name)
|
||||
}
|
||||
|
||||
repo, err = r.store.CreateRepository(ctx, param.Owner, param.Name, creds.Name, param.WebhookSecret)
|
||||
if err != nil {
|
||||
return params.Repository{}, errors.Wrap(err, "creating repository")
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if err != nil {
|
||||
r.store.DeleteRepository(ctx, repo.ID, true)
|
||||
}
|
||||
}()
|
||||
|
||||
poolMgr, err := r.loadRepoPoolManager(repo)
|
||||
if err := poolMgr.Start(); err != nil {
|
||||
return params.Repository{}, errors.Wrap(err, "starting pool manager")
|
||||
}
|
||||
r.repositories[repo.ID] = poolMgr
|
||||
return repo, nil
|
||||
}
|
||||
|
||||
func (r *Runner) ListRepositories(ctx context.Context) ([]params.Repository, error) {
|
||||
if !auth.IsAdmin(ctx) {
|
||||
return nil, runnerErrors.ErrUnauthorized
|
||||
}
|
||||
|
||||
repos, err := r.store.ListRepositories(ctx)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "listing repositories")
|
||||
}
|
||||
|
||||
return repos, nil
|
||||
}
|
||||
|
||||
func (r *Runner) GetRepositoryByID(ctx context.Context, repoID string) (params.Repository, error) {
|
||||
if !auth.IsAdmin(ctx) {
|
||||
return params.Repository{}, runnerErrors.ErrUnauthorized
|
||||
}
|
||||
|
||||
repo, err := r.store.GetRepositoryByID(ctx, repoID)
|
||||
if err != nil {
|
||||
return params.Repository{}, errors.Wrap(err, "fetching repository")
|
||||
}
|
||||
return repo, nil
|
||||
}
|
||||
|
||||
func (r *Runner) DeleteRepository(ctx context.Context, repoID string) error {
|
||||
if !auth.IsAdmin(ctx) {
|
||||
return runnerErrors.ErrUnauthorized
|
||||
}
|
||||
|
||||
repo, err := r.store.GetRepositoryByID(ctx, repoID)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "fetching repo")
|
||||
}
|
||||
|
||||
poolMgr, ok := r.repositories[repo.ID]
|
||||
if ok {
|
||||
if err := poolMgr.Stop(); err != nil {
|
||||
log.Printf("failed to stop pool for repo %s", repo.ID)
|
||||
}
|
||||
delete(r.repositories, repoID)
|
||||
}
|
||||
|
||||
pools, err := r.store.ListRepoPools(ctx, repoID)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "fetching repo pools")
|
||||
}
|
||||
|
||||
if len(pools) > 0 {
|
||||
poolIds := []string{}
|
||||
for _, pool := range pools {
|
||||
poolIds = append(poolIds, pool.ID)
|
||||
}
|
||||
|
||||
return runnerErrors.NewBadRequestError("repo has pools defined (%s)", strings.Join(poolIds, ", "))
|
||||
}
|
||||
|
||||
if err := r.store.DeleteRepository(ctx, repoID, true); err != nil {
|
||||
return errors.Wrap(err, "removing repository")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *Runner) UpdateRepository(ctx context.Context, repoID string, param params.UpdateRepositoryParams) (params.Repository, error) {
|
||||
if !auth.IsAdmin(ctx) {
|
||||
return params.Repository{}, runnerErrors.ErrUnauthorized
|
||||
}
|
||||
|
||||
r.mux.Lock()
|
||||
defer r.mux.Unlock()
|
||||
|
||||
repo, err := r.store.GetRepositoryByID(ctx, repoID)
|
||||
if err != nil {
|
||||
return params.Repository{}, errors.Wrap(err, "fetching repo")
|
||||
}
|
||||
|
||||
if param.CredentialsName != "" {
|
||||
// Check that credentials are set before saving to db
|
||||
if _, ok := r.credentials[param.CredentialsName]; !ok {
|
||||
return params.Repository{}, runnerErrors.NewBadRequestError("invalid credentials (%s) for repo %s/%s", param.CredentialsName, repo.Owner, repo.Name)
|
||||
}
|
||||
}
|
||||
|
||||
repo, err = r.store.UpdateRepository(ctx, repoID, param)
|
||||
if err != nil {
|
||||
return params.Repository{}, errors.Wrap(err, "updating repo")
|
||||
}
|
||||
|
||||
log.Printf("post-update webhook secret: %s", repo.WebhookSecret)
|
||||
poolMgr, ok := r.repositories[repo.ID]
|
||||
if ok {
|
||||
internalCfg, err := r.getInternalConfig(repo)
|
||||
if err != nil {
|
||||
return params.Repository{}, errors.Wrap(err, "fetching internal config")
|
||||
}
|
||||
repo.Internal = internalCfg
|
||||
// stop the pool mgr
|
||||
if err := poolMgr.RefreshState(repo); err != nil {
|
||||
return params.Repository{}, errors.Wrap(err, "updating pool manager")
|
||||
}
|
||||
} else {
|
||||
poolMgr, err := r.loadRepoPoolManager(repo)
|
||||
if err != nil {
|
||||
return params.Repository{}, errors.Wrap(err, "loading pool manager")
|
||||
}
|
||||
r.repositories[repo.ID] = poolMgr
|
||||
}
|
||||
|
||||
return repo, nil
|
||||
}
|
||||
|
||||
func (r *Runner) CreateRepoPool(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *Runner) DeleteRepoPool(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *Runner) ListRepoPools(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *Runner) UpdateRepoPool(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *Runner) ListPoolInstances(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *Runner) ListCredentials(ctx context.Context) ([]params.GithubCredentials, error) {
|
||||
ret := []params.GithubCredentials{}
|
||||
|
||||
|
|
@ -282,10 +105,10 @@ func (r *Runner) ListProviders(ctx context.Context) ([]params.Provider, error) {
|
|||
return ret, nil
|
||||
}
|
||||
|
||||
func (r *Runner) getInternalConfig(repo params.Repository) (params.Internal, error) {
|
||||
creds, ok := r.credentials[repo.CredentialsName]
|
||||
func (r *Runner) getInternalConfig(credsName string) (params.Internal, error) {
|
||||
creds, ok := r.credentials[credsName]
|
||||
if !ok {
|
||||
return params.Internal{}, runnerErrors.NewBadRequestError("invalid credentials (%s) for repo %s/%s", repo.CredentialsName, repo.Owner, repo.Name)
|
||||
return params.Internal{}, runnerErrors.NewBadRequestError("invalid credential name (%s)", credsName)
|
||||
}
|
||||
|
||||
return params.Internal{
|
||||
|
|
@ -296,19 +119,6 @@ func (r *Runner) getInternalConfig(repo params.Repository) (params.Internal, err
|
|||
}, nil
|
||||
}
|
||||
|
||||
func (r *Runner) loadRepoPoolManager(repo params.Repository) (common.PoolManager, error) {
|
||||
cfg, err := r.getInternalConfig(repo)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "fetching internal config")
|
||||
}
|
||||
repo.Internal = cfg
|
||||
poolManager, err := pool.NewRepositoryPoolManager(r.ctx, repo, r.providers, r.store)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "creating pool manager")
|
||||
}
|
||||
return poolManager, nil
|
||||
}
|
||||
|
||||
func (r *Runner) loadReposAndOrgs() error {
|
||||
r.mux.Lock()
|
||||
defer r.mux.Unlock()
|
||||
|
|
@ -395,21 +205,6 @@ func (r *Runner) Wait() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (r *Runner) findRepoPoolManager(owner, name string) (common.PoolManager, error) {
|
||||
r.mux.Lock()
|
||||
defer r.mux.Unlock()
|
||||
|
||||
repo, err := r.store.GetRepository(r.ctx, owner, name)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "fetching repo")
|
||||
}
|
||||
|
||||
if repo, ok := r.repositories[repo.ID]; ok {
|
||||
return repo, nil
|
||||
}
|
||||
return nil, errors.Wrapf(runnerErrors.ErrNotFound, "repository %s/%s not configured", owner, name)
|
||||
}
|
||||
|
||||
func (r *Runner) findOrgPoolManager(name string) (common.PoolManager, error) {
|
||||
r.mux.Lock()
|
||||
defer r.mux.Unlock()
|
||||
|
|
|
|||
|
|
@ -1,8 +1,34 @@
|
|||
package runner
|
||||
|
||||
import "runner-manager/config"
|
||||
|
||||
type HookTargetType string
|
||||
|
||||
const (
|
||||
RepoHook HookTargetType = "repository"
|
||||
OrganizationHook HookTargetType = "organization"
|
||||
)
|
||||
|
||||
var (
|
||||
// Linux only for now. Will add Windows soon. (famous last words?)
|
||||
supportedOSType map[config.OSType]struct{} = map[config.OSType]struct{}{
|
||||
config.Linux: {},
|
||||
}
|
||||
|
||||
// These are the architectures that Github supports.
|
||||
supportedOSArch map[config.OSArch]struct{} = map[config.OSArch]struct{}{
|
||||
config.Amd64: {},
|
||||
config.Arm: {},
|
||||
config.Arm64: {},
|
||||
}
|
||||
)
|
||||
|
||||
func IsSupportedOSType(osType config.OSType) bool {
|
||||
_, ok := supportedOSType[osType]
|
||||
return ok
|
||||
}
|
||||
|
||||
func IsSupportedArch(arch config.OSArch) bool {
|
||||
_, ok := supportedOSArch[arch]
|
||||
return ok
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue