Add organizations

This commit is contained in:
Gabriel Adrian Samfira 2022-05-04 16:27:24 +00:00
parent 3e416d8272
commit 095b43ffb4
19 changed files with 2395 additions and 1240 deletions

View file

@ -12,7 +12,6 @@ import (
runnerParams "garm/params"
"garm/runner"
"github.com/gorilla/mux"
"github.com/pkg/errors"
)
@ -194,350 +193,3 @@ func (a *APIController) ListProviders(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(providers)
}
func (a *APIController) CreateRepoHandler(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
var repoData runnerParams.CreateRepoParams
if err := json.NewDecoder(r.Body).Decode(&repoData); err != nil {
handleError(w, gErrors.ErrBadRequest)
return
}
repo, err := a.r.CreateRepository(ctx, repoData)
if err != nil {
log.Printf("error creating repository: %s", err)
handleError(w, err)
return
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(repo)
}
func (a *APIController) ListReposHandler(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
repos, err := a.r.ListRepositories(ctx)
if err != nil {
log.Printf("listing repos: %s", err)
handleError(w, err)
return
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(repos)
}
func (a *APIController) GetRepoByIDHandler(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
vars := mux.Vars(r)
repoID, ok := vars["repoID"]
if !ok {
w.WriteHeader(http.StatusBadRequest)
json.NewEncoder(w).Encode(params.APIErrorResponse{
Error: "Bad Request",
Details: "No repo ID specified",
})
return
}
repo, err := a.r.GetRepositoryByID(ctx, repoID)
if err != nil {
log.Printf("fetching repo: %s", err)
handleError(w, err)
return
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(repo)
}
func (a *APIController) DeleteRepoHandler(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
vars := mux.Vars(r)
repoID, ok := vars["repoID"]
if !ok {
w.WriteHeader(http.StatusBadRequest)
json.NewEncoder(w).Encode(params.APIErrorResponse{
Error: "Bad Request",
Details: "No repo ID specified",
})
return
}
if err := a.r.DeleteRepository(ctx, repoID); err != nil {
log.Printf("fetching repo: %s", err)
handleError(w, err)
return
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
}
func (a *APIController) UpdateRepoHandler(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
vars := mux.Vars(r)
repoID, ok := vars["repoID"]
if !ok {
w.WriteHeader(http.StatusBadRequest)
json.NewEncoder(w).Encode(params.APIErrorResponse{
Error: "Bad Request",
Details: "No repo ID specified",
})
return
}
var updatePayload runnerParams.UpdateRepositoryParams
if err := json.NewDecoder(r.Body).Decode(&updatePayload); err != nil {
handleError(w, gErrors.ErrBadRequest)
return
}
repo, err := a.r.UpdateRepository(ctx, repoID, updatePayload)
if err != nil {
log.Printf("error updating repository: %s", err)
handleError(w, err)
return
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(repo)
}
func (a *APIController) CreateRepoPoolHandler(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
vars := mux.Vars(r)
repoID, ok := vars["repoID"]
if !ok {
w.WriteHeader(http.StatusBadRequest)
json.NewEncoder(w).Encode(params.APIErrorResponse{
Error: "Bad Request",
Details: "No repo ID specified",
})
return
}
var poolData runnerParams.CreatePoolParams
if err := json.NewDecoder(r.Body).Decode(&poolData); err != nil {
log.Printf("failed to decode: %s", err)
handleError(w, gErrors.ErrBadRequest)
return
}
pool, err := a.r.CreateRepoPool(ctx, repoID, poolData)
if err != nil {
log.Printf("error creating repository pool: %s", err)
handleError(w, err)
return
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(pool)
}
func (a *APIController) ListRepoPoolsHandler(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
vars := mux.Vars(r)
repoID, ok := vars["repoID"]
if !ok {
w.WriteHeader(http.StatusBadRequest)
json.NewEncoder(w).Encode(params.APIErrorResponse{
Error: "Bad Request",
Details: "No repo ID specified",
})
return
}
pools, err := a.r.ListRepoPools(ctx, repoID)
if err != nil {
log.Printf("listing pools: %s", err)
handleError(w, err)
return
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(pools)
}
func (a *APIController) GetRepoPoolHandler(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
vars := mux.Vars(r)
repoID, repoOk := vars["repoID"]
poolID, poolOk := vars["poolID"]
if !repoOk || !poolOk {
w.WriteHeader(http.StatusBadRequest)
json.NewEncoder(w).Encode(params.APIErrorResponse{
Error: "Bad Request",
Details: "No repo or pool ID specified",
})
return
}
pool, err := a.r.GetRepoPoolByID(ctx, repoID, poolID)
if err != nil {
log.Printf("listing pools: %s", err)
handleError(w, err)
return
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(pool)
}
func (a *APIController) DeleteRepoPoolHandler(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
vars := mux.Vars(r)
repoID, repoOk := vars["repoID"]
poolID, poolOk := vars["poolID"]
if !repoOk || !poolOk {
w.WriteHeader(http.StatusBadRequest)
json.NewEncoder(w).Encode(params.APIErrorResponse{
Error: "Bad Request",
Details: "No repo or pool ID specified",
})
return
}
if err := a.r.DeleteRepoPool(ctx, repoID, poolID); err != nil {
log.Printf("removing pool: %s", err)
handleError(w, err)
return
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
}
func (a *APIController) UpdateRepoPoolHandler(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
vars := mux.Vars(r)
repoID, repoOk := vars["repoID"]
poolID, poolOk := vars["poolID"]
if !repoOk || !poolOk {
w.WriteHeader(http.StatusBadRequest)
json.NewEncoder(w).Encode(params.APIErrorResponse{
Error: "Bad Request",
Details: "No repo or pool ID specified",
})
return
}
var poolData runnerParams.UpdatePoolParams
if err := json.NewDecoder(r.Body).Decode(&poolData); err != nil {
log.Printf("failed to decode: %s", err)
handleError(w, gErrors.ErrBadRequest)
return
}
pool, err := a.r.UpdateRepoPool(ctx, repoID, poolID, poolData)
if err != nil {
log.Printf("error creating repository pool: %s", err)
handleError(w, err)
return
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(pool)
}
func (a *APIController) ListRepoInstancesHandler(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
vars := mux.Vars(r)
repoID, ok := vars["repoID"]
if !ok {
w.WriteHeader(http.StatusBadRequest)
json.NewEncoder(w).Encode(params.APIErrorResponse{
Error: "Bad Request",
Details: "No repo ID specified",
})
return
}
instances, err := a.r.ListRepoInstances(ctx, repoID)
if err != nil {
log.Printf("listing pools: %s", err)
handleError(w, err)
return
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(instances)
}
func (a *APIController) ListPoolInstancesHandler(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
vars := mux.Vars(r)
poolID, ok := vars["poolID"]
if !ok {
w.WriteHeader(http.StatusBadRequest)
json.NewEncoder(w).Encode(params.APIErrorResponse{
Error: "Bad Request",
Details: "No repo ID specified",
})
return
}
instances, err := a.r.ListPoolInstances(ctx, poolID)
if err != nil {
log.Printf("listing pools: %s", err)
handleError(w, err)
return
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(instances)
}
func (a *APIController) GetInstanceHandler(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
vars := mux.Vars(r)
instanceName, ok := vars["instanceName"]
if !ok {
w.WriteHeader(http.StatusBadRequest)
json.NewEncoder(w).Encode(params.APIErrorResponse{
Error: "Bad Request",
Details: "No repo ID specified",
})
return
}
instance, err := a.r.GetInstance(ctx, instanceName)
if err != nil {
log.Printf("listing pools: %s", err)
handleError(w, err)
return
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(instance)
}
func (a *APIController) InstanceStatusMessageHandler(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
var updateMessage runnerParams.InstanceUpdateMessage
if err := json.NewDecoder(r.Body).Decode(&updateMessage); err != nil {
log.Printf("failed to decode: %s", err)
handleError(w, gErrors.ErrBadRequest)
return
}
if err := a.r.AddInstanceStatusMessage(ctx, updateMessage); err != nil {
log.Printf("error saving status message: %s", err)
handleError(w, err)
return
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
}

View file

@ -0,0 +1,81 @@
package controllers
import (
"encoding/json"
"log"
"net/http"
"garm/apiserver/params"
gErrors "garm/errors"
runnerParams "garm/params"
"github.com/gorilla/mux"
)
func (a *APIController) ListPoolInstancesHandler(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
vars := mux.Vars(r)
poolID, ok := vars["poolID"]
if !ok {
w.WriteHeader(http.StatusBadRequest)
json.NewEncoder(w).Encode(params.APIErrorResponse{
Error: "Bad Request",
Details: "No pool ID specified",
})
return
}
instances, err := a.r.ListPoolInstances(ctx, poolID)
if err != nil {
log.Printf("listing pools: %s", err)
handleError(w, err)
return
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(instances)
}
func (a *APIController) GetInstanceHandler(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
vars := mux.Vars(r)
instanceName, ok := vars["instanceName"]
if !ok {
w.WriteHeader(http.StatusBadRequest)
json.NewEncoder(w).Encode(params.APIErrorResponse{
Error: "Bad Request",
Details: "No pool ID specified",
})
return
}
instance, err := a.r.GetInstance(ctx, instanceName)
if err != nil {
log.Printf("listing pools: %s", err)
handleError(w, err)
return
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(instance)
}
func (a *APIController) InstanceStatusMessageHandler(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
var updateMessage runnerParams.InstanceUpdateMessage
if err := json.NewDecoder(r.Body).Decode(&updateMessage); err != nil {
log.Printf("failed to decode: %s", err)
handleError(w, gErrors.ErrBadRequest)
return
}
if err := a.r.AddInstanceStatusMessage(ctx, updateMessage); err != nil {
log.Printf("error saving status message: %s", err)
handleError(w, err)
return
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
}

View file

@ -0,0 +1,292 @@
package controllers
import (
"encoding/json"
"log"
"net/http"
"garm/apiserver/params"
gErrors "garm/errors"
runnerParams "garm/params"
"github.com/gorilla/mux"
)
func (a *APIController) CreateOrgHandler(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
var repoData runnerParams.CreateOrgParams
if err := json.NewDecoder(r.Body).Decode(&repoData); err != nil {
handleError(w, gErrors.ErrBadRequest)
return
}
repo, err := a.r.CreateOrganization(ctx, repoData)
if err != nil {
log.Printf("error creating repository: %s", err)
handleError(w, err)
return
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(repo)
}
func (a *APIController) ListOrgsHandler(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
orgs, err := a.r.ListOrganizations(ctx)
if err != nil {
log.Printf("listing orgs: %s", err)
handleError(w, err)
return
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(orgs)
}
func (a *APIController) GetOrgByIDHandler(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
vars := mux.Vars(r)
orgID, ok := vars["orgID"]
if !ok {
w.WriteHeader(http.StatusBadRequest)
json.NewEncoder(w).Encode(params.APIErrorResponse{
Error: "Bad Request",
Details: "No org ID specified",
})
return
}
org, err := a.r.GetOrganizationByID(ctx, orgID)
if err != nil {
log.Printf("fetching org: %s", err)
handleError(w, err)
return
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(org)
}
func (a *APIController) DeleteOrgHandler(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
vars := mux.Vars(r)
orgID, ok := vars["orgID"]
if !ok {
w.WriteHeader(http.StatusBadRequest)
json.NewEncoder(w).Encode(params.APIErrorResponse{
Error: "Bad Request",
Details: "No org ID specified",
})
return
}
if err := a.r.DeleteOrganization(ctx, orgID); err != nil {
log.Printf("fetching org: %s", err)
handleError(w, err)
return
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
}
func (a *APIController) UpdateOrgHandler(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
vars := mux.Vars(r)
orgID, ok := vars["orgID"]
if !ok {
w.WriteHeader(http.StatusBadRequest)
json.NewEncoder(w).Encode(params.APIErrorResponse{
Error: "Bad Request",
Details: "No org ID specified",
})
return
}
var updatePayload runnerParams.UpdateRepositoryParams
if err := json.NewDecoder(r.Body).Decode(&updatePayload); err != nil {
handleError(w, gErrors.ErrBadRequest)
return
}
org, err := a.r.UpdateOrganization(ctx, orgID, updatePayload)
if err != nil {
log.Printf("error updating organization: %s", err)
handleError(w, err)
return
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(org)
}
func (a *APIController) CreateOrgPoolHandler(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
vars := mux.Vars(r)
orgID, ok := vars["orgID"]
if !ok {
w.WriteHeader(http.StatusBadRequest)
json.NewEncoder(w).Encode(params.APIErrorResponse{
Error: "Bad Request",
Details: "No org ID specified",
})
return
}
var poolData runnerParams.CreatePoolParams
if err := json.NewDecoder(r.Body).Decode(&poolData); err != nil {
log.Printf("failed to decode: %s", err)
handleError(w, gErrors.ErrBadRequest)
return
}
pool, err := a.r.CreateOrgPool(ctx, orgID, poolData)
if err != nil {
log.Printf("error creating organization pool: %s", err)
handleError(w, err)
return
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(pool)
}
func (a *APIController) ListOrgPoolsHandler(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
vars := mux.Vars(r)
orgID, ok := vars["orgID"]
if !ok {
w.WriteHeader(http.StatusBadRequest)
json.NewEncoder(w).Encode(params.APIErrorResponse{
Error: "Bad Request",
Details: "No org ID specified",
})
return
}
pools, err := a.r.ListOrgPools(ctx, orgID)
if err != nil {
log.Printf("listing pools: %s", err)
handleError(w, err)
return
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(pools)
}
func (a *APIController) GetOrgPoolHandler(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
vars := mux.Vars(r)
orgID, repoOk := vars["orgID"]
poolID, poolOk := vars["poolID"]
if !repoOk || !poolOk {
w.WriteHeader(http.StatusBadRequest)
json.NewEncoder(w).Encode(params.APIErrorResponse{
Error: "Bad Request",
Details: "No org or pool ID specified",
})
return
}
pool, err := a.r.GetOrgPoolByID(ctx, orgID, poolID)
if err != nil {
log.Printf("listing pools: %s", err)
handleError(w, err)
return
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(pool)
}
func (a *APIController) DeleteOrgPoolHandler(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
vars := mux.Vars(r)
orgID, orgOk := vars["orgID"]
poolID, poolOk := vars["poolID"]
if !orgOk || !poolOk {
w.WriteHeader(http.StatusBadRequest)
json.NewEncoder(w).Encode(params.APIErrorResponse{
Error: "Bad Request",
Details: "No org or pool ID specified",
})
return
}
if err := a.r.DeleteOrgPool(ctx, orgID, poolID); err != nil {
log.Printf("removing pool: %s", err)
handleError(w, err)
return
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
}
func (a *APIController) UpdateOrgPoolHandler(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
vars := mux.Vars(r)
orgID, orgOk := vars["orgID"]
poolID, poolOk := vars["poolID"]
if !orgOk || !poolOk {
w.WriteHeader(http.StatusBadRequest)
json.NewEncoder(w).Encode(params.APIErrorResponse{
Error: "Bad Request",
Details: "No org or pool ID specified",
})
return
}
var poolData runnerParams.UpdatePoolParams
if err := json.NewDecoder(r.Body).Decode(&poolData); err != nil {
log.Printf("failed to decode: %s", err)
handleError(w, gErrors.ErrBadRequest)
return
}
pool, err := a.r.UpdateOrgPool(ctx, orgID, poolID, poolData)
if err != nil {
log.Printf("error creating organization pool: %s", err)
handleError(w, err)
return
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(pool)
}
func (a *APIController) ListOrgInstancesHandler(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
vars := mux.Vars(r)
orgID, ok := vars["orgID"]
if !ok {
w.WriteHeader(http.StatusBadRequest)
json.NewEncoder(w).Encode(params.APIErrorResponse{
Error: "Bad Request",
Details: "No org ID specified",
})
return
}
instances, err := a.r.ListOrgInstances(ctx, orgID)
if err != nil {
log.Printf("listing pools: %s", err)
handleError(w, err)
return
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(instances)
}

View file

@ -0,0 +1,292 @@
package controllers
import (
"encoding/json"
"log"
"net/http"
"garm/apiserver/params"
gErrors "garm/errors"
runnerParams "garm/params"
"github.com/gorilla/mux"
)
func (a *APIController) CreateRepoHandler(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
var repoData runnerParams.CreateRepoParams
if err := json.NewDecoder(r.Body).Decode(&repoData); err != nil {
handleError(w, gErrors.ErrBadRequest)
return
}
repo, err := a.r.CreateRepository(ctx, repoData)
if err != nil {
log.Printf("error creating repository: %s", err)
handleError(w, err)
return
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(repo)
}
func (a *APIController) ListReposHandler(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
repos, err := a.r.ListRepositories(ctx)
if err != nil {
log.Printf("listing repos: %s", err)
handleError(w, err)
return
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(repos)
}
func (a *APIController) GetRepoByIDHandler(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
vars := mux.Vars(r)
repoID, ok := vars["repoID"]
if !ok {
w.WriteHeader(http.StatusBadRequest)
json.NewEncoder(w).Encode(params.APIErrorResponse{
Error: "Bad Request",
Details: "No repo ID specified",
})
return
}
repo, err := a.r.GetRepositoryByID(ctx, repoID)
if err != nil {
log.Printf("fetching repo: %s", err)
handleError(w, err)
return
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(repo)
}
func (a *APIController) DeleteRepoHandler(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
vars := mux.Vars(r)
repoID, ok := vars["repoID"]
if !ok {
w.WriteHeader(http.StatusBadRequest)
json.NewEncoder(w).Encode(params.APIErrorResponse{
Error: "Bad Request",
Details: "No repo ID specified",
})
return
}
if err := a.r.DeleteRepository(ctx, repoID); err != nil {
log.Printf("fetching repo: %s", err)
handleError(w, err)
return
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
}
func (a *APIController) UpdateRepoHandler(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
vars := mux.Vars(r)
repoID, ok := vars["repoID"]
if !ok {
w.WriteHeader(http.StatusBadRequest)
json.NewEncoder(w).Encode(params.APIErrorResponse{
Error: "Bad Request",
Details: "No repo ID specified",
})
return
}
var updatePayload runnerParams.UpdateRepositoryParams
if err := json.NewDecoder(r.Body).Decode(&updatePayload); err != nil {
handleError(w, gErrors.ErrBadRequest)
return
}
repo, err := a.r.UpdateRepository(ctx, repoID, updatePayload)
if err != nil {
log.Printf("error updating repository: %s", err)
handleError(w, err)
return
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(repo)
}
func (a *APIController) CreateRepoPoolHandler(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
vars := mux.Vars(r)
repoID, ok := vars["repoID"]
if !ok {
w.WriteHeader(http.StatusBadRequest)
json.NewEncoder(w).Encode(params.APIErrorResponse{
Error: "Bad Request",
Details: "No repo ID specified",
})
return
}
var poolData runnerParams.CreatePoolParams
if err := json.NewDecoder(r.Body).Decode(&poolData); err != nil {
log.Printf("failed to decode: %s", err)
handleError(w, gErrors.ErrBadRequest)
return
}
pool, err := a.r.CreateRepoPool(ctx, repoID, poolData)
if err != nil {
log.Printf("error creating repository pool: %s", err)
handleError(w, err)
return
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(pool)
}
func (a *APIController) ListRepoPoolsHandler(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
vars := mux.Vars(r)
repoID, ok := vars["repoID"]
if !ok {
w.WriteHeader(http.StatusBadRequest)
json.NewEncoder(w).Encode(params.APIErrorResponse{
Error: "Bad Request",
Details: "No repo ID specified",
})
return
}
pools, err := a.r.ListRepoPools(ctx, repoID)
if err != nil {
log.Printf("listing pools: %s", err)
handleError(w, err)
return
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(pools)
}
func (a *APIController) GetRepoPoolHandler(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
vars := mux.Vars(r)
repoID, repoOk := vars["repoID"]
poolID, poolOk := vars["poolID"]
if !repoOk || !poolOk {
w.WriteHeader(http.StatusBadRequest)
json.NewEncoder(w).Encode(params.APIErrorResponse{
Error: "Bad Request",
Details: "No repo or pool ID specified",
})
return
}
pool, err := a.r.GetRepoPoolByID(ctx, repoID, poolID)
if err != nil {
log.Printf("listing pools: %s", err)
handleError(w, err)
return
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(pool)
}
func (a *APIController) DeleteRepoPoolHandler(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
vars := mux.Vars(r)
repoID, repoOk := vars["repoID"]
poolID, poolOk := vars["poolID"]
if !repoOk || !poolOk {
w.WriteHeader(http.StatusBadRequest)
json.NewEncoder(w).Encode(params.APIErrorResponse{
Error: "Bad Request",
Details: "No repo or pool ID specified",
})
return
}
if err := a.r.DeleteRepoPool(ctx, repoID, poolID); err != nil {
log.Printf("removing pool: %s", err)
handleError(w, err)
return
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
}
func (a *APIController) UpdateRepoPoolHandler(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
vars := mux.Vars(r)
repoID, repoOk := vars["repoID"]
poolID, poolOk := vars["poolID"]
if !repoOk || !poolOk {
w.WriteHeader(http.StatusBadRequest)
json.NewEncoder(w).Encode(params.APIErrorResponse{
Error: "Bad Request",
Details: "No repo or pool ID specified",
})
return
}
var poolData runnerParams.UpdatePoolParams
if err := json.NewDecoder(r.Body).Decode(&poolData); err != nil {
log.Printf("failed to decode: %s", err)
handleError(w, gErrors.ErrBadRequest)
return
}
pool, err := a.r.UpdateRepoPool(ctx, repoID, poolID, poolData)
if err != nil {
log.Printf("error creating repository pool: %s", err)
handleError(w, err)
return
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(pool)
}
func (a *APIController) ListRepoInstancesHandler(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
vars := mux.Vars(r)
repoID, ok := vars["repoID"]
if !ok {
w.WriteHeader(http.StatusBadRequest)
json.NewEncoder(w).Encode(params.APIErrorResponse{
Error: "Bad Request",
Details: "No repo ID specified",
})
return
}
instances, err := a.r.ListRepoInstances(ctx, repoID)
if err != nil {
log.Printf("listing pools: %s", err)
handleError(w, err)
return
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(instances)
}

View file

@ -96,40 +96,40 @@ func NewAPIRouter(han *controllers.APIController, logWriter io.Writer, authMiddl
// Organizations and pools //
/////////////////////////////
// Get pool
apiRouter.Handle("/organizations/{orgID}/pools/{poolID}/", log(os.Stdout, http.HandlerFunc(han.CatchAll))).Methods("GET", "OPTIONS")
apiRouter.Handle("/organizations/{orgID}/pools/{poolID}", log(os.Stdout, http.HandlerFunc(han.CatchAll))).Methods("GET", "OPTIONS")
apiRouter.Handle("/organizations/{orgID}/pools/{poolID}/", log(os.Stdout, http.HandlerFunc(han.GetOrgPoolHandler))).Methods("GET", "OPTIONS")
apiRouter.Handle("/organizations/{orgID}/pools/{poolID}", log(os.Stdout, http.HandlerFunc(han.GetOrgPoolHandler))).Methods("GET", "OPTIONS")
// Delete pool
apiRouter.Handle("/organizations/{orgID}/pools/{poolID}/", log(os.Stdout, http.HandlerFunc(han.CatchAll))).Methods("DELETE", "OPTIONS")
apiRouter.Handle("/organizations/{orgID}/pools/{poolID}", log(os.Stdout, http.HandlerFunc(han.CatchAll))).Methods("DELETE", "OPTIONS")
apiRouter.Handle("/organizations/{orgID}/pools/{poolID}/", log(os.Stdout, http.HandlerFunc(han.DeleteOrgPoolHandler))).Methods("DELETE", "OPTIONS")
apiRouter.Handle("/organizations/{orgID}/pools/{poolID}", log(os.Stdout, http.HandlerFunc(han.DeleteOrgPoolHandler))).Methods("DELETE", "OPTIONS")
// Update pool
apiRouter.Handle("/organizations/{orgID}/pools/{poolID}/", log(os.Stdout, http.HandlerFunc(han.CatchAll))).Methods("PUT", "OPTIONS")
apiRouter.Handle("/organizations/{orgID}/pools/{poolID}", log(os.Stdout, http.HandlerFunc(han.CatchAll))).Methods("PUT", "OPTIONS")
apiRouter.Handle("/organizations/{orgID}/pools/{poolID}/", log(os.Stdout, http.HandlerFunc(han.UpdateOrgPoolHandler))).Methods("PUT", "OPTIONS")
apiRouter.Handle("/organizations/{orgID}/pools/{poolID}", log(os.Stdout, http.HandlerFunc(han.UpdateOrgPoolHandler))).Methods("PUT", "OPTIONS")
// List pools
apiRouter.Handle("/organizations/{orgID}/pools/", log(os.Stdout, http.HandlerFunc(han.CatchAll))).Methods("GET", "OPTIONS")
apiRouter.Handle("/organizations/{orgID}/pools", log(os.Stdout, http.HandlerFunc(han.CatchAll))).Methods("GET", "OPTIONS")
apiRouter.Handle("/organizations/{orgID}/pools/", log(os.Stdout, http.HandlerFunc(han.ListOrgPoolsHandler))).Methods("GET", "OPTIONS")
apiRouter.Handle("/organizations/{orgID}/pools", log(os.Stdout, http.HandlerFunc(han.ListOrgPoolsHandler))).Methods("GET", "OPTIONS")
// Create pool
apiRouter.Handle("/organizations/{orgID}/pools/", log(os.Stdout, http.HandlerFunc(han.CatchAll))).Methods("POST", "OPTIONS")
apiRouter.Handle("/organizations/{orgID}/pools", log(os.Stdout, http.HandlerFunc(han.CatchAll))).Methods("POST", "OPTIONS")
apiRouter.Handle("/organizations/{orgID}/pools/", log(os.Stdout, http.HandlerFunc(han.CreateOrgPoolHandler))).Methods("POST", "OPTIONS")
apiRouter.Handle("/organizations/{orgID}/pools", log(os.Stdout, http.HandlerFunc(han.CreateOrgPoolHandler))).Methods("POST", "OPTIONS")
// Repo instances list
apiRouter.Handle("/organizations/{orgID}/instances/", log(os.Stdout, http.HandlerFunc(han.CatchAll))).Methods("GET", "OPTIONS")
apiRouter.Handle("/organizations/{orgID}/instances", log(os.Stdout, http.HandlerFunc(han.CatchAll))).Methods("GET", "OPTIONS")
apiRouter.Handle("/organizations/{orgID}/instances/", log(os.Stdout, http.HandlerFunc(han.ListOrgInstancesHandler))).Methods("GET", "OPTIONS")
apiRouter.Handle("/organizations/{orgID}/instances", log(os.Stdout, http.HandlerFunc(han.ListOrgInstancesHandler))).Methods("GET", "OPTIONS")
// Get org
apiRouter.Handle("/organizations/{orgID}/", log(os.Stdout, http.HandlerFunc(han.CatchAll))).Methods("GET", "OPTIONS")
apiRouter.Handle("/organizations/{orgID}", log(os.Stdout, http.HandlerFunc(han.CatchAll))).Methods("GET", "OPTIONS")
apiRouter.Handle("/organizations/{orgID}/", log(os.Stdout, http.HandlerFunc(han.GetOrgByIDHandler))).Methods("GET", "OPTIONS")
apiRouter.Handle("/organizations/{orgID}", log(os.Stdout, http.HandlerFunc(han.GetOrgByIDHandler))).Methods("GET", "OPTIONS")
// Update org
apiRouter.Handle("/organizations/{orgID}/", log(os.Stdout, http.HandlerFunc(han.CatchAll))).Methods("PUT", "OPTIONS")
apiRouter.Handle("/organizations/{orgID}", log(os.Stdout, http.HandlerFunc(han.CatchAll))).Methods("PUT", "OPTIONS")
apiRouter.Handle("/organizations/{orgID}/", log(os.Stdout, http.HandlerFunc(han.UpdateOrgHandler))).Methods("PUT", "OPTIONS")
apiRouter.Handle("/organizations/{orgID}", log(os.Stdout, http.HandlerFunc(han.UpdateOrgHandler))).Methods("PUT", "OPTIONS")
// Delete org
apiRouter.Handle("/organizations/{orgID}/", log(os.Stdout, http.HandlerFunc(han.CatchAll))).Methods("DELETE", "OPTIONS")
apiRouter.Handle("/organizations/{orgID}", log(os.Stdout, http.HandlerFunc(han.CatchAll))).Methods("DELETE", "OPTIONS")
apiRouter.Handle("/organizations/{orgID}/", log(os.Stdout, http.HandlerFunc(han.DeleteOrgHandler))).Methods("DELETE", "OPTIONS")
apiRouter.Handle("/organizations/{orgID}", log(os.Stdout, http.HandlerFunc(han.DeleteOrgHandler))).Methods("DELETE", "OPTIONS")
// List orgs
apiRouter.Handle("/organizations/", log(os.Stdout, http.HandlerFunc(han.CatchAll))).Methods("GET", "OPTIONS")
apiRouter.Handle("/organizations", log(os.Stdout, http.HandlerFunc(han.CatchAll))).Methods("GET", "OPTIONS")
apiRouter.Handle("/organizations/", log(os.Stdout, http.HandlerFunc(han.ListOrgsHandler))).Methods("GET", "OPTIONS")
apiRouter.Handle("/organizations", log(os.Stdout, http.HandlerFunc(han.ListOrgsHandler))).Methods("GET", "OPTIONS")
// Create org
apiRouter.Handle("/organizations/", log(os.Stdout, http.HandlerFunc(han.CatchAll))).Methods("POST", "OPTIONS")
apiRouter.Handle("/organizations", log(os.Stdout, http.HandlerFunc(han.CatchAll))).Methods("POST", "OPTIONS")
apiRouter.Handle("/organizations/", log(os.Stdout, http.HandlerFunc(han.CreateOrgHandler))).Methods("POST", "OPTIONS")
apiRouter.Handle("/organizations", log(os.Stdout, http.HandlerFunc(han.CreateOrgHandler))).Methods("POST", "OPTIONS")
// Credentials and providers
apiRouter.Handle("/credentials/", log(os.Stdout, http.HandlerFunc(han.ListCredentials))).Methods("GET", "OPTIONS")

View file

@ -120,172 +120,10 @@ func (c *Client) ListProviders() ([]params.Provider, error) {
return providers, nil
}
func (c *Client) ListRepositories() ([]params.Repository, error) {
var repos []params.Repository
url := fmt.Sprintf("%s/api/v1/repositories", c.Config.BaseURL)
resp, err := c.client.R().
SetResult(&repos).
Get(url)
if err != nil || resp.IsError() {
apiErr, decErr := c.decodeAPIError(resp.Body())
if decErr != nil {
return nil, errors.Wrap(decErr, "sending request")
}
return nil, fmt.Errorf("error fetching repos: %s", apiErr.Details)
}
return repos, nil
}
func (c *Client) GetInstanceByName(instanceName string) (params.Instance, error) {
url := fmt.Sprintf("%s/api/v1/instances/%s", c.Config.BaseURL, instanceName)
func (c *Client) CreateRepository(param params.CreateRepoParams) (params.Repository, error) {
var response params.Repository
url := fmt.Sprintf("%s/api/v1/repositories", c.Config.BaseURL)
body, err := json.Marshal(param)
if err != nil {
return params.Repository{}, err
}
resp, err := c.client.R().
SetBody(body).
SetResult(&response).
Post(url)
if err != nil || resp.IsError() {
apiErr, decErr := c.decodeAPIError(resp.Body())
if decErr != nil {
return response, errors.Wrap(decErr, "sending request")
}
return response, fmt.Errorf("error performing login: %s", apiErr.Details)
}
return response, nil
}
func (c *Client) GetRepository(repoID string) (params.Repository, error) {
var response params.Repository
url := fmt.Sprintf("%s/api/v1/repositories/%s", c.Config.BaseURL, repoID)
resp, err := c.client.R().
SetResult(&response).
Get(url)
if err != nil || resp.IsError() {
apiErr, decErr := c.decodeAPIError(resp.Body())
if decErr != nil {
return response, errors.Wrap(decErr, "sending request")
}
return response, fmt.Errorf("error fetching repos: %s", apiErr.Details)
}
return response, nil
}
func (c *Client) DeleteRepository(repoID string) error {
url := fmt.Sprintf("%s/api/v1/repositories/%s", c.Config.BaseURL, repoID)
resp, err := c.client.R().
Delete(url)
if err != nil || resp.IsError() {
apiErr, decErr := c.decodeAPIError(resp.Body())
if decErr != nil {
return errors.Wrap(decErr, "sending request")
}
return fmt.Errorf("error fetching repos: %s", apiErr.Details)
}
return nil
}
func (c *Client) CreateRepoPool(repoID string, param params.CreatePoolParams) (params.Pool, error) {
url := fmt.Sprintf("%s/api/v1/repositories/%s/pools", c.Config.BaseURL, repoID)
var response params.Pool
body, err := json.Marshal(param)
if err != nil {
return response, err
}
resp, err := c.client.R().
SetBody(body).
SetResult(&response).
Post(url)
if err != nil || resp.IsError() {
apiErr, decErr := c.decodeAPIError(resp.Body())
if decErr != nil {
return response, errors.Wrap(decErr, "sending request")
}
return response, fmt.Errorf("error performing login: %s", apiErr.Details)
}
return response, nil
}
func (c *Client) ListRepoPools(repoID string) ([]params.Pool, error) {
url := fmt.Sprintf("%s/api/v1/repositories/%s/pools", c.Config.BaseURL, repoID)
var response []params.Pool
resp, err := c.client.R().
SetResult(&response).
Get(url)
if err != nil || resp.IsError() {
apiErr, decErr := c.decodeAPIError(resp.Body())
if decErr != nil {
return response, errors.Wrap(decErr, "sending request")
}
return response, fmt.Errorf("error performing login: %s", apiErr.Details)
}
return response, nil
}
func (c *Client) GetRepoPool(repoID, poolID string) (params.Pool, error) {
url := fmt.Sprintf("%s/api/v1/repositories/%s/pools/%s", c.Config.BaseURL, repoID, poolID)
var response params.Pool
resp, err := c.client.R().
SetResult(&response).
Get(url)
if err != nil || resp.IsError() {
apiErr, decErr := c.decodeAPIError(resp.Body())
if decErr != nil {
return response, errors.Wrap(decErr, "sending request")
}
return response, fmt.Errorf("error performing login: %s", apiErr.Details)
}
return response, nil
}
func (c *Client) DeleteRepoPool(repoID, poolID string) error {
url := fmt.Sprintf("%s/api/v1/repositories/%s/pools/%s", c.Config.BaseURL, repoID, poolID)
resp, err := c.client.R().
Delete(url)
if err != nil || resp.IsError() {
apiErr, decErr := c.decodeAPIError(resp.Body())
if decErr != nil {
return errors.Wrap(decErr, "sending request")
}
return fmt.Errorf("error performing login: %s", apiErr.Details)
}
return nil
}
func (c *Client) UpdateRepoPool(repoID, poolID string, param params.UpdatePoolParams) (params.Pool, error) {
url := fmt.Sprintf("%s/api/v1/repositories/%s/pools/%s", c.Config.BaseURL, repoID, poolID)
var response params.Pool
body, err := json.Marshal(param)
if err != nil {
return response, err
}
resp, err := c.client.R().
SetBody(body).
SetResult(&response).
Put(url)
if err != nil || resp.IsError() {
apiErr, decErr := c.decodeAPIError(resp.Body())
if decErr != nil {
return response, errors.Wrap(decErr, "sending request")
}
return response, fmt.Errorf("error performing login: %s", apiErr.Details)
}
return response, nil
}
func (c *Client) ListRepoInstances(repoID string) ([]params.Instance, error) {
url := fmt.Sprintf("%s/api/v1/repositories/%s/instances", c.Config.BaseURL, repoID)
var response []params.Instance
var response params.Instance
resp, err := c.client.R().
SetResult(&response).
Get(url)
@ -315,20 +153,3 @@ func (c *Client) ListPoolInstances(poolID string) ([]params.Instance, error) {
}
return response, nil
}
func (c *Client) GetInstanceByName(instanceName string) (params.Instance, error) {
url := fmt.Sprintf("%s/api/v1/instances/%s", c.Config.BaseURL, instanceName)
var response params.Instance
resp, err := c.client.R().
SetResult(&response).
Get(url)
if err != nil || resp.IsError() {
apiErr, decErr := c.decodeAPIError(resp.Body())
if decErr != nil {
return response, errors.Wrap(decErr, "sending request")
}
return response, fmt.Errorf("error performing login: %s", apiErr.Details)
}
return response, nil
}

View file

@ -0,0 +1,189 @@
package client
import (
"encoding/json"
"fmt"
"garm/params"
"github.com/pkg/errors"
)
func (c *Client) ListOrganizations() ([]params.Organization, error) {
var orgs []params.Organization
url := fmt.Sprintf("%s/api/v1/organizations", c.Config.BaseURL)
resp, err := c.client.R().
SetResult(&orgs).
Get(url)
if err != nil || resp.IsError() {
apiErr, decErr := c.decodeAPIError(resp.Body())
if decErr != nil {
return nil, errors.Wrap(decErr, "sending request")
}
return nil, fmt.Errorf("error fetching orgs: %s", apiErr.Details)
}
return orgs, nil
}
func (c *Client) CreateOrganization(param params.CreateOrgParams) (params.Organization, error) {
var response params.Organization
url := fmt.Sprintf("%s/api/v1/organizations", c.Config.BaseURL)
body, err := json.Marshal(param)
if err != nil {
return params.Organization{}, err
}
resp, err := c.client.R().
SetBody(body).
SetResult(&response).
Post(url)
if err != nil || resp.IsError() {
apiErr, decErr := c.decodeAPIError(resp.Body())
if decErr != nil {
return response, errors.Wrap(decErr, "sending request")
}
return response, fmt.Errorf("error performing login: %s", apiErr.Details)
}
return response, nil
}
func (c *Client) GetOrganization(orgID string) (params.Organization, error) {
var response params.Organization
url := fmt.Sprintf("%s/api/v1/organizations/%s", c.Config.BaseURL, orgID)
resp, err := c.client.R().
SetResult(&response).
Get(url)
if err != nil || resp.IsError() {
apiErr, decErr := c.decodeAPIError(resp.Body())
if decErr != nil {
return response, errors.Wrap(decErr, "sending request")
}
return response, fmt.Errorf("error fetching orgs: %s", apiErr.Details)
}
return response, nil
}
func (c *Client) DeleteOrganization(orgID string) error {
url := fmt.Sprintf("%s/api/v1/organizations/%s", c.Config.BaseURL, orgID)
resp, err := c.client.R().
Delete(url)
if err != nil || resp.IsError() {
apiErr, decErr := c.decodeAPIError(resp.Body())
if decErr != nil {
return errors.Wrap(decErr, "sending request")
}
return fmt.Errorf("error fetching orgs: %s", apiErr.Details)
}
return nil
}
func (c *Client) CreateOrgPool(orgID string, param params.CreatePoolParams) (params.Pool, error) {
url := fmt.Sprintf("%s/api/v1/organizations/%s/pools", c.Config.BaseURL, orgID)
var response params.Pool
body, err := json.Marshal(param)
if err != nil {
return response, err
}
resp, err := c.client.R().
SetBody(body).
SetResult(&response).
Post(url)
if err != nil || resp.IsError() {
apiErr, decErr := c.decodeAPIError(resp.Body())
if decErr != nil {
return response, errors.Wrap(decErr, "sending request")
}
return response, fmt.Errorf("error creating org pool: %s", apiErr.Details)
}
return response, nil
}
func (c *Client) ListOrgPools(orgID string) ([]params.Pool, error) {
url := fmt.Sprintf("%s/api/v1/organizations/%s/pools", c.Config.BaseURL, orgID)
var response []params.Pool
resp, err := c.client.R().
SetResult(&response).
Get(url)
if err != nil || resp.IsError() {
apiErr, decErr := c.decodeAPIError(resp.Body())
if decErr != nil {
return response, errors.Wrap(decErr, "sending request")
}
return response, fmt.Errorf("error listing org pools: %s", apiErr.Details)
}
return response, nil
}
func (c *Client) GetOrgPool(orgID, poolID string) (params.Pool, error) {
url := fmt.Sprintf("%s/api/v1/organizations/%s/pools/%s", c.Config.BaseURL, orgID, poolID)
var response params.Pool
resp, err := c.client.R().
SetResult(&response).
Get(url)
if err != nil || resp.IsError() {
apiErr, decErr := c.decodeAPIError(resp.Body())
if decErr != nil {
return response, errors.Wrap(decErr, "sending request")
}
return response, fmt.Errorf("error fetching org pool: %s", apiErr.Details)
}
return response, nil
}
func (c *Client) DeleteOrgPool(orgID, poolID string) error {
url := fmt.Sprintf("%s/api/v1/organizations/%s/pools/%s", c.Config.BaseURL, orgID, poolID)
resp, err := c.client.R().
Delete(url)
if err != nil || resp.IsError() {
apiErr, decErr := c.decodeAPIError(resp.Body())
if decErr != nil {
return errors.Wrap(decErr, "sending request")
}
return fmt.Errorf("error deleting org pool: %s", apiErr.Details)
}
return nil
}
func (c *Client) UpdateOrgPool(orgID, poolID string, param params.UpdatePoolParams) (params.Pool, error) {
url := fmt.Sprintf("%s/api/v1/organizations/%s/pools/%s", c.Config.BaseURL, orgID, poolID)
var response params.Pool
body, err := json.Marshal(param)
if err != nil {
return response, err
}
resp, err := c.client.R().
SetBody(body).
SetResult(&response).
Put(url)
if err != nil || resp.IsError() {
apiErr, decErr := c.decodeAPIError(resp.Body())
if decErr != nil {
return response, errors.Wrap(decErr, "sending request")
}
return response, fmt.Errorf("error updating org pool: %s", apiErr.Details)
}
return response, nil
}
func (c *Client) ListOrgInstances(orgID string) ([]params.Instance, error) {
url := fmt.Sprintf("%s/api/v1/organizations/%s/instances", c.Config.BaseURL, orgID)
var response []params.Instance
resp, err := c.client.R().
SetResult(&response).
Get(url)
if err != nil || resp.IsError() {
apiErr, decErr := c.decodeAPIError(resp.Body())
if decErr != nil {
return response, errors.Wrap(decErr, "sending request")
}
return response, fmt.Errorf("error listing org instances: %s", apiErr.Details)
}
return response, nil
}

View file

@ -0,0 +1,189 @@
package client
import (
"encoding/json"
"fmt"
"garm/params"
"github.com/pkg/errors"
)
func (c *Client) ListRepositories() ([]params.Repository, error) {
var repos []params.Repository
url := fmt.Sprintf("%s/api/v1/repositories", c.Config.BaseURL)
resp, err := c.client.R().
SetResult(&repos).
Get(url)
if err != nil || resp.IsError() {
apiErr, decErr := c.decodeAPIError(resp.Body())
if decErr != nil {
return nil, errors.Wrap(decErr, "sending request")
}
return nil, fmt.Errorf("error fetching repos: %s", apiErr.Details)
}
return repos, nil
}
func (c *Client) CreateRepository(param params.CreateRepoParams) (params.Repository, error) {
var response params.Repository
url := fmt.Sprintf("%s/api/v1/repositories", c.Config.BaseURL)
body, err := json.Marshal(param)
if err != nil {
return params.Repository{}, err
}
resp, err := c.client.R().
SetBody(body).
SetResult(&response).
Post(url)
if err != nil || resp.IsError() {
apiErr, decErr := c.decodeAPIError(resp.Body())
if decErr != nil {
return response, errors.Wrap(decErr, "sending request")
}
return response, fmt.Errorf("error performing login: %s", apiErr.Details)
}
return response, nil
}
func (c *Client) GetRepository(repoID string) (params.Repository, error) {
var response params.Repository
url := fmt.Sprintf("%s/api/v1/repositories/%s", c.Config.BaseURL, repoID)
resp, err := c.client.R().
SetResult(&response).
Get(url)
if err != nil || resp.IsError() {
apiErr, decErr := c.decodeAPIError(resp.Body())
if decErr != nil {
return response, errors.Wrap(decErr, "sending request")
}
return response, fmt.Errorf("error fetching repos: %s", apiErr.Details)
}
return response, nil
}
func (c *Client) DeleteRepository(repoID string) error {
url := fmt.Sprintf("%s/api/v1/repositories/%s", c.Config.BaseURL, repoID)
resp, err := c.client.R().
Delete(url)
if err != nil || resp.IsError() {
apiErr, decErr := c.decodeAPIError(resp.Body())
if decErr != nil {
return errors.Wrap(decErr, "sending request")
}
return fmt.Errorf("error fetching repos: %s", apiErr.Details)
}
return nil
}
func (c *Client) CreateRepoPool(repoID string, param params.CreatePoolParams) (params.Pool, error) {
url := fmt.Sprintf("%s/api/v1/repositories/%s/pools", c.Config.BaseURL, repoID)
var response params.Pool
body, err := json.Marshal(param)
if err != nil {
return response, err
}
resp, err := c.client.R().
SetBody(body).
SetResult(&response).
Post(url)
if err != nil || resp.IsError() {
apiErr, decErr := c.decodeAPIError(resp.Body())
if decErr != nil {
return response, errors.Wrap(decErr, "sending request")
}
return response, fmt.Errorf("error performing login: %s", apiErr.Details)
}
return response, nil
}
func (c *Client) ListRepoPools(repoID string) ([]params.Pool, error) {
url := fmt.Sprintf("%s/api/v1/repositories/%s/pools", c.Config.BaseURL, repoID)
var response []params.Pool
resp, err := c.client.R().
SetResult(&response).
Get(url)
if err != nil || resp.IsError() {
apiErr, decErr := c.decodeAPIError(resp.Body())
if decErr != nil {
return response, errors.Wrap(decErr, "sending request")
}
return response, fmt.Errorf("error performing login: %s", apiErr.Details)
}
return response, nil
}
func (c *Client) GetRepoPool(repoID, poolID string) (params.Pool, error) {
url := fmt.Sprintf("%s/api/v1/repositories/%s/pools/%s", c.Config.BaseURL, repoID, poolID)
var response params.Pool
resp, err := c.client.R().
SetResult(&response).
Get(url)
if err != nil || resp.IsError() {
apiErr, decErr := c.decodeAPIError(resp.Body())
if decErr != nil {
return response, errors.Wrap(decErr, "sending request")
}
return response, fmt.Errorf("error performing login: %s", apiErr.Details)
}
return response, nil
}
func (c *Client) DeleteRepoPool(repoID, poolID string) error {
url := fmt.Sprintf("%s/api/v1/repositories/%s/pools/%s", c.Config.BaseURL, repoID, poolID)
resp, err := c.client.R().
Delete(url)
if err != nil || resp.IsError() {
apiErr, decErr := c.decodeAPIError(resp.Body())
if decErr != nil {
return errors.Wrap(decErr, "sending request")
}
return fmt.Errorf("error performing login: %s", apiErr.Details)
}
return nil
}
func (c *Client) UpdateRepoPool(repoID, poolID string, param params.UpdatePoolParams) (params.Pool, error) {
url := fmt.Sprintf("%s/api/v1/repositories/%s/pools/%s", c.Config.BaseURL, repoID, poolID)
var response params.Pool
body, err := json.Marshal(param)
if err != nil {
return response, err
}
resp, err := c.client.R().
SetBody(body).
SetResult(&response).
Put(url)
if err != nil || resp.IsError() {
apiErr, decErr := c.decodeAPIError(resp.Body())
if decErr != nil {
return response, errors.Wrap(decErr, "sending request")
}
return response, fmt.Errorf("error performing login: %s", apiErr.Details)
}
return response, nil
}
func (c *Client) ListRepoInstances(repoID string) ([]params.Instance, error) {
url := fmt.Sprintf("%s/api/v1/repositories/%s/instances", c.Config.BaseURL, repoID)
var response []params.Instance
resp, err := c.client.R().
SetResult(&response).
Get(url)
if err != nil || resp.IsError() {
apiErr, decErr := c.decodeAPIError(resp.Body())
if decErr != nil {
return response, errors.Wrap(decErr, "sending request")
}
return response, fmt.Errorf("error performing login: %s", apiErr.Details)
}
return response, nil
}

Binary file not shown.

View file

@ -166,3 +166,8 @@ type Provider struct {
ProviderType config.ProviderType `json:"type"`
Description string `json:"description"`
}
type UpdatePoolStateParams struct {
WebhookSecret string
Internal Internal
}

View file

@ -35,6 +35,23 @@ func (c *CreateRepoParams) Validate() error {
return nil
}
type CreateOrgParams struct {
Name string `json:"name"`
CredentialsName string `json:"credentials_name"`
WebhookSecret string `json:"webhook_secret"`
}
func (c *CreateOrgParams) Validate() error {
if c.Name == "" {
return errors.NewBadRequestError("missing repo name")
}
if c.CredentialsName == "" {
return errors.NewBadRequestError("missing credentials name")
}
return nil
}
// NewUserParams holds the needed information to create
// a new user
type NewUserParams struct {

View file

@ -14,7 +14,7 @@ const (
type PoolManager interface {
WebhookSecret() string
HandleWorkflowJob(job params.WorkflowJob) error
RefreshState(cfg params.Repository) error
RefreshState(param params.UpdatePoolStateParams) error
// AddPool(ctx context.Context, pool params.Pool) error
// PoolManager lifecycle functions. Start/stop pool.
@ -22,16 +22,3 @@ type PoolManager interface {
Stop() error
Wait() error
}
type Pool interface {
ListInstances() ([]params.Instance, error)
GetInstance() (params.Instance, error)
DeleteInstance() error
StopInstance() error
StartInstance() error
// Pool lifecycle functions. Start/stop pool.
Start() error
Stop() error
Wait() error
}

322
runner/organizations.go Normal file
View file

@ -0,0 +1,322 @@
package runner
import (
"context"
"garm/auth"
runnerErrors "garm/errors"
"garm/params"
"garm/runner/common"
"garm/runner/pool"
"log"
"strings"
"github.com/pkg/errors"
)
func (r *Runner) CreateOrganization(ctx context.Context, param params.CreateOrgParams) (org params.Organization, err error) {
if !auth.IsAdmin(ctx) {
return org, runnerErrors.ErrUnauthorized
}
if err := param.Validate(); err != nil {
return params.Organization{}, errors.Wrap(err, "validating params")
}
creds, ok := r.credentials[param.CredentialsName]
if !ok {
return params.Organization{}, runnerErrors.NewBadRequestError("credentials %s not defined", param.CredentialsName)
}
_, err = r.store.GetOrganization(ctx, param.Name)
if err != nil {
if !errors.Is(err, runnerErrors.ErrNotFound) {
return params.Organization{}, errors.Wrap(err, "fetching repo")
}
} else {
return params.Organization{}, runnerErrors.NewConflictError("organization %s already exists", param.Name)
}
org, err = r.store.CreateOrganization(ctx, param.Name, creds.Name, param.WebhookSecret)
if err != nil {
return params.Organization{}, errors.Wrap(err, "creating organization")
}
defer func() {
if err != nil {
r.store.DeleteOrganization(ctx, org.ID)
}
}()
poolMgr, err := r.loadOrgPoolManager(org)
if err := poolMgr.Start(); err != nil {
return params.Organization{}, errors.Wrap(err, "starting pool manager")
}
r.organizations[org.ID] = poolMgr
return org, nil
}
func (r *Runner) ListOrganizations(ctx context.Context) ([]params.Organization, error) {
if !auth.IsAdmin(ctx) {
return nil, runnerErrors.ErrUnauthorized
}
orgs, err := r.store.ListOrganizations(ctx)
if err != nil {
return nil, errors.Wrap(err, "listing organizations")
}
return orgs, nil
}
func (r *Runner) GetOrganizationByID(ctx context.Context, orgID string) (params.Organization, error) {
if !auth.IsAdmin(ctx) {
return params.Organization{}, runnerErrors.ErrUnauthorized
}
org, err := r.store.GetOrganizationByID(ctx, orgID)
if err != nil {
return params.Organization{}, errors.Wrap(err, "fetching repository")
}
return org, nil
}
func (r *Runner) DeleteOrganization(ctx context.Context, orgID string) error {
if !auth.IsAdmin(ctx) {
return runnerErrors.ErrUnauthorized
}
org, err := r.store.GetOrganizationByID(ctx, orgID)
if err != nil {
return errors.Wrap(err, "fetching repo")
}
poolMgr, ok := r.organizations[org.ID]
if ok {
if err := poolMgr.Stop(); err != nil {
log.Printf("failed to stop pool for repo %s", org.ID)
}
delete(r.organizations, orgID)
}
pools, err := r.store.ListOrgPools(ctx, orgID)
if err != nil {
return errors.Wrap(err, "fetching repo pools")
}
if len(pools) > 0 {
poolIds := []string{}
for _, pool := range pools {
poolIds = append(poolIds, pool.ID)
}
return runnerErrors.NewBadRequestError("repo has pools defined (%s)", strings.Join(poolIds, ", "))
}
if err := r.store.DeleteOrganization(ctx, orgID); err != nil {
return errors.Wrap(err, "removing repository")
}
return nil
}
func (r *Runner) UpdateOrganization(ctx context.Context, orgID string, param params.UpdateRepositoryParams) (params.Organization, error) {
if !auth.IsAdmin(ctx) {
return params.Organization{}, runnerErrors.ErrUnauthorized
}
r.mux.Lock()
defer r.mux.Unlock()
org, err := r.store.GetOrganizationByID(ctx, orgID)
if err != nil {
return params.Organization{}, errors.Wrap(err, "fetching org")
}
if param.CredentialsName != "" {
// Check that credentials are set before saving to db
if _, ok := r.credentials[param.CredentialsName]; !ok {
return params.Organization{}, runnerErrors.NewBadRequestError("invalid credentials (%s) for org %s", param.CredentialsName, org.Name)
}
}
org, err = r.store.UpdateOrganization(ctx, orgID, param)
if err != nil {
return params.Organization{}, errors.Wrap(err, "updating org")
}
poolMgr, ok := r.organizations[org.ID]
if ok {
internalCfg, err := r.getInternalConfig(org.CredentialsName)
if err != nil {
return params.Organization{}, errors.Wrap(err, "fetching internal config")
}
newState := params.UpdatePoolStateParams{
WebhookSecret: org.WebhookSecret,
Internal: internalCfg,
}
org.Internal = internalCfg
// stop the pool mgr
if err := poolMgr.RefreshState(newState); err != nil {
return params.Organization{}, errors.Wrap(err, "updating pool manager")
}
} else {
poolMgr, err := r.loadOrgPoolManager(org)
if err != nil {
return params.Organization{}, errors.Wrap(err, "loading pool manager")
}
r.organizations[org.ID] = poolMgr
}
return org, nil
}
func (r *Runner) CreateOrgPool(ctx context.Context, orgID string, param params.CreatePoolParams) (params.Pool, error) {
if !auth.IsAdmin(ctx) {
return params.Pool{}, runnerErrors.ErrUnauthorized
}
r.mux.Lock()
defer r.mux.Unlock()
_, ok := r.organizations[orgID]
if !ok {
return params.Pool{}, runnerErrors.ErrNotFound
}
createPoolParams, err := r.appendTagsToCreatePoolParams(param)
if err != nil {
return params.Pool{}, errors.Wrap(err, "fetching pool params")
}
pool, err := r.store.CreateOrganizationPool(ctx, orgID, createPoolParams)
if err != nil {
return params.Pool{}, errors.Wrap(err, "creating pool")
}
return pool, nil
}
func (r *Runner) GetOrgPoolByID(ctx context.Context, orgID, poolID string) (params.Pool, error) {
if !auth.IsAdmin(ctx) {
return params.Pool{}, runnerErrors.ErrUnauthorized
}
pool, err := r.store.GetOrganizationPool(ctx, orgID, poolID)
if err != nil {
return params.Pool{}, errors.Wrap(err, "fetching pool")
}
return pool, nil
}
func (r *Runner) DeleteOrgPool(ctx context.Context, orgID, poolID string) error {
if !auth.IsAdmin(ctx) {
return runnerErrors.ErrUnauthorized
}
// TODO: dedup instance count verification
pool, err := r.store.GetOrganizationPool(ctx, orgID, poolID)
if err != nil {
return errors.Wrap(err, "fetching pool")
}
instances, err := r.store.ListInstances(ctx, pool.ID)
if err != nil {
return errors.Wrap(err, "fetching instances")
}
// TODO: implement a count function
if len(instances) > 0 {
runnerIDs := []string{}
for _, run := range instances {
runnerIDs = append(runnerIDs, run.ID)
}
return runnerErrors.NewBadRequestError("pool has runners: %s", strings.Join(runnerIDs, ", "))
}
if err := r.store.DeleteOrganizationPool(ctx, orgID, poolID); err != nil {
return errors.Wrap(err, "deleting pool")
}
return nil
}
func (r *Runner) ListOrgPools(ctx context.Context, orgID string) ([]params.Pool, error) {
if !auth.IsAdmin(ctx) {
return []params.Pool{}, runnerErrors.ErrUnauthorized
}
pools, err := r.store.ListOrgPools(ctx, orgID)
if err != nil {
return nil, errors.Wrap(err, "fetching pools")
}
return pools, nil
}
func (r *Runner) UpdateOrgPool(ctx context.Context, orgID, poolID string, param params.UpdatePoolParams) (params.Pool, error) {
if !auth.IsAdmin(ctx) {
return params.Pool{}, runnerErrors.ErrUnauthorized
}
pool, err := r.store.GetOrganizationPool(ctx, orgID, poolID)
if err != nil {
return params.Pool{}, errors.Wrap(err, "fetching pool")
}
maxRunners := pool.MaxRunners
minIdleRunners := pool.MinIdleRunners
if param.MaxRunners != nil {
maxRunners = *param.MaxRunners
}
if param.MinIdleRunners != nil {
minIdleRunners = *param.MinIdleRunners
}
if minIdleRunners > maxRunners {
return params.Pool{}, runnerErrors.NewBadRequestError("min_idle_runners cannot be larger than max_runners")
}
newPool, err := r.store.UpdateOrganizationPool(ctx, orgID, poolID, param)
if err != nil {
return params.Pool{}, errors.Wrap(err, "updating pool")
}
return newPool, nil
}
func (r *Runner) ListOrgInstances(ctx context.Context, orgID string) ([]params.Instance, error) {
if !auth.IsAdmin(ctx) {
return nil, runnerErrors.ErrUnauthorized
}
instances, err := r.store.ListOrgInstances(ctx, orgID)
if err != nil {
return []params.Instance{}, errors.Wrap(err, "fetching instances")
}
return instances, nil
}
func (r *Runner) loadOrgPoolManager(org params.Organization) (common.PoolManager, error) {
cfg, err := r.getInternalConfig(org.CredentialsName)
if err != nil {
return nil, errors.Wrap(err, "fetching internal config")
}
org.Internal = cfg
poolManager, err := pool.NewOrganizationPoolManager(r.ctx, org, r.providers, r.store)
if err != nil {
return nil, errors.Wrap(err, "creating pool manager")
}
return poolManager, nil
}
func (r *Runner) findOrgPoolManager(name string) (common.PoolManager, error) {
r.mux.Lock()
defer r.mux.Unlock()
org, err := r.store.GetOrganization(r.ctx, name)
if err != nil {
return nil, errors.Wrap(err, "fetching repo")
}
if orgPoolMgr, ok := r.organizations[org.ID]; ok {
return orgPoolMgr, nil
}
return nil, errors.Wrapf(runnerErrors.ErrNotFound, "organization %s not configured", name)
}

View file

@ -1,6 +1,612 @@
package pool
import (
"context"
"fmt"
"garm/auth"
dbCommon "garm/database/common"
runnerErrors "garm/errors"
"garm/params"
"garm/runner/common"
providerCommon "garm/runner/providers/common"
"log"
"strings"
"sync"
"time"
"github.com/google/go-github/v43/github"
"github.com/google/uuid"
"github.com/pkg/errors"
)
var (
poolIDLabelprefix = "runner-pool-id:"
controllerLabelPrefix = "runner-controller-id:"
)
type basePool struct {
ctx context.Context
controllerID string
store dbCommon.Store
providers map[string]common.Provider
tools []*github.RunnerApplicationDownload
quit chan struct{}
done chan struct{}
helper poolHelper
mux sync.Mutex
}
// cleanupOrphanedProviderRunners compares runners in github with local runners and removes
// any local runners that are not present in Github. Runners that are "idle" in our
// provider, but do not exist in github, will be removed. This can happen if the
// garm was offline while a job was executed by a github action. When this
// happens, github will remove the ephemeral worker and send a webhook our way.
// If we were offline and did not process the webhook, the instance will linger.
// We need to remove it from the provider and database.
func (r *basePool) cleanupOrphanedProviderRunners(runners []*github.Runner) error {
// runners, err := r.getGithubRunners()
// if err != nil {
// return errors.Wrap(err, "fetching github runners")
// }
dbInstances, err := r.helper.FetchDbInstances()
if err != nil {
return errors.Wrap(err, "fetching instances from db")
}
runnerNames := map[string]bool{}
for _, run := range runners {
runnerNames[*run.Name] = true
}
for _, instance := range dbInstances {
if providerCommon.InstanceStatus(instance.Status) == providerCommon.InstancePendingCreate || providerCommon.InstanceStatus(instance.Status) == providerCommon.InstancePendingDelete {
// this instance is in the process of being created or is awaiting deletion.
// Instances in pending_Create did not get a chance to register themselves in,
// github so we let them be for now.
continue
}
if ok := runnerNames[instance.Name]; !ok {
// Set pending_delete on DB field. Allow consolidate() to remove it.
updateParams := params.UpdateInstanceParams{
RunnerStatus: providerCommon.RunnerStatus(providerCommon.InstancePendingDelete),
}
_, err = r.store.UpdateInstance(r.ctx, instance.ID, updateParams)
if err != nil {
return errors.Wrap(err, "syncing local state with github")
}
}
}
return nil
}
func (r *basePool) fetchInstanceFromJob(job params.WorkflowJob) (params.Instance, error) {
runnerName := job.WorkflowJob.RunnerName
runner, err := r.store.GetInstanceByName(r.ctx, runnerName)
if err != nil {
return params.Instance{}, errors.Wrap(err, "fetching instance")
}
return runner, nil
}
func (r *basePool) setInstanceRunnerStatus(job params.WorkflowJob, status providerCommon.RunnerStatus) error {
runner, err := r.fetchInstanceFromJob(job)
if err != nil {
return errors.Wrap(err, "fetching instance")
}
updateParams := params.UpdateInstanceParams{
RunnerStatus: status,
}
log.Printf("setting runner status for %s to %s", runner.Name, status)
if _, err := r.store.UpdateInstance(r.ctx, runner.ID, updateParams); err != nil {
return errors.Wrap(err, "updating runner state")
}
return nil
}
func (r *basePool) setInstanceStatus(job params.WorkflowJob, status providerCommon.InstanceStatus) error {
runner, err := r.fetchInstanceFromJob(job)
if err != nil {
return errors.Wrap(err, "fetching instance")
}
updateParams := params.UpdateInstanceParams{
Status: status,
}
if _, err := r.store.UpdateInstance(r.ctx, runner.ID, updateParams); err != nil {
return errors.Wrap(err, "updating runner state")
}
return nil
}
func (r *basePool) acquireNewInstance(job params.WorkflowJob) error {
requestedLabels := job.WorkflowJob.Labels
if len(requestedLabels) == 0 {
// no labels were requested.
return nil
}
pool, err := r.helper.FindPoolByTags(requestedLabels)
if err != nil {
return errors.Wrap(err, "fetching suitable pool")
}
log.Printf("adding new runner with requested tags %s in pool %s", strings.Join(job.WorkflowJob.Labels, ", "), pool.ID)
if !pool.Enabled {
log.Printf("selected pool (%s) is disabled", pool.ID)
return nil
}
// TODO: implement count
poolInstances, err := r.store.ListInstances(r.ctx, pool.ID)
if err != nil {
return errors.Wrap(err, "fetching instances")
}
if len(poolInstances) >= int(pool.MaxRunners) {
log.Printf("max_runners (%d) reached for pool %s, skipping...", pool.MaxRunners, pool.ID)
return nil
}
if err := r.AddRunner(r.ctx, pool.ID); err != nil {
log.Printf("failed to add runner to pool %s", pool.ID)
return errors.Wrap(err, "adding runner")
}
return nil
}
func (r *basePool) AddRunner(ctx context.Context, poolID string) error {
pool, err := r.helper.GetPoolByID(poolID)
if err != nil {
return errors.Wrap(err, "fetching pool")
}
name := fmt.Sprintf("garm-%s", uuid.New())
createParams := params.CreateInstanceParams{
Name: name,
Pool: poolID,
Status: providerCommon.InstancePendingCreate,
RunnerStatus: providerCommon.RunnerPending,
OSArch: pool.OSArch,
OSType: pool.OSType,
CallbackURL: r.helper.GetCallbackURL(),
}
instance, err := r.store.CreateInstance(r.ctx, poolID, createParams)
if err != nil {
return errors.Wrap(err, "creating instance")
}
updateParams := params.UpdateInstanceParams{
OSName: instance.OSName,
OSVersion: instance.OSVersion,
Addresses: instance.Addresses,
Status: instance.Status,
ProviderID: instance.ProviderID,
}
if _, err := r.store.UpdateInstance(r.ctx, instance.ID, updateParams); err != nil {
return errors.Wrap(err, "updating runner state")
}
return nil
}
func (r *basePool) loop() {
defer func() {
log.Printf("repository %s loop exited", r.helper.String())
close(r.done)
}()
log.Printf("starting loop for %s", r.helper.String())
// TODO: Consolidate runners on loop start. Provider runners must match runners
// in github and DB. When a Workflow job is received, we will first create/update
// an entity in the database, before sending the request to the provider to create/delete
// an instance. If a "queued" job is received, we create an entity in the db with
// a state of "pending_create". Once that instance is up and calls home, it is marked
// as "active". If a "completed" job is received from github, we mark the instance
// as "pending_delete". Once the provider deletes the instance, we mark it as "deleted"
// in the database.
// We also ensure we have runners created based on pool characteristics. This is where
// we spin up "MinWorkers" for each runner type.
for {
select {
case <-time.After(5 * time.Second):
// consolidate.
r.consolidate()
case <-time.After(3 * time.Hour):
// Update tools cache.
tools, err := r.helper.FetchTools()
if err != nil {
log.Printf("failed to update tools for repo %s: %s", r.helper.String(), err)
}
r.mux.Lock()
r.tools = tools
r.mux.Unlock()
case <-r.ctx.Done():
// daemon is shutting down.
return
case <-r.quit:
// this worker was stopped.
return
}
}
}
func (r *basePool) addInstanceToProvider(instance params.Instance) error {
pool, err := r.helper.GetPoolByID(instance.PoolID)
if err != nil {
return errors.Wrap(err, "fetching pool")
}
provider, ok := r.providers[pool.ProviderName]
if !ok {
return runnerErrors.NewNotFoundError("invalid provider ID")
}
labels := []string{}
for _, tag := range pool.Tags {
labels = append(labels, tag.Name)
}
labels = append(labels, r.controllerLabel())
labels = append(labels, r.poolLabel(pool.ID))
tk, err := r.helper.GetGithubRegistrationToken()
if err != nil {
return errors.Wrap(err, "fetching registration token")
}
entity := r.helper.String()
jwtToken, err := auth.NewInstanceJWTToken(instance, r.helper.JwtToken(), entity, common.RepositoryPool)
if err != nil {
return errors.Wrap(err, "fetching instance jwt token")
}
bootstrapArgs := params.BootstrapInstance{
Name: instance.Name,
Tools: r.tools,
RepoURL: r.helper.GithubURL(),
GithubRunnerAccessToken: tk,
CallbackURL: instance.CallbackURL,
InstanceToken: jwtToken,
OSArch: pool.OSArch,
Flavor: pool.Flavor,
Image: pool.Image,
Labels: labels,
}
providerInstance, err := provider.CreateInstance(r.ctx, bootstrapArgs)
if err != nil {
return errors.Wrap(err, "creating instance")
}
updateInstanceArgs := r.updateArgsFromProviderInstance(providerInstance)
if _, err := r.store.UpdateInstance(r.ctx, instance.ID, updateInstanceArgs); err != nil {
return errors.Wrap(err, "updating instance")
}
return nil
}
func (r *basePool) poolIDFromStringLabels(labels []string) (string, error) {
for _, lbl := range labels {
if strings.HasPrefix(lbl, poolIDLabelprefix) {
return lbl[len(poolIDLabelprefix):], nil
}
}
return "", runnerErrors.ErrNotFound
}
func (r *basePool) HandleWorkflowJob(job params.WorkflowJob) error {
if err := r.helper.ValidateOwner(job); err != nil {
return errors.Wrap(err, "validating owner")
}
switch job.Action {
case "queued":
// Create instance in database and set it to pending create.
if err := r.acquireNewInstance(job); err != nil {
log.Printf("failed to add instance")
}
case "completed":
// Set instance in database to pending delete.
if job.WorkflowJob.RunnerName == "" {
// Unassigned jobs will have an empty runner_name.
// There is nothing to to in this case.
log.Printf("no runner was assigned. Skipping.")
return nil
}
log.Printf("marking instance %s as pending_delete", job.WorkflowJob.RunnerName)
if err := r.setInstanceStatus(job, providerCommon.InstancePendingDelete); err != nil {
log.Printf("failed to update runner %s status", job.WorkflowJob.RunnerName)
return errors.Wrap(err, "updating runner")
}
case "in_progress":
// update instance workload state. Set job_id in instance state.
if err := r.setInstanceRunnerStatus(job, providerCommon.RunnerActive); err != nil {
log.Printf("failed to update runner %s status", job.WorkflowJob.RunnerName)
return errors.Wrap(err, "updating runner")
}
}
return nil
}
func (r *basePool) poolLabel(poolID string) string {
return fmt.Sprintf("%s%s", poolIDLabelprefix, poolID)
}
func (r *basePool) controllerLabel() string {
return fmt.Sprintf("%s%s", controllerLabelPrefix, r.controllerID)
}
func (r *basePool) updateArgsFromProviderInstance(providerInstance params.Instance) params.UpdateInstanceParams {
return params.UpdateInstanceParams{
ProviderID: providerInstance.ProviderID,
OSName: providerInstance.OSName,
OSVersion: providerInstance.OSVersion,
Addresses: providerInstance.Addresses,
Status: providerInstance.Status,
RunnerStatus: providerInstance.RunnerStatus,
}
}
func (r *basePool) ensureMinIdleRunners() {
pools, err := r.helper.ListPools()
if err != nil {
log.Printf("error listing pools: %s", err)
return
}
for _, pool := range pools {
if !pool.Enabled {
log.Printf("pool %s is disabled, skipping", pool.ID)
continue
}
existingInstances, err := r.store.ListInstances(r.ctx, pool.ID)
if err != nil {
log.Printf("failed to ensure minimum idle workers for pool %s: %s", pool.ID, err)
return
}
// asJs, _ := json.MarshalIndent(existingInstances, "", " ")
// log.Printf(">>> %s", string(asJs))
if uint(len(existingInstances)) >= pool.MaxRunners {
log.Printf("max workers (%d) reached for pool %s, skipping idle worker creation", pool.MaxRunners, pool.ID)
continue
}
idleOrPendingWorkers := []params.Instance{}
for _, inst := range existingInstances {
if providerCommon.RunnerStatus(inst.RunnerStatus) != providerCommon.RunnerActive {
idleOrPendingWorkers = append(idleOrPendingWorkers, inst)
}
}
var required int
if len(idleOrPendingWorkers) < int(pool.MinIdleRunners) {
// get the needed delta.
required = int(pool.MinIdleRunners) - len(idleOrPendingWorkers)
projectedInstanceCount := len(existingInstances) + required
if uint(projectedInstanceCount) > pool.MaxRunners {
// ensure we don't go above max workers
delta := projectedInstanceCount - int(pool.MaxRunners)
required = required - delta
}
}
for i := 0; i < required; i++ {
log.Printf("addind new idle worker to pool %s", pool.ID)
if err := r.AddRunner(r.ctx, pool.ID); err != nil {
log.Printf("failed to add new instance for pool %s: %s", pool.ID, err)
}
}
}
}
// cleanupOrphanedGithubRunners will forcefully remove any github runners that appear
// as offline and for which we no longer have a local instance.
// This may happen if someone manually deletes the instance in the provider. We need to
// first remove the instance from github, and then from our database.
func (r *basePool) cleanupOrphanedGithubRunners(runners []*github.Runner) error {
for _, runner := range runners {
status := runner.GetStatus()
if status != "offline" {
// Runner is online. Ignore it.
continue
}
removeRunner := false
poolID, err := r.poolIDFromLabels(runner.Labels)
if err != nil {
if !errors.Is(err, runnerErrors.ErrNotFound) {
return errors.Wrap(err, "finding pool")
}
// not a runner we manage
continue
}
pool, err := r.helper.GetPoolByID(poolID)
if err != nil {
if !errors.Is(err, runnerErrors.ErrNotFound) {
return errors.Wrap(err, "fetching pool")
}
// not pool we manage.
continue
}
dbInstance, err := r.store.GetPoolInstanceByName(r.ctx, poolID, *runner.Name)
if err != nil {
if !errors.Is(err, runnerErrors.ErrNotFound) {
return errors.Wrap(err, "fetching instance from DB")
}
// We no longer have a DB entry for this instance. Previous forceful
// removal may have failed?
removeRunner = true
} else {
if providerCommon.InstanceStatus(dbInstance.Status) == providerCommon.InstancePendingDelete {
// already marked for deleting. Let consolidate take care of it.
continue
}
// check if the provider still has the instance.
provider, ok := r.providers[pool.ProviderName]
if !ok {
return fmt.Errorf("unknown provider %s for pool %s", pool.ProviderName, pool.ID)
}
if providerCommon.InstanceStatus(dbInstance.Status) == providerCommon.InstanceRunning {
// instance is running, but github reports runner as offline. Log the event.
// This scenario requires manual intervention.
// Perhaps it just came online and github did not yet change it's status?
log.Printf("instance %s is online but github reports runner as offline", dbInstance.Name)
continue
}
//start the instance
if err := provider.Start(r.ctx, dbInstance.ProviderID); err != nil {
return errors.Wrapf(err, "starting instance %s", dbInstance.ProviderID)
}
// we started the instance. Give it a chance to come online
continue
}
if removeRunner {
if err := r.helper.RemoveGithubRunner(*runner.ID); err != nil {
return errors.Wrap(err, "removing runner")
}
}
}
return nil
}
func (r *basePool) deleteInstanceFromProvider(instance params.Instance) error {
pool, err := r.helper.GetPoolByID(instance.PoolID)
if err != nil {
return errors.Wrap(err, "fetching pool")
}
provider, ok := r.providers[pool.ProviderName]
if !ok {
return runnerErrors.NewNotFoundError("invalid provider ID")
}
if err := provider.DeleteInstance(r.ctx, instance.ProviderID); err != nil {
return errors.Wrap(err, "removing instance")
}
if err := r.store.DeleteInstance(r.ctx, pool.ID, instance.Name); err != nil {
return errors.Wrap(err, "deleting instance from database")
}
return nil
}
func (r *basePool) poolIDFromLabels(labels []*github.RunnerLabels) (string, error) {
for _, lbl := range labels {
if strings.HasPrefix(*lbl.Name, poolIDLabelprefix) {
labelName := *lbl.Name
return labelName[len(poolIDLabelprefix):], nil
}
}
return "", runnerErrors.ErrNotFound
}
func (r *basePool) deletePendingInstances() {
instances, err := r.helper.FetchDbInstances()
if err != nil {
log.Printf("failed to fetch instances from store: %s", err)
return
}
for _, instance := range instances {
if instance.Status != providerCommon.InstancePendingDelete {
// not in pending_delete status. Skip.
continue
}
if err := r.deleteInstanceFromProvider(instance); err != nil {
log.Printf("failed to delete instance from provider: %+v", err)
}
}
}
func (r *basePool) addPendingInstances() {
// TODO: filter instances by status.
instances, err := r.helper.FetchDbInstances()
if err != nil {
log.Printf("failed to fetch instances from store: %s", err)
return
}
for _, instance := range instances {
if instance.Status != providerCommon.InstancePendingCreate {
// not in pending_create status. Skip.
continue
}
// asJs, _ := json.MarshalIndent(instance, "", " ")
// log.Printf(">>> %s", string(asJs))
if err := r.addInstanceToProvider(instance); err != nil {
log.Printf("failed to create instance in provider: %s", err)
}
}
}
func (r *basePool) consolidate() {
r.mux.Lock()
defer r.mux.Unlock()
r.deletePendingInstances()
r.addPendingInstances()
r.ensureMinIdleRunners()
}
func (r *basePool) Wait() error {
select {
case <-r.done:
case <-time.After(20 * time.Second):
return errors.Wrap(runnerErrors.ErrTimeout, "waiting for pool to stop")
}
return nil
}
func (r *basePool) Start() error {
tools, err := r.helper.FetchTools()
if err != nil {
return errors.Wrap(err, "initializing tools")
}
r.mux.Lock()
r.tools = tools
r.mux.Unlock()
runners, err := r.helper.GetGithubRunners()
if err != nil {
return errors.Wrap(err, "fetching github runners")
}
if err := r.cleanupOrphanedProviderRunners(runners); err != nil {
return errors.Wrap(err, "cleaning orphaned instances")
}
if err := r.cleanupOrphanedGithubRunners(runners); err != nil {
return errors.Wrap(err, "cleaning orphaned github runners")
}
go r.loop()
return nil
}
func (r *basePool) Stop() error {
close(r.quit)
return nil
}
func (r *basePool) RefreshState(param params.UpdatePoolStateParams) error {
return r.helper.UpdateState(param)
}
func (r *basePool) WebhookSecret() string {
return r.helper.WebhookSecret()
}

26
runner/pool/interfaces.go Normal file
View file

@ -0,0 +1,26 @@
package pool
import (
"garm/params"
"github.com/google/go-github/v43/github"
)
type poolHelper interface {
GetGithubToken() string
GetGithubRunners() ([]*github.Runner, error)
FetchTools() ([]*github.RunnerApplicationDownload, error)
FetchDbInstances() ([]params.Instance, error)
RemoveGithubRunner(runnerID int64) error
ListPools() ([]params.Pool, error)
GithubURL() string
JwtToken() string
GetGithubRegistrationToken() (string, error)
String() string
GetCallbackURL() string
FindPoolByTags(labels []string) (params.Pool, error)
GetPoolByID(poolID string) (params.Pool, error)
ValidateOwner(job params.WorkflowJob) error
UpdateState(param params.UpdatePoolStateParams) error
WebhookSecret() string
}

164
runner/pool/organization.go Normal file
View file

@ -0,0 +1,164 @@
package pool
import (
"context"
"fmt"
"sync"
"garm/config"
dbCommon "garm/database/common"
runnerErrors "garm/errors"
"garm/params"
"garm/runner/common"
"garm/util"
"github.com/google/go-github/v43/github"
"github.com/pkg/errors"
)
// test that we implement PoolManager
var _ poolHelper = &organization{}
func NewOrganizationPoolManager(ctx context.Context, cfg params.Organization, providers map[string]common.Provider, store dbCommon.Store) (common.PoolManager, error) {
ghc, err := util.GithubClient(ctx, cfg.Internal.OAuth2Token)
if err != nil {
return nil, errors.Wrap(err, "getting github client")
}
helper := &organization{
cfg: cfg,
ctx: ctx,
ghcli: ghc,
id: cfg.ID,
store: store,
}
repo := &basePool{
ctx: ctx,
store: store,
providers: providers,
controllerID: cfg.Internal.ControllerID,
quit: make(chan struct{}),
done: make(chan struct{}),
helper: helper,
}
return repo, nil
}
type organization struct {
cfg params.Organization
ctx context.Context
ghcli *github.Client
id string
store dbCommon.Store
mux sync.Mutex
}
func (r *organization) UpdateState(param params.UpdatePoolStateParams) error {
r.mux.Lock()
defer r.mux.Unlock()
r.cfg.WebhookSecret = param.WebhookSecret
r.cfg.Internal = param.Internal
ghc, err := util.GithubClient(r.ctx, r.GetGithubToken())
if err != nil {
return errors.Wrap(err, "getting github client")
}
r.ghcli = ghc
return nil
}
func (r *organization) GetGithubToken() string {
return r.cfg.Internal.OAuth2Token
}
func (r *organization) GetGithubRunners() ([]*github.Runner, error) {
runners, _, err := r.ghcli.Actions.ListOrganizationRunners(r.ctx, r.cfg.Name, nil)
if err != nil {
return nil, errors.Wrap(err, "fetching runners")
}
return runners.Runners, nil
}
func (r *organization) FetchTools() ([]*github.RunnerApplicationDownload, error) {
r.mux.Lock()
defer r.mux.Unlock()
tools, _, err := r.ghcli.Actions.ListOrganizationRunnerApplicationDownloads(r.ctx, r.cfg.Name)
if err != nil {
return nil, errors.Wrap(err, "fetching runner tools")
}
return tools, nil
}
func (r *organization) FetchDbInstances() ([]params.Instance, error) {
return r.store.ListRepoInstances(r.ctx, r.id)
}
func (r *organization) RemoveGithubRunner(runnerID int64) error {
_, err := r.ghcli.Actions.RemoveOrganizationRunner(r.ctx, r.cfg.Name, runnerID)
return errors.Wrap(err, "removing runner")
}
func (r *organization) ListPools() ([]params.Pool, error) {
pools, err := r.store.ListRepoPools(r.ctx, r.id)
if err != nil {
return nil, errors.Wrap(err, "fetching pools")
}
return pools, nil
}
func (r *organization) GithubURL() string {
return fmt.Sprintf("%s/%s", config.GithubBaseURL, r.cfg.Name)
}
func (r *organization) JwtToken() string {
return r.cfg.Internal.JWTSecret
}
func (r *organization) GetGithubRegistrationToken() (string, error) {
tk, _, err := r.ghcli.Actions.CreateOrganizationRegistrationToken(r.ctx, r.cfg.Name)
if err != nil {
return "", errors.Wrap(err, "creating runner token")
}
return *tk.Token, nil
}
func (r *organization) String() string {
return fmt.Sprintf("%s", r.cfg.Name)
}
func (r *organization) WebhookSecret() string {
return r.cfg.WebhookSecret
}
func (r *organization) GetCallbackURL() string {
return r.cfg.Internal.InstanceCallbackURL
}
func (r *organization) FindPoolByTags(labels []string) (params.Pool, error) {
pool, err := r.store.FindRepositoryPoolByTags(r.ctx, r.id, labels)
if err != nil {
return params.Pool{}, errors.Wrap(err, "fetching suitable pool")
}
return pool, nil
}
func (r *organization) GetPoolByID(poolID string) (params.Pool, error) {
pool, err := r.store.GetRepositoryPool(r.ctx, r.id, poolID)
if err != nil {
return params.Pool{}, errors.Wrap(err, "fetching pool")
}
return pool, nil
}
func (r *organization) ValidateOwner(job params.WorkflowJob) error {
if job.Organization.Login != r.cfg.Name {
return runnerErrors.NewBadRequestError("job not meant for this pool manager")
}
return nil
}

View file

@ -3,27 +3,21 @@ package pool
import (
"context"
"fmt"
"log"
"strings"
"sync"
"time"
"garm/auth"
"garm/config"
dbCommon "garm/database/common"
runnerErrors "garm/errors"
"garm/params"
"garm/runner/common"
providerCommon "garm/runner/providers/common"
"garm/util"
"github.com/google/go-github/v43/github"
"github.com/google/uuid"
"github.com/pkg/errors"
)
// test that we implement PoolManager
var _ common.PoolManager = &Repository{}
var _ poolHelper = &repository{}
func NewRepositoryPoolManager(ctx context.Context, cfg params.Repository, providers map[string]common.Provider, store dbCommon.Store) (common.PoolManager, error) {
ghc, err := util.GithubClient(ctx, cfg.Internal.OAuth2Token)
@ -31,42 +25,46 @@ func NewRepositoryPoolManager(ctx context.Context, cfg params.Repository, provid
return nil, errors.Wrap(err, "getting github client")
}
repo := &Repository{
helper := &repository{
cfg: cfg,
ctx: ctx,
ghcli: ghc,
id: cfg.ID,
store: store,
}
repo := &basePool{
ctx: ctx,
cfg: cfg,
ghcli: ghc,
id: cfg.ID,
store: store,
providers: providers,
controllerID: cfg.Internal.ControllerID,
quit: make(chan struct{}),
done: make(chan struct{}),
helper: helper,
}
return repo, nil
}
type Repository struct {
ctx context.Context
controllerID string
cfg params.Repository
store dbCommon.Store
ghcli *github.Client
providers map[string]common.Provider
tools []*github.RunnerApplicationDownload
quit chan struct{}
done chan struct{}
id string
var _ poolHelper = &repository{}
type repository struct {
cfg params.Repository
ctx context.Context
ghcli *github.Client
id string
store dbCommon.Store
mux sync.Mutex
}
func (r *Repository) RefreshState(cfg params.Repository) error {
func (r *repository) UpdateState(param params.UpdatePoolStateParams) error {
r.mux.Lock()
defer r.mux.Unlock()
r.cfg = cfg
ghc, err := util.GithubClient(r.ctx, r.cfg.Internal.OAuth2Token)
r.cfg.WebhookSecret = param.WebhookSecret
r.cfg.Internal = param.Internal
ghc, err := util.GithubClient(r.ctx, r.GetGithubToken())
if err != nil {
return errors.Wrap(err, "getting github client")
}
@ -74,7 +72,11 @@ func (r *Repository) RefreshState(cfg params.Repository) error {
return nil
}
func (r *Repository) getGithubRunners() ([]*github.Runner, error) {
func (r *repository) GetGithubToken() string {
return r.cfg.Internal.OAuth2Token
}
func (r *repository) GetGithubRunners() ([]*github.Runner, error) {
runners, _, err := r.ghcli.Actions.ListRunners(r.ctx, r.cfg.Owner, r.cfg.Name, nil)
if err != nil {
return nil, errors.Wrap(err, "fetching runners")
@ -83,580 +85,82 @@ func (r *Repository) getGithubRunners() ([]*github.Runner, error) {
return runners.Runners, nil
}
func (r *Repository) getProviderInstances() ([]params.Instance, error) {
return nil, nil
}
func (r *Repository) Start() error {
if err := r.fetchTools(); err != nil {
return errors.Wrap(err, "initializing tools")
}
runners, err := r.getGithubRunners()
if err != nil {
return errors.Wrap(err, "fetching github runners")
}
if err := r.cleanupOrphanedProviderRunners(runners); err != nil {
return errors.Wrap(err, "cleaning orphaned instances")
}
if err := r.cleanupOrphanedGithubRunners(runners); err != nil {
return errors.Wrap(err, "cleaning orphaned github runners")
}
go r.loop()
return nil
}
func (r *Repository) Stop() error {
close(r.quit)
return nil
}
func (r *Repository) fetchTools() error {
func (r *repository) FetchTools() ([]*github.RunnerApplicationDownload, error) {
r.mux.Lock()
defer r.mux.Unlock()
tools, _, err := r.ghcli.Actions.ListRunnerApplicationDownloads(r.ctx, r.cfg.Owner, r.cfg.Name)
if err != nil {
return errors.Wrap(err, "fetching runner tools")
return nil, errors.Wrap(err, "fetching runner tools")
}
r.tools = tools
return nil
return tools, nil
}
func (r *Repository) Wait() error {
select {
case <-r.done:
case <-time.After(20 * time.Second):
return errors.Wrap(runnerErrors.ErrTimeout, "waiting for pool to stop")
}
return nil
func (r *repository) FetchDbInstances() ([]params.Instance, error) {
return r.store.ListRepoInstances(r.ctx, r.id)
}
func (r *Repository) consolidate() {
r.mux.Lock()
defer r.mux.Unlock()
r.deletePendingInstances()
r.addPendingInstances()
r.ensureMinIdleRunners()
func (r *repository) RemoveGithubRunner(runnerID int64) error {
_, err := r.ghcli.Actions.RemoveRunner(r.ctx, r.cfg.Owner, r.cfg.Name, runnerID)
return errors.Wrap(err, "removing runner")
}
func (r *Repository) addPendingInstances() {
// TODO: filter instances by status.
instances, err := r.store.ListRepoInstances(r.ctx, r.id)
if err != nil {
log.Printf("failed to fetch instances from store: %s", err)
return
}
for _, instance := range instances {
if instance.Status != providerCommon.InstancePendingCreate {
// not in pending_create status. Skip.
continue
}
// asJs, _ := json.MarshalIndent(instance, "", " ")
// log.Printf(">>> %s", string(asJs))
if err := r.addInstanceToProvider(instance); err != nil {
log.Printf("failed to create instance in provider: %s", err)
}
}
}
func (r *Repository) deletePendingInstances() {
instances, err := r.store.ListRepoInstances(r.ctx, r.id)
if err != nil {
log.Printf("failed to fetch instances from store: %s", err)
return
}
for _, instance := range instances {
if instance.Status != providerCommon.InstancePendingDelete {
// not in pending_delete status. Skip.
continue
}
if err := r.deleteInstanceFromProvider(instance); err != nil {
log.Printf("failed to delete instance from provider: %+v", err)
}
}
}
func (r *Repository) poolIDFromLabels(labels []*github.RunnerLabels) (string, error) {
for _, lbl := range labels {
if strings.HasPrefix(*lbl.Name, poolIDLabelprefix) {
labelName := *lbl.Name
return labelName[len(poolIDLabelprefix):], nil
}
}
return "", runnerErrors.ErrNotFound
}
// cleanupOrphanedGithubRunners will forcefully remove any github runners that appear
// as offline and for which we no longer have a local instance.
// This may happen if someone manually deletes the instance in the provider. We need to
// first remove the instance from github, and then from our database.
func (r *Repository) cleanupOrphanedGithubRunners(runners []*github.Runner) error {
for _, runner := range runners {
status := runner.GetStatus()
if status != "offline" {
// Runner is online. Ignore it.
continue
}
removeRunner := false
poolID, err := r.poolIDFromLabels(runner.Labels)
if err != nil {
if !errors.Is(err, runnerErrors.ErrNotFound) {
return errors.Wrap(err, "finding pool")
}
// not a runner we manage
continue
}
pool, err := r.store.GetRepositoryPool(r.ctx, r.id, poolID)
if err != nil {
if !errors.Is(err, runnerErrors.ErrNotFound) {
return errors.Wrap(err, "fetching pool")
}
// not pool we manage.
continue
}
dbInstance, err := r.store.GetPoolInstanceByName(r.ctx, poolID, *runner.Name)
if err != nil {
if !errors.Is(err, runnerErrors.ErrNotFound) {
return errors.Wrap(err, "fetching instance from DB")
}
// We no longer have a DB entry for this instance. Previous forceful
// removal may have failed?
removeRunner = true
} else {
if providerCommon.InstanceStatus(dbInstance.Status) == providerCommon.InstancePendingDelete {
// already marked for deleting. Let consolidate take care of it.
continue
}
// check if the provider still has the instance.
provider, ok := r.providers[pool.ProviderName]
if !ok {
return fmt.Errorf("unknown provider %s for pool %s", pool.ProviderName, pool.ID)
}
if providerCommon.InstanceStatus(dbInstance.Status) == providerCommon.InstanceRunning {
// instance is running, but github reports runner as offline. Log the event.
// This scenario requires manual intervention.
// Perhaps it just came online and github did not yet change it's status?
log.Printf("instance %s is online but github reports runner as offline", dbInstance.Name)
continue
}
//start the instance
if err := provider.Start(r.ctx, dbInstance.ProviderID); err != nil {
return errors.Wrapf(err, "starting instance %s", dbInstance.ProviderID)
}
// we started the instance. Give it a chance to come online
continue
}
if removeRunner {
if _, err := r.ghcli.Actions.RemoveRunner(r.ctx, r.cfg.Owner, r.cfg.Name, *runner.ID); err != nil {
return errors.Wrap(err, "removing runner")
}
}
}
return nil
}
// cleanupOrphanedProviderRunners compares runners in github with local runners and removes
// any local runners that are not present in Github. Runners that are "idle" in our
// provider, but do not exist in github, will be removed. This can happen if the
// garm was offline while a job was executed by a github action. When this
// happens, github will remove the ephemeral worker and send a webhook our way.
// If we were offline and did not process the webhook, the instance will linger.
// We need to remove it from the provider and database.
func (r *Repository) cleanupOrphanedProviderRunners(runners []*github.Runner) error {
// runners, err := r.getGithubRunners()
// if err != nil {
// return errors.Wrap(err, "fetching github runners")
// }
dbInstances, err := r.store.ListRepoInstances(r.ctx, r.id)
if err != nil {
return errors.Wrap(err, "fetching instances from db")
}
runnerNames := map[string]bool{}
for _, run := range runners {
runnerNames[*run.Name] = true
}
for _, instance := range dbInstances {
if providerCommon.InstanceStatus(instance.Status) == providerCommon.InstancePendingCreate || providerCommon.InstanceStatus(instance.Status) == providerCommon.InstancePendingDelete {
// this instance is in the process of being created or is awaiting deletion.
// Instances in pending_Create did not get a chance to register themselves in,
// github so we let them be for now.
continue
}
if ok := runnerNames[instance.Name]; !ok {
// Set pending_delete on DB field. Allow consolidate() to remove it.
_, err = r.store.UpdateInstance(r.ctx, instance.ID, params.UpdateInstanceParams{})
if err != nil {
return errors.Wrap(err, "syncing local state with github")
}
}
}
return nil
}
func (r *Repository) ensureMinIdleRunners() {
func (r *repository) ListPools() ([]params.Pool, error) {
pools, err := r.store.ListRepoPools(r.ctx, r.id)
if err != nil {
log.Printf("error listing pools: %s", err)
return
}
for _, pool := range pools {
if !pool.Enabled {
log.Printf("pool %s is disabled, skipping", pool.ID)
continue
}
existingInstances, err := r.store.ListInstances(r.ctx, pool.ID)
if err != nil {
log.Printf("failed to ensure minimum idle workers for pool %s: %s", pool.ID, err)
return
}
// asJs, _ := json.MarshalIndent(existingInstances, "", " ")
// log.Printf(">>> %s", string(asJs))
if uint(len(existingInstances)) >= pool.MaxRunners {
log.Printf("max workers (%d) reached for pool %s, skipping idle worker creation", pool.MaxRunners, pool.ID)
continue
}
idleOrPendingWorkers := []params.Instance{}
for _, inst := range existingInstances {
if providerCommon.RunnerStatus(inst.RunnerStatus) != providerCommon.RunnerActive {
idleOrPendingWorkers = append(idleOrPendingWorkers, inst)
}
}
var required int
if len(idleOrPendingWorkers) < int(pool.MinIdleRunners) {
// get the needed delta.
required = int(pool.MinIdleRunners) - len(idleOrPendingWorkers)
projectedInstanceCount := len(existingInstances) + required
if uint(projectedInstanceCount) > pool.MaxRunners {
// ensure we don't go above max workers
delta := projectedInstanceCount - int(pool.MaxRunners)
required = required - delta
}
}
for i := 0; i < required; i++ {
log.Printf("addind new idle worker to pool %s", pool.ID)
if err := r.AddRunner(r.ctx, pool.ID); err != nil {
log.Printf("failed to add new instance for pool %s: %s", pool.ID, err)
}
}
return nil, errors.Wrap(err, "fetching pools")
}
return pools, nil
}
func (r *Repository) githubURL() string {
func (r *repository) GithubURL() string {
return fmt.Sprintf("%s/%s/%s", config.GithubBaseURL, r.cfg.Owner, r.cfg.Name)
}
func (r *Repository) poolLabel(poolID string) string {
return fmt.Sprintf("%s%s", poolIDLabelprefix, poolID)
func (r *repository) JwtToken() string {
return r.cfg.Internal.JWTSecret
}
func (r *Repository) controllerLabel() string {
return fmt.Sprintf("%s%s", controllerLabelPrefix, r.controllerID)
}
func (r *Repository) updateArgsFromProviderInstance(providerInstance params.Instance) params.UpdateInstanceParams {
return params.UpdateInstanceParams{
ProviderID: providerInstance.ProviderID,
OSName: providerInstance.OSName,
OSVersion: providerInstance.OSVersion,
Addresses: providerInstance.Addresses,
Status: providerInstance.Status,
RunnerStatus: providerInstance.RunnerStatus,
}
}
func (r *Repository) deleteInstanceFromProvider(instance params.Instance) error {
pool, err := r.store.GetRepositoryPool(r.ctx, r.id, instance.PoolID)
if err != nil {
return errors.Wrap(err, "fetching pool")
}
provider, ok := r.providers[pool.ProviderName]
if !ok {
return runnerErrors.NewNotFoundError("invalid provider ID")
}
if err := provider.DeleteInstance(r.ctx, instance.ProviderID); err != nil {
return errors.Wrap(err, "removing instance")
}
if err := r.store.DeleteInstance(r.ctx, pool.ID, instance.Name); err != nil {
return errors.Wrap(err, "deleting instance from database")
}
return nil
}
func (r *Repository) addInstanceToProvider(instance params.Instance) error {
pool, err := r.store.GetRepositoryPool(r.ctx, r.id, instance.PoolID)
if err != nil {
return errors.Wrap(err, "fetching pool")
}
provider, ok := r.providers[pool.ProviderName]
if !ok {
return runnerErrors.NewNotFoundError("invalid provider ID")
}
labels := []string{}
for _, tag := range pool.Tags {
labels = append(labels, tag.Name)
}
labels = append(labels, r.controllerLabel())
labels = append(labels, r.poolLabel(pool.ID))
func (r *repository) GetGithubRegistrationToken() (string, error) {
tk, _, err := r.ghcli.Actions.CreateRegistrationToken(r.ctx, r.cfg.Owner, r.cfg.Name)
if err != nil {
return errors.Wrap(err, "creating runner token")
return "", errors.Wrap(err, "creating runner token")
}
entity := fmt.Sprintf("%s/%s", r.cfg.Owner, r.cfg.Name)
jwtToken, err := auth.NewInstanceJWTToken(instance, r.cfg.Internal.JWTSecret, entity, common.RepositoryPool)
if err != nil {
return errors.Wrap(err, "fetching instance jwt token")
}
bootstrapArgs := params.BootstrapInstance{
Name: instance.Name,
Tools: r.tools,
RepoURL: r.githubURL(),
GithubRunnerAccessToken: *tk.Token,
CallbackURL: instance.CallbackURL,
InstanceToken: jwtToken,
OSArch: pool.OSArch,
Flavor: pool.Flavor,
Image: pool.Image,
Labels: labels,
}
providerInstance, err := provider.CreateInstance(r.ctx, bootstrapArgs)
if err != nil {
return errors.Wrap(err, "creating instance")
}
updateInstanceArgs := r.updateArgsFromProviderInstance(providerInstance)
if _, err := r.store.UpdateInstance(r.ctx, instance.ID, updateInstanceArgs); err != nil {
return errors.Wrap(err, "updating instance")
}
return nil
return *tk.Token, nil
}
func (r *Repository) AddRunner(ctx context.Context, poolID string) error {
pool, err := r.store.GetRepositoryPool(r.ctx, r.id, poolID)
if err != nil {
return errors.Wrap(err, "fetching pool")
}
name := fmt.Sprintf("garm-%s", uuid.New())
createParams := params.CreateInstanceParams{
Name: name,
Pool: poolID,
Status: providerCommon.InstancePendingCreate,
RunnerStatus: providerCommon.RunnerPending,
OSArch: pool.OSArch,
OSType: pool.OSType,
CallbackURL: r.cfg.Internal.InstanceCallbackURL,
}
instance, err := r.store.CreateInstance(r.ctx, poolID, createParams)
if err != nil {
return errors.Wrap(err, "creating instance")
}
updateParams := params.UpdateInstanceParams{
OSName: instance.OSName,
OSVersion: instance.OSVersion,
Addresses: instance.Addresses,
Status: instance.Status,
ProviderID: instance.ProviderID,
}
if _, err := r.store.UpdateInstance(r.ctx, instance.ID, updateParams); err != nil {
return errors.Wrap(err, "updating runner state")
}
return nil
func (r *repository) String() string {
return fmt.Sprintf("%s/%s", r.cfg.Owner, r.cfg.Name)
}
func (r *Repository) loop() {
defer func() {
log.Printf("repository %s/%s loop exited", r.cfg.Owner, r.cfg.Name)
close(r.done)
}()
log.Printf("starting loop for %s/%s", r.cfg.Owner, r.cfg.Name)
// TODO: Consolidate runners on loop start. Provider runners must match runners
// in github and DB. When a Workflow job is received, we will first create/update
// an entity in the database, before sending the request to the provider to create/delete
// an instance. If a "queued" job is received, we create an entity in the db with
// a state of "pending_create". Once that instance is up and calls home, it is marked
// as "active". If a "completed" job is received from github, we mark the instance
// as "pending_delete". Once the provider deletes the instance, we mark it as "deleted"
// in the database.
// We also ensure we have runners created based on pool characteristics. This is where
// we spin up "MinWorkers" for each runner type.
for {
select {
case <-time.After(5 * time.Second):
// consolidate.
r.consolidate()
case <-time.After(3 * time.Hour):
// Update tools cache.
if err := r.fetchTools(); err != nil {
log.Printf("failed to update tools for repo %s/%s: %s", r.cfg.Owner, r.cfg.Name, err)
}
case <-r.ctx.Done():
// daemon is shutting down.
return
case <-r.quit:
// this worker was stopped.
return
}
}
}
func (r *Repository) WebhookSecret() string {
func (r *repository) WebhookSecret() string {
return r.cfg.WebhookSecret
}
func (r *Repository) poolIDFromStringLabels(labels []string) (string, error) {
for _, lbl := range labels {
if strings.HasPrefix(lbl, poolIDLabelprefix) {
return lbl[len(poolIDLabelprefix):], nil
}
}
return "", runnerErrors.ErrNotFound
func (r *repository) GetCallbackURL() string {
return r.cfg.Internal.InstanceCallbackURL
}
func (r *Repository) fetchInstanceFromJob(job params.WorkflowJob) (params.Instance, error) {
// asJs, _ := json.MarshalIndent(job, "", " ")
// log.Printf(">>> Job data: %s", string(asJs))
runnerName := job.WorkflowJob.RunnerName
runner, err := r.store.GetInstanceByName(r.ctx, runnerName)
func (r *repository) FindPoolByTags(labels []string) (params.Pool, error) {
pool, err := r.store.FindRepositoryPoolByTags(r.ctx, r.id, labels)
if err != nil {
return params.Instance{}, errors.Wrap(err, "fetching instance")
return params.Pool{}, errors.Wrap(err, "fetching suitable pool")
}
return runner, nil
return pool, nil
}
func (r *Repository) setInstanceRunnerStatus(job params.WorkflowJob, status providerCommon.RunnerStatus) error {
runner, err := r.fetchInstanceFromJob(job)
func (r *repository) GetPoolByID(poolID string) (params.Pool, error) {
pool, err := r.store.GetRepositoryPool(r.ctx, r.id, poolID)
if err != nil {
return errors.Wrap(err, "fetching instance")
return params.Pool{}, errors.Wrap(err, "fetching pool")
}
updateParams := params.UpdateInstanceParams{
RunnerStatus: status,
}
log.Printf("setting runner status for %s to %s", runner.Name, status)
if _, err := r.store.UpdateInstance(r.ctx, runner.ID, updateParams); err != nil {
return errors.Wrap(err, "updating runner state")
}
return nil
return pool, nil
}
func (r *Repository) setInstanceStatus(job params.WorkflowJob, status providerCommon.InstanceStatus) error {
runner, err := r.fetchInstanceFromJob(job)
if err != nil {
return errors.Wrap(err, "fetching instance")
}
updateParams := params.UpdateInstanceParams{
Status: status,
}
if _, err := r.store.UpdateInstance(r.ctx, runner.ID, updateParams); err != nil {
return errors.Wrap(err, "updating runner state")
}
return nil
}
func (r *Repository) acquireNewInstance(job params.WorkflowJob) error {
requestedLabels := job.WorkflowJob.Labels
if len(requestedLabels) == 0 {
// no labels were requested.
return nil
}
pool, err := r.store.FindRepositoryPoolByTags(r.ctx, r.id, requestedLabels)
if err != nil {
return errors.Wrap(err, "fetching suitable pool")
}
log.Printf("adding new runner with requested tags %s in pool %s", strings.Join(job.WorkflowJob.Labels, ", "), pool.ID)
if !pool.Enabled {
log.Printf("selected pool (%s) is disabled", pool.ID)
return nil
}
// TODO: implement count
poolInstances, err := r.store.ListInstances(r.ctx, pool.ID)
if err != nil {
return errors.Wrap(err, "fetching instances")
}
if len(poolInstances) >= int(pool.MaxRunners) {
log.Printf("max_runners (%d) reached for pool %s, skipping...", pool.MaxRunners, pool.ID)
return nil
}
if err := r.AddRunner(r.ctx, pool.ID); err != nil {
log.Printf("failed to add runner to pool %s", pool.ID)
return errors.Wrap(err, "adding runner")
}
return nil
}
func (r *Repository) HandleWorkflowJob(job params.WorkflowJob) error {
func (r *repository) ValidateOwner(job params.WorkflowJob) error {
if job.Repository.Name != r.cfg.Name || job.Repository.Owner.Login != r.cfg.Owner {
return runnerErrors.NewBadRequestError("job not meant for this pool manager")
}
switch job.Action {
case "queued":
// Create instance in database and set it to pending create.
if err := r.acquireNewInstance(job); err != nil {
log.Printf("failed to add instance")
}
case "completed":
// Set instance in database to pending delete.
if job.WorkflowJob.RunnerName == "" {
// Unassigned jobs will have an empty runner_name.
// There is nothing to to in this case.
log.Printf("no runner was assigned. Skipping.")
return nil
}
log.Printf("marking instance %s as pending_delete", job.WorkflowJob.RunnerName)
if err := r.setInstanceStatus(job, providerCommon.InstancePendingDelete); err != nil {
log.Printf("failed to update runner %s status", job.WorkflowJob.RunnerName)
return errors.Wrap(err, "updating runner")
}
case "in_progress":
// update instance workload state. Set job_id in instance state.
if err := r.setInstanceRunnerStatus(job, providerCommon.RunnerActive); err != nil {
log.Printf("failed to update runner %s status", job.WorkflowJob.RunnerName)
return errors.Wrap(err, "updating runner")
}
}
return nil
}

View file

@ -2,13 +2,12 @@ package runner
import (
"context"
"log"
"garm/auth"
runnerErrors "garm/errors"
"garm/params"
"garm/runner/common"
"garm/runner/pool"
"garm/util"
"log"
"strings"
"github.com/pkg/errors"
@ -150,9 +149,13 @@ func (r *Runner) UpdateRepository(ctx context.Context, repoID string, param para
if err != nil {
return params.Repository{}, errors.Wrap(err, "fetching internal config")
}
newState := params.UpdatePoolStateParams{
WebhookSecret: repo.WebhookSecret,
Internal: internalCfg,
}
repo.Internal = internalCfg
// stop the pool mgr
if err := poolMgr.RefreshState(repo); err != nil {
if err := poolMgr.RefreshState(newState); err != nil {
return params.Repository{}, errors.Wrap(err, "updating pool manager")
}
} else {
@ -179,49 +182,12 @@ func (r *Runner) CreateRepoPool(ctx context.Context, repoID string, param params
return params.Pool{}, runnerErrors.ErrNotFound
}
if err := param.Validate(); err != nil {
return params.Pool{}, errors.Wrapf(runnerErrors.ErrBadRequest, "validating params: %s", err)
}
if !IsSupportedOSType(param.OSType) {
return params.Pool{}, runnerErrors.NewBadRequestError("invalid OS type %s", param.OSType)
}
if !IsSupportedArch(param.OSArch) {
return params.Pool{}, runnerErrors.NewBadRequestError("invalid OS architecture %s", param.OSArch)
}
_, ok = r.providers[param.ProviderName]
if !ok {
return params.Pool{}, runnerErrors.NewBadRequestError("no such provider %s", param.ProviderName)
}
// github automatically adds the "self-hosted" tag as well as the OS type (linux, windows, etc)
// and architecture (arm, x64, etc) to all self hosted runners. When a workflow job comes in, we try
// to find a pool based on the labels that are set in the workflow. If we don't explicitly define these
// default tags for each pool, and the user targets these labels, we won't be able to match any pools.
// The downside is that all pools with the same OS and arch will have these default labels. Users should
// set distinct and unique labels on each pool, and explicitly target those labels, or risk assigning
// the job to the wrong worker type.
ghArch, err := util.ResolveToGithubArch(string(param.OSArch))
createPoolParams, err := r.appendTagsToCreatePoolParams(param)
if err != nil {
return params.Pool{}, errors.Wrap(err, "invalid arch")
return params.Pool{}, errors.Wrap(err, "fetching pool params")
}
osType, err := util.ResolveToGithubOSType(string(param.OSType))
if err != nil {
return params.Pool{}, errors.Wrap(err, "invalid os type")
}
extraLabels := []string{
"self-hosted",
ghArch,
osType,
}
param.Tags = append(param.Tags, extraLabels...)
pool, err := r.store.CreateRepositoryPool(ctx, repoID, param)
pool, err := r.store.CreateRepositoryPool(ctx, repoID, createPoolParams)
if err != nil {
return params.Pool{}, errors.Wrap(err, "creating pool")
}
@ -295,34 +261,6 @@ func (r *Runner) ListPoolInstances(ctx context.Context, poolID string) ([]params
return instances, nil
}
func (r *Runner) loadRepoPoolManager(repo params.Repository) (common.PoolManager, error) {
cfg, err := r.getInternalConfig(repo.CredentialsName)
if err != nil {
return nil, errors.Wrap(err, "fetching internal config")
}
repo.Internal = cfg
poolManager, err := pool.NewRepositoryPoolManager(r.ctx, repo, r.providers, r.store)
if err != nil {
return nil, errors.Wrap(err, "creating pool manager")
}
return poolManager, nil
}
func (r *Runner) findRepoPoolManager(owner, name string) (common.PoolManager, error) {
r.mux.Lock()
defer r.mux.Unlock()
repo, err := r.store.GetRepository(r.ctx, owner, name)
if err != nil {
return nil, errors.Wrap(err, "fetching repo")
}
if repo, ok := r.repositories[repo.ID]; ok {
return repo, nil
}
return nil, errors.Wrapf(runnerErrors.ErrNotFound, "repository %s/%s not configured", owner, name)
}
func (r *Runner) UpdateRepoPool(ctx context.Context, repoID, poolID string, param params.UpdatePoolParams) (params.Pool, error) {
if !auth.IsAdmin(ctx) {
return params.Pool{}, runnerErrors.ErrUnauthorized
@ -366,40 +304,30 @@ func (r *Runner) ListRepoInstances(ctx context.Context, repoID string) ([]params
return instances, nil
}
// TODO: move these in another file
func (r *Runner) GetInstance(ctx context.Context, instanceName string) (params.Instance, error) {
if !auth.IsAdmin(ctx) {
return params.Instance{}, runnerErrors.ErrUnauthorized
}
instance, err := r.store.GetInstanceByName(ctx, instanceName)
func (r *Runner) loadRepoPoolManager(repo params.Repository) (common.PoolManager, error) {
cfg, err := r.getInternalConfig(repo.CredentialsName)
if err != nil {
return params.Instance{}, errors.Wrap(err, "fetching instance")
return nil, errors.Wrap(err, "fetching internal config")
}
return instance, nil
repo.Internal = cfg
poolManager, err := pool.NewRepositoryPoolManager(r.ctx, repo, r.providers, r.store)
if err != nil {
return nil, errors.Wrap(err, "creating pool manager")
}
return poolManager, nil
}
func (r *Runner) AddInstanceStatusMessage(ctx context.Context, param params.InstanceUpdateMessage) error {
instanceID := auth.InstanceID(ctx)
if instanceID == "" {
return runnerErrors.ErrUnauthorized
func (r *Runner) findRepoPoolManager(owner, name string) (common.PoolManager, error) {
r.mux.Lock()
defer r.mux.Unlock()
repo, err := r.store.GetRepository(r.ctx, owner, name)
if err != nil {
return nil, errors.Wrap(err, "fetching repo")
}
if err := r.store.AddInstanceStatusMessage(ctx, instanceID, param.Message); err != nil {
return errors.Wrap(err, "adding status update")
if repo, ok := r.repositories[repo.ID]; ok {
return repo, nil
}
// if param.Status == providerCommon.RunnerIdle {
// }
updateParams := params.UpdateInstanceParams{
RunnerStatus: param.Status,
}
if _, err := r.store.UpdateInstance(r.ctx, instanceID, updateParams); err != nil {
return errors.Wrap(err, "updating runner state")
}
return nil
return nil, errors.Wrapf(runnerErrors.ErrNotFound, "repository %s/%s not configured", owner, name)
}

View file

@ -15,6 +15,7 @@ import (
"strings"
"sync"
"garm/auth"
"garm/config"
"garm/database"
dbCommon "garm/database/common"
@ -101,8 +102,6 @@ func (r *Runner) ListProviders(ctx context.Context) ([]params.Provider, error) {
ret := []params.Provider{}
for _, val := range r.providers {
params := val.AsParams()
log.Printf(">>>>> %s", params.Name)
ret = append(ret, val.AsParams())
}
return ret, nil
@ -140,6 +139,20 @@ func (r *Runner) loadReposAndOrgs() error {
r.repositories[repo.ID] = poolManager
}
orgs, err := r.store.ListOrganizations(r.ctx)
if err != nil {
return errors.Wrap(err, "fetching repositories")
}
for _, org := range orgs {
log.Printf("creating pool manager for organization %s", org.Name)
poolManager, err := r.loadOrgPoolManager(org)
if err != nil {
return errors.Wrap(err, "loading repo pool manager")
}
r.organizations[org.ID] = poolManager
}
return nil
}
@ -208,21 +221,6 @@ func (r *Runner) Wait() error {
return nil
}
func (r *Runner) findOrgPoolManager(name string) (common.PoolManager, error) {
r.mux.Lock()
defer r.mux.Unlock()
org, err := r.store.GetOrganization(r.ctx, name)
if err != nil {
return nil, errors.Wrap(err, "fetching repo")
}
if orgPoolMgr, ok := r.organizations[org.ID]; ok {
return orgPoolMgr, nil
}
return nil, errors.Wrapf(runnerErrors.ErrNotFound, "organization %s not configured", name)
}
func (r *Runner) validateHookBody(signature, secret string, body []byte) error {
if secret == "" {
// A secret was not set. Skip validation of body.
@ -382,3 +380,85 @@ func (r *Runner) ensureSSHKeys() error {
return nil
}
func (r *Runner) appendTagsToCreatePoolParams(param params.CreatePoolParams) (params.CreatePoolParams, error) {
if err := param.Validate(); err != nil {
return params.CreatePoolParams{}, errors.Wrapf(runnerErrors.ErrBadRequest, "validating params: %s", err)
}
if !IsSupportedOSType(param.OSType) {
return params.CreatePoolParams{}, runnerErrors.NewBadRequestError("invalid OS type %s", param.OSType)
}
if !IsSupportedArch(param.OSArch) {
return params.CreatePoolParams{}, runnerErrors.NewBadRequestError("invalid OS architecture %s", param.OSArch)
}
_, ok := r.providers[param.ProviderName]
if !ok {
return params.CreatePoolParams{}, runnerErrors.NewBadRequestError("no such provider %s", param.ProviderName)
}
// github automatically adds the "self-hosted" tag as well as the OS type (linux, windows, etc)
// and architecture (arm, x64, etc) to all self hosted runners. When a workflow job comes in, we try
// to find a pool based on the labels that are set in the workflow. If we don't explicitly define these
// default tags for each pool, and the user targets these labels, we won't be able to match any pools.
// The downside is that all pools with the same OS and arch will have these default labels. Users should
// set distinct and unique labels on each pool, and explicitly target those labels, or risk assigning
// the job to the wrong worker type.
ghArch, err := util.ResolveToGithubArch(string(param.OSArch))
if err != nil {
return params.CreatePoolParams{}, errors.Wrap(err, "invalid arch")
}
osType, err := util.ResolveToGithubOSType(string(param.OSType))
if err != nil {
return params.CreatePoolParams{}, errors.Wrap(err, "invalid os type")
}
extraLabels := []string{
"self-hosted",
ghArch,
osType,
}
param.Tags = append(param.Tags, extraLabels...)
return param, nil
}
func (r *Runner) GetInstance(ctx context.Context, instanceName string) (params.Instance, error) {
if !auth.IsAdmin(ctx) {
return params.Instance{}, runnerErrors.ErrUnauthorized
}
instance, err := r.store.GetInstanceByName(ctx, instanceName)
if err != nil {
return params.Instance{}, errors.Wrap(err, "fetching instance")
}
return instance, nil
}
func (r *Runner) AddInstanceStatusMessage(ctx context.Context, param params.InstanceUpdateMessage) error {
instanceID := auth.InstanceID(ctx)
if instanceID == "" {
return runnerErrors.ErrUnauthorized
}
if err := r.store.AddInstanceStatusMessage(ctx, instanceID, param.Message); err != nil {
return errors.Wrap(err, "adding status update")
}
// if param.Status == providerCommon.RunnerIdle {
// }
updateParams := params.UpdateInstanceParams{
RunnerStatus: param.Status,
}
if _, err := r.store.UpdateInstance(r.ctx, instanceID, updateParams); err != nil {
return errors.Wrap(err, "updating runner state")
}
return nil
}