diff --git a/apiserver/controllers/controllers.go b/apiserver/controllers/controllers.go index 797a42ae..b8959d37 100644 --- a/apiserver/controllers/controllers.go +++ b/apiserver/controllers/controllers.go @@ -12,7 +12,6 @@ import ( runnerParams "garm/params" "garm/runner" - "github.com/gorilla/mux" "github.com/pkg/errors" ) @@ -194,350 +193,3 @@ func (a *APIController) ListProviders(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(providers) } - -func (a *APIController) CreateRepoHandler(w http.ResponseWriter, r *http.Request) { - ctx := r.Context() - - var repoData runnerParams.CreateRepoParams - if err := json.NewDecoder(r.Body).Decode(&repoData); err != nil { - handleError(w, gErrors.ErrBadRequest) - return - } - - repo, err := a.r.CreateRepository(ctx, repoData) - if err != nil { - log.Printf("error creating repository: %s", err) - handleError(w, err) - return - } - - w.Header().Set("Content-Type", "application/json") - json.NewEncoder(w).Encode(repo) -} - -func (a *APIController) ListReposHandler(w http.ResponseWriter, r *http.Request) { - ctx := r.Context() - - repos, err := a.r.ListRepositories(ctx) - if err != nil { - log.Printf("listing repos: %s", err) - handleError(w, err) - return - } - - w.Header().Set("Content-Type", "application/json") - json.NewEncoder(w).Encode(repos) -} - -func (a *APIController) GetRepoByIDHandler(w http.ResponseWriter, r *http.Request) { - ctx := r.Context() - - vars := mux.Vars(r) - repoID, ok := vars["repoID"] - if !ok { - w.WriteHeader(http.StatusBadRequest) - json.NewEncoder(w).Encode(params.APIErrorResponse{ - Error: "Bad Request", - Details: "No repo ID specified", - }) - return - } - - repo, err := a.r.GetRepositoryByID(ctx, repoID) - if err != nil { - log.Printf("fetching repo: %s", err) - handleError(w, err) - return - } - - w.Header().Set("Content-Type", "application/json") - json.NewEncoder(w).Encode(repo) -} - -func (a *APIController) DeleteRepoHandler(w http.ResponseWriter, r *http.Request) { - ctx := r.Context() - - vars := mux.Vars(r) - repoID, ok := vars["repoID"] - if !ok { - w.WriteHeader(http.StatusBadRequest) - json.NewEncoder(w).Encode(params.APIErrorResponse{ - Error: "Bad Request", - Details: "No repo ID specified", - }) - return - } - - if err := a.r.DeleteRepository(ctx, repoID); err != nil { - log.Printf("fetching repo: %s", err) - handleError(w, err) - return - } - - w.Header().Set("Content-Type", "application/json") - w.WriteHeader(http.StatusOK) - -} - -func (a *APIController) UpdateRepoHandler(w http.ResponseWriter, r *http.Request) { - ctx := r.Context() - - vars := mux.Vars(r) - repoID, ok := vars["repoID"] - if !ok { - w.WriteHeader(http.StatusBadRequest) - json.NewEncoder(w).Encode(params.APIErrorResponse{ - Error: "Bad Request", - Details: "No repo ID specified", - }) - return - } - - var updatePayload runnerParams.UpdateRepositoryParams - if err := json.NewDecoder(r.Body).Decode(&updatePayload); err != nil { - handleError(w, gErrors.ErrBadRequest) - return - } - - repo, err := a.r.UpdateRepository(ctx, repoID, updatePayload) - if err != nil { - log.Printf("error updating repository: %s", err) - handleError(w, err) - return - } - - w.Header().Set("Content-Type", "application/json") - json.NewEncoder(w).Encode(repo) -} - -func (a *APIController) CreateRepoPoolHandler(w http.ResponseWriter, r *http.Request) { - ctx := r.Context() - - vars := mux.Vars(r) - repoID, ok := vars["repoID"] - if !ok { - w.WriteHeader(http.StatusBadRequest) - json.NewEncoder(w).Encode(params.APIErrorResponse{ - Error: "Bad Request", - Details: "No repo ID specified", - }) - return - } - - var poolData runnerParams.CreatePoolParams - if err := json.NewDecoder(r.Body).Decode(&poolData); err != nil { - log.Printf("failed to decode: %s", err) - handleError(w, gErrors.ErrBadRequest) - return - } - - pool, err := a.r.CreateRepoPool(ctx, repoID, poolData) - if err != nil { - log.Printf("error creating repository pool: %s", err) - handleError(w, err) - return - } - - w.Header().Set("Content-Type", "application/json") - json.NewEncoder(w).Encode(pool) -} - -func (a *APIController) ListRepoPoolsHandler(w http.ResponseWriter, r *http.Request) { - ctx := r.Context() - vars := mux.Vars(r) - repoID, ok := vars["repoID"] - if !ok { - w.WriteHeader(http.StatusBadRequest) - json.NewEncoder(w).Encode(params.APIErrorResponse{ - Error: "Bad Request", - Details: "No repo ID specified", - }) - return - } - - pools, err := a.r.ListRepoPools(ctx, repoID) - if err != nil { - log.Printf("listing pools: %s", err) - handleError(w, err) - return - } - - w.Header().Set("Content-Type", "application/json") - json.NewEncoder(w).Encode(pools) -} - -func (a *APIController) GetRepoPoolHandler(w http.ResponseWriter, r *http.Request) { - ctx := r.Context() - vars := mux.Vars(r) - repoID, repoOk := vars["repoID"] - poolID, poolOk := vars["poolID"] - if !repoOk || !poolOk { - w.WriteHeader(http.StatusBadRequest) - json.NewEncoder(w).Encode(params.APIErrorResponse{ - Error: "Bad Request", - Details: "No repo or pool ID specified", - }) - return - } - - pool, err := a.r.GetRepoPoolByID(ctx, repoID, poolID) - if err != nil { - log.Printf("listing pools: %s", err) - handleError(w, err) - return - } - - w.Header().Set("Content-Type", "application/json") - json.NewEncoder(w).Encode(pool) -} - -func (a *APIController) DeleteRepoPoolHandler(w http.ResponseWriter, r *http.Request) { - ctx := r.Context() - - vars := mux.Vars(r) - repoID, repoOk := vars["repoID"] - poolID, poolOk := vars["poolID"] - if !repoOk || !poolOk { - w.WriteHeader(http.StatusBadRequest) - json.NewEncoder(w).Encode(params.APIErrorResponse{ - Error: "Bad Request", - Details: "No repo or pool ID specified", - }) - return - } - - if err := a.r.DeleteRepoPool(ctx, repoID, poolID); err != nil { - log.Printf("removing pool: %s", err) - handleError(w, err) - return - } - - w.Header().Set("Content-Type", "application/json") - w.WriteHeader(http.StatusOK) - -} - -func (a *APIController) UpdateRepoPoolHandler(w http.ResponseWriter, r *http.Request) { - ctx := r.Context() - - vars := mux.Vars(r) - repoID, repoOk := vars["repoID"] - poolID, poolOk := vars["poolID"] - if !repoOk || !poolOk { - w.WriteHeader(http.StatusBadRequest) - json.NewEncoder(w).Encode(params.APIErrorResponse{ - Error: "Bad Request", - Details: "No repo or pool ID specified", - }) - return - } - - var poolData runnerParams.UpdatePoolParams - if err := json.NewDecoder(r.Body).Decode(&poolData); err != nil { - log.Printf("failed to decode: %s", err) - handleError(w, gErrors.ErrBadRequest) - return - } - - pool, err := a.r.UpdateRepoPool(ctx, repoID, poolID, poolData) - if err != nil { - log.Printf("error creating repository pool: %s", err) - handleError(w, err) - return - } - - w.Header().Set("Content-Type", "application/json") - json.NewEncoder(w).Encode(pool) -} - -func (a *APIController) ListRepoInstancesHandler(w http.ResponseWriter, r *http.Request) { - ctx := r.Context() - vars := mux.Vars(r) - repoID, ok := vars["repoID"] - if !ok { - w.WriteHeader(http.StatusBadRequest) - json.NewEncoder(w).Encode(params.APIErrorResponse{ - Error: "Bad Request", - Details: "No repo ID specified", - }) - return - } - - instances, err := a.r.ListRepoInstances(ctx, repoID) - if err != nil { - log.Printf("listing pools: %s", err) - handleError(w, err) - return - } - - w.Header().Set("Content-Type", "application/json") - json.NewEncoder(w).Encode(instances) -} - -func (a *APIController) ListPoolInstancesHandler(w http.ResponseWriter, r *http.Request) { - ctx := r.Context() - vars := mux.Vars(r) - poolID, ok := vars["poolID"] - if !ok { - w.WriteHeader(http.StatusBadRequest) - json.NewEncoder(w).Encode(params.APIErrorResponse{ - Error: "Bad Request", - Details: "No repo ID specified", - }) - return - } - - instances, err := a.r.ListPoolInstances(ctx, poolID) - if err != nil { - log.Printf("listing pools: %s", err) - handleError(w, err) - return - } - - w.Header().Set("Content-Type", "application/json") - json.NewEncoder(w).Encode(instances) -} - -func (a *APIController) GetInstanceHandler(w http.ResponseWriter, r *http.Request) { - ctx := r.Context() - vars := mux.Vars(r) - instanceName, ok := vars["instanceName"] - if !ok { - w.WriteHeader(http.StatusBadRequest) - json.NewEncoder(w).Encode(params.APIErrorResponse{ - Error: "Bad Request", - Details: "No repo ID specified", - }) - return - } - - instance, err := a.r.GetInstance(ctx, instanceName) - if err != nil { - log.Printf("listing pools: %s", err) - handleError(w, err) - return - } - - w.Header().Set("Content-Type", "application/json") - json.NewEncoder(w).Encode(instance) -} - -func (a *APIController) InstanceStatusMessageHandler(w http.ResponseWriter, r *http.Request) { - ctx := r.Context() - - var updateMessage runnerParams.InstanceUpdateMessage - if err := json.NewDecoder(r.Body).Decode(&updateMessage); err != nil { - log.Printf("failed to decode: %s", err) - handleError(w, gErrors.ErrBadRequest) - return - } - - if err := a.r.AddInstanceStatusMessage(ctx, updateMessage); err != nil { - log.Printf("error saving status message: %s", err) - handleError(w, err) - return - } - - w.Header().Set("Content-Type", "application/json") - w.WriteHeader(http.StatusOK) -} diff --git a/apiserver/controllers/instances.go b/apiserver/controllers/instances.go new file mode 100644 index 00000000..970ea88a --- /dev/null +++ b/apiserver/controllers/instances.go @@ -0,0 +1,81 @@ +package controllers + +import ( + "encoding/json" + "log" + "net/http" + + "garm/apiserver/params" + gErrors "garm/errors" + runnerParams "garm/params" + + "github.com/gorilla/mux" +) + +func (a *APIController) ListPoolInstancesHandler(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + vars := mux.Vars(r) + poolID, ok := vars["poolID"] + if !ok { + w.WriteHeader(http.StatusBadRequest) + json.NewEncoder(w).Encode(params.APIErrorResponse{ + Error: "Bad Request", + Details: "No pool ID specified", + }) + return + } + + instances, err := a.r.ListPoolInstances(ctx, poolID) + if err != nil { + log.Printf("listing pools: %s", err) + handleError(w, err) + return + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(instances) +} + +func (a *APIController) GetInstanceHandler(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + vars := mux.Vars(r) + instanceName, ok := vars["instanceName"] + if !ok { + w.WriteHeader(http.StatusBadRequest) + json.NewEncoder(w).Encode(params.APIErrorResponse{ + Error: "Bad Request", + Details: "No pool ID specified", + }) + return + } + + instance, err := a.r.GetInstance(ctx, instanceName) + if err != nil { + log.Printf("listing pools: %s", err) + handleError(w, err) + return + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(instance) +} + +func (a *APIController) InstanceStatusMessageHandler(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + + var updateMessage runnerParams.InstanceUpdateMessage + if err := json.NewDecoder(r.Body).Decode(&updateMessage); err != nil { + log.Printf("failed to decode: %s", err) + handleError(w, gErrors.ErrBadRequest) + return + } + + if err := a.r.AddInstanceStatusMessage(ctx, updateMessage); err != nil { + log.Printf("error saving status message: %s", err) + handleError(w, err) + return + } + + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) +} diff --git a/apiserver/controllers/organizations.go b/apiserver/controllers/organizations.go new file mode 100644 index 00000000..f850dc71 --- /dev/null +++ b/apiserver/controllers/organizations.go @@ -0,0 +1,292 @@ +package controllers + +import ( + "encoding/json" + "log" + "net/http" + + "garm/apiserver/params" + gErrors "garm/errors" + runnerParams "garm/params" + + "github.com/gorilla/mux" +) + +func (a *APIController) CreateOrgHandler(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + + var repoData runnerParams.CreateOrgParams + if err := json.NewDecoder(r.Body).Decode(&repoData); err != nil { + handleError(w, gErrors.ErrBadRequest) + return + } + + repo, err := a.r.CreateOrganization(ctx, repoData) + if err != nil { + log.Printf("error creating repository: %s", err) + handleError(w, err) + return + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(repo) +} + +func (a *APIController) ListOrgsHandler(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + + orgs, err := a.r.ListOrganizations(ctx) + if err != nil { + log.Printf("listing orgs: %s", err) + handleError(w, err) + return + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(orgs) +} + +func (a *APIController) GetOrgByIDHandler(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + + vars := mux.Vars(r) + orgID, ok := vars["orgID"] + if !ok { + w.WriteHeader(http.StatusBadRequest) + json.NewEncoder(w).Encode(params.APIErrorResponse{ + Error: "Bad Request", + Details: "No org ID specified", + }) + return + } + + org, err := a.r.GetOrganizationByID(ctx, orgID) + if err != nil { + log.Printf("fetching org: %s", err) + handleError(w, err) + return + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(org) +} + +func (a *APIController) DeleteOrgHandler(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + + vars := mux.Vars(r) + orgID, ok := vars["orgID"] + if !ok { + w.WriteHeader(http.StatusBadRequest) + json.NewEncoder(w).Encode(params.APIErrorResponse{ + Error: "Bad Request", + Details: "No org ID specified", + }) + return + } + + if err := a.r.DeleteOrganization(ctx, orgID); err != nil { + log.Printf("fetching org: %s", err) + handleError(w, err) + return + } + + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + +} + +func (a *APIController) UpdateOrgHandler(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + + vars := mux.Vars(r) + orgID, ok := vars["orgID"] + if !ok { + w.WriteHeader(http.StatusBadRequest) + json.NewEncoder(w).Encode(params.APIErrorResponse{ + Error: "Bad Request", + Details: "No org ID specified", + }) + return + } + + var updatePayload runnerParams.UpdateRepositoryParams + if err := json.NewDecoder(r.Body).Decode(&updatePayload); err != nil { + handleError(w, gErrors.ErrBadRequest) + return + } + + org, err := a.r.UpdateOrganization(ctx, orgID, updatePayload) + if err != nil { + log.Printf("error updating organization: %s", err) + handleError(w, err) + return + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(org) +} + +func (a *APIController) CreateOrgPoolHandler(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + + vars := mux.Vars(r) + orgID, ok := vars["orgID"] + if !ok { + w.WriteHeader(http.StatusBadRequest) + json.NewEncoder(w).Encode(params.APIErrorResponse{ + Error: "Bad Request", + Details: "No org ID specified", + }) + return + } + + var poolData runnerParams.CreatePoolParams + if err := json.NewDecoder(r.Body).Decode(&poolData); err != nil { + log.Printf("failed to decode: %s", err) + handleError(w, gErrors.ErrBadRequest) + return + } + + pool, err := a.r.CreateOrgPool(ctx, orgID, poolData) + if err != nil { + log.Printf("error creating organization pool: %s", err) + handleError(w, err) + return + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(pool) +} + +func (a *APIController) ListOrgPoolsHandler(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + vars := mux.Vars(r) + orgID, ok := vars["orgID"] + if !ok { + w.WriteHeader(http.StatusBadRequest) + json.NewEncoder(w).Encode(params.APIErrorResponse{ + Error: "Bad Request", + Details: "No org ID specified", + }) + return + } + + pools, err := a.r.ListOrgPools(ctx, orgID) + if err != nil { + log.Printf("listing pools: %s", err) + handleError(w, err) + return + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(pools) +} + +func (a *APIController) GetOrgPoolHandler(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + vars := mux.Vars(r) + orgID, repoOk := vars["orgID"] + poolID, poolOk := vars["poolID"] + if !repoOk || !poolOk { + w.WriteHeader(http.StatusBadRequest) + json.NewEncoder(w).Encode(params.APIErrorResponse{ + Error: "Bad Request", + Details: "No org or pool ID specified", + }) + return + } + + pool, err := a.r.GetOrgPoolByID(ctx, orgID, poolID) + if err != nil { + log.Printf("listing pools: %s", err) + handleError(w, err) + return + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(pool) +} + +func (a *APIController) DeleteOrgPoolHandler(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + + vars := mux.Vars(r) + orgID, orgOk := vars["orgID"] + poolID, poolOk := vars["poolID"] + if !orgOk || !poolOk { + w.WriteHeader(http.StatusBadRequest) + json.NewEncoder(w).Encode(params.APIErrorResponse{ + Error: "Bad Request", + Details: "No org or pool ID specified", + }) + return + } + + if err := a.r.DeleteOrgPool(ctx, orgID, poolID); err != nil { + log.Printf("removing pool: %s", err) + handleError(w, err) + return + } + + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + +} + +func (a *APIController) UpdateOrgPoolHandler(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + + vars := mux.Vars(r) + orgID, orgOk := vars["orgID"] + poolID, poolOk := vars["poolID"] + if !orgOk || !poolOk { + w.WriteHeader(http.StatusBadRequest) + json.NewEncoder(w).Encode(params.APIErrorResponse{ + Error: "Bad Request", + Details: "No org or pool ID specified", + }) + return + } + + var poolData runnerParams.UpdatePoolParams + if err := json.NewDecoder(r.Body).Decode(&poolData); err != nil { + log.Printf("failed to decode: %s", err) + handleError(w, gErrors.ErrBadRequest) + return + } + + pool, err := a.r.UpdateOrgPool(ctx, orgID, poolID, poolData) + if err != nil { + log.Printf("error creating organization pool: %s", err) + handleError(w, err) + return + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(pool) +} + +func (a *APIController) ListOrgInstancesHandler(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + vars := mux.Vars(r) + orgID, ok := vars["orgID"] + if !ok { + w.WriteHeader(http.StatusBadRequest) + json.NewEncoder(w).Encode(params.APIErrorResponse{ + Error: "Bad Request", + Details: "No org ID specified", + }) + return + } + + instances, err := a.r.ListOrgInstances(ctx, orgID) + if err != nil { + log.Printf("listing pools: %s", err) + handleError(w, err) + return + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(instances) +} diff --git a/apiserver/controllers/repositories.go b/apiserver/controllers/repositories.go new file mode 100644 index 00000000..c46c9dd2 --- /dev/null +++ b/apiserver/controllers/repositories.go @@ -0,0 +1,292 @@ +package controllers + +import ( + "encoding/json" + "log" + "net/http" + + "garm/apiserver/params" + gErrors "garm/errors" + runnerParams "garm/params" + + "github.com/gorilla/mux" +) + +func (a *APIController) CreateRepoHandler(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + + var repoData runnerParams.CreateRepoParams + if err := json.NewDecoder(r.Body).Decode(&repoData); err != nil { + handleError(w, gErrors.ErrBadRequest) + return + } + + repo, err := a.r.CreateRepository(ctx, repoData) + if err != nil { + log.Printf("error creating repository: %s", err) + handleError(w, err) + return + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(repo) +} + +func (a *APIController) ListReposHandler(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + + repos, err := a.r.ListRepositories(ctx) + if err != nil { + log.Printf("listing repos: %s", err) + handleError(w, err) + return + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(repos) +} + +func (a *APIController) GetRepoByIDHandler(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + + vars := mux.Vars(r) + repoID, ok := vars["repoID"] + if !ok { + w.WriteHeader(http.StatusBadRequest) + json.NewEncoder(w).Encode(params.APIErrorResponse{ + Error: "Bad Request", + Details: "No repo ID specified", + }) + return + } + + repo, err := a.r.GetRepositoryByID(ctx, repoID) + if err != nil { + log.Printf("fetching repo: %s", err) + handleError(w, err) + return + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(repo) +} + +func (a *APIController) DeleteRepoHandler(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + + vars := mux.Vars(r) + repoID, ok := vars["repoID"] + if !ok { + w.WriteHeader(http.StatusBadRequest) + json.NewEncoder(w).Encode(params.APIErrorResponse{ + Error: "Bad Request", + Details: "No repo ID specified", + }) + return + } + + if err := a.r.DeleteRepository(ctx, repoID); err != nil { + log.Printf("fetching repo: %s", err) + handleError(w, err) + return + } + + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + +} + +func (a *APIController) UpdateRepoHandler(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + + vars := mux.Vars(r) + repoID, ok := vars["repoID"] + if !ok { + w.WriteHeader(http.StatusBadRequest) + json.NewEncoder(w).Encode(params.APIErrorResponse{ + Error: "Bad Request", + Details: "No repo ID specified", + }) + return + } + + var updatePayload runnerParams.UpdateRepositoryParams + if err := json.NewDecoder(r.Body).Decode(&updatePayload); err != nil { + handleError(w, gErrors.ErrBadRequest) + return + } + + repo, err := a.r.UpdateRepository(ctx, repoID, updatePayload) + if err != nil { + log.Printf("error updating repository: %s", err) + handleError(w, err) + return + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(repo) +} + +func (a *APIController) CreateRepoPoolHandler(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + + vars := mux.Vars(r) + repoID, ok := vars["repoID"] + if !ok { + w.WriteHeader(http.StatusBadRequest) + json.NewEncoder(w).Encode(params.APIErrorResponse{ + Error: "Bad Request", + Details: "No repo ID specified", + }) + return + } + + var poolData runnerParams.CreatePoolParams + if err := json.NewDecoder(r.Body).Decode(&poolData); err != nil { + log.Printf("failed to decode: %s", err) + handleError(w, gErrors.ErrBadRequest) + return + } + + pool, err := a.r.CreateRepoPool(ctx, repoID, poolData) + if err != nil { + log.Printf("error creating repository pool: %s", err) + handleError(w, err) + return + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(pool) +} + +func (a *APIController) ListRepoPoolsHandler(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + vars := mux.Vars(r) + repoID, ok := vars["repoID"] + if !ok { + w.WriteHeader(http.StatusBadRequest) + json.NewEncoder(w).Encode(params.APIErrorResponse{ + Error: "Bad Request", + Details: "No repo ID specified", + }) + return + } + + pools, err := a.r.ListRepoPools(ctx, repoID) + if err != nil { + log.Printf("listing pools: %s", err) + handleError(w, err) + return + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(pools) +} + +func (a *APIController) GetRepoPoolHandler(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + vars := mux.Vars(r) + repoID, repoOk := vars["repoID"] + poolID, poolOk := vars["poolID"] + if !repoOk || !poolOk { + w.WriteHeader(http.StatusBadRequest) + json.NewEncoder(w).Encode(params.APIErrorResponse{ + Error: "Bad Request", + Details: "No repo or pool ID specified", + }) + return + } + + pool, err := a.r.GetRepoPoolByID(ctx, repoID, poolID) + if err != nil { + log.Printf("listing pools: %s", err) + handleError(w, err) + return + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(pool) +} + +func (a *APIController) DeleteRepoPoolHandler(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + + vars := mux.Vars(r) + repoID, repoOk := vars["repoID"] + poolID, poolOk := vars["poolID"] + if !repoOk || !poolOk { + w.WriteHeader(http.StatusBadRequest) + json.NewEncoder(w).Encode(params.APIErrorResponse{ + Error: "Bad Request", + Details: "No repo or pool ID specified", + }) + return + } + + if err := a.r.DeleteRepoPool(ctx, repoID, poolID); err != nil { + log.Printf("removing pool: %s", err) + handleError(w, err) + return + } + + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + +} + +func (a *APIController) UpdateRepoPoolHandler(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + + vars := mux.Vars(r) + repoID, repoOk := vars["repoID"] + poolID, poolOk := vars["poolID"] + if !repoOk || !poolOk { + w.WriteHeader(http.StatusBadRequest) + json.NewEncoder(w).Encode(params.APIErrorResponse{ + Error: "Bad Request", + Details: "No repo or pool ID specified", + }) + return + } + + var poolData runnerParams.UpdatePoolParams + if err := json.NewDecoder(r.Body).Decode(&poolData); err != nil { + log.Printf("failed to decode: %s", err) + handleError(w, gErrors.ErrBadRequest) + return + } + + pool, err := a.r.UpdateRepoPool(ctx, repoID, poolID, poolData) + if err != nil { + log.Printf("error creating repository pool: %s", err) + handleError(w, err) + return + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(pool) +} + +func (a *APIController) ListRepoInstancesHandler(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + vars := mux.Vars(r) + repoID, ok := vars["repoID"] + if !ok { + w.WriteHeader(http.StatusBadRequest) + json.NewEncoder(w).Encode(params.APIErrorResponse{ + Error: "Bad Request", + Details: "No repo ID specified", + }) + return + } + + instances, err := a.r.ListRepoInstances(ctx, repoID) + if err != nil { + log.Printf("listing pools: %s", err) + handleError(w, err) + return + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(instances) +} diff --git a/apiserver/routers/routers.go b/apiserver/routers/routers.go index 8ff1f074..42fbc28c 100644 --- a/apiserver/routers/routers.go +++ b/apiserver/routers/routers.go @@ -96,40 +96,40 @@ func NewAPIRouter(han *controllers.APIController, logWriter io.Writer, authMiddl // Organizations and pools // ///////////////////////////// // Get pool - apiRouter.Handle("/organizations/{orgID}/pools/{poolID}/", log(os.Stdout, http.HandlerFunc(han.CatchAll))).Methods("GET", "OPTIONS") - apiRouter.Handle("/organizations/{orgID}/pools/{poolID}", log(os.Stdout, http.HandlerFunc(han.CatchAll))).Methods("GET", "OPTIONS") + apiRouter.Handle("/organizations/{orgID}/pools/{poolID}/", log(os.Stdout, http.HandlerFunc(han.GetOrgPoolHandler))).Methods("GET", "OPTIONS") + apiRouter.Handle("/organizations/{orgID}/pools/{poolID}", log(os.Stdout, http.HandlerFunc(han.GetOrgPoolHandler))).Methods("GET", "OPTIONS") // Delete pool - apiRouter.Handle("/organizations/{orgID}/pools/{poolID}/", log(os.Stdout, http.HandlerFunc(han.CatchAll))).Methods("DELETE", "OPTIONS") - apiRouter.Handle("/organizations/{orgID}/pools/{poolID}", log(os.Stdout, http.HandlerFunc(han.CatchAll))).Methods("DELETE", "OPTIONS") + apiRouter.Handle("/organizations/{orgID}/pools/{poolID}/", log(os.Stdout, http.HandlerFunc(han.DeleteOrgPoolHandler))).Methods("DELETE", "OPTIONS") + apiRouter.Handle("/organizations/{orgID}/pools/{poolID}", log(os.Stdout, http.HandlerFunc(han.DeleteOrgPoolHandler))).Methods("DELETE", "OPTIONS") // Update pool - apiRouter.Handle("/organizations/{orgID}/pools/{poolID}/", log(os.Stdout, http.HandlerFunc(han.CatchAll))).Methods("PUT", "OPTIONS") - apiRouter.Handle("/organizations/{orgID}/pools/{poolID}", log(os.Stdout, http.HandlerFunc(han.CatchAll))).Methods("PUT", "OPTIONS") + apiRouter.Handle("/organizations/{orgID}/pools/{poolID}/", log(os.Stdout, http.HandlerFunc(han.UpdateOrgPoolHandler))).Methods("PUT", "OPTIONS") + apiRouter.Handle("/organizations/{orgID}/pools/{poolID}", log(os.Stdout, http.HandlerFunc(han.UpdateOrgPoolHandler))).Methods("PUT", "OPTIONS") // List pools - apiRouter.Handle("/organizations/{orgID}/pools/", log(os.Stdout, http.HandlerFunc(han.CatchAll))).Methods("GET", "OPTIONS") - apiRouter.Handle("/organizations/{orgID}/pools", log(os.Stdout, http.HandlerFunc(han.CatchAll))).Methods("GET", "OPTIONS") + apiRouter.Handle("/organizations/{orgID}/pools/", log(os.Stdout, http.HandlerFunc(han.ListOrgPoolsHandler))).Methods("GET", "OPTIONS") + apiRouter.Handle("/organizations/{orgID}/pools", log(os.Stdout, http.HandlerFunc(han.ListOrgPoolsHandler))).Methods("GET", "OPTIONS") // Create pool - apiRouter.Handle("/organizations/{orgID}/pools/", log(os.Stdout, http.HandlerFunc(han.CatchAll))).Methods("POST", "OPTIONS") - apiRouter.Handle("/organizations/{orgID}/pools", log(os.Stdout, http.HandlerFunc(han.CatchAll))).Methods("POST", "OPTIONS") + apiRouter.Handle("/organizations/{orgID}/pools/", log(os.Stdout, http.HandlerFunc(han.CreateOrgPoolHandler))).Methods("POST", "OPTIONS") + apiRouter.Handle("/organizations/{orgID}/pools", log(os.Stdout, http.HandlerFunc(han.CreateOrgPoolHandler))).Methods("POST", "OPTIONS") // Repo instances list - apiRouter.Handle("/organizations/{orgID}/instances/", log(os.Stdout, http.HandlerFunc(han.CatchAll))).Methods("GET", "OPTIONS") - apiRouter.Handle("/organizations/{orgID}/instances", log(os.Stdout, http.HandlerFunc(han.CatchAll))).Methods("GET", "OPTIONS") + apiRouter.Handle("/organizations/{orgID}/instances/", log(os.Stdout, http.HandlerFunc(han.ListOrgInstancesHandler))).Methods("GET", "OPTIONS") + apiRouter.Handle("/organizations/{orgID}/instances", log(os.Stdout, http.HandlerFunc(han.ListOrgInstancesHandler))).Methods("GET", "OPTIONS") // Get org - apiRouter.Handle("/organizations/{orgID}/", log(os.Stdout, http.HandlerFunc(han.CatchAll))).Methods("GET", "OPTIONS") - apiRouter.Handle("/organizations/{orgID}", log(os.Stdout, http.HandlerFunc(han.CatchAll))).Methods("GET", "OPTIONS") + apiRouter.Handle("/organizations/{orgID}/", log(os.Stdout, http.HandlerFunc(han.GetOrgByIDHandler))).Methods("GET", "OPTIONS") + apiRouter.Handle("/organizations/{orgID}", log(os.Stdout, http.HandlerFunc(han.GetOrgByIDHandler))).Methods("GET", "OPTIONS") // Update org - apiRouter.Handle("/organizations/{orgID}/", log(os.Stdout, http.HandlerFunc(han.CatchAll))).Methods("PUT", "OPTIONS") - apiRouter.Handle("/organizations/{orgID}", log(os.Stdout, http.HandlerFunc(han.CatchAll))).Methods("PUT", "OPTIONS") + apiRouter.Handle("/organizations/{orgID}/", log(os.Stdout, http.HandlerFunc(han.UpdateOrgHandler))).Methods("PUT", "OPTIONS") + apiRouter.Handle("/organizations/{orgID}", log(os.Stdout, http.HandlerFunc(han.UpdateOrgHandler))).Methods("PUT", "OPTIONS") // Delete org - apiRouter.Handle("/organizations/{orgID}/", log(os.Stdout, http.HandlerFunc(han.CatchAll))).Methods("DELETE", "OPTIONS") - apiRouter.Handle("/organizations/{orgID}", log(os.Stdout, http.HandlerFunc(han.CatchAll))).Methods("DELETE", "OPTIONS") + apiRouter.Handle("/organizations/{orgID}/", log(os.Stdout, http.HandlerFunc(han.DeleteOrgHandler))).Methods("DELETE", "OPTIONS") + apiRouter.Handle("/organizations/{orgID}", log(os.Stdout, http.HandlerFunc(han.DeleteOrgHandler))).Methods("DELETE", "OPTIONS") // List orgs - apiRouter.Handle("/organizations/", log(os.Stdout, http.HandlerFunc(han.CatchAll))).Methods("GET", "OPTIONS") - apiRouter.Handle("/organizations", log(os.Stdout, http.HandlerFunc(han.CatchAll))).Methods("GET", "OPTIONS") + apiRouter.Handle("/organizations/", log(os.Stdout, http.HandlerFunc(han.ListOrgsHandler))).Methods("GET", "OPTIONS") + apiRouter.Handle("/organizations", log(os.Stdout, http.HandlerFunc(han.ListOrgsHandler))).Methods("GET", "OPTIONS") // Create org - apiRouter.Handle("/organizations/", log(os.Stdout, http.HandlerFunc(han.CatchAll))).Methods("POST", "OPTIONS") - apiRouter.Handle("/organizations", log(os.Stdout, http.HandlerFunc(han.CatchAll))).Methods("POST", "OPTIONS") + apiRouter.Handle("/organizations/", log(os.Stdout, http.HandlerFunc(han.CreateOrgHandler))).Methods("POST", "OPTIONS") + apiRouter.Handle("/organizations", log(os.Stdout, http.HandlerFunc(han.CreateOrgHandler))).Methods("POST", "OPTIONS") // Credentials and providers apiRouter.Handle("/credentials/", log(os.Stdout, http.HandlerFunc(han.ListCredentials))).Methods("GET", "OPTIONS") diff --git a/cmd/garm-cli/client/client.go b/cmd/garm-cli/client/client.go index a812da61..220cc20b 100644 --- a/cmd/garm-cli/client/client.go +++ b/cmd/garm-cli/client/client.go @@ -120,172 +120,10 @@ func (c *Client) ListProviders() ([]params.Provider, error) { return providers, nil } -func (c *Client) ListRepositories() ([]params.Repository, error) { - var repos []params.Repository - url := fmt.Sprintf("%s/api/v1/repositories", c.Config.BaseURL) - resp, err := c.client.R(). - SetResult(&repos). - Get(url) - if err != nil || resp.IsError() { - apiErr, decErr := c.decodeAPIError(resp.Body()) - if decErr != nil { - return nil, errors.Wrap(decErr, "sending request") - } - return nil, fmt.Errorf("error fetching repos: %s", apiErr.Details) - } - return repos, nil -} +func (c *Client) GetInstanceByName(instanceName string) (params.Instance, error) { + url := fmt.Sprintf("%s/api/v1/instances/%s", c.Config.BaseURL, instanceName) -func (c *Client) CreateRepository(param params.CreateRepoParams) (params.Repository, error) { - var response params.Repository - url := fmt.Sprintf("%s/api/v1/repositories", c.Config.BaseURL) - - body, err := json.Marshal(param) - if err != nil { - return params.Repository{}, err - } - resp, err := c.client.R(). - SetBody(body). - SetResult(&response). - Post(url) - if err != nil || resp.IsError() { - apiErr, decErr := c.decodeAPIError(resp.Body()) - if decErr != nil { - return response, errors.Wrap(decErr, "sending request") - } - return response, fmt.Errorf("error performing login: %s", apiErr.Details) - } - return response, nil -} - -func (c *Client) GetRepository(repoID string) (params.Repository, error) { - var response params.Repository - url := fmt.Sprintf("%s/api/v1/repositories/%s", c.Config.BaseURL, repoID) - resp, err := c.client.R(). - SetResult(&response). - Get(url) - if err != nil || resp.IsError() { - apiErr, decErr := c.decodeAPIError(resp.Body()) - if decErr != nil { - return response, errors.Wrap(decErr, "sending request") - } - return response, fmt.Errorf("error fetching repos: %s", apiErr.Details) - } - return response, nil -} - -func (c *Client) DeleteRepository(repoID string) error { - url := fmt.Sprintf("%s/api/v1/repositories/%s", c.Config.BaseURL, repoID) - resp, err := c.client.R(). - Delete(url) - if err != nil || resp.IsError() { - apiErr, decErr := c.decodeAPIError(resp.Body()) - if decErr != nil { - return errors.Wrap(decErr, "sending request") - } - return fmt.Errorf("error fetching repos: %s", apiErr.Details) - } - return nil -} - -func (c *Client) CreateRepoPool(repoID string, param params.CreatePoolParams) (params.Pool, error) { - url := fmt.Sprintf("%s/api/v1/repositories/%s/pools", c.Config.BaseURL, repoID) - - var response params.Pool - body, err := json.Marshal(param) - if err != nil { - return response, err - } - resp, err := c.client.R(). - SetBody(body). - SetResult(&response). - Post(url) - if err != nil || resp.IsError() { - apiErr, decErr := c.decodeAPIError(resp.Body()) - if decErr != nil { - return response, errors.Wrap(decErr, "sending request") - } - return response, fmt.Errorf("error performing login: %s", apiErr.Details) - } - return response, nil -} - -func (c *Client) ListRepoPools(repoID string) ([]params.Pool, error) { - url := fmt.Sprintf("%s/api/v1/repositories/%s/pools", c.Config.BaseURL, repoID) - - var response []params.Pool - resp, err := c.client.R(). - SetResult(&response). - Get(url) - if err != nil || resp.IsError() { - apiErr, decErr := c.decodeAPIError(resp.Body()) - if decErr != nil { - return response, errors.Wrap(decErr, "sending request") - } - return response, fmt.Errorf("error performing login: %s", apiErr.Details) - } - return response, nil -} - -func (c *Client) GetRepoPool(repoID, poolID string) (params.Pool, error) { - url := fmt.Sprintf("%s/api/v1/repositories/%s/pools/%s", c.Config.BaseURL, repoID, poolID) - - var response params.Pool - resp, err := c.client.R(). - SetResult(&response). - Get(url) - if err != nil || resp.IsError() { - apiErr, decErr := c.decodeAPIError(resp.Body()) - if decErr != nil { - return response, errors.Wrap(decErr, "sending request") - } - return response, fmt.Errorf("error performing login: %s", apiErr.Details) - } - return response, nil -} - -func (c *Client) DeleteRepoPool(repoID, poolID string) error { - url := fmt.Sprintf("%s/api/v1/repositories/%s/pools/%s", c.Config.BaseURL, repoID, poolID) - - resp, err := c.client.R(). - Delete(url) - - if err != nil || resp.IsError() { - apiErr, decErr := c.decodeAPIError(resp.Body()) - if decErr != nil { - return errors.Wrap(decErr, "sending request") - } - return fmt.Errorf("error performing login: %s", apiErr.Details) - } - return nil -} - -func (c *Client) UpdateRepoPool(repoID, poolID string, param params.UpdatePoolParams) (params.Pool, error) { - url := fmt.Sprintf("%s/api/v1/repositories/%s/pools/%s", c.Config.BaseURL, repoID, poolID) - - var response params.Pool - body, err := json.Marshal(param) - if err != nil { - return response, err - } - resp, err := c.client.R(). - SetBody(body). - SetResult(&response). - Put(url) - if err != nil || resp.IsError() { - apiErr, decErr := c.decodeAPIError(resp.Body()) - if decErr != nil { - return response, errors.Wrap(decErr, "sending request") - } - return response, fmt.Errorf("error performing login: %s", apiErr.Details) - } - return response, nil -} - -func (c *Client) ListRepoInstances(repoID string) ([]params.Instance, error) { - url := fmt.Sprintf("%s/api/v1/repositories/%s/instances", c.Config.BaseURL, repoID) - - var response []params.Instance + var response params.Instance resp, err := c.client.R(). SetResult(&response). Get(url) @@ -315,20 +153,3 @@ func (c *Client) ListPoolInstances(poolID string) ([]params.Instance, error) { } return response, nil } - -func (c *Client) GetInstanceByName(instanceName string) (params.Instance, error) { - url := fmt.Sprintf("%s/api/v1/instances/%s", c.Config.BaseURL, instanceName) - - var response params.Instance - resp, err := c.client.R(). - SetResult(&response). - Get(url) - if err != nil || resp.IsError() { - apiErr, decErr := c.decodeAPIError(resp.Body()) - if decErr != nil { - return response, errors.Wrap(decErr, "sending request") - } - return response, fmt.Errorf("error performing login: %s", apiErr.Details) - } - return response, nil -} diff --git a/cmd/garm-cli/client/organizations.go b/cmd/garm-cli/client/organizations.go new file mode 100644 index 00000000..8ef4a1ae --- /dev/null +++ b/cmd/garm-cli/client/organizations.go @@ -0,0 +1,189 @@ +package client + +import ( + "encoding/json" + "fmt" + + "garm/params" + + "github.com/pkg/errors" +) + +func (c *Client) ListOrganizations() ([]params.Organization, error) { + var orgs []params.Organization + url := fmt.Sprintf("%s/api/v1/organizations", c.Config.BaseURL) + resp, err := c.client.R(). + SetResult(&orgs). + Get(url) + if err != nil || resp.IsError() { + apiErr, decErr := c.decodeAPIError(resp.Body()) + if decErr != nil { + return nil, errors.Wrap(decErr, "sending request") + } + return nil, fmt.Errorf("error fetching orgs: %s", apiErr.Details) + } + return orgs, nil +} + +func (c *Client) CreateOrganization(param params.CreateOrgParams) (params.Organization, error) { + var response params.Organization + url := fmt.Sprintf("%s/api/v1/organizations", c.Config.BaseURL) + + body, err := json.Marshal(param) + if err != nil { + return params.Organization{}, err + } + resp, err := c.client.R(). + SetBody(body). + SetResult(&response). + Post(url) + if err != nil || resp.IsError() { + apiErr, decErr := c.decodeAPIError(resp.Body()) + if decErr != nil { + return response, errors.Wrap(decErr, "sending request") + } + return response, fmt.Errorf("error performing login: %s", apiErr.Details) + } + return response, nil +} + +func (c *Client) GetOrganization(orgID string) (params.Organization, error) { + var response params.Organization + url := fmt.Sprintf("%s/api/v1/organizations/%s", c.Config.BaseURL, orgID) + resp, err := c.client.R(). + SetResult(&response). + Get(url) + if err != nil || resp.IsError() { + apiErr, decErr := c.decodeAPIError(resp.Body()) + if decErr != nil { + return response, errors.Wrap(decErr, "sending request") + } + return response, fmt.Errorf("error fetching orgs: %s", apiErr.Details) + } + return response, nil +} + +func (c *Client) DeleteOrganization(orgID string) error { + url := fmt.Sprintf("%s/api/v1/organizations/%s", c.Config.BaseURL, orgID) + resp, err := c.client.R(). + Delete(url) + if err != nil || resp.IsError() { + apiErr, decErr := c.decodeAPIError(resp.Body()) + if decErr != nil { + return errors.Wrap(decErr, "sending request") + } + return fmt.Errorf("error fetching orgs: %s", apiErr.Details) + } + return nil +} + +func (c *Client) CreateOrgPool(orgID string, param params.CreatePoolParams) (params.Pool, error) { + url := fmt.Sprintf("%s/api/v1/organizations/%s/pools", c.Config.BaseURL, orgID) + + var response params.Pool + body, err := json.Marshal(param) + if err != nil { + return response, err + } + resp, err := c.client.R(). + SetBody(body). + SetResult(&response). + Post(url) + if err != nil || resp.IsError() { + apiErr, decErr := c.decodeAPIError(resp.Body()) + if decErr != nil { + return response, errors.Wrap(decErr, "sending request") + } + return response, fmt.Errorf("error creating org pool: %s", apiErr.Details) + } + return response, nil +} + +func (c *Client) ListOrgPools(orgID string) ([]params.Pool, error) { + url := fmt.Sprintf("%s/api/v1/organizations/%s/pools", c.Config.BaseURL, orgID) + + var response []params.Pool + resp, err := c.client.R(). + SetResult(&response). + Get(url) + if err != nil || resp.IsError() { + apiErr, decErr := c.decodeAPIError(resp.Body()) + if decErr != nil { + return response, errors.Wrap(decErr, "sending request") + } + return response, fmt.Errorf("error listing org pools: %s", apiErr.Details) + } + return response, nil +} + +func (c *Client) GetOrgPool(orgID, poolID string) (params.Pool, error) { + url := fmt.Sprintf("%s/api/v1/organizations/%s/pools/%s", c.Config.BaseURL, orgID, poolID) + + var response params.Pool + resp, err := c.client.R(). + SetResult(&response). + Get(url) + if err != nil || resp.IsError() { + apiErr, decErr := c.decodeAPIError(resp.Body()) + if decErr != nil { + return response, errors.Wrap(decErr, "sending request") + } + return response, fmt.Errorf("error fetching org pool: %s", apiErr.Details) + } + return response, nil +} + +func (c *Client) DeleteOrgPool(orgID, poolID string) error { + url := fmt.Sprintf("%s/api/v1/organizations/%s/pools/%s", c.Config.BaseURL, orgID, poolID) + + resp, err := c.client.R(). + Delete(url) + + if err != nil || resp.IsError() { + apiErr, decErr := c.decodeAPIError(resp.Body()) + if decErr != nil { + return errors.Wrap(decErr, "sending request") + } + return fmt.Errorf("error deleting org pool: %s", apiErr.Details) + } + return nil +} + +func (c *Client) UpdateOrgPool(orgID, poolID string, param params.UpdatePoolParams) (params.Pool, error) { + url := fmt.Sprintf("%s/api/v1/organizations/%s/pools/%s", c.Config.BaseURL, orgID, poolID) + + var response params.Pool + body, err := json.Marshal(param) + if err != nil { + return response, err + } + resp, err := c.client.R(). + SetBody(body). + SetResult(&response). + Put(url) + if err != nil || resp.IsError() { + apiErr, decErr := c.decodeAPIError(resp.Body()) + if decErr != nil { + return response, errors.Wrap(decErr, "sending request") + } + return response, fmt.Errorf("error updating org pool: %s", apiErr.Details) + } + return response, nil +} + +func (c *Client) ListOrgInstances(orgID string) ([]params.Instance, error) { + url := fmt.Sprintf("%s/api/v1/organizations/%s/instances", c.Config.BaseURL, orgID) + + var response []params.Instance + resp, err := c.client.R(). + SetResult(&response). + Get(url) + if err != nil || resp.IsError() { + apiErr, decErr := c.decodeAPIError(resp.Body()) + if decErr != nil { + return response, errors.Wrap(decErr, "sending request") + } + return response, fmt.Errorf("error listing org instances: %s", apiErr.Details) + } + return response, nil +} diff --git a/cmd/garm-cli/client/repositories.go b/cmd/garm-cli/client/repositories.go new file mode 100644 index 00000000..0f0f614e --- /dev/null +++ b/cmd/garm-cli/client/repositories.go @@ -0,0 +1,189 @@ +package client + +import ( + "encoding/json" + "fmt" + + "garm/params" + + "github.com/pkg/errors" +) + +func (c *Client) ListRepositories() ([]params.Repository, error) { + var repos []params.Repository + url := fmt.Sprintf("%s/api/v1/repositories", c.Config.BaseURL) + resp, err := c.client.R(). + SetResult(&repos). + Get(url) + if err != nil || resp.IsError() { + apiErr, decErr := c.decodeAPIError(resp.Body()) + if decErr != nil { + return nil, errors.Wrap(decErr, "sending request") + } + return nil, fmt.Errorf("error fetching repos: %s", apiErr.Details) + } + return repos, nil +} + +func (c *Client) CreateRepository(param params.CreateRepoParams) (params.Repository, error) { + var response params.Repository + url := fmt.Sprintf("%s/api/v1/repositories", c.Config.BaseURL) + + body, err := json.Marshal(param) + if err != nil { + return params.Repository{}, err + } + resp, err := c.client.R(). + SetBody(body). + SetResult(&response). + Post(url) + if err != nil || resp.IsError() { + apiErr, decErr := c.decodeAPIError(resp.Body()) + if decErr != nil { + return response, errors.Wrap(decErr, "sending request") + } + return response, fmt.Errorf("error performing login: %s", apiErr.Details) + } + return response, nil +} + +func (c *Client) GetRepository(repoID string) (params.Repository, error) { + var response params.Repository + url := fmt.Sprintf("%s/api/v1/repositories/%s", c.Config.BaseURL, repoID) + resp, err := c.client.R(). + SetResult(&response). + Get(url) + if err != nil || resp.IsError() { + apiErr, decErr := c.decodeAPIError(resp.Body()) + if decErr != nil { + return response, errors.Wrap(decErr, "sending request") + } + return response, fmt.Errorf("error fetching repos: %s", apiErr.Details) + } + return response, nil +} + +func (c *Client) DeleteRepository(repoID string) error { + url := fmt.Sprintf("%s/api/v1/repositories/%s", c.Config.BaseURL, repoID) + resp, err := c.client.R(). + Delete(url) + if err != nil || resp.IsError() { + apiErr, decErr := c.decodeAPIError(resp.Body()) + if decErr != nil { + return errors.Wrap(decErr, "sending request") + } + return fmt.Errorf("error fetching repos: %s", apiErr.Details) + } + return nil +} + +func (c *Client) CreateRepoPool(repoID string, param params.CreatePoolParams) (params.Pool, error) { + url := fmt.Sprintf("%s/api/v1/repositories/%s/pools", c.Config.BaseURL, repoID) + + var response params.Pool + body, err := json.Marshal(param) + if err != nil { + return response, err + } + resp, err := c.client.R(). + SetBody(body). + SetResult(&response). + Post(url) + if err != nil || resp.IsError() { + apiErr, decErr := c.decodeAPIError(resp.Body()) + if decErr != nil { + return response, errors.Wrap(decErr, "sending request") + } + return response, fmt.Errorf("error performing login: %s", apiErr.Details) + } + return response, nil +} + +func (c *Client) ListRepoPools(repoID string) ([]params.Pool, error) { + url := fmt.Sprintf("%s/api/v1/repositories/%s/pools", c.Config.BaseURL, repoID) + + var response []params.Pool + resp, err := c.client.R(). + SetResult(&response). + Get(url) + if err != nil || resp.IsError() { + apiErr, decErr := c.decodeAPIError(resp.Body()) + if decErr != nil { + return response, errors.Wrap(decErr, "sending request") + } + return response, fmt.Errorf("error performing login: %s", apiErr.Details) + } + return response, nil +} + +func (c *Client) GetRepoPool(repoID, poolID string) (params.Pool, error) { + url := fmt.Sprintf("%s/api/v1/repositories/%s/pools/%s", c.Config.BaseURL, repoID, poolID) + + var response params.Pool + resp, err := c.client.R(). + SetResult(&response). + Get(url) + if err != nil || resp.IsError() { + apiErr, decErr := c.decodeAPIError(resp.Body()) + if decErr != nil { + return response, errors.Wrap(decErr, "sending request") + } + return response, fmt.Errorf("error performing login: %s", apiErr.Details) + } + return response, nil +} + +func (c *Client) DeleteRepoPool(repoID, poolID string) error { + url := fmt.Sprintf("%s/api/v1/repositories/%s/pools/%s", c.Config.BaseURL, repoID, poolID) + + resp, err := c.client.R(). + Delete(url) + + if err != nil || resp.IsError() { + apiErr, decErr := c.decodeAPIError(resp.Body()) + if decErr != nil { + return errors.Wrap(decErr, "sending request") + } + return fmt.Errorf("error performing login: %s", apiErr.Details) + } + return nil +} + +func (c *Client) UpdateRepoPool(repoID, poolID string, param params.UpdatePoolParams) (params.Pool, error) { + url := fmt.Sprintf("%s/api/v1/repositories/%s/pools/%s", c.Config.BaseURL, repoID, poolID) + + var response params.Pool + body, err := json.Marshal(param) + if err != nil { + return response, err + } + resp, err := c.client.R(). + SetBody(body). + SetResult(&response). + Put(url) + if err != nil || resp.IsError() { + apiErr, decErr := c.decodeAPIError(resp.Body()) + if decErr != nil { + return response, errors.Wrap(decErr, "sending request") + } + return response, fmt.Errorf("error performing login: %s", apiErr.Details) + } + return response, nil +} + +func (c *Client) ListRepoInstances(repoID string) ([]params.Instance, error) { + url := fmt.Sprintf("%s/api/v1/repositories/%s/instances", c.Config.BaseURL, repoID) + + var response []params.Instance + resp, err := c.client.R(). + SetResult(&response). + Get(url) + if err != nil || resp.IsError() { + apiErr, decErr := c.decodeAPIError(resp.Body()) + if decErr != nil { + return response, errors.Wrap(decErr, "sending request") + } + return response, fmt.Errorf("error performing login: %s", apiErr.Details) + } + return response, nil +} diff --git a/cmd/garm-cli/garm-cli b/cmd/garm-cli/garm-cli deleted file mode 100755 index bc61e7e8..00000000 Binary files a/cmd/garm-cli/garm-cli and /dev/null differ diff --git a/params/params.go b/params/params.go index cc6244f0..8461200e 100644 --- a/params/params.go +++ b/params/params.go @@ -166,3 +166,8 @@ type Provider struct { ProviderType config.ProviderType `json:"type"` Description string `json:"description"` } + +type UpdatePoolStateParams struct { + WebhookSecret string + Internal Internal +} diff --git a/params/requests.go b/params/requests.go index e48ded2c..c1940aa0 100644 --- a/params/requests.go +++ b/params/requests.go @@ -35,6 +35,23 @@ func (c *CreateRepoParams) Validate() error { return nil } +type CreateOrgParams struct { + Name string `json:"name"` + CredentialsName string `json:"credentials_name"` + WebhookSecret string `json:"webhook_secret"` +} + +func (c *CreateOrgParams) Validate() error { + if c.Name == "" { + return errors.NewBadRequestError("missing repo name") + } + + if c.CredentialsName == "" { + return errors.NewBadRequestError("missing credentials name") + } + return nil +} + // NewUserParams holds the needed information to create // a new user type NewUserParams struct { diff --git a/runner/common/pool.go b/runner/common/pool.go index 6a6d67d5..a523dcd7 100644 --- a/runner/common/pool.go +++ b/runner/common/pool.go @@ -14,7 +14,7 @@ const ( type PoolManager interface { WebhookSecret() string HandleWorkflowJob(job params.WorkflowJob) error - RefreshState(cfg params.Repository) error + RefreshState(param params.UpdatePoolStateParams) error // AddPool(ctx context.Context, pool params.Pool) error // PoolManager lifecycle functions. Start/stop pool. @@ -22,16 +22,3 @@ type PoolManager interface { Stop() error Wait() error } - -type Pool interface { - ListInstances() ([]params.Instance, error) - GetInstance() (params.Instance, error) - DeleteInstance() error - StopInstance() error - StartInstance() error - - // Pool lifecycle functions. Start/stop pool. - Start() error - Stop() error - Wait() error -} diff --git a/runner/organizations.go b/runner/organizations.go new file mode 100644 index 00000000..c3ef9da1 --- /dev/null +++ b/runner/organizations.go @@ -0,0 +1,322 @@ +package runner + +import ( + "context" + "garm/auth" + runnerErrors "garm/errors" + "garm/params" + "garm/runner/common" + "garm/runner/pool" + "log" + "strings" + + "github.com/pkg/errors" +) + +func (r *Runner) CreateOrganization(ctx context.Context, param params.CreateOrgParams) (org params.Organization, err error) { + if !auth.IsAdmin(ctx) { + return org, runnerErrors.ErrUnauthorized + } + + if err := param.Validate(); err != nil { + return params.Organization{}, errors.Wrap(err, "validating params") + } + + creds, ok := r.credentials[param.CredentialsName] + if !ok { + return params.Organization{}, runnerErrors.NewBadRequestError("credentials %s not defined", param.CredentialsName) + } + + _, err = r.store.GetOrganization(ctx, param.Name) + if err != nil { + if !errors.Is(err, runnerErrors.ErrNotFound) { + return params.Organization{}, errors.Wrap(err, "fetching repo") + } + } else { + return params.Organization{}, runnerErrors.NewConflictError("organization %s already exists", param.Name) + } + + org, err = r.store.CreateOrganization(ctx, param.Name, creds.Name, param.WebhookSecret) + if err != nil { + return params.Organization{}, errors.Wrap(err, "creating organization") + } + + defer func() { + if err != nil { + r.store.DeleteOrganization(ctx, org.ID) + } + }() + + poolMgr, err := r.loadOrgPoolManager(org) + if err := poolMgr.Start(); err != nil { + return params.Organization{}, errors.Wrap(err, "starting pool manager") + } + r.organizations[org.ID] = poolMgr + return org, nil +} + +func (r *Runner) ListOrganizations(ctx context.Context) ([]params.Organization, error) { + if !auth.IsAdmin(ctx) { + return nil, runnerErrors.ErrUnauthorized + } + + orgs, err := r.store.ListOrganizations(ctx) + if err != nil { + return nil, errors.Wrap(err, "listing organizations") + } + + return orgs, nil +} + +func (r *Runner) GetOrganizationByID(ctx context.Context, orgID string) (params.Organization, error) { + if !auth.IsAdmin(ctx) { + return params.Organization{}, runnerErrors.ErrUnauthorized + } + + org, err := r.store.GetOrganizationByID(ctx, orgID) + if err != nil { + return params.Organization{}, errors.Wrap(err, "fetching repository") + } + return org, nil +} + +func (r *Runner) DeleteOrganization(ctx context.Context, orgID string) error { + if !auth.IsAdmin(ctx) { + return runnerErrors.ErrUnauthorized + } + + org, err := r.store.GetOrganizationByID(ctx, orgID) + if err != nil { + return errors.Wrap(err, "fetching repo") + } + + poolMgr, ok := r.organizations[org.ID] + if ok { + if err := poolMgr.Stop(); err != nil { + log.Printf("failed to stop pool for repo %s", org.ID) + } + delete(r.organizations, orgID) + } + + pools, err := r.store.ListOrgPools(ctx, orgID) + if err != nil { + return errors.Wrap(err, "fetching repo pools") + } + + if len(pools) > 0 { + poolIds := []string{} + for _, pool := range pools { + poolIds = append(poolIds, pool.ID) + } + + return runnerErrors.NewBadRequestError("repo has pools defined (%s)", strings.Join(poolIds, ", ")) + } + + if err := r.store.DeleteOrganization(ctx, orgID); err != nil { + return errors.Wrap(err, "removing repository") + } + return nil +} + +func (r *Runner) UpdateOrganization(ctx context.Context, orgID string, param params.UpdateRepositoryParams) (params.Organization, error) { + if !auth.IsAdmin(ctx) { + return params.Organization{}, runnerErrors.ErrUnauthorized + } + + r.mux.Lock() + defer r.mux.Unlock() + + org, err := r.store.GetOrganizationByID(ctx, orgID) + if err != nil { + return params.Organization{}, errors.Wrap(err, "fetching org") + } + + if param.CredentialsName != "" { + // Check that credentials are set before saving to db + if _, ok := r.credentials[param.CredentialsName]; !ok { + return params.Organization{}, runnerErrors.NewBadRequestError("invalid credentials (%s) for org %s", param.CredentialsName, org.Name) + } + } + + org, err = r.store.UpdateOrganization(ctx, orgID, param) + if err != nil { + return params.Organization{}, errors.Wrap(err, "updating org") + } + + poolMgr, ok := r.organizations[org.ID] + if ok { + internalCfg, err := r.getInternalConfig(org.CredentialsName) + if err != nil { + return params.Organization{}, errors.Wrap(err, "fetching internal config") + } + newState := params.UpdatePoolStateParams{ + WebhookSecret: org.WebhookSecret, + Internal: internalCfg, + } + org.Internal = internalCfg + // stop the pool mgr + if err := poolMgr.RefreshState(newState); err != nil { + return params.Organization{}, errors.Wrap(err, "updating pool manager") + } + } else { + poolMgr, err := r.loadOrgPoolManager(org) + if err != nil { + return params.Organization{}, errors.Wrap(err, "loading pool manager") + } + r.organizations[org.ID] = poolMgr + } + + return org, nil +} + +func (r *Runner) CreateOrgPool(ctx context.Context, orgID string, param params.CreatePoolParams) (params.Pool, error) { + if !auth.IsAdmin(ctx) { + return params.Pool{}, runnerErrors.ErrUnauthorized + } + + r.mux.Lock() + defer r.mux.Unlock() + + _, ok := r.organizations[orgID] + if !ok { + return params.Pool{}, runnerErrors.ErrNotFound + } + + createPoolParams, err := r.appendTagsToCreatePoolParams(param) + if err != nil { + return params.Pool{}, errors.Wrap(err, "fetching pool params") + } + + pool, err := r.store.CreateOrganizationPool(ctx, orgID, createPoolParams) + if err != nil { + return params.Pool{}, errors.Wrap(err, "creating pool") + } + + return pool, nil +} + +func (r *Runner) GetOrgPoolByID(ctx context.Context, orgID, poolID string) (params.Pool, error) { + if !auth.IsAdmin(ctx) { + return params.Pool{}, runnerErrors.ErrUnauthorized + } + + pool, err := r.store.GetOrganizationPool(ctx, orgID, poolID) + if err != nil { + return params.Pool{}, errors.Wrap(err, "fetching pool") + } + return pool, nil +} + +func (r *Runner) DeleteOrgPool(ctx context.Context, orgID, poolID string) error { + if !auth.IsAdmin(ctx) { + return runnerErrors.ErrUnauthorized + } + + // TODO: dedup instance count verification + pool, err := r.store.GetOrganizationPool(ctx, orgID, poolID) + if err != nil { + return errors.Wrap(err, "fetching pool") + } + + instances, err := r.store.ListInstances(ctx, pool.ID) + if err != nil { + return errors.Wrap(err, "fetching instances") + } + + // TODO: implement a count function + if len(instances) > 0 { + runnerIDs := []string{} + for _, run := range instances { + runnerIDs = append(runnerIDs, run.ID) + } + return runnerErrors.NewBadRequestError("pool has runners: %s", strings.Join(runnerIDs, ", ")) + } + + if err := r.store.DeleteOrganizationPool(ctx, orgID, poolID); err != nil { + return errors.Wrap(err, "deleting pool") + } + return nil +} + +func (r *Runner) ListOrgPools(ctx context.Context, orgID string) ([]params.Pool, error) { + if !auth.IsAdmin(ctx) { + return []params.Pool{}, runnerErrors.ErrUnauthorized + } + + pools, err := r.store.ListOrgPools(ctx, orgID) + if err != nil { + return nil, errors.Wrap(err, "fetching pools") + } + return pools, nil +} + +func (r *Runner) UpdateOrgPool(ctx context.Context, orgID, poolID string, param params.UpdatePoolParams) (params.Pool, error) { + if !auth.IsAdmin(ctx) { + return params.Pool{}, runnerErrors.ErrUnauthorized + } + + pool, err := r.store.GetOrganizationPool(ctx, orgID, poolID) + if err != nil { + return params.Pool{}, errors.Wrap(err, "fetching pool") + } + + maxRunners := pool.MaxRunners + minIdleRunners := pool.MinIdleRunners + + if param.MaxRunners != nil { + maxRunners = *param.MaxRunners + } + if param.MinIdleRunners != nil { + minIdleRunners = *param.MinIdleRunners + } + + if minIdleRunners > maxRunners { + return params.Pool{}, runnerErrors.NewBadRequestError("min_idle_runners cannot be larger than max_runners") + } + + newPool, err := r.store.UpdateOrganizationPool(ctx, orgID, poolID, param) + if err != nil { + return params.Pool{}, errors.Wrap(err, "updating pool") + } + return newPool, nil +} + +func (r *Runner) ListOrgInstances(ctx context.Context, orgID string) ([]params.Instance, error) { + if !auth.IsAdmin(ctx) { + return nil, runnerErrors.ErrUnauthorized + } + + instances, err := r.store.ListOrgInstances(ctx, orgID) + if err != nil { + return []params.Instance{}, errors.Wrap(err, "fetching instances") + } + return instances, nil +} + +func (r *Runner) loadOrgPoolManager(org params.Organization) (common.PoolManager, error) { + cfg, err := r.getInternalConfig(org.CredentialsName) + if err != nil { + return nil, errors.Wrap(err, "fetching internal config") + } + org.Internal = cfg + poolManager, err := pool.NewOrganizationPoolManager(r.ctx, org, r.providers, r.store) + if err != nil { + return nil, errors.Wrap(err, "creating pool manager") + } + return poolManager, nil +} + +func (r *Runner) findOrgPoolManager(name string) (common.PoolManager, error) { + r.mux.Lock() + defer r.mux.Unlock() + + org, err := r.store.GetOrganization(r.ctx, name) + if err != nil { + return nil, errors.Wrap(err, "fetching repo") + } + + if orgPoolMgr, ok := r.organizations[org.ID]; ok { + return orgPoolMgr, nil + } + return nil, errors.Wrapf(runnerErrors.ErrNotFound, "organization %s not configured", name) +} diff --git a/runner/pool/common.go b/runner/pool/common.go index b81dc50c..e8b2cccf 100644 --- a/runner/pool/common.go +++ b/runner/pool/common.go @@ -1,6 +1,612 @@ package pool +import ( + "context" + "fmt" + "garm/auth" + dbCommon "garm/database/common" + runnerErrors "garm/errors" + "garm/params" + "garm/runner/common" + providerCommon "garm/runner/providers/common" + "log" + "strings" + "sync" + "time" + + "github.com/google/go-github/v43/github" + "github.com/google/uuid" + "github.com/pkg/errors" +) + var ( poolIDLabelprefix = "runner-pool-id:" controllerLabelPrefix = "runner-controller-id:" ) + +type basePool struct { + ctx context.Context + controllerID string + + store dbCommon.Store + + providers map[string]common.Provider + tools []*github.RunnerApplicationDownload + quit chan struct{} + done chan struct{} + + helper poolHelper + + mux sync.Mutex +} + +// cleanupOrphanedProviderRunners compares runners in github with local runners and removes +// any local runners that are not present in Github. Runners that are "idle" in our +// provider, but do not exist in github, will be removed. This can happen if the +// garm was offline while a job was executed by a github action. When this +// happens, github will remove the ephemeral worker and send a webhook our way. +// If we were offline and did not process the webhook, the instance will linger. +// We need to remove it from the provider and database. +func (r *basePool) cleanupOrphanedProviderRunners(runners []*github.Runner) error { + // runners, err := r.getGithubRunners() + // if err != nil { + // return errors.Wrap(err, "fetching github runners") + // } + + dbInstances, err := r.helper.FetchDbInstances() + if err != nil { + return errors.Wrap(err, "fetching instances from db") + } + + runnerNames := map[string]bool{} + for _, run := range runners { + runnerNames[*run.Name] = true + } + + for _, instance := range dbInstances { + if providerCommon.InstanceStatus(instance.Status) == providerCommon.InstancePendingCreate || providerCommon.InstanceStatus(instance.Status) == providerCommon.InstancePendingDelete { + // this instance is in the process of being created or is awaiting deletion. + // Instances in pending_Create did not get a chance to register themselves in, + // github so we let them be for now. + continue + } + if ok := runnerNames[instance.Name]; !ok { + // Set pending_delete on DB field. Allow consolidate() to remove it. + updateParams := params.UpdateInstanceParams{ + RunnerStatus: providerCommon.RunnerStatus(providerCommon.InstancePendingDelete), + } + _, err = r.store.UpdateInstance(r.ctx, instance.ID, updateParams) + if err != nil { + return errors.Wrap(err, "syncing local state with github") + } + } + } + return nil +} + +func (r *basePool) fetchInstanceFromJob(job params.WorkflowJob) (params.Instance, error) { + runnerName := job.WorkflowJob.RunnerName + runner, err := r.store.GetInstanceByName(r.ctx, runnerName) + if err != nil { + return params.Instance{}, errors.Wrap(err, "fetching instance") + } + + return runner, nil +} + +func (r *basePool) setInstanceRunnerStatus(job params.WorkflowJob, status providerCommon.RunnerStatus) error { + runner, err := r.fetchInstanceFromJob(job) + if err != nil { + return errors.Wrap(err, "fetching instance") + } + + updateParams := params.UpdateInstanceParams{ + RunnerStatus: status, + } + + log.Printf("setting runner status for %s to %s", runner.Name, status) + if _, err := r.store.UpdateInstance(r.ctx, runner.ID, updateParams); err != nil { + return errors.Wrap(err, "updating runner state") + } + return nil +} + +func (r *basePool) setInstanceStatus(job params.WorkflowJob, status providerCommon.InstanceStatus) error { + runner, err := r.fetchInstanceFromJob(job) + if err != nil { + return errors.Wrap(err, "fetching instance") + } + + updateParams := params.UpdateInstanceParams{ + Status: status, + } + + if _, err := r.store.UpdateInstance(r.ctx, runner.ID, updateParams); err != nil { + return errors.Wrap(err, "updating runner state") + } + return nil +} + +func (r *basePool) acquireNewInstance(job params.WorkflowJob) error { + requestedLabels := job.WorkflowJob.Labels + if len(requestedLabels) == 0 { + // no labels were requested. + return nil + } + + pool, err := r.helper.FindPoolByTags(requestedLabels) + if err != nil { + return errors.Wrap(err, "fetching suitable pool") + } + log.Printf("adding new runner with requested tags %s in pool %s", strings.Join(job.WorkflowJob.Labels, ", "), pool.ID) + + if !pool.Enabled { + log.Printf("selected pool (%s) is disabled", pool.ID) + return nil + } + + // TODO: implement count + poolInstances, err := r.store.ListInstances(r.ctx, pool.ID) + if err != nil { + return errors.Wrap(err, "fetching instances") + } + + if len(poolInstances) >= int(pool.MaxRunners) { + log.Printf("max_runners (%d) reached for pool %s, skipping...", pool.MaxRunners, pool.ID) + return nil + } + + if err := r.AddRunner(r.ctx, pool.ID); err != nil { + log.Printf("failed to add runner to pool %s", pool.ID) + return errors.Wrap(err, "adding runner") + } + return nil +} + +func (r *basePool) AddRunner(ctx context.Context, poolID string) error { + pool, err := r.helper.GetPoolByID(poolID) + if err != nil { + return errors.Wrap(err, "fetching pool") + } + + name := fmt.Sprintf("garm-%s", uuid.New()) + + createParams := params.CreateInstanceParams{ + Name: name, + Pool: poolID, + Status: providerCommon.InstancePendingCreate, + RunnerStatus: providerCommon.RunnerPending, + OSArch: pool.OSArch, + OSType: pool.OSType, + CallbackURL: r.helper.GetCallbackURL(), + } + + instance, err := r.store.CreateInstance(r.ctx, poolID, createParams) + if err != nil { + return errors.Wrap(err, "creating instance") + } + + updateParams := params.UpdateInstanceParams{ + OSName: instance.OSName, + OSVersion: instance.OSVersion, + Addresses: instance.Addresses, + Status: instance.Status, + ProviderID: instance.ProviderID, + } + + if _, err := r.store.UpdateInstance(r.ctx, instance.ID, updateParams); err != nil { + return errors.Wrap(err, "updating runner state") + } + + return nil +} + +func (r *basePool) loop() { + defer func() { + log.Printf("repository %s loop exited", r.helper.String()) + close(r.done) + }() + log.Printf("starting loop for %s", r.helper.String()) + // TODO: Consolidate runners on loop start. Provider runners must match runners + // in github and DB. When a Workflow job is received, we will first create/update + // an entity in the database, before sending the request to the provider to create/delete + // an instance. If a "queued" job is received, we create an entity in the db with + // a state of "pending_create". Once that instance is up and calls home, it is marked + // as "active". If a "completed" job is received from github, we mark the instance + // as "pending_delete". Once the provider deletes the instance, we mark it as "deleted" + // in the database. + // We also ensure we have runners created based on pool characteristics. This is where + // we spin up "MinWorkers" for each runner type. + + for { + select { + case <-time.After(5 * time.Second): + // consolidate. + r.consolidate() + case <-time.After(3 * time.Hour): + // Update tools cache. + tools, err := r.helper.FetchTools() + if err != nil { + log.Printf("failed to update tools for repo %s: %s", r.helper.String(), err) + } + r.mux.Lock() + r.tools = tools + r.mux.Unlock() + case <-r.ctx.Done(): + // daemon is shutting down. + return + case <-r.quit: + // this worker was stopped. + return + } + } +} + +func (r *basePool) addInstanceToProvider(instance params.Instance) error { + pool, err := r.helper.GetPoolByID(instance.PoolID) + if err != nil { + return errors.Wrap(err, "fetching pool") + } + + provider, ok := r.providers[pool.ProviderName] + if !ok { + return runnerErrors.NewNotFoundError("invalid provider ID") + } + + labels := []string{} + for _, tag := range pool.Tags { + labels = append(labels, tag.Name) + } + labels = append(labels, r.controllerLabel()) + labels = append(labels, r.poolLabel(pool.ID)) + + tk, err := r.helper.GetGithubRegistrationToken() + if err != nil { + return errors.Wrap(err, "fetching registration token") + } + + entity := r.helper.String() + jwtToken, err := auth.NewInstanceJWTToken(instance, r.helper.JwtToken(), entity, common.RepositoryPool) + if err != nil { + return errors.Wrap(err, "fetching instance jwt token") + } + + bootstrapArgs := params.BootstrapInstance{ + Name: instance.Name, + Tools: r.tools, + RepoURL: r.helper.GithubURL(), + GithubRunnerAccessToken: tk, + CallbackURL: instance.CallbackURL, + InstanceToken: jwtToken, + OSArch: pool.OSArch, + Flavor: pool.Flavor, + Image: pool.Image, + Labels: labels, + } + + providerInstance, err := provider.CreateInstance(r.ctx, bootstrapArgs) + if err != nil { + return errors.Wrap(err, "creating instance") + } + + updateInstanceArgs := r.updateArgsFromProviderInstance(providerInstance) + if _, err := r.store.UpdateInstance(r.ctx, instance.ID, updateInstanceArgs); err != nil { + return errors.Wrap(err, "updating instance") + } + return nil +} + +func (r *basePool) poolIDFromStringLabels(labels []string) (string, error) { + for _, lbl := range labels { + if strings.HasPrefix(lbl, poolIDLabelprefix) { + return lbl[len(poolIDLabelprefix):], nil + } + } + return "", runnerErrors.ErrNotFound +} + +func (r *basePool) HandleWorkflowJob(job params.WorkflowJob) error { + if err := r.helper.ValidateOwner(job); err != nil { + return errors.Wrap(err, "validating owner") + } + + switch job.Action { + case "queued": + // Create instance in database and set it to pending create. + if err := r.acquireNewInstance(job); err != nil { + log.Printf("failed to add instance") + } + case "completed": + // Set instance in database to pending delete. + if job.WorkflowJob.RunnerName == "" { + // Unassigned jobs will have an empty runner_name. + // There is nothing to to in this case. + log.Printf("no runner was assigned. Skipping.") + return nil + } + log.Printf("marking instance %s as pending_delete", job.WorkflowJob.RunnerName) + if err := r.setInstanceStatus(job, providerCommon.InstancePendingDelete); err != nil { + log.Printf("failed to update runner %s status", job.WorkflowJob.RunnerName) + return errors.Wrap(err, "updating runner") + } + case "in_progress": + // update instance workload state. Set job_id in instance state. + if err := r.setInstanceRunnerStatus(job, providerCommon.RunnerActive); err != nil { + log.Printf("failed to update runner %s status", job.WorkflowJob.RunnerName) + return errors.Wrap(err, "updating runner") + } + } + return nil +} + +func (r *basePool) poolLabel(poolID string) string { + return fmt.Sprintf("%s%s", poolIDLabelprefix, poolID) +} + +func (r *basePool) controllerLabel() string { + return fmt.Sprintf("%s%s", controllerLabelPrefix, r.controllerID) +} + +func (r *basePool) updateArgsFromProviderInstance(providerInstance params.Instance) params.UpdateInstanceParams { + return params.UpdateInstanceParams{ + ProviderID: providerInstance.ProviderID, + OSName: providerInstance.OSName, + OSVersion: providerInstance.OSVersion, + Addresses: providerInstance.Addresses, + Status: providerInstance.Status, + RunnerStatus: providerInstance.RunnerStatus, + } +} + +func (r *basePool) ensureMinIdleRunners() { + pools, err := r.helper.ListPools() + if err != nil { + log.Printf("error listing pools: %s", err) + return + } + for _, pool := range pools { + if !pool.Enabled { + log.Printf("pool %s is disabled, skipping", pool.ID) + continue + } + existingInstances, err := r.store.ListInstances(r.ctx, pool.ID) + if err != nil { + log.Printf("failed to ensure minimum idle workers for pool %s: %s", pool.ID, err) + return + } + + // asJs, _ := json.MarshalIndent(existingInstances, "", " ") + // log.Printf(">>> %s", string(asJs)) + if uint(len(existingInstances)) >= pool.MaxRunners { + log.Printf("max workers (%d) reached for pool %s, skipping idle worker creation", pool.MaxRunners, pool.ID) + continue + } + + idleOrPendingWorkers := []params.Instance{} + for _, inst := range existingInstances { + if providerCommon.RunnerStatus(inst.RunnerStatus) != providerCommon.RunnerActive { + idleOrPendingWorkers = append(idleOrPendingWorkers, inst) + } + } + + var required int + if len(idleOrPendingWorkers) < int(pool.MinIdleRunners) { + // get the needed delta. + required = int(pool.MinIdleRunners) - len(idleOrPendingWorkers) + + projectedInstanceCount := len(existingInstances) + required + if uint(projectedInstanceCount) > pool.MaxRunners { + // ensure we don't go above max workers + delta := projectedInstanceCount - int(pool.MaxRunners) + required = required - delta + } + } + + for i := 0; i < required; i++ { + log.Printf("addind new idle worker to pool %s", pool.ID) + if err := r.AddRunner(r.ctx, pool.ID); err != nil { + log.Printf("failed to add new instance for pool %s: %s", pool.ID, err) + } + } + } +} + +// cleanupOrphanedGithubRunners will forcefully remove any github runners that appear +// as offline and for which we no longer have a local instance. +// This may happen if someone manually deletes the instance in the provider. We need to +// first remove the instance from github, and then from our database. +func (r *basePool) cleanupOrphanedGithubRunners(runners []*github.Runner) error { + for _, runner := range runners { + status := runner.GetStatus() + if status != "offline" { + // Runner is online. Ignore it. + continue + } + + removeRunner := false + poolID, err := r.poolIDFromLabels(runner.Labels) + if err != nil { + if !errors.Is(err, runnerErrors.ErrNotFound) { + return errors.Wrap(err, "finding pool") + } + // not a runner we manage + continue + } + + pool, err := r.helper.GetPoolByID(poolID) + if err != nil { + if !errors.Is(err, runnerErrors.ErrNotFound) { + return errors.Wrap(err, "fetching pool") + } + // not pool we manage. + continue + } + + dbInstance, err := r.store.GetPoolInstanceByName(r.ctx, poolID, *runner.Name) + if err != nil { + if !errors.Is(err, runnerErrors.ErrNotFound) { + return errors.Wrap(err, "fetching instance from DB") + } + // We no longer have a DB entry for this instance. Previous forceful + // removal may have failed? + removeRunner = true + } else { + if providerCommon.InstanceStatus(dbInstance.Status) == providerCommon.InstancePendingDelete { + // already marked for deleting. Let consolidate take care of it. + continue + } + // check if the provider still has the instance. + provider, ok := r.providers[pool.ProviderName] + if !ok { + return fmt.Errorf("unknown provider %s for pool %s", pool.ProviderName, pool.ID) + } + + if providerCommon.InstanceStatus(dbInstance.Status) == providerCommon.InstanceRunning { + // instance is running, but github reports runner as offline. Log the event. + // This scenario requires manual intervention. + // Perhaps it just came online and github did not yet change it's status? + log.Printf("instance %s is online but github reports runner as offline", dbInstance.Name) + continue + } + //start the instance + if err := provider.Start(r.ctx, dbInstance.ProviderID); err != nil { + return errors.Wrapf(err, "starting instance %s", dbInstance.ProviderID) + } + // we started the instance. Give it a chance to come online + continue + } + + if removeRunner { + if err := r.helper.RemoveGithubRunner(*runner.ID); err != nil { + return errors.Wrap(err, "removing runner") + } + } + } + return nil +} + +func (r *basePool) deleteInstanceFromProvider(instance params.Instance) error { + pool, err := r.helper.GetPoolByID(instance.PoolID) + if err != nil { + return errors.Wrap(err, "fetching pool") + } + + provider, ok := r.providers[pool.ProviderName] + if !ok { + return runnerErrors.NewNotFoundError("invalid provider ID") + } + + if err := provider.DeleteInstance(r.ctx, instance.ProviderID); err != nil { + return errors.Wrap(err, "removing instance") + } + + if err := r.store.DeleteInstance(r.ctx, pool.ID, instance.Name); err != nil { + return errors.Wrap(err, "deleting instance from database") + } + return nil +} + +func (r *basePool) poolIDFromLabels(labels []*github.RunnerLabels) (string, error) { + for _, lbl := range labels { + if strings.HasPrefix(*lbl.Name, poolIDLabelprefix) { + labelName := *lbl.Name + return labelName[len(poolIDLabelprefix):], nil + } + } + return "", runnerErrors.ErrNotFound +} + +func (r *basePool) deletePendingInstances() { + instances, err := r.helper.FetchDbInstances() + if err != nil { + log.Printf("failed to fetch instances from store: %s", err) + return + } + + for _, instance := range instances { + if instance.Status != providerCommon.InstancePendingDelete { + // not in pending_delete status. Skip. + continue + } + + if err := r.deleteInstanceFromProvider(instance); err != nil { + log.Printf("failed to delete instance from provider: %+v", err) + } + } +} + +func (r *basePool) addPendingInstances() { + // TODO: filter instances by status. + instances, err := r.helper.FetchDbInstances() + if err != nil { + log.Printf("failed to fetch instances from store: %s", err) + return + } + + for _, instance := range instances { + if instance.Status != providerCommon.InstancePendingCreate { + // not in pending_create status. Skip. + continue + } + // asJs, _ := json.MarshalIndent(instance, "", " ") + // log.Printf(">>> %s", string(asJs)) + if err := r.addInstanceToProvider(instance); err != nil { + log.Printf("failed to create instance in provider: %s", err) + } + } +} + +func (r *basePool) consolidate() { + r.mux.Lock() + defer r.mux.Unlock() + + r.deletePendingInstances() + r.addPendingInstances() + r.ensureMinIdleRunners() +} + +func (r *basePool) Wait() error { + select { + case <-r.done: + case <-time.After(20 * time.Second): + return errors.Wrap(runnerErrors.ErrTimeout, "waiting for pool to stop") + } + return nil +} + +func (r *basePool) Start() error { + tools, err := r.helper.FetchTools() + if err != nil { + return errors.Wrap(err, "initializing tools") + } + r.mux.Lock() + r.tools = tools + r.mux.Unlock() + + runners, err := r.helper.GetGithubRunners() + if err != nil { + return errors.Wrap(err, "fetching github runners") + } + if err := r.cleanupOrphanedProviderRunners(runners); err != nil { + return errors.Wrap(err, "cleaning orphaned instances") + } + + if err := r.cleanupOrphanedGithubRunners(runners); err != nil { + return errors.Wrap(err, "cleaning orphaned github runners") + } + go r.loop() + return nil +} + +func (r *basePool) Stop() error { + close(r.quit) + return nil +} + +func (r *basePool) RefreshState(param params.UpdatePoolStateParams) error { + return r.helper.UpdateState(param) +} + +func (r *basePool) WebhookSecret() string { + return r.helper.WebhookSecret() +} diff --git a/runner/pool/interfaces.go b/runner/pool/interfaces.go new file mode 100644 index 00000000..1304e8f8 --- /dev/null +++ b/runner/pool/interfaces.go @@ -0,0 +1,26 @@ +package pool + +import ( + "garm/params" + + "github.com/google/go-github/v43/github" +) + +type poolHelper interface { + GetGithubToken() string + GetGithubRunners() ([]*github.Runner, error) + FetchTools() ([]*github.RunnerApplicationDownload, error) + FetchDbInstances() ([]params.Instance, error) + RemoveGithubRunner(runnerID int64) error + ListPools() ([]params.Pool, error) + GithubURL() string + JwtToken() string + GetGithubRegistrationToken() (string, error) + String() string + GetCallbackURL() string + FindPoolByTags(labels []string) (params.Pool, error) + GetPoolByID(poolID string) (params.Pool, error) + ValidateOwner(job params.WorkflowJob) error + UpdateState(param params.UpdatePoolStateParams) error + WebhookSecret() string +} diff --git a/runner/pool/organization.go b/runner/pool/organization.go new file mode 100644 index 00000000..c7b1a460 --- /dev/null +++ b/runner/pool/organization.go @@ -0,0 +1,164 @@ +package pool + +import ( + "context" + "fmt" + "sync" + + "garm/config" + dbCommon "garm/database/common" + runnerErrors "garm/errors" + "garm/params" + "garm/runner/common" + "garm/util" + + "github.com/google/go-github/v43/github" + "github.com/pkg/errors" +) + +// test that we implement PoolManager +var _ poolHelper = &organization{} + +func NewOrganizationPoolManager(ctx context.Context, cfg params.Organization, providers map[string]common.Provider, store dbCommon.Store) (common.PoolManager, error) { + ghc, err := util.GithubClient(ctx, cfg.Internal.OAuth2Token) + if err != nil { + return nil, errors.Wrap(err, "getting github client") + } + + helper := &organization{ + cfg: cfg, + ctx: ctx, + ghcli: ghc, + id: cfg.ID, + store: store, + } + + repo := &basePool{ + ctx: ctx, + store: store, + providers: providers, + controllerID: cfg.Internal.ControllerID, + quit: make(chan struct{}), + done: make(chan struct{}), + helper: helper, + } + return repo, nil +} + +type organization struct { + cfg params.Organization + ctx context.Context + ghcli *github.Client + id string + store dbCommon.Store + + mux sync.Mutex +} + +func (r *organization) UpdateState(param params.UpdatePoolStateParams) error { + r.mux.Lock() + defer r.mux.Unlock() + + r.cfg.WebhookSecret = param.WebhookSecret + r.cfg.Internal = param.Internal + + ghc, err := util.GithubClient(r.ctx, r.GetGithubToken()) + if err != nil { + return errors.Wrap(err, "getting github client") + } + r.ghcli = ghc + return nil +} + +func (r *organization) GetGithubToken() string { + return r.cfg.Internal.OAuth2Token +} + +func (r *organization) GetGithubRunners() ([]*github.Runner, error) { + runners, _, err := r.ghcli.Actions.ListOrganizationRunners(r.ctx, r.cfg.Name, nil) + if err != nil { + return nil, errors.Wrap(err, "fetching runners") + } + + return runners.Runners, nil +} + +func (r *organization) FetchTools() ([]*github.RunnerApplicationDownload, error) { + r.mux.Lock() + defer r.mux.Unlock() + tools, _, err := r.ghcli.Actions.ListOrganizationRunnerApplicationDownloads(r.ctx, r.cfg.Name) + if err != nil { + return nil, errors.Wrap(err, "fetching runner tools") + } + + return tools, nil +} + +func (r *organization) FetchDbInstances() ([]params.Instance, error) { + return r.store.ListRepoInstances(r.ctx, r.id) +} + +func (r *organization) RemoveGithubRunner(runnerID int64) error { + _, err := r.ghcli.Actions.RemoveOrganizationRunner(r.ctx, r.cfg.Name, runnerID) + return errors.Wrap(err, "removing runner") +} + +func (r *organization) ListPools() ([]params.Pool, error) { + pools, err := r.store.ListRepoPools(r.ctx, r.id) + if err != nil { + return nil, errors.Wrap(err, "fetching pools") + } + return pools, nil +} + +func (r *organization) GithubURL() string { + return fmt.Sprintf("%s/%s", config.GithubBaseURL, r.cfg.Name) +} + +func (r *organization) JwtToken() string { + return r.cfg.Internal.JWTSecret +} + +func (r *organization) GetGithubRegistrationToken() (string, error) { + tk, _, err := r.ghcli.Actions.CreateOrganizationRegistrationToken(r.ctx, r.cfg.Name) + + if err != nil { + return "", errors.Wrap(err, "creating runner token") + } + return *tk.Token, nil +} + +func (r *organization) String() string { + return fmt.Sprintf("%s", r.cfg.Name) +} + +func (r *organization) WebhookSecret() string { + return r.cfg.WebhookSecret +} + +func (r *organization) GetCallbackURL() string { + return r.cfg.Internal.InstanceCallbackURL +} + +func (r *organization) FindPoolByTags(labels []string) (params.Pool, error) { + pool, err := r.store.FindRepositoryPoolByTags(r.ctx, r.id, labels) + if err != nil { + return params.Pool{}, errors.Wrap(err, "fetching suitable pool") + } + return pool, nil +} + +func (r *organization) GetPoolByID(poolID string) (params.Pool, error) { + pool, err := r.store.GetRepositoryPool(r.ctx, r.id, poolID) + if err != nil { + return params.Pool{}, errors.Wrap(err, "fetching pool") + } + return pool, nil +} + +func (r *organization) ValidateOwner(job params.WorkflowJob) error { + if job.Organization.Login != r.cfg.Name { + return runnerErrors.NewBadRequestError("job not meant for this pool manager") + } + return nil +} diff --git a/runner/pool/repository.go b/runner/pool/repository.go index fa894156..ad4f4ec3 100644 --- a/runner/pool/repository.go +++ b/runner/pool/repository.go @@ -3,27 +3,21 @@ package pool import ( "context" "fmt" - "log" - "strings" "sync" - "time" - "garm/auth" "garm/config" dbCommon "garm/database/common" runnerErrors "garm/errors" "garm/params" "garm/runner/common" - providerCommon "garm/runner/providers/common" "garm/util" "github.com/google/go-github/v43/github" - "github.com/google/uuid" "github.com/pkg/errors" ) // test that we implement PoolManager -var _ common.PoolManager = &Repository{} +var _ poolHelper = &repository{} func NewRepositoryPoolManager(ctx context.Context, cfg params.Repository, providers map[string]common.Provider, store dbCommon.Store) (common.PoolManager, error) { ghc, err := util.GithubClient(ctx, cfg.Internal.OAuth2Token) @@ -31,42 +25,46 @@ func NewRepositoryPoolManager(ctx context.Context, cfg params.Repository, provid return nil, errors.Wrap(err, "getting github client") } - repo := &Repository{ + helper := &repository{ + cfg: cfg, + ctx: ctx, + ghcli: ghc, + id: cfg.ID, + store: store, + } + + repo := &basePool{ ctx: ctx, - cfg: cfg, - ghcli: ghc, - id: cfg.ID, store: store, providers: providers, controllerID: cfg.Internal.ControllerID, quit: make(chan struct{}), done: make(chan struct{}), + helper: helper, } - return repo, nil } -type Repository struct { - ctx context.Context - controllerID string - cfg params.Repository - store dbCommon.Store - ghcli *github.Client - providers map[string]common.Provider - tools []*github.RunnerApplicationDownload - quit chan struct{} - done chan struct{} - id string +var _ poolHelper = &repository{} + +type repository struct { + cfg params.Repository + ctx context.Context + ghcli *github.Client + id string + store dbCommon.Store mux sync.Mutex } -func (r *Repository) RefreshState(cfg params.Repository) error { +func (r *repository) UpdateState(param params.UpdatePoolStateParams) error { r.mux.Lock() defer r.mux.Unlock() - r.cfg = cfg - ghc, err := util.GithubClient(r.ctx, r.cfg.Internal.OAuth2Token) + r.cfg.WebhookSecret = param.WebhookSecret + r.cfg.Internal = param.Internal + + ghc, err := util.GithubClient(r.ctx, r.GetGithubToken()) if err != nil { return errors.Wrap(err, "getting github client") } @@ -74,7 +72,11 @@ func (r *Repository) RefreshState(cfg params.Repository) error { return nil } -func (r *Repository) getGithubRunners() ([]*github.Runner, error) { +func (r *repository) GetGithubToken() string { + return r.cfg.Internal.OAuth2Token +} + +func (r *repository) GetGithubRunners() ([]*github.Runner, error) { runners, _, err := r.ghcli.Actions.ListRunners(r.ctx, r.cfg.Owner, r.cfg.Name, nil) if err != nil { return nil, errors.Wrap(err, "fetching runners") @@ -83,580 +85,82 @@ func (r *Repository) getGithubRunners() ([]*github.Runner, error) { return runners.Runners, nil } -func (r *Repository) getProviderInstances() ([]params.Instance, error) { - return nil, nil -} - -func (r *Repository) Start() error { - if err := r.fetchTools(); err != nil { - return errors.Wrap(err, "initializing tools") - } - - runners, err := r.getGithubRunners() - if err != nil { - return errors.Wrap(err, "fetching github runners") - } - if err := r.cleanupOrphanedProviderRunners(runners); err != nil { - return errors.Wrap(err, "cleaning orphaned instances") - } - - if err := r.cleanupOrphanedGithubRunners(runners); err != nil { - return errors.Wrap(err, "cleaning orphaned github runners") - } - go r.loop() - return nil -} - -func (r *Repository) Stop() error { - close(r.quit) - return nil -} - -func (r *Repository) fetchTools() error { +func (r *repository) FetchTools() ([]*github.RunnerApplicationDownload, error) { r.mux.Lock() defer r.mux.Unlock() tools, _, err := r.ghcli.Actions.ListRunnerApplicationDownloads(r.ctx, r.cfg.Owner, r.cfg.Name) if err != nil { - return errors.Wrap(err, "fetching runner tools") + return nil, errors.Wrap(err, "fetching runner tools") } - r.tools = tools - return nil + + return tools, nil } -func (r *Repository) Wait() error { - select { - case <-r.done: - case <-time.After(20 * time.Second): - return errors.Wrap(runnerErrors.ErrTimeout, "waiting for pool to stop") - } - return nil +func (r *repository) FetchDbInstances() ([]params.Instance, error) { + return r.store.ListRepoInstances(r.ctx, r.id) } -func (r *Repository) consolidate() { - r.mux.Lock() - defer r.mux.Unlock() - - r.deletePendingInstances() - r.addPendingInstances() - r.ensureMinIdleRunners() +func (r *repository) RemoveGithubRunner(runnerID int64) error { + _, err := r.ghcli.Actions.RemoveRunner(r.ctx, r.cfg.Owner, r.cfg.Name, runnerID) + return errors.Wrap(err, "removing runner") } -func (r *Repository) addPendingInstances() { - // TODO: filter instances by status. - instances, err := r.store.ListRepoInstances(r.ctx, r.id) - if err != nil { - log.Printf("failed to fetch instances from store: %s", err) - return - } - - for _, instance := range instances { - if instance.Status != providerCommon.InstancePendingCreate { - // not in pending_create status. Skip. - continue - } - // asJs, _ := json.MarshalIndent(instance, "", " ") - // log.Printf(">>> %s", string(asJs)) - if err := r.addInstanceToProvider(instance); err != nil { - log.Printf("failed to create instance in provider: %s", err) - } - } -} - -func (r *Repository) deletePendingInstances() { - instances, err := r.store.ListRepoInstances(r.ctx, r.id) - if err != nil { - log.Printf("failed to fetch instances from store: %s", err) - return - } - - for _, instance := range instances { - if instance.Status != providerCommon.InstancePendingDelete { - // not in pending_delete status. Skip. - continue - } - - if err := r.deleteInstanceFromProvider(instance); err != nil { - log.Printf("failed to delete instance from provider: %+v", err) - } - } -} - -func (r *Repository) poolIDFromLabels(labels []*github.RunnerLabels) (string, error) { - for _, lbl := range labels { - if strings.HasPrefix(*lbl.Name, poolIDLabelprefix) { - labelName := *lbl.Name - return labelName[len(poolIDLabelprefix):], nil - } - } - return "", runnerErrors.ErrNotFound -} - -// cleanupOrphanedGithubRunners will forcefully remove any github runners that appear -// as offline and for which we no longer have a local instance. -// This may happen if someone manually deletes the instance in the provider. We need to -// first remove the instance from github, and then from our database. -func (r *Repository) cleanupOrphanedGithubRunners(runners []*github.Runner) error { - for _, runner := range runners { - status := runner.GetStatus() - if status != "offline" { - // Runner is online. Ignore it. - continue - } - - removeRunner := false - poolID, err := r.poolIDFromLabels(runner.Labels) - if err != nil { - if !errors.Is(err, runnerErrors.ErrNotFound) { - return errors.Wrap(err, "finding pool") - } - // not a runner we manage - continue - } - - pool, err := r.store.GetRepositoryPool(r.ctx, r.id, poolID) - if err != nil { - if !errors.Is(err, runnerErrors.ErrNotFound) { - return errors.Wrap(err, "fetching pool") - } - // not pool we manage. - continue - } - - dbInstance, err := r.store.GetPoolInstanceByName(r.ctx, poolID, *runner.Name) - if err != nil { - if !errors.Is(err, runnerErrors.ErrNotFound) { - return errors.Wrap(err, "fetching instance from DB") - } - // We no longer have a DB entry for this instance. Previous forceful - // removal may have failed? - removeRunner = true - } else { - if providerCommon.InstanceStatus(dbInstance.Status) == providerCommon.InstancePendingDelete { - // already marked for deleting. Let consolidate take care of it. - continue - } - // check if the provider still has the instance. - provider, ok := r.providers[pool.ProviderName] - if !ok { - return fmt.Errorf("unknown provider %s for pool %s", pool.ProviderName, pool.ID) - } - - if providerCommon.InstanceStatus(dbInstance.Status) == providerCommon.InstanceRunning { - // instance is running, but github reports runner as offline. Log the event. - // This scenario requires manual intervention. - // Perhaps it just came online and github did not yet change it's status? - log.Printf("instance %s is online but github reports runner as offline", dbInstance.Name) - continue - } - //start the instance - if err := provider.Start(r.ctx, dbInstance.ProviderID); err != nil { - return errors.Wrapf(err, "starting instance %s", dbInstance.ProviderID) - } - // we started the instance. Give it a chance to come online - continue - } - - if removeRunner { - if _, err := r.ghcli.Actions.RemoveRunner(r.ctx, r.cfg.Owner, r.cfg.Name, *runner.ID); err != nil { - return errors.Wrap(err, "removing runner") - } - } - } - return nil -} - -// cleanupOrphanedProviderRunners compares runners in github with local runners and removes -// any local runners that are not present in Github. Runners that are "idle" in our -// provider, but do not exist in github, will be removed. This can happen if the -// garm was offline while a job was executed by a github action. When this -// happens, github will remove the ephemeral worker and send a webhook our way. -// If we were offline and did not process the webhook, the instance will linger. -// We need to remove it from the provider and database. -func (r *Repository) cleanupOrphanedProviderRunners(runners []*github.Runner) error { - // runners, err := r.getGithubRunners() - // if err != nil { - // return errors.Wrap(err, "fetching github runners") - // } - - dbInstances, err := r.store.ListRepoInstances(r.ctx, r.id) - if err != nil { - return errors.Wrap(err, "fetching instances from db") - } - - runnerNames := map[string]bool{} - for _, run := range runners { - runnerNames[*run.Name] = true - } - - for _, instance := range dbInstances { - if providerCommon.InstanceStatus(instance.Status) == providerCommon.InstancePendingCreate || providerCommon.InstanceStatus(instance.Status) == providerCommon.InstancePendingDelete { - // this instance is in the process of being created or is awaiting deletion. - // Instances in pending_Create did not get a chance to register themselves in, - // github so we let them be for now. - continue - } - if ok := runnerNames[instance.Name]; !ok { - // Set pending_delete on DB field. Allow consolidate() to remove it. - _, err = r.store.UpdateInstance(r.ctx, instance.ID, params.UpdateInstanceParams{}) - if err != nil { - return errors.Wrap(err, "syncing local state with github") - } - } - } - return nil -} - -func (r *Repository) ensureMinIdleRunners() { +func (r *repository) ListPools() ([]params.Pool, error) { pools, err := r.store.ListRepoPools(r.ctx, r.id) if err != nil { - log.Printf("error listing pools: %s", err) - return - } - for _, pool := range pools { - if !pool.Enabled { - log.Printf("pool %s is disabled, skipping", pool.ID) - continue - } - existingInstances, err := r.store.ListInstances(r.ctx, pool.ID) - if err != nil { - log.Printf("failed to ensure minimum idle workers for pool %s: %s", pool.ID, err) - return - } - - // asJs, _ := json.MarshalIndent(existingInstances, "", " ") - // log.Printf(">>> %s", string(asJs)) - if uint(len(existingInstances)) >= pool.MaxRunners { - log.Printf("max workers (%d) reached for pool %s, skipping idle worker creation", pool.MaxRunners, pool.ID) - continue - } - - idleOrPendingWorkers := []params.Instance{} - for _, inst := range existingInstances { - if providerCommon.RunnerStatus(inst.RunnerStatus) != providerCommon.RunnerActive { - idleOrPendingWorkers = append(idleOrPendingWorkers, inst) - } - } - - var required int - if len(idleOrPendingWorkers) < int(pool.MinIdleRunners) { - // get the needed delta. - required = int(pool.MinIdleRunners) - len(idleOrPendingWorkers) - - projectedInstanceCount := len(existingInstances) + required - if uint(projectedInstanceCount) > pool.MaxRunners { - // ensure we don't go above max workers - delta := projectedInstanceCount - int(pool.MaxRunners) - required = required - delta - } - } - - for i := 0; i < required; i++ { - log.Printf("addind new idle worker to pool %s", pool.ID) - if err := r.AddRunner(r.ctx, pool.ID); err != nil { - log.Printf("failed to add new instance for pool %s: %s", pool.ID, err) - } - } + return nil, errors.Wrap(err, "fetching pools") } + return pools, nil } -func (r *Repository) githubURL() string { +func (r *repository) GithubURL() string { return fmt.Sprintf("%s/%s/%s", config.GithubBaseURL, r.cfg.Owner, r.cfg.Name) } -func (r *Repository) poolLabel(poolID string) string { - return fmt.Sprintf("%s%s", poolIDLabelprefix, poolID) +func (r *repository) JwtToken() string { + return r.cfg.Internal.JWTSecret } -func (r *Repository) controllerLabel() string { - return fmt.Sprintf("%s%s", controllerLabelPrefix, r.controllerID) -} - -func (r *Repository) updateArgsFromProviderInstance(providerInstance params.Instance) params.UpdateInstanceParams { - return params.UpdateInstanceParams{ - ProviderID: providerInstance.ProviderID, - OSName: providerInstance.OSName, - OSVersion: providerInstance.OSVersion, - Addresses: providerInstance.Addresses, - Status: providerInstance.Status, - RunnerStatus: providerInstance.RunnerStatus, - } -} - -func (r *Repository) deleteInstanceFromProvider(instance params.Instance) error { - pool, err := r.store.GetRepositoryPool(r.ctx, r.id, instance.PoolID) - if err != nil { - return errors.Wrap(err, "fetching pool") - } - - provider, ok := r.providers[pool.ProviderName] - if !ok { - return runnerErrors.NewNotFoundError("invalid provider ID") - } - - if err := provider.DeleteInstance(r.ctx, instance.ProviderID); err != nil { - return errors.Wrap(err, "removing instance") - } - - if err := r.store.DeleteInstance(r.ctx, pool.ID, instance.Name); err != nil { - return errors.Wrap(err, "deleting instance from database") - } - return nil -} - -func (r *Repository) addInstanceToProvider(instance params.Instance) error { - pool, err := r.store.GetRepositoryPool(r.ctx, r.id, instance.PoolID) - if err != nil { - return errors.Wrap(err, "fetching pool") - } - - provider, ok := r.providers[pool.ProviderName] - if !ok { - return runnerErrors.NewNotFoundError("invalid provider ID") - } - - labels := []string{} - for _, tag := range pool.Tags { - labels = append(labels, tag.Name) - } - labels = append(labels, r.controllerLabel()) - labels = append(labels, r.poolLabel(pool.ID)) - +func (r *repository) GetGithubRegistrationToken() (string, error) { tk, _, err := r.ghcli.Actions.CreateRegistrationToken(r.ctx, r.cfg.Owner, r.cfg.Name) if err != nil { - return errors.Wrap(err, "creating runner token") + return "", errors.Wrap(err, "creating runner token") } - - entity := fmt.Sprintf("%s/%s", r.cfg.Owner, r.cfg.Name) - jwtToken, err := auth.NewInstanceJWTToken(instance, r.cfg.Internal.JWTSecret, entity, common.RepositoryPool) - if err != nil { - return errors.Wrap(err, "fetching instance jwt token") - } - - bootstrapArgs := params.BootstrapInstance{ - Name: instance.Name, - Tools: r.tools, - RepoURL: r.githubURL(), - GithubRunnerAccessToken: *tk.Token, - CallbackURL: instance.CallbackURL, - InstanceToken: jwtToken, - OSArch: pool.OSArch, - Flavor: pool.Flavor, - Image: pool.Image, - Labels: labels, - } - - providerInstance, err := provider.CreateInstance(r.ctx, bootstrapArgs) - if err != nil { - return errors.Wrap(err, "creating instance") - } - - updateInstanceArgs := r.updateArgsFromProviderInstance(providerInstance) - if _, err := r.store.UpdateInstance(r.ctx, instance.ID, updateInstanceArgs); err != nil { - return errors.Wrap(err, "updating instance") - } - return nil + return *tk.Token, nil } -func (r *Repository) AddRunner(ctx context.Context, poolID string) error { - pool, err := r.store.GetRepositoryPool(r.ctx, r.id, poolID) - if err != nil { - return errors.Wrap(err, "fetching pool") - } - - name := fmt.Sprintf("garm-%s", uuid.New()) - - createParams := params.CreateInstanceParams{ - Name: name, - Pool: poolID, - Status: providerCommon.InstancePendingCreate, - RunnerStatus: providerCommon.RunnerPending, - OSArch: pool.OSArch, - OSType: pool.OSType, - CallbackURL: r.cfg.Internal.InstanceCallbackURL, - } - - instance, err := r.store.CreateInstance(r.ctx, poolID, createParams) - if err != nil { - return errors.Wrap(err, "creating instance") - } - - updateParams := params.UpdateInstanceParams{ - OSName: instance.OSName, - OSVersion: instance.OSVersion, - Addresses: instance.Addresses, - Status: instance.Status, - ProviderID: instance.ProviderID, - } - - if _, err := r.store.UpdateInstance(r.ctx, instance.ID, updateParams); err != nil { - return errors.Wrap(err, "updating runner state") - } - - return nil +func (r *repository) String() string { + return fmt.Sprintf("%s/%s", r.cfg.Owner, r.cfg.Name) } -func (r *Repository) loop() { - defer func() { - log.Printf("repository %s/%s loop exited", r.cfg.Owner, r.cfg.Name) - close(r.done) - }() - log.Printf("starting loop for %s/%s", r.cfg.Owner, r.cfg.Name) - // TODO: Consolidate runners on loop start. Provider runners must match runners - // in github and DB. When a Workflow job is received, we will first create/update - // an entity in the database, before sending the request to the provider to create/delete - // an instance. If a "queued" job is received, we create an entity in the db with - // a state of "pending_create". Once that instance is up and calls home, it is marked - // as "active". If a "completed" job is received from github, we mark the instance - // as "pending_delete". Once the provider deletes the instance, we mark it as "deleted" - // in the database. - // We also ensure we have runners created based on pool characteristics. This is where - // we spin up "MinWorkers" for each runner type. - - for { - select { - case <-time.After(5 * time.Second): - // consolidate. - r.consolidate() - case <-time.After(3 * time.Hour): - // Update tools cache. - if err := r.fetchTools(); err != nil { - log.Printf("failed to update tools for repo %s/%s: %s", r.cfg.Owner, r.cfg.Name, err) - } - case <-r.ctx.Done(): - // daemon is shutting down. - return - case <-r.quit: - // this worker was stopped. - return - } - } -} - -func (r *Repository) WebhookSecret() string { +func (r *repository) WebhookSecret() string { return r.cfg.WebhookSecret } -func (r *Repository) poolIDFromStringLabels(labels []string) (string, error) { - for _, lbl := range labels { - if strings.HasPrefix(lbl, poolIDLabelprefix) { - return lbl[len(poolIDLabelprefix):], nil - } - } - return "", runnerErrors.ErrNotFound +func (r *repository) GetCallbackURL() string { + return r.cfg.Internal.InstanceCallbackURL } -func (r *Repository) fetchInstanceFromJob(job params.WorkflowJob) (params.Instance, error) { - // asJs, _ := json.MarshalIndent(job, "", " ") - // log.Printf(">>> Job data: %s", string(asJs)) - runnerName := job.WorkflowJob.RunnerName - runner, err := r.store.GetInstanceByName(r.ctx, runnerName) +func (r *repository) FindPoolByTags(labels []string) (params.Pool, error) { + pool, err := r.store.FindRepositoryPoolByTags(r.ctx, r.id, labels) if err != nil { - return params.Instance{}, errors.Wrap(err, "fetching instance") + return params.Pool{}, errors.Wrap(err, "fetching suitable pool") } - - return runner, nil + return pool, nil } -func (r *Repository) setInstanceRunnerStatus(job params.WorkflowJob, status providerCommon.RunnerStatus) error { - runner, err := r.fetchInstanceFromJob(job) +func (r *repository) GetPoolByID(poolID string) (params.Pool, error) { + pool, err := r.store.GetRepositoryPool(r.ctx, r.id, poolID) if err != nil { - return errors.Wrap(err, "fetching instance") + return params.Pool{}, errors.Wrap(err, "fetching pool") } - - updateParams := params.UpdateInstanceParams{ - RunnerStatus: status, - } - - log.Printf("setting runner status for %s to %s", runner.Name, status) - if _, err := r.store.UpdateInstance(r.ctx, runner.ID, updateParams); err != nil { - return errors.Wrap(err, "updating runner state") - } - return nil + return pool, nil } -func (r *Repository) setInstanceStatus(job params.WorkflowJob, status providerCommon.InstanceStatus) error { - runner, err := r.fetchInstanceFromJob(job) - if err != nil { - return errors.Wrap(err, "fetching instance") - } - - updateParams := params.UpdateInstanceParams{ - Status: status, - } - - if _, err := r.store.UpdateInstance(r.ctx, runner.ID, updateParams); err != nil { - return errors.Wrap(err, "updating runner state") - } - return nil -} - -func (r *Repository) acquireNewInstance(job params.WorkflowJob) error { - requestedLabels := job.WorkflowJob.Labels - if len(requestedLabels) == 0 { - // no labels were requested. - return nil - } - - pool, err := r.store.FindRepositoryPoolByTags(r.ctx, r.id, requestedLabels) - if err != nil { - return errors.Wrap(err, "fetching suitable pool") - } - log.Printf("adding new runner with requested tags %s in pool %s", strings.Join(job.WorkflowJob.Labels, ", "), pool.ID) - - if !pool.Enabled { - log.Printf("selected pool (%s) is disabled", pool.ID) - return nil - } - - // TODO: implement count - poolInstances, err := r.store.ListInstances(r.ctx, pool.ID) - if err != nil { - return errors.Wrap(err, "fetching instances") - } - - if len(poolInstances) >= int(pool.MaxRunners) { - log.Printf("max_runners (%d) reached for pool %s, skipping...", pool.MaxRunners, pool.ID) - return nil - } - - if err := r.AddRunner(r.ctx, pool.ID); err != nil { - log.Printf("failed to add runner to pool %s", pool.ID) - return errors.Wrap(err, "adding runner") - } - return nil -} - -func (r *Repository) HandleWorkflowJob(job params.WorkflowJob) error { +func (r *repository) ValidateOwner(job params.WorkflowJob) error { if job.Repository.Name != r.cfg.Name || job.Repository.Owner.Login != r.cfg.Owner { return runnerErrors.NewBadRequestError("job not meant for this pool manager") } - - switch job.Action { - case "queued": - // Create instance in database and set it to pending create. - if err := r.acquireNewInstance(job); err != nil { - log.Printf("failed to add instance") - } - case "completed": - // Set instance in database to pending delete. - if job.WorkflowJob.RunnerName == "" { - // Unassigned jobs will have an empty runner_name. - // There is nothing to to in this case. - log.Printf("no runner was assigned. Skipping.") - return nil - } - log.Printf("marking instance %s as pending_delete", job.WorkflowJob.RunnerName) - if err := r.setInstanceStatus(job, providerCommon.InstancePendingDelete); err != nil { - log.Printf("failed to update runner %s status", job.WorkflowJob.RunnerName) - return errors.Wrap(err, "updating runner") - } - case "in_progress": - // update instance workload state. Set job_id in instance state. - if err := r.setInstanceRunnerStatus(job, providerCommon.RunnerActive); err != nil { - log.Printf("failed to update runner %s status", job.WorkflowJob.RunnerName) - return errors.Wrap(err, "updating runner") - } - } return nil } diff --git a/runner/repositories.go b/runner/repositories.go index 463a2339..0007f296 100644 --- a/runner/repositories.go +++ b/runner/repositories.go @@ -2,13 +2,12 @@ package runner import ( "context" - "log" "garm/auth" runnerErrors "garm/errors" "garm/params" "garm/runner/common" "garm/runner/pool" - "garm/util" + "log" "strings" "github.com/pkg/errors" @@ -150,9 +149,13 @@ func (r *Runner) UpdateRepository(ctx context.Context, repoID string, param para if err != nil { return params.Repository{}, errors.Wrap(err, "fetching internal config") } + newState := params.UpdatePoolStateParams{ + WebhookSecret: repo.WebhookSecret, + Internal: internalCfg, + } repo.Internal = internalCfg // stop the pool mgr - if err := poolMgr.RefreshState(repo); err != nil { + if err := poolMgr.RefreshState(newState); err != nil { return params.Repository{}, errors.Wrap(err, "updating pool manager") } } else { @@ -179,49 +182,12 @@ func (r *Runner) CreateRepoPool(ctx context.Context, repoID string, param params return params.Pool{}, runnerErrors.ErrNotFound } - if err := param.Validate(); err != nil { - return params.Pool{}, errors.Wrapf(runnerErrors.ErrBadRequest, "validating params: %s", err) - } - - if !IsSupportedOSType(param.OSType) { - return params.Pool{}, runnerErrors.NewBadRequestError("invalid OS type %s", param.OSType) - } - - if !IsSupportedArch(param.OSArch) { - return params.Pool{}, runnerErrors.NewBadRequestError("invalid OS architecture %s", param.OSArch) - } - - _, ok = r.providers[param.ProviderName] - if !ok { - return params.Pool{}, runnerErrors.NewBadRequestError("no such provider %s", param.ProviderName) - } - - // github automatically adds the "self-hosted" tag as well as the OS type (linux, windows, etc) - // and architecture (arm, x64, etc) to all self hosted runners. When a workflow job comes in, we try - // to find a pool based on the labels that are set in the workflow. If we don't explicitly define these - // default tags for each pool, and the user targets these labels, we won't be able to match any pools. - // The downside is that all pools with the same OS and arch will have these default labels. Users should - // set distinct and unique labels on each pool, and explicitly target those labels, or risk assigning - // the job to the wrong worker type. - ghArch, err := util.ResolveToGithubArch(string(param.OSArch)) + createPoolParams, err := r.appendTagsToCreatePoolParams(param) if err != nil { - return params.Pool{}, errors.Wrap(err, "invalid arch") + return params.Pool{}, errors.Wrap(err, "fetching pool params") } - osType, err := util.ResolveToGithubOSType(string(param.OSType)) - if err != nil { - return params.Pool{}, errors.Wrap(err, "invalid os type") - } - - extraLabels := []string{ - "self-hosted", - ghArch, - osType, - } - - param.Tags = append(param.Tags, extraLabels...) - - pool, err := r.store.CreateRepositoryPool(ctx, repoID, param) + pool, err := r.store.CreateRepositoryPool(ctx, repoID, createPoolParams) if err != nil { return params.Pool{}, errors.Wrap(err, "creating pool") } @@ -295,34 +261,6 @@ func (r *Runner) ListPoolInstances(ctx context.Context, poolID string) ([]params return instances, nil } -func (r *Runner) loadRepoPoolManager(repo params.Repository) (common.PoolManager, error) { - cfg, err := r.getInternalConfig(repo.CredentialsName) - if err != nil { - return nil, errors.Wrap(err, "fetching internal config") - } - repo.Internal = cfg - poolManager, err := pool.NewRepositoryPoolManager(r.ctx, repo, r.providers, r.store) - if err != nil { - return nil, errors.Wrap(err, "creating pool manager") - } - return poolManager, nil -} - -func (r *Runner) findRepoPoolManager(owner, name string) (common.PoolManager, error) { - r.mux.Lock() - defer r.mux.Unlock() - - repo, err := r.store.GetRepository(r.ctx, owner, name) - if err != nil { - return nil, errors.Wrap(err, "fetching repo") - } - - if repo, ok := r.repositories[repo.ID]; ok { - return repo, nil - } - return nil, errors.Wrapf(runnerErrors.ErrNotFound, "repository %s/%s not configured", owner, name) -} - func (r *Runner) UpdateRepoPool(ctx context.Context, repoID, poolID string, param params.UpdatePoolParams) (params.Pool, error) { if !auth.IsAdmin(ctx) { return params.Pool{}, runnerErrors.ErrUnauthorized @@ -366,40 +304,30 @@ func (r *Runner) ListRepoInstances(ctx context.Context, repoID string) ([]params return instances, nil } -// TODO: move these in another file - -func (r *Runner) GetInstance(ctx context.Context, instanceName string) (params.Instance, error) { - if !auth.IsAdmin(ctx) { - return params.Instance{}, runnerErrors.ErrUnauthorized - } - - instance, err := r.store.GetInstanceByName(ctx, instanceName) +func (r *Runner) loadRepoPoolManager(repo params.Repository) (common.PoolManager, error) { + cfg, err := r.getInternalConfig(repo.CredentialsName) if err != nil { - return params.Instance{}, errors.Wrap(err, "fetching instance") + return nil, errors.Wrap(err, "fetching internal config") } - return instance, nil + repo.Internal = cfg + poolManager, err := pool.NewRepositoryPoolManager(r.ctx, repo, r.providers, r.store) + if err != nil { + return nil, errors.Wrap(err, "creating pool manager") + } + return poolManager, nil } -func (r *Runner) AddInstanceStatusMessage(ctx context.Context, param params.InstanceUpdateMessage) error { - instanceID := auth.InstanceID(ctx) - if instanceID == "" { - return runnerErrors.ErrUnauthorized +func (r *Runner) findRepoPoolManager(owner, name string) (common.PoolManager, error) { + r.mux.Lock() + defer r.mux.Unlock() + + repo, err := r.store.GetRepository(r.ctx, owner, name) + if err != nil { + return nil, errors.Wrap(err, "fetching repo") } - if err := r.store.AddInstanceStatusMessage(ctx, instanceID, param.Message); err != nil { - return errors.Wrap(err, "adding status update") + if repo, ok := r.repositories[repo.ID]; ok { + return repo, nil } - - // if param.Status == providerCommon.RunnerIdle { - // } - - updateParams := params.UpdateInstanceParams{ - RunnerStatus: param.Status, - } - - if _, err := r.store.UpdateInstance(r.ctx, instanceID, updateParams); err != nil { - return errors.Wrap(err, "updating runner state") - } - - return nil + return nil, errors.Wrapf(runnerErrors.ErrNotFound, "repository %s/%s not configured", owner, name) } diff --git a/runner/runner.go b/runner/runner.go index 295d4a19..87152aa6 100644 --- a/runner/runner.go +++ b/runner/runner.go @@ -15,6 +15,7 @@ import ( "strings" "sync" + "garm/auth" "garm/config" "garm/database" dbCommon "garm/database/common" @@ -101,8 +102,6 @@ func (r *Runner) ListProviders(ctx context.Context) ([]params.Provider, error) { ret := []params.Provider{} for _, val := range r.providers { - params := val.AsParams() - log.Printf(">>>>> %s", params.Name) ret = append(ret, val.AsParams()) } return ret, nil @@ -140,6 +139,20 @@ func (r *Runner) loadReposAndOrgs() error { r.repositories[repo.ID] = poolManager } + orgs, err := r.store.ListOrganizations(r.ctx) + if err != nil { + return errors.Wrap(err, "fetching repositories") + } + + for _, org := range orgs { + log.Printf("creating pool manager for organization %s", org.Name) + poolManager, err := r.loadOrgPoolManager(org) + if err != nil { + return errors.Wrap(err, "loading repo pool manager") + } + r.organizations[org.ID] = poolManager + } + return nil } @@ -208,21 +221,6 @@ func (r *Runner) Wait() error { return nil } -func (r *Runner) findOrgPoolManager(name string) (common.PoolManager, error) { - r.mux.Lock() - defer r.mux.Unlock() - - org, err := r.store.GetOrganization(r.ctx, name) - if err != nil { - return nil, errors.Wrap(err, "fetching repo") - } - - if orgPoolMgr, ok := r.organizations[org.ID]; ok { - return orgPoolMgr, nil - } - return nil, errors.Wrapf(runnerErrors.ErrNotFound, "organization %s not configured", name) -} - func (r *Runner) validateHookBody(signature, secret string, body []byte) error { if secret == "" { // A secret was not set. Skip validation of body. @@ -382,3 +380,85 @@ func (r *Runner) ensureSSHKeys() error { return nil } + +func (r *Runner) appendTagsToCreatePoolParams(param params.CreatePoolParams) (params.CreatePoolParams, error) { + if err := param.Validate(); err != nil { + return params.CreatePoolParams{}, errors.Wrapf(runnerErrors.ErrBadRequest, "validating params: %s", err) + } + + if !IsSupportedOSType(param.OSType) { + return params.CreatePoolParams{}, runnerErrors.NewBadRequestError("invalid OS type %s", param.OSType) + } + + if !IsSupportedArch(param.OSArch) { + return params.CreatePoolParams{}, runnerErrors.NewBadRequestError("invalid OS architecture %s", param.OSArch) + } + + _, ok := r.providers[param.ProviderName] + if !ok { + return params.CreatePoolParams{}, runnerErrors.NewBadRequestError("no such provider %s", param.ProviderName) + } + + // github automatically adds the "self-hosted" tag as well as the OS type (linux, windows, etc) + // and architecture (arm, x64, etc) to all self hosted runners. When a workflow job comes in, we try + // to find a pool based on the labels that are set in the workflow. If we don't explicitly define these + // default tags for each pool, and the user targets these labels, we won't be able to match any pools. + // The downside is that all pools with the same OS and arch will have these default labels. Users should + // set distinct and unique labels on each pool, and explicitly target those labels, or risk assigning + // the job to the wrong worker type. + ghArch, err := util.ResolveToGithubArch(string(param.OSArch)) + if err != nil { + return params.CreatePoolParams{}, errors.Wrap(err, "invalid arch") + } + + osType, err := util.ResolveToGithubOSType(string(param.OSType)) + if err != nil { + return params.CreatePoolParams{}, errors.Wrap(err, "invalid os type") + } + + extraLabels := []string{ + "self-hosted", + ghArch, + osType, + } + + param.Tags = append(param.Tags, extraLabels...) + + return param, nil +} + +func (r *Runner) GetInstance(ctx context.Context, instanceName string) (params.Instance, error) { + if !auth.IsAdmin(ctx) { + return params.Instance{}, runnerErrors.ErrUnauthorized + } + + instance, err := r.store.GetInstanceByName(ctx, instanceName) + if err != nil { + return params.Instance{}, errors.Wrap(err, "fetching instance") + } + return instance, nil +} + +func (r *Runner) AddInstanceStatusMessage(ctx context.Context, param params.InstanceUpdateMessage) error { + instanceID := auth.InstanceID(ctx) + if instanceID == "" { + return runnerErrors.ErrUnauthorized + } + + if err := r.store.AddInstanceStatusMessage(ctx, instanceID, param.Message); err != nil { + return errors.Wrap(err, "adding status update") + } + + // if param.Status == providerCommon.RunnerIdle { + // } + + updateParams := params.UpdateInstanceParams{ + RunnerStatus: param.Status, + } + + if _, err := r.store.UpdateInstance(r.ctx, instanceID, updateParams); err != nil { + return errors.Wrap(err, "updating runner state") + } + + return nil +}