Merge pull request #86 from gabriel-samfira/add-job-tracking

Add job tracking
This commit is contained in:
Gabriel 2023-07-04 10:23:42 +03:00 committed by GitHub
commit 0889f6c999
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
47 changed files with 1396 additions and 1138 deletions

View file

@ -306,3 +306,17 @@ func (a *APIController) ListProviders(w http.ResponseWriter, r *http.Request) {
log.Printf("failed to encode response: %q", err)
}
}
func (a *APIController) ListAllJobs(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
jobs, err := a.r.ListAllJobs(ctx)
if err != nil {
handleError(w, err)
return
}
w.Header().Set("Content-Type", "application/json")
if err := json.NewEncoder(w).Encode(jobs); err != nil {
log.Printf("failed to encode response: %q", err)
}
}

View file

@ -29,8 +29,10 @@ package routers
//go:generate swagger generate client --target=../../ --spec=../swagger.yaml
import (
_ "expvar" // Register the expvar handlers
"io"
"net/http"
_ "net/http/pprof" // Register the pprof handlers
"github.com/gorilla/mux"
"github.com/prometheus/client_golang/prometheus/promhttp"
@ -54,6 +56,15 @@ func WithMetricsRouter(parentRouter *mux.Router, disableAuth bool, metricsMiddle
return parentRouter
}
func WithDebugServer(parentRouter *mux.Router) *mux.Router {
if parentRouter == nil {
return nil
}
parentRouter.PathPrefix("/debug/pprof/").Handler(http.DefaultServeMux)
return parentRouter
}
func NewAPIRouter(han *controllers.APIController, logWriter io.Writer, authMiddleware, initMiddleware, instanceMiddleware auth.Middleware) *mux.Router {
router := mux.NewRouter()
logMiddleware := util.NewLoggingMiddleware(logWriter)
@ -94,6 +105,13 @@ func NewAPIRouter(han *controllers.APIController, logWriter io.Writer, authMiddl
apiRouter.Handle("/metrics-token/", http.HandlerFunc(han.MetricsTokenHandler)).Methods("GET", "OPTIONS")
apiRouter.Handle("/metrics-token", http.HandlerFunc(han.MetricsTokenHandler)).Methods("GET", "OPTIONS")
//////////
// Jobs //
//////////
// List all jobs
apiRouter.Handle("/jobs/", http.HandlerFunc(han.ListAllJobs)).Methods("GET", "OPTIONS")
apiRouter.Handle("/jobs", http.HandlerFunc(han.ListAllJobs)).Methods("GET", "OPTIONS")
///////////
// Pools //
///////////

View file

@ -183,6 +183,19 @@ func (c *Client) DeleteRunner(instanceName string) error {
return nil
}
func (c *Client) ListAllJobs() ([]params.Job, error) {
url := fmt.Sprintf("%s/api/v1/jobs", c.Config.BaseURL)
var response []params.Job
resp, err := c.client.R().
SetResult(&response).
Get(url)
if err != nil || resp.IsError() {
return response, c.handleError(err, resp)
}
return response, nil
}
func (c *Client) ListPoolInstances(poolID string) ([]params.Instance, error) {
url := fmt.Sprintf("%s/api/v1/pools/%s/instances", c.Config.BaseURL, poolID)

79
cmd/garm-cli/cmd/jobs.go Normal file
View file

@ -0,0 +1,79 @@
// Copyright 2023 Cloudbase Solutions SRL
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License. You may obtain
// a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations
// under the License.
package cmd
import (
"fmt"
"strings"
"github.com/cloudbase/garm/params"
"github.com/google/uuid"
"github.com/jedib0t/go-pretty/v6/table"
"github.com/spf13/cobra"
)
// runnerCmd represents the runner command
var jobsCmd = &cobra.Command{
Use: "job",
SilenceUsage: true,
Short: "Information about jobs",
Long: `Query information about jobs.`,
Run: nil,
}
var jobsListCmd = &cobra.Command{
Use: "list",
Aliases: []string{"ls"},
Short: "List jobs",
Long: `List all jobs currently recorded in the system.`,
SilenceUsage: true,
RunE: func(cmd *cobra.Command, args []string) error {
if needsInit {
return errNeedsInitError
}
jobs, err := cli.ListAllJobs()
if err != nil {
return err
}
formatJobs(jobs)
return nil
},
}
func formatJobs(jobs []params.Job) {
t := table.NewWriter()
header := table.Row{"ID", "Name", "Status", "Conclusion", "Runner Name", "Repository", "Requested Labels", "Locked by"}
t.AppendHeader(header)
for _, job := range jobs {
lockedBy := ""
repo := fmt.Sprintf("%s/%s", job.RepositoryOwner, job.RepositoryName)
if job.LockedBy != uuid.Nil {
lockedBy = job.LockedBy.String()
}
t.AppendRow(table.Row{job.ID, job.Name, job.Status, job.Conclusion, job.RunnerName, repo, strings.Join(job.Labels, " "), lockedBy})
t.AppendSeparator()
}
fmt.Println(t.Render())
}
func init() {
jobsCmd.AddCommand(
jobsListCmd,
)
rootCmd.AddCommand(jobsCmd)
}

View file

@ -198,11 +198,11 @@ func init() {
func formatInstances(param []params.Instance) {
t := table.NewWriter()
header := table.Row{"Name", "Status", "Runner Status", "Pool ID"}
header := table.Row{"Nr", "Name", "Status", "Runner Status", "Pool ID"}
t.AppendHeader(header)
for _, inst := range param {
t.AppendRow(table.Row{inst.Name, inst.Status, inst.RunnerStatus, inst.PoolID})
for idx, inst := range param {
t.AppendRow(table.Row{idx + 1, inst.Name, inst.Status, inst.RunnerStatus, inst.PoolID})
t.AppendSeparator()
}
fmt.Println(t.Render())

View file

@ -129,7 +129,7 @@ func main() {
log.Fatal(err)
}
runner, err := runner.NewRunner(ctx, *cfg)
runner, err := runner.NewRunner(ctx, *cfg, db)
if err != nil {
log.Fatalf("failed to create controller: %+v", err)
}
@ -176,6 +176,11 @@ func main() {
router = routers.WithMetricsRouter(router, cfg.Metrics.DisableAuth, metricsMiddleware)
}
if cfg.Default.DebugServer {
log.Printf("setting up debug routes")
router = routers.WithDebugServer(router)
}
corsMw := mux.CORSMethodMiddleware(router)
router.Use(corsMw)

View file

@ -120,6 +120,7 @@ type Default struct {
// LogFile is the location of the log file.
LogFile string `toml:"log_file,omitempty" json:"log-file"`
EnableLogStreamer bool `toml:"enable_log_streamer"`
DebugServer bool `toml:"debug_server" json:"debug-server"`
}
func (d *Default) Validate() error {
@ -337,7 +338,7 @@ func (s *SQLite) Validate() error {
}
func (s *SQLite) ConnectionString() (string, error) {
return s.DBFile, nil
return fmt.Sprintf("%s?_journal_mode=WAL&_foreign_keys=ON", s.DBFile), nil
}
// MySQL is the config entry for the mysql section

View file

@ -412,7 +412,7 @@ func TestGormParams(t *testing.T) {
dbType, uri, err := cfg.GormParams()
require.Nil(t, err)
require.Equal(t, SQLiteBackend, dbType)
require.Equal(t, filepath.Join(dir, "garm.db"), uri)
require.Equal(t, filepath.Join(dir, "garm.db?_journal_mode=WAL&_foreign_keys=ON"), uri)
cfg.DbBackend = MySQLBackend
cfg.MySQL = getMySQLDefaultConfig()

View file

@ -86,6 +86,7 @@ type PoolStore interface {
PoolInstanceCount(ctx context.Context, poolID string) (int64, error)
GetPoolInstanceByName(ctx context.Context, poolID string, instanceName string) (params.Instance, error)
FindPoolsMatchingAllTags(ctx context.Context, entityType params.PoolType, entityID string, tags []string) ([]params.Pool, error)
}
type UserStore interface {
@ -111,6 +112,21 @@ type InstanceStore interface {
ListInstanceEvents(ctx context.Context, instanceID string, eventType params.EventType, eventLevel params.EventLevel) ([]params.StatusMessage, error)
}
type JobsStore interface {
CreateOrUpdateJob(ctx context.Context, job params.Job) (params.Job, error)
ListEntityJobsByStatus(ctx context.Context, entityType params.PoolType, entityID string, status params.JobStatus) ([]params.Job, error)
ListJobsByStatus(ctx context.Context, status params.JobStatus) ([]params.Job, error)
ListAllJobs(ctx context.Context) ([]params.Job, error)
GetJobByID(ctx context.Context, jobID int64) (params.Job, error)
DeleteJob(ctx context.Context, jobID int64) error
UnlockJob(ctx context.Context, jobID int64, entityID string) error
LockJob(ctx context.Context, jobID int64, entityID string) error
BreakLockJobIsQueued(ctx context.Context, jobID int64) error
DeleteCompletedJobs(ctx context.Context) error
}
//go:generate mockery --name=Store
type Store interface {
RepoStore
@ -119,6 +135,7 @@ type Store interface {
PoolStore
UserStore
InstanceStore
JobsStore
ControllerInfo() (params.ControllerInfo, error)
InitController() (params.ControllerInfo, error)

View file

@ -18,8 +18,8 @@ import (
runnerErrors "github.com/cloudbase/garm/errors"
"github.com/cloudbase/garm/params"
"github.com/google/uuid"
"github.com/pkg/errors"
uuid "github.com/satori/go.uuid"
"gorm.io/gorm"
)
@ -42,7 +42,7 @@ func (s *sqlDatabase) InitController() (params.ControllerInfo, error) {
return params.ControllerInfo{}, runnerErrors.NewConflictError("controller already initialized")
}
newID, err := uuid.NewV4()
newID, err := uuid.NewUUID()
if err != nil {
return params.ControllerInfo{}, errors.Wrap(err, "generating UUID")
}

View file

@ -69,5 +69,6 @@ func (s *CtrlTestSuite) TestInitControllerAlreadyInitialized() {
}
func TestCtrlTestSuite(t *testing.T) {
t.Parallel()
suite.Run(t, new(CtrlTestSuite))
}

View file

@ -7,8 +7,8 @@ import (
"github.com/cloudbase/garm/params"
"github.com/cloudbase/garm/util"
"github.com/google/uuid"
"github.com/pkg/errors"
uuid "github.com/satori/go.uuid"
"gorm.io/datatypes"
"gorm.io/gorm"
)
@ -148,7 +148,7 @@ func (s *sqlDatabase) CreateEnterprisePool(ctx context.Context, enterpriseID str
Flavor: param.Flavor,
OSType: param.OSType,
OSArch: param.OSArch,
EnterpriseID: enterprise.ID,
EnterpriseID: &enterprise.ID,
Enabled: param.Enabled,
RunnerBootstrapTimeout: param.RunnerBootstrapTimeout,
}
@ -224,15 +224,15 @@ func (s *sqlDatabase) UpdateEnterprisePool(ctx context.Context, enterpriseID, po
}
func (s *sqlDatabase) FindEnterprisePoolByTags(ctx context.Context, enterpriseID string, tags []string) (params.Pool, error) {
pool, err := s.findPoolByTags(enterpriseID, "enterprise_id", tags)
pool, err := s.findPoolByTags(enterpriseID, params.EnterprisePool, tags)
if err != nil {
return params.Pool{}, errors.Wrap(err, "fetching pool")
}
return pool, nil
return pool[0], nil
}
func (s *sqlDatabase) ListEnterprisePools(ctx context.Context, enterpriseID string) ([]params.Pool, error) {
pools, err := s.getEnterprisePools(ctx, enterpriseID, "Tags", "Enterprise")
pools, err := s.listEntityPools(ctx, params.EnterprisePool, enterpriseID, "Tags", "Instances")
if err != nil {
return nil, errors.Wrap(err, "fetching pools")
}
@ -246,7 +246,7 @@ func (s *sqlDatabase) ListEnterprisePools(ctx context.Context, enterpriseID stri
}
func (s *sqlDatabase) ListEnterpriseInstances(ctx context.Context, enterpriseID string) ([]params.Instance, error) {
pools, err := s.getEnterprisePools(ctx, enterpriseID, "Instances")
pools, err := s.listEntityPools(ctx, params.EnterprisePool, enterpriseID, "Instances", "Tags")
if err != nil {
return nil, errors.Wrap(err, "fetching enterprise")
}
@ -274,7 +274,7 @@ func (s *sqlDatabase) getEnterprise(ctx context.Context, name string) (Enterpris
}
func (s *sqlDatabase) getEnterpriseByID(ctx context.Context, id string, preload ...string) (Enterprise, error) {
u, err := uuid.FromString(id)
u, err := uuid.Parse(id)
if err != nil {
return Enterprise{}, errors.Wrap(runnerErrors.ErrBadRequest, "parsing id")
}
@ -315,28 +315,3 @@ func (s *sqlDatabase) getEnterprisePoolByUniqueFields(ctx context.Context, enter
return pool[0], nil
}
func (s *sqlDatabase) getEnterprisePools(ctx context.Context, enterpriseID string, preload ...string) ([]Pool, error) {
_, err := s.getEnterpriseByID(ctx, enterpriseID)
if err != nil {
return nil, errors.Wrap(err, "fetching enterprise")
}
q := s.conn
if len(preload) > 0 {
for _, item := range preload {
q = q.Preload(item)
}
}
var pools []Pool
err = q.Model(&Pool{}).Where("enterprise_id = ?", enterpriseID).
Omit("extra_specs").
Find(&pools).Error
if err != nil {
return nil, errors.Wrap(err, "fetching pool")
}
return pools, nil
}

View file

@ -671,7 +671,7 @@ func (s *EnterpriseTestSuite) TestListEnterprisePoolsInvalidEnterpriseID() {
_, err := s.Store.ListEnterprisePools(context.Background(), "dummy-enterprise-id")
s.Require().NotNil(err)
s.Require().Equal("fetching pools: fetching enterprise: parsing id: invalid request", err.Error())
s.Require().Equal("fetching pools: parsing id: invalid request", err.Error())
}
func (s *EnterpriseTestSuite) TestGetEnterprisePool() {
@ -785,7 +785,7 @@ func (s *EnterpriseTestSuite) TestListEnterpriseInstancesInvalidEnterpriseID() {
_, err := s.Store.ListEnterpriseInstances(context.Background(), "dummy-enterprise-id")
s.Require().NotNil(err)
s.Require().Equal("fetching enterprise: fetching enterprise: parsing id: invalid request", err.Error())
s.Require().Equal("fetching enterprise: parsing id: invalid request", err.Error())
}
func (s *EnterpriseTestSuite) TestUpdateEnterprisePool() {
@ -811,5 +811,6 @@ func (s *EnterpriseTestSuite) TestUpdateEnterprisePoolInvalidEnterpriseID() {
}
func TestEnterpriseTestSuite(t *testing.T) {
t.Parallel()
suite.Run(t, new(EnterpriseTestSuite))
}

View file

@ -16,12 +16,14 @@ package sql
import (
"context"
"encoding/json"
runnerErrors "github.com/cloudbase/garm/errors"
"github.com/cloudbase/garm/params"
"github.com/google/uuid"
"github.com/pkg/errors"
uuid "github.com/satori/go.uuid"
"gorm.io/datatypes"
"gorm.io/gorm"
"gorm.io/gorm/clause"
)
@ -32,6 +34,14 @@ func (s *sqlDatabase) CreateInstance(ctx context.Context, poolID string, param p
return params.Instance{}, errors.Wrap(err, "fetching pool")
}
var labels datatypes.JSON
if len(param.AditionalLabels) > 0 {
labels, err = json.Marshal(param.AditionalLabels)
if err != nil {
return params.Instance{}, errors.Wrap(err, "marshalling labels")
}
}
newInstance := Instance{
Pool: pool,
Name: param.Name,
@ -42,6 +52,7 @@ func (s *sqlDatabase) CreateInstance(ctx context.Context, poolID string, param p
CallbackURL: param.CallbackURL,
MetadataURL: param.MetadataURL,
GitHubRunnerGroup: param.GitHubRunnerGroup,
AditionalLabels: labels,
}
q := s.conn.Create(&newInstance)
if q.Error != nil {
@ -52,7 +63,7 @@ func (s *sqlDatabase) CreateInstance(ctx context.Context, poolID string, param p
}
func (s *sqlDatabase) getInstanceByID(ctx context.Context, instanceID string) (Instance, error) {
u, err := uuid.FromString(instanceID)
u, err := uuid.Parse(instanceID)
if err != nil {
return Instance{}, errors.Wrap(runnerErrors.ErrBadRequest, "parsing id")
}
@ -248,7 +259,7 @@ func (s *sqlDatabase) UpdateInstance(ctx context.Context, instanceID string, par
}
func (s *sqlDatabase) ListPoolInstances(ctx context.Context, poolID string) ([]params.Instance, error) {
u, err := uuid.FromString(poolID)
u, err := uuid.Parse(poolID)
if err != nil {
return nil, errors.Wrap(runnerErrors.ErrBadRequest, "parsing id")
}

View file

@ -552,5 +552,6 @@ func (s *InstancesTestSuite) TestPoolInstanceCountDBCountErr() {
}
func TestInstTestSuite(t *testing.T) {
t.Parallel()
suite.Run(t, new(InstancesTestSuite))
}

337
database/sql/jobs.go Normal file
View file

@ -0,0 +1,337 @@
package sql
import (
"context"
"encoding/json"
"github.com/cloudbase/garm/database/common"
runnerErrors "github.com/cloudbase/garm/errors"
"github.com/cloudbase/garm/params"
"github.com/google/uuid"
"github.com/pkg/errors"
"gorm.io/gorm"
"gorm.io/gorm/clause"
)
var _ common.JobsStore = &sqlDatabase{}
func sqlWorkflowJobToParamsJob(job WorkflowJob) (params.Job, error) {
labels := []string{}
if job.Labels != nil {
if err := json.Unmarshal(job.Labels, &labels); err != nil {
return params.Job{}, errors.Wrap(err, "unmarshaling labels")
}
}
return params.Job{
ID: job.ID,
RunID: job.RunID,
Action: job.Action,
Status: job.Status,
Name: job.Name,
Conclusion: job.Conclusion,
StartedAt: job.StartedAt,
CompletedAt: job.CompletedAt,
GithubRunnerID: job.GithubRunnerID,
RunnerName: job.RunnerName,
RunnerGroupID: job.RunnerGroupID,
RunnerGroupName: job.RunnerGroupName,
RepositoryName: job.RepositoryName,
RepositoryOwner: job.RepositoryOwner,
RepoID: job.RepoID,
OrgID: job.OrgID,
EnterpriseID: job.EnterpriseID,
Labels: labels,
CreatedAt: job.CreatedAt,
UpdatedAt: job.UpdatedAt,
LockedBy: job.LockedBy,
}, nil
}
func paramsJobToWorkflowJob(job params.Job) (WorkflowJob, error) {
asJson, err := json.Marshal(job.Labels)
if err != nil {
return WorkflowJob{}, errors.Wrap(err, "marshaling labels")
}
return WorkflowJob{
ID: job.ID,
RunID: job.RunID,
Action: job.Action,
Status: job.Status,
Name: job.Name,
Conclusion: job.Conclusion,
StartedAt: job.StartedAt,
CompletedAt: job.CompletedAt,
GithubRunnerID: job.GithubRunnerID,
RunnerName: job.RunnerName,
RunnerGroupID: job.RunnerGroupID,
RunnerGroupName: job.RunnerGroupName,
RepositoryName: job.RepositoryName,
RepositoryOwner: job.RepositoryOwner,
RepoID: job.RepoID,
OrgID: job.OrgID,
EnterpriseID: job.EnterpriseID,
Labels: asJson,
LockedBy: job.LockedBy,
}, nil
}
func (s *sqlDatabase) DeleteJob(ctx context.Context, jobID int64) error {
q := s.conn.Delete(&WorkflowJob{}, jobID)
if q.Error != nil {
if errors.Is(q.Error, gorm.ErrRecordNotFound) {
return nil
}
return errors.Wrap(q.Error, "deleting job")
}
return nil
}
func (s *sqlDatabase) LockJob(ctx context.Context, jobID int64, entityID string) error {
entityUUID, err := uuid.Parse(entityID)
if err != nil {
return errors.Wrap(err, "parsing entity id")
}
var workflowJob WorkflowJob
q := s.conn.Clauses(clause.Locking{Strength: "UPDATE"}).Where("id = ?", jobID).First(&workflowJob)
if q.Error != nil {
if errors.Is(q.Error, gorm.ErrRecordNotFound) {
return runnerErrors.ErrNotFound
}
return errors.Wrap(q.Error, "fetching job")
}
if workflowJob.LockedBy.String() == entityID {
// Already locked by us.
return nil
}
if workflowJob.LockedBy != uuid.Nil {
return runnerErrors.NewConflictError("job is locked by another entity %s", workflowJob.LockedBy.String())
}
workflowJob.LockedBy = entityUUID
if err := s.conn.Save(&workflowJob).Error; err != nil {
return errors.Wrap(err, "saving job")
}
return nil
}
func (s *sqlDatabase) BreakLockJobIsQueued(ctx context.Context, jobID int64) error {
var workflowJob WorkflowJob
q := s.conn.Clauses(clause.Locking{Strength: "UPDATE"}).Where("id = ? and status = ?", jobID, params.JobStatusQueued).First(&workflowJob)
if q.Error != nil {
if errors.Is(q.Error, gorm.ErrRecordNotFound) {
return nil
}
return errors.Wrap(q.Error, "fetching job")
}
if workflowJob.LockedBy == uuid.Nil {
// Job is already unlocked.
return nil
}
workflowJob.LockedBy = uuid.Nil
if err := s.conn.Save(&workflowJob).Error; err != nil {
return errors.Wrap(err, "saving job")
}
return nil
}
func (s *sqlDatabase) UnlockJob(ctx context.Context, jobID int64, entityID string) error {
var workflowJob WorkflowJob
q := s.conn.Clauses(clause.Locking{Strength: "UPDATE"}).Where("id = ?", jobID).First(&workflowJob)
if q.Error != nil {
if errors.Is(q.Error, gorm.ErrRecordNotFound) {
return runnerErrors.ErrNotFound
}
return errors.Wrap(q.Error, "fetching job")
}
if workflowJob.LockedBy == uuid.Nil {
// Job is already unlocked.
return nil
}
if workflowJob.LockedBy != uuid.Nil && workflowJob.LockedBy.String() != entityID {
return runnerErrors.NewConflictError("job is locked by another entity %s", workflowJob.LockedBy.String())
}
workflowJob.LockedBy = uuid.Nil
if err := s.conn.Save(&workflowJob).Error; err != nil {
return errors.Wrap(err, "saving job")
}
return nil
}
func (s *sqlDatabase) CreateOrUpdateJob(ctx context.Context, job params.Job) (params.Job, error) {
var workflowJob WorkflowJob
q := s.conn.Clauses(clause.Locking{Strength: "UPDATE"}).Where("id = ?", job.ID).First(&workflowJob)
if q.Error != nil {
if !errors.Is(q.Error, gorm.ErrRecordNotFound) {
return params.Job{}, errors.Wrap(q.Error, "fetching job")
}
}
if workflowJob.ID != 0 {
// Update workflowJob with values from job.
workflowJob.Status = job.Status
workflowJob.Action = job.Action
workflowJob.Conclusion = job.Conclusion
workflowJob.StartedAt = job.StartedAt
workflowJob.CompletedAt = job.CompletedAt
workflowJob.GithubRunnerID = job.GithubRunnerID
workflowJob.RunnerGroupID = job.RunnerGroupID
workflowJob.RunnerGroupName = job.RunnerGroupName
if job.LockedBy != uuid.Nil {
workflowJob.LockedBy = job.LockedBy
}
if job.RunnerName != "" {
workflowJob.RunnerName = job.RunnerName
}
if job.RepoID != nil {
workflowJob.RepoID = job.RepoID
}
if job.OrgID != nil {
workflowJob.OrgID = job.OrgID
}
if job.EnterpriseID != nil {
workflowJob.EnterpriseID = job.EnterpriseID
}
if err := s.conn.Save(&workflowJob).Error; err != nil {
return params.Job{}, errors.Wrap(err, "saving job")
}
} else {
workflowJob, err := paramsJobToWorkflowJob(job)
if err != nil {
return params.Job{}, errors.Wrap(err, "converting job")
}
if err := s.conn.Create(&workflowJob).Error; err != nil {
return params.Job{}, errors.Wrap(err, "creating job")
}
}
return sqlWorkflowJobToParamsJob(workflowJob)
}
// ListJobsByStatus lists all jobs for a given status.
func (s *sqlDatabase) ListJobsByStatus(ctx context.Context, status params.JobStatus) ([]params.Job, error) {
var jobs []WorkflowJob
query := s.conn.Model(&WorkflowJob{}).Where("status = ?", status)
if err := query.Find(&jobs); err.Error != nil {
return nil, err.Error
}
ret := make([]params.Job, len(jobs))
for idx, job := range jobs {
jobParam, err := sqlWorkflowJobToParamsJob(job)
if err != nil {
return nil, errors.Wrap(err, "converting job")
}
ret[idx] = jobParam
}
return ret, nil
}
// ListEntityJobsByStatus lists all jobs for a given entity type and id.
func (s *sqlDatabase) ListEntityJobsByStatus(ctx context.Context, entityType params.PoolType, entityID string, status params.JobStatus) ([]params.Job, error) {
u, err := uuid.Parse(entityID)
if err != nil {
return nil, err
}
var jobs []WorkflowJob
query := s.conn.Model(&WorkflowJob{}).Where("status = ?", status)
switch entityType {
case params.OrganizationPool:
query = query.Where("org_id = ?", u)
case params.RepositoryPool:
query = query.Where("repo_id = ?", u)
case params.EnterprisePool:
query = query.Where("enterprise_id = ?", u)
}
if err := query.Find(&jobs); err.Error != nil {
if errors.Is(err.Error, gorm.ErrRecordNotFound) {
return []params.Job{}, nil
}
return nil, err.Error
}
ret := make([]params.Job, len(jobs))
for idx, job := range jobs {
jobParam, err := sqlWorkflowJobToParamsJob(job)
if err != nil {
return nil, errors.Wrap(err, "converting job")
}
ret[idx] = jobParam
}
return ret, nil
}
func (s *sqlDatabase) ListAllJobs(ctx context.Context) ([]params.Job, error) {
var jobs []WorkflowJob
query := s.conn.Model(&WorkflowJob{})
if err := query.Find(&jobs); err.Error != nil {
if errors.Is(err.Error, gorm.ErrRecordNotFound) {
return []params.Job{}, nil
}
return nil, err.Error
}
ret := make([]params.Job, len(jobs))
for idx, job := range jobs {
jobParam, err := sqlWorkflowJobToParamsJob(job)
if err != nil {
return nil, errors.Wrap(err, "converting job")
}
ret[idx] = jobParam
}
return ret, nil
}
// GetJobByID gets a job by id.
func (s *sqlDatabase) GetJobByID(ctx context.Context, jobID int64) (params.Job, error) {
var job WorkflowJob
query := s.conn.Model(&WorkflowJob{}).Where("id = ?", jobID)
if err := query.First(&job); err.Error != nil {
if errors.Is(err.Error, gorm.ErrRecordNotFound) {
return params.Job{}, runnerErrors.ErrNotFound
}
return params.Job{}, err.Error
}
return sqlWorkflowJobToParamsJob(job)
}
// DeleteCompletedJobs deletes all completed jobs.
func (s *sqlDatabase) DeleteCompletedJobs(ctx context.Context) error {
query := s.conn.Model(&WorkflowJob{}).Where("status = ?", params.JobStatusCompleted)
if err := query.Unscoped().Delete(&WorkflowJob{}); err.Error != nil {
if errors.Is(err.Error, gorm.ErrRecordNotFound) {
return nil
}
return err.Error
}
return nil
}

View file

@ -20,8 +20,8 @@ import (
"github.com/cloudbase/garm/params"
"github.com/cloudbase/garm/runner/providers/common"
"github.com/google/uuid"
"github.com/pkg/errors"
uuid "github.com/satori/go.uuid"
"gorm.io/datatypes"
"gorm.io/gorm"
)
@ -38,7 +38,7 @@ func (b *Base) BeforeCreate(tx *gorm.DB) error {
if b.ID != emptyId {
return nil
}
newID, err := uuid.NewV4()
newID, err := uuid.NewUUID()
if err != nil {
return errors.Wrap(err, "generating id")
}
@ -50,7 +50,7 @@ type Tag struct {
Base
Name string `gorm:"type:varchar(64);uniqueIndex"`
Pools []*Pool `gorm:"many2many:pool_tags;"`
Pools []*Pool `gorm:"many2many:pool_tags;constraint:OnDelete:CASCADE,OnUpdate:CASCADE;"`
}
type Pool struct {
@ -65,7 +65,7 @@ type Pool struct {
Flavor string `gorm:"index:idx_pool_type"`
OSType params.OSType
OSArch params.OSArch
Tags []*Tag `gorm:"many2many:pool_tags;"`
Tags []*Tag `gorm:"many2many:pool_tags;constraint:OnDelete:CASCADE,OnUpdate:CASCADE;"`
Enabled bool
// ExtraSpecs is an opaque json that gets sent to the provider
// as part of the bootstrap params for instances. It can contain
@ -73,13 +73,13 @@ type Pool struct {
ExtraSpecs datatypes.JSON
GitHubRunnerGroup string
RepoID uuid.UUID `gorm:"index"`
RepoID *uuid.UUID `gorm:"index"`
Repository Repository `gorm:"foreignKey:RepoID"`
OrgID uuid.UUID `gorm:"index"`
OrgID *uuid.UUID `gorm:"index"`
Organization Organization `gorm:"foreignKey:OrgID"`
EnterpriseID uuid.UUID `gorm:"index"`
EnterpriseID *uuid.UUID `gorm:"index"`
Enterprise Enterprise `gorm:"foreignKey:EnterpriseID"`
Instances []Instance `gorm:"foreignKey:PoolID"`
@ -130,8 +130,8 @@ type InstanceStatusUpdate struct {
EventLevel params.EventLevel
Message string `gorm:"type:text"`
InstanceID uuid.UUID
Instance Instance `gorm:"foreignKey:InstanceID"`
InstanceID uuid.UUID `gorm:"index:idx_instance_status_updates_instance_id"`
Instance Instance `gorm:"foreignKey:InstanceID"`
}
type Instance struct {
@ -144,7 +144,7 @@ type Instance struct {
OSArch params.OSArch
OSName string
OSVersion string
Addresses []Address `gorm:"foreignKey:InstanceID"`
Addresses []Address `gorm:"foreignKey:InstanceID;constraint:OnDelete:CASCADE,OnUpdate:CASCADE;"`
Status common.InstanceStatus
RunnerStatus common.RunnerStatus
CallbackURL string
@ -153,11 +153,12 @@ type Instance struct {
CreateAttempt int
TokenFetched bool
GitHubRunnerGroup string
AditionalLabels datatypes.JSON
PoolID uuid.UUID
Pool Pool `gorm:"foreignKey:PoolID"`
StatusMessages []InstanceStatusUpdate `gorm:"foreignKey:InstanceID"`
StatusMessages []InstanceStatusUpdate `gorm:"foreignKey:InstanceID;constraint:OnDelete:CASCADE,OnUpdate:CASCADE;"`
}
type User struct {
@ -176,3 +177,59 @@ type ControllerInfo struct {
ControllerID uuid.UUID
}
type WorkflowJob struct {
// ID is the ID of the job.
ID int64 `gorm:"index"`
// RunID is the ID of the workflow run. A run may have multiple jobs.
RunID int64
// Action is the specific activity that triggered the event.
Action string `gorm:"type:varchar(254);index"`
// Conclusion is the outcome of the job.
// Possible values: "success", "failure", "neutral", "cancelled", "skipped",
// "timed_out", "action_required"
Conclusion string
// Status is the phase of the lifecycle that the job is currently in.
// "queued", "in_progress" and "completed".
Status string
// Name is the name if the job that was triggered.
Name string
StartedAt time.Time
CompletedAt time.Time
GithubRunnerID int64
RunnerName string
RunnerGroupID int64
RunnerGroupName string
// repository in which the job was triggered.
RepositoryName string
RepositoryOwner string
Labels datatypes.JSON
// The entity that received the hook.
//
// Webhooks may be configured on the repo, the org and/or the enterprise.
// If we only configure a repo to use garm, we'll only ever receive a
// webhook from the repo. But if we configure the parent org of the repo and
// the parent enterprise of the org to use garm, a webhook will be sent for each
// entity type, in response to one workflow event. Thus, we will get 3 webhooks
// with the same run_id and job id. Record all involved entities in the same job
// if we have them configured in garm.
RepoID *uuid.UUID `gorm:"index"`
Repository Repository `gorm:"foreignKey:RepoID"`
OrgID *uuid.UUID `gorm:"index"`
Organization Organization `gorm:"foreignKey:OrgID"`
EnterpriseID *uuid.UUID `gorm:"index"`
Enterprise Enterprise `gorm:"foreignKey:EnterpriseID"`
LockedBy uuid.UUID
CreatedAt time.Time
UpdatedAt time.Time
DeletedAt gorm.DeletedAt `gorm:"index"`
}

View file

@ -22,8 +22,8 @@ import (
"github.com/cloudbase/garm/params"
"github.com/cloudbase/garm/util"
"github.com/google/uuid"
"github.com/pkg/errors"
uuid "github.com/satori/go.uuid"
"gorm.io/datatypes"
"gorm.io/gorm"
)
@ -165,7 +165,7 @@ func (s *sqlDatabase) CreateOrganizationPool(ctx context.Context, orgId string,
Flavor: param.Flavor,
OSType: param.OSType,
OSArch: param.OSArch,
OrgID: org.ID,
OrgID: &org.ID,
Enabled: param.Enabled,
RunnerBootstrapTimeout: param.RunnerBootstrapTimeout,
}
@ -212,7 +212,7 @@ func (s *sqlDatabase) CreateOrganizationPool(ctx context.Context, orgId string,
}
func (s *sqlDatabase) ListOrgPools(ctx context.Context, orgID string) ([]params.Pool, error) {
pools, err := s.getOrgPools(ctx, orgID, "Tags")
pools, err := s.listEntityPools(ctx, params.OrganizationPool, orgID, "Tags", "Instances")
if err != nil {
return nil, errors.Wrap(err, "fetching pools")
}
@ -246,15 +246,15 @@ func (s *sqlDatabase) DeleteOrganizationPool(ctx context.Context, orgID, poolID
}
func (s *sqlDatabase) FindOrganizationPoolByTags(ctx context.Context, orgID string, tags []string) (params.Pool, error) {
pool, err := s.findPoolByTags(orgID, "org_id", tags)
pool, err := s.findPoolByTags(orgID, params.OrganizationPool, tags)
if err != nil {
return params.Pool{}, errors.Wrap(err, "fetching pool")
}
return pool, nil
return pool[0], nil
}
func (s *sqlDatabase) ListOrgInstances(ctx context.Context, orgID string) ([]params.Instance, error) {
pools, err := s.getOrgPools(ctx, orgID, "Instances")
pools, err := s.listEntityPools(ctx, params.OrganizationPool, orgID, "Tags", "Instances")
if err != nil {
return nil, errors.Wrap(err, "fetching org")
}
@ -277,7 +277,7 @@ func (s *sqlDatabase) UpdateOrganizationPool(ctx context.Context, orgID, poolID
}
func (s *sqlDatabase) getPoolByID(ctx context.Context, poolID string, preload ...string) (Pool, error) {
u, err := uuid.FromString(poolID)
u, err := uuid.Parse(poolID)
if err != nil {
return Pool{}, errors.Wrap(runnerErrors.ErrBadRequest, "parsing id")
}
@ -300,34 +300,8 @@ func (s *sqlDatabase) getPoolByID(ctx context.Context, poolID string, preload ..
return pool, nil
}
func (s *sqlDatabase) getOrgPools(ctx context.Context, orgID string, preload ...string) ([]Pool, error) {
_, err := s.getOrgByID(ctx, orgID)
if err != nil {
return nil, errors.Wrap(err, "fetching org")
}
q := s.conn
if len(preload) > 0 {
for _, item := range preload {
q = q.Preload(item)
}
}
var pools []Pool
err = q.Model(&Pool{}).
Where("org_id = ?", orgID).
Omit("extra_specs").
Find(&pools).Error
if err != nil {
return nil, errors.Wrap(err, "fetching pool")
}
return pools, nil
}
func (s *sqlDatabase) getOrgByID(ctx context.Context, id string, preload ...string) (Organization, error) {
u, err := uuid.FromString(id)
u, err := uuid.Parse(id)
if err != nil {
return Organization{}, errors.Wrap(runnerErrors.ErrBadRequest, "parsing id")
}

View file

@ -670,7 +670,7 @@ func (s *OrgTestSuite) TestListOrgPoolsInvalidOrgID() {
_, err := s.Store.ListOrgPools(context.Background(), "dummy-org-id")
s.Require().NotNil(err)
s.Require().Equal("fetching pools: fetching org: parsing id: invalid request", err.Error())
s.Require().Equal("fetching pools: parsing id: invalid request", err.Error())
}
func (s *OrgTestSuite) TestGetOrganizationPool() {
@ -784,7 +784,7 @@ func (s *OrgTestSuite) TestListOrgInstancesInvalidOrgID() {
_, err := s.Store.ListOrgInstances(context.Background(), "dummy-org-id")
s.Require().NotNil(err)
s.Require().Equal("fetching org: fetching org: parsing id: invalid request", err.Error())
s.Require().Equal("fetching org: parsing id: invalid request", err.Error())
}
func (s *OrgTestSuite) TestUpdateOrganizationPool() {
@ -810,5 +810,6 @@ func (s *OrgTestSuite) TestUpdateOrganizationPoolInvalidOrgID() {
}
func TestOrgTestSuite(t *testing.T) {
t.Parallel()
suite.Run(t, new(OrgTestSuite))
}

View file

@ -21,8 +21,8 @@ import (
runnerErrors "github.com/cloudbase/garm/errors"
"github.com/cloudbase/garm/params"
"github.com/google/uuid"
"github.com/pkg/errors"
uuid "github.com/satori/go.uuid"
"gorm.io/gorm"
)
@ -73,7 +73,7 @@ func (s *sqlDatabase) getEntityPool(ctx context.Context, entityType params.PoolT
return Pool{}, errors.Wrap(runnerErrors.ErrBadRequest, "missing entity id")
}
u, err := uuid.FromString(poolID)
u, err := uuid.Parse(poolID)
if err != nil {
return Pool{}, errors.Wrap(runnerErrors.ErrBadRequest, "parsing id")
}
@ -112,3 +112,109 @@ func (s *sqlDatabase) getEntityPool(ctx context.Context, entityType params.PoolT
return pool, nil
}
func (s *sqlDatabase) listEntityPools(ctx context.Context, entityType params.PoolType, entityID string, preload ...string) ([]Pool, error) {
if _, err := uuid.Parse(entityID); err != nil {
return nil, errors.Wrap(runnerErrors.ErrBadRequest, "parsing id")
}
q := s.conn
if len(preload) > 0 {
for _, item := range preload {
q = q.Preload(item)
}
}
var fieldName string
switch entityType {
case params.RepositoryPool:
fieldName = "repo_id"
case params.OrganizationPool:
fieldName = "org_id"
case params.EnterprisePool:
fieldName = "enterprise_id"
default:
return nil, fmt.Errorf("invalid entityType: %v", entityType)
}
var pools []Pool
condition := fmt.Sprintf("%s = ?", fieldName)
err := q.Model(&Pool{}).
Where(condition, entityID).
Omit("extra_specs").
Find(&pools).Error
if err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return []Pool{}, nil
}
return nil, errors.Wrap(err, "fetching pool")
}
return pools, nil
}
func (s *sqlDatabase) findPoolByTags(id string, poolType params.PoolType, tags []string) ([]params.Pool, error) {
if len(tags) == 0 {
return nil, runnerErrors.NewBadRequestError("missing tags")
}
u, err := uuid.Parse(id)
if err != nil {
return nil, errors.Wrap(runnerErrors.ErrBadRequest, "parsing id")
}
var fieldName string
switch poolType {
case params.RepositoryPool:
fieldName = "repo_id"
case params.OrganizationPool:
fieldName = "org_id"
case params.EnterprisePool:
fieldName = "enterprise_id"
default:
return nil, fmt.Errorf("invalid poolType: %v", poolType)
}
var pools []Pool
where := fmt.Sprintf("tags.name in ? and %s = ? and enabled = true", fieldName)
q := s.conn.Joins("JOIN pool_tags on pool_tags.pool_id=pools.id").
Joins("JOIN tags on tags.id=pool_tags.tag_id").
Group("pools.id").
Preload("Tags").
Having("count(1) = ?", len(tags)).
Where(where, tags, u).Find(&pools)
if q.Error != nil {
if errors.Is(q.Error, gorm.ErrRecordNotFound) {
return nil, runnerErrors.ErrNotFound
}
return nil, errors.Wrap(q.Error, "fetching pool")
}
if len(pools) == 0 {
return nil, runnerErrors.ErrNotFound
}
ret := make([]params.Pool, len(pools))
for idx, val := range pools {
ret[idx] = s.sqlToCommonPool(val)
}
return ret, nil
}
func (s *sqlDatabase) FindPoolsMatchingAllTags(ctx context.Context, entityType params.PoolType, entityID string, tags []string) ([]params.Pool, error) {
if len(tags) == 0 {
return nil, runnerErrors.NewBadRequestError("missing tags")
}
pools, err := s.findPoolByTags(entityID, entityType, tags)
if err != nil {
if errors.Is(err, runnerErrors.ErrNotFound) {
return []params.Pool{}, nil
}
return nil, errors.Wrap(err, "fetching pools")
}
return pools, nil
}

View file

@ -186,5 +186,6 @@ func (s *PoolsTestSuite) TestDeletePoolByIDDBRemoveErr() {
}
func TestPoolsTestSuite(t *testing.T) {
t.Parallel()
suite.Run(t, new(PoolsTestSuite))
}

View file

@ -22,8 +22,8 @@ import (
"github.com/cloudbase/garm/params"
"github.com/cloudbase/garm/util"
"github.com/google/uuid"
"github.com/pkg/errors"
uuid "github.com/satori/go.uuid"
"gorm.io/datatypes"
"gorm.io/gorm"
)
@ -165,7 +165,7 @@ func (s *sqlDatabase) CreateRepositoryPool(ctx context.Context, repoId string, p
Flavor: param.Flavor,
OSType: param.OSType,
OSArch: param.OSArch,
RepoID: repo.ID,
RepoID: &repo.ID,
Enabled: param.Enabled,
RunnerBootstrapTimeout: param.RunnerBootstrapTimeout,
}
@ -212,7 +212,7 @@ func (s *sqlDatabase) CreateRepositoryPool(ctx context.Context, repoId string, p
}
func (s *sqlDatabase) ListRepoPools(ctx context.Context, repoID string) ([]params.Pool, error) {
pools, err := s.getRepoPools(ctx, repoID, "Tags")
pools, err := s.listEntityPools(ctx, params.RepositoryPool, repoID, "Tags", "Instances")
if err != nil {
return nil, errors.Wrap(err, "fetching pools")
}
@ -246,15 +246,15 @@ func (s *sqlDatabase) DeleteRepositoryPool(ctx context.Context, repoID, poolID s
}
func (s *sqlDatabase) FindRepositoryPoolByTags(ctx context.Context, repoID string, tags []string) (params.Pool, error) {
pool, err := s.findPoolByTags(repoID, "repo_id", tags)
pool, err := s.findPoolByTags(repoID, params.RepositoryPool, tags)
if err != nil {
return params.Pool{}, errors.Wrap(err, "fetching pool")
}
return pool, nil
return pool[0], nil
}
func (s *sqlDatabase) ListRepoInstances(ctx context.Context, repoID string) ([]params.Instance, error) {
pools, err := s.getRepoPools(ctx, repoID, "Instances")
pools, err := s.listEntityPools(ctx, params.RepositoryPool, repoID, "Tags", "Instances")
if err != nil {
return nil, errors.Wrap(err, "fetching repo")
}
@ -294,38 +294,6 @@ func (s *sqlDatabase) getRepo(ctx context.Context, owner, name string) (Reposito
return repo, nil
}
func (s *sqlDatabase) findPoolByTags(id, poolType string, tags []string) (params.Pool, error) {
if len(tags) == 0 {
return params.Pool{}, runnerErrors.NewBadRequestError("missing tags")
}
u, err := uuid.FromString(id)
if err != nil {
return params.Pool{}, errors.Wrap(runnerErrors.ErrBadRequest, "parsing id")
}
var pools []Pool
where := fmt.Sprintf("tags.name in ? and %s = ? and enabled = true", poolType)
q := s.conn.Joins("JOIN pool_tags on pool_tags.pool_id=pools.id").
Joins("JOIN tags on tags.id=pool_tags.tag_id").
Group("pools.id").
Preload("Tags").
Having("count(1) = ?", len(tags)).
Where(where, tags, u).Find(&pools)
if q.Error != nil {
if errors.Is(q.Error, gorm.ErrRecordNotFound) {
return params.Pool{}, runnerErrors.ErrNotFound
}
return params.Pool{}, errors.Wrap(q.Error, "fetching pool")
}
if len(pools) == 0 {
return params.Pool{}, runnerErrors.ErrNotFound
}
return s.sqlToCommonPool(pools[0]), nil
}
func (s *sqlDatabase) getRepoPoolByUniqueFields(ctx context.Context, repoID string, provider, image, flavor string) (Pool, error) {
repo, err := s.getRepoByID(ctx, repoID)
if err != nil {
@ -345,32 +313,8 @@ func (s *sqlDatabase) getRepoPoolByUniqueFields(ctx context.Context, repoID stri
return pool[0], nil
}
func (s *sqlDatabase) getRepoPools(ctx context.Context, repoID string, preload ...string) ([]Pool, error) {
_, err := s.getRepoByID(ctx, repoID)
if err != nil {
return nil, errors.Wrap(err, "fetching repo")
}
q := s.conn
if len(preload) > 0 {
for _, item := range preload {
q = q.Preload(item)
}
}
var pools []Pool
err = q.Model(&Pool{}).Where("repo_id = ?", repoID).
Omit("extra_specs").
Find(&pools).Error
if err != nil {
return nil, errors.Wrap(err, "fetching pool")
}
return pools, nil
}
func (s *sqlDatabase) getRepoByID(ctx context.Context, id string, preload ...string) (Repository, error) {
u, err := uuid.FromString(id)
u, err := uuid.Parse(id)
if err != nil {
return Repository{}, errors.Wrap(runnerErrors.ErrBadRequest, "parsing id")
}

View file

@ -707,7 +707,7 @@ func (s *RepoTestSuite) TestListRepoPoolsInvalidRepoID() {
_, err := s.Store.ListRepoPools(context.Background(), "dummy-repo-id")
s.Require().NotNil(err)
s.Require().Equal("fetching pools: fetching repo: parsing id: invalid request", err.Error())
s.Require().Equal("fetching pools: parsing id: invalid request", err.Error())
}
func (s *RepoTestSuite) TestGetRepositoryPool() {
@ -820,7 +820,7 @@ func (s *RepoTestSuite) TestListRepoInstancesInvalidRepoID() {
_, err := s.Store.ListRepoInstances(context.Background(), "dummy-repo-id")
s.Require().NotNil(err)
s.Require().Equal("fetching repo: fetching repo: parsing id: invalid request", err.Error())
s.Require().Equal("fetching repo: parsing id: invalid request", err.Error())
}
func (s *RepoTestSuite) TestUpdateRepositoryPool() {
@ -846,5 +846,6 @@ func (s *RepoTestSuite) TestUpdateRepositoryPoolInvalidRepoID() {
}
func TestRepoTestSuite(t *testing.T) {
t.Parallel()
suite.Run(t, new(RepoTestSuite))
}

View file

@ -16,7 +16,9 @@ package sql
import (
"context"
"fmt"
"log"
"strings"
"github.com/pkg/errors"
"gorm.io/driver/mysql"
@ -79,6 +81,111 @@ type sqlDatabase struct {
cfg config.Database
}
var renameTemplate = `
PRAGMA foreign_keys = OFF;
BEGIN TRANSACTION;
ALTER TABLE %s RENAME TO %s_old;
COMMIT;
`
var restoreNameTemplate = `
PRAGMA foreign_keys = OFF;
BEGIN TRANSACTION;
DROP TABLE IF EXISTS %s;
ALTER TABLE %s_old RENAME TO %s;
COMMIT;
`
var copyContentsTemplate = `
PRAGMA foreign_keys = OFF;
BEGIN TRANSACTION;
INSERT INTO %s SELECT * FROM %s_old;
DROP TABLE %s_old;
COMMIT;
`
func (s *sqlDatabase) cascadeMigrationSQLite(model interface{}, name string, justDrop bool) error {
if !s.conn.Migrator().HasTable(name) {
return nil
}
defer s.conn.Exec("PRAGMA foreign_keys = ON;")
var data string
var indexes []string
if err := s.conn.Raw(fmt.Sprintf("select sql from sqlite_master where type='table' and tbl_name='%s'", name)).Scan(&data).Error; err != nil {
if !errors.Is(err, gorm.ErrRecordNotFound) {
return fmt.Errorf("failed to get table %s: %w", name, err)
}
}
if err := s.conn.Raw(fmt.Sprintf("SELECT name FROM sqlite_master WHERE type == 'index' AND tbl_name == '%s' and name not like 'sqlite_%%'", name)).Scan(&indexes).Error; err != nil {
if !errors.Is(err, gorm.ErrRecordNotFound) {
return fmt.Errorf("failed to get table indexes %s: %w", name, err)
}
}
if strings.Contains(data, "ON DELETE CASCADE") {
return nil
}
if justDrop {
if err := s.conn.Migrator().DropTable(model); err != nil {
return fmt.Errorf("failed to drop table %s: %w", name, err)
}
return nil
}
for _, index := range indexes {
if err := s.conn.Migrator().DropIndex(model, index); err != nil {
return fmt.Errorf("failed to drop index %s: %w", index, err)
}
}
err := s.conn.Exec(fmt.Sprintf(renameTemplate, name, name)).Error
if err != nil {
return fmt.Errorf("failed to rename table %s: %w", name, err)
}
if model != nil {
if err := s.conn.Migrator().AutoMigrate(model); err != nil {
if err := s.conn.Exec(fmt.Sprintf(restoreNameTemplate, name, name, name)).Error; err != nil {
log.Printf("failed to restore table %s: %s", name, err)
}
return fmt.Errorf("failed to create table %s: %w", name, err)
}
}
err = s.conn.Exec(fmt.Sprintf(copyContentsTemplate, name, name, name)).Error
if err != nil {
return fmt.Errorf("failed to copy contents to table %s: %w", name, err)
}
return nil
}
func (s *sqlDatabase) cascadeMigration() error {
switch s.cfg.DbBackend {
case config.SQLiteBackend:
if err := s.cascadeMigrationSQLite(&Address{}, "addresses", true); err != nil {
return fmt.Errorf("failed to drop table addresses: %w", err)
}
if err := s.cascadeMigrationSQLite(&InstanceStatusUpdate{}, "instance_status_updates", true); err != nil {
return fmt.Errorf("failed to drop table instance_status_updates: %w", err)
}
if err := s.cascadeMigrationSQLite(&Tag{}, "pool_tags", false); err != nil {
return fmt.Errorf("failed to migrate addresses: %w", err)
}
case config.MySQLBackend:
return nil
default:
return fmt.Errorf("invalid db backend: %s", s.cfg.DbBackend)
}
return nil
}
func (s *sqlDatabase) migrateDB() error {
if s.conn.Migrator().HasIndex(&Organization{}, "idx_organizations_name") {
if err := s.conn.Migrator().DropIndex(&Organization{}, "idx_organizations_name"); err != nil {
@ -91,6 +198,25 @@ func (s *sqlDatabase) migrateDB() error {
log.Printf("failed to drop index idx_owner: %s", err)
}
}
if err := s.cascadeMigration(); err != nil {
return errors.Wrap(err, "running cascade migration")
}
if s.conn.Migrator().HasTable(&Pool{}) {
if err := s.conn.Exec("update pools set repo_id=NULL where repo_id='00000000-0000-0000-0000-000000000000'").Error; err != nil {
return errors.Wrap(err, "updating pools")
}
if err := s.conn.Exec("update pools set org_id=NULL where org_id='00000000-0000-0000-0000-000000000000'").Error; err != nil {
return errors.Wrap(err, "updating pools")
}
if err := s.conn.Exec("update pools set enterprise_id=NULL where enterprise_id='00000000-0000-0000-0000-000000000000'").Error; err != nil {
return errors.Wrap(err, "updating pools")
}
}
if err := s.conn.AutoMigrate(
&Tag{},
&Pool{},
@ -102,6 +228,7 @@ func (s *sqlDatabase) migrateDB() error {
&Instance{},
&ControllerInfo{},
&User{},
&WorkflowJob{},
); err != nil {
return errors.Wrap(err, "running auto migrate")
}

View file

@ -22,7 +22,6 @@ import (
"github.com/cloudbase/garm/util"
"github.com/pkg/errors"
uuid "github.com/satori/go.uuid"
"gorm.io/datatypes"
"gorm.io/gorm"
)
@ -32,6 +31,9 @@ func (s *sqlDatabase) sqlToParamsInstance(instance Instance) params.Instance {
if instance.ProviderID != nil {
id = *instance.ProviderID
}
var labels []string
_ = json.Unmarshal(instance.AditionalLabels, &labels)
ret := params.Instance{
ID: instance.ID.String(),
ProviderID: id,
@ -51,6 +53,7 @@ func (s *sqlDatabase) sqlToParamsInstance(instance Instance) params.Instance {
UpdatedAt: instance.UpdatedAt,
TokenFetched: instance.TokenFetched,
GitHubRunnerGroup: instance.GitHubRunnerGroup,
AditionalLabels: labels,
}
if len(instance.ProviderFault) > 0 {
@ -148,19 +151,19 @@ func (s *sqlDatabase) sqlToCommonPool(pool Pool) params.Pool {
GitHubRunnerGroup: pool.GitHubRunnerGroup,
}
if pool.RepoID != uuid.Nil {
if pool.RepoID != nil {
ret.RepoID = pool.RepoID.String()
if pool.Repository.Owner != "" && pool.Repository.Name != "" {
ret.RepoName = fmt.Sprintf("%s/%s", pool.Repository.Owner, pool.Repository.Name)
}
}
if pool.OrgID != uuid.Nil && pool.Organization.Name != "" {
if pool.OrgID != nil && pool.Organization.Name != "" {
ret.OrgID = pool.OrgID.String()
ret.OrgName = pool.Organization.Name
}
if pool.EnterpriseID != uuid.Nil && pool.Enterprise.Name != "" {
if pool.EnterpriseID != nil && pool.Enterprise.Name != "" {
ret.EnterpriseID = pool.EnterpriseID.String()
ret.EnterpriseName = pool.Enterprise.Name
}

View file

@ -29,8 +29,9 @@ var (
// ErrBadRequest is returned is a malformed request is sent
ErrBadRequest = NewBadRequestError("invalid request")
// ErrTimeout is returned when a timeout occurs.
ErrTimeout = fmt.Errorf("timed out")
ErrUnprocessable = fmt.Errorf("cannot process request")
ErrTimeout = fmt.Errorf("timed out")
ErrUnprocessable = fmt.Errorf("cannot process request")
ErrNoPoolsAvailable = fmt.Errorf("no pools available")
)
type baseError struct {

1
go.mod
View file

@ -23,7 +23,6 @@ require (
github.com/nbutton23/zxcvbn-go v0.0.0-20210217022336-fa2cb2858354
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.14.0
github.com/satori/go.uuid v1.2.1-0.20181028125025-b2ce2384e17b
github.com/spf13/cobra v1.6.1
github.com/stretchr/testify v1.8.2
github.com/teris-io/shortid v0.0.0-20220617161101-71ec9f2aa569

2
go.sum
View file

@ -293,8 +293,6 @@ github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTE
github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8=
github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs=
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/satori/go.uuid v1.2.1-0.20181028125025-b2ce2384e17b h1:gQZ0qzfKHQIybLANtM3mBXNUtOfsCFXeTsnBqCsx1KM=
github.com/satori/go.uuid v1.2.1-0.20181028125025-b2ce2384e17b/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0=
github.com/sirupsen/logrus v1.4.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMBDgk/93Q=
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=

View file

@ -22,7 +22,7 @@ import (
"github.com/cloudbase/garm/util/appdefaults"
"github.com/google/go-github/v53/github"
uuid "github.com/satori/go.uuid"
"github.com/google/uuid"
)
type (
@ -33,6 +33,7 @@ type (
OSType string
OSArch string
ProviderType string
JobStatus string
)
const (
@ -42,6 +43,12 @@ const (
ExternalProvider ProviderType = "external"
)
const (
JobStatusQueued JobStatus = "queued"
JobStatusInProgress JobStatus = "in_progress"
JobStatusCompleted JobStatus = "completed"
)
const (
RepositoryPool PoolType = "repository"
OrganizationPool PoolType = "organization"
@ -149,10 +156,11 @@ type Instance struct {
GitHubRunnerGroup string `json:"github-runner-group"`
// Do not serialize sensitive info.
CallbackURL string `json:"-"`
MetadataURL string `json:"-"`
CreateAttempt int `json:"-"`
TokenFetched bool `json:"-"`
CallbackURL string `json:"-"`
MetadataURL string `json:"-"`
CreateAttempt int `json:"-"`
TokenFetched bool `json:"-"`
AditionalLabels []string `json:"-"`
}
func (i Instance) GetName() string {
@ -417,3 +425,53 @@ func (p RunnerPrefix) GetRunnerPrefix() string {
}
return p.Prefix
}
type Job struct {
// ID is the ID of the job.
ID int64 `json:"id"`
// RunID is the ID of the workflow run. A run may have multiple jobs.
RunID int64 `json:"run_id"`
// Action is the specific activity that triggered the event.
Action string `json:"action"`
// Conclusion is the outcome of the job.
// Possible values: "success", "failure", "neutral", "cancelled", "skipped",
// "timed_out", "action_required"
Conclusion string `json:"conclusion"`
// Status is the phase of the lifecycle that the job is currently in.
// "queued", "in_progress" and "completed".
Status string `json:"status"`
// Name is the name if the job that was triggered.
Name string `json:"name"`
StartedAt time.Time
CompletedAt time.Time
GithubRunnerID int64 `json:"runner_id"`
RunnerName string `json:"runner_name"`
RunnerGroupID int64 `json:"runner_group_id"`
RunnerGroupName string `json:"runner_group_name"`
// repository in which the job was triggered.
RepositoryName string
RepositoryOwner string
Labels []string
// The entity that received the hook.
//
// Webhooks may be configured on the repo, the org and/or the enterprise.
// If we only configure a repo to use garm, we'll only ever receive a
// webhook from the repo. But if we configure the parent org of the repo and
// the parent enterprise of the org to use garm, a webhook will be sent for each
// entity type, in response to one workflow event. Thus, we will get 3 webhooks
// with the same run_id and job id. Record all involved entities in the same job
// if we have them configured in garm.
RepoID *uuid.UUID `json:"repo_id,omitempty"`
OrgID *uuid.UUID `json:"org_id,omitempty"`
EnterpriseID *uuid.UUID `json:"enterprise_id,omitempty"`
LockedBy uuid.UUID
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}

View file

@ -136,6 +136,7 @@ type CreateInstanceParams struct {
// The runner group must be created by someone with access to the enterprise.
GitHubRunnerGroup string
CreateAttempt int `json:"-"`
AditionalLabels []string
}
type CreatePoolParams struct {

View file

@ -25,15 +25,14 @@ const (
PoolConsilitationInterval = 5 * time.Second
PoolReapTimeoutInterval = 5 * time.Minute
// Temporary tools download token is valid for 1 hour by default.
// Set this to 15 minutes. This should allow enough time even on slow
// clouds for the instance to spin up, download the tools and join gh.
PoolToolUpdateInterval = 15 * time.Minute
// There is no point in making an API call to get available tools, for every runner
// we spin up. We cache the tools for one minute. This should save us a lot of API calls
// in cases where we have a lot of runners spin up at the same time.
PoolToolUpdateInterval = 1 * time.Minute
// UnauthorizedBackoffTimer is the time we wait before making another request
// after getting an unauthorized error from github. It is unlikely that a second
// request will not receive the same error, unless the config is changed with new
// credentials and garm is restarted.
UnauthorizedBackoffTimer = 15 * time.Minute
// BackoffTimer is the time we wait before attempting to make another request
// to the github API.
BackoffTimer = 1 * time.Minute
)
//go:generate mockery --all

View file

@ -65,6 +65,14 @@ type enterprise struct {
mux sync.Mutex
}
func (r *enterprise) GithubCLI() common.GithubClient {
return r.ghcli
}
func (e *enterprise) PoolType() params.PoolType {
return params.EnterprisePool
}
func (r *enterprise) GetRunnerInfoFromWorkflow(job params.WorkflowJob) (params.RunnerInfo, error) {
if err := r.ValidateOwner(job); err != nil {
return params.RunnerInfo{}, errors.Wrap(err, "validating owner")

View file

@ -16,6 +16,7 @@ package pool
import (
"github.com/cloudbase/garm/params"
"github.com/cloudbase/garm/runner/common"
"github.com/google/go-github/v53/github"
)
@ -28,6 +29,8 @@ type poolHelper interface {
RemoveGithubRunner(runnerID int64) (*github.Response, error)
FetchTools() ([]*github.RunnerApplicationDownload, error)
GithubCLI() common.GithubClient
FetchDbInstances() ([]params.Instance, error)
ListPools() ([]params.Pool, error)
GithubURL() string
@ -41,4 +44,5 @@ type poolHelper interface {
UpdateState(param params.UpdatePoolStateParams) error
WebhookSecret() string
ID() string
PoolType() params.PoolType
}

View file

@ -77,6 +77,14 @@ type organization struct {
mux sync.Mutex
}
func (r *organization) GithubCLI() common.GithubClient {
return r.ghcli
}
func (o *organization) PoolType() params.PoolType {
return params.OrganizationPool
}
func (r *organization) GetRunnerInfoFromWorkflow(job params.WorkflowJob) (params.RunnerInfo, error) {
if err := r.ValidateOwner(job); err != nil {
return params.RunnerInfo{}, errors.Wrap(err, "validating owner")

View file

@ -20,6 +20,7 @@ import (
"log"
"math"
"net/http"
"strconv"
"strings"
"sync"
"time"
@ -33,6 +34,7 @@ import (
"github.com/cloudbase/garm/util"
"github.com/google/go-github/v53/github"
"github.com/google/uuid"
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"
)
@ -40,6 +42,13 @@ import (
var (
poolIDLabelprefix = "runner-pool-id:"
controllerLabelPrefix = "runner-controller-id:"
// We tag runners that have been spawned as a result of a queued job with the job ID
// that spawned them. There is no way to guarantee that the runner spawned in response to a particular
// job, will be picked up by that job. We mark them so as in the very likely event that the runner
// has picked up a different job, we can clear the lock on the job that spaned it.
// The job it picked up would already be transitioned to in_progress so it will be ignored by the
// consume loop.
jobLabelPrefix = "in_response_to_job:"
)
const (
@ -96,62 +105,94 @@ type basePoolManager struct {
keyMux *keyMutex
}
func (r *basePoolManager) HandleWorkflowJob(job params.WorkflowJob) (err error) {
func (r *basePoolManager) HandleWorkflowJob(job params.WorkflowJob) error {
if err := r.helper.ValidateOwner(job); err != nil {
return errors.Wrap(err, "validating owner")
}
var jobParams params.Job
var err error
var triggeredBy int64
defer func() {
if err != nil && errors.Is(err, runnerErrors.ErrUnauthorized) {
r.setPoolRunningState(false, fmt.Sprintf("failed to handle job: %q", err))
// we're updating the job in the database, regardless of whether it was successful or not.
// or if it was meant for this pool or not. Github will send the same job data to all hierarchies
// that have been configured to work with garm. Updating the job at all levels should yield the same
// outcome in the db.
if jobParams.ID == 0 {
return
}
_, err := r.store.GetJobByID(r.ctx, jobParams.ID)
if err != nil {
if !errors.Is(err, runnerErrors.ErrNotFound) {
r.log("failed to get job %d: %s", jobParams.ID, err)
return
}
// This job is new to us. Check if we have a pool that can handle it.
potentialPools, err := r.store.FindPoolsMatchingAllTags(r.ctx, r.helper.PoolType(), r.helper.ID(), jobParams.Labels)
if err != nil {
r.log("failed to find pools matching tags %s: %s; not recording job", strings.Join(jobParams.Labels, ", "), err)
return
}
if len(potentialPools) == 0 {
r.log("no pools matching tags %s; not recording job", strings.Join(jobParams.Labels, ", "))
return
}
}
if _, jobErr := r.store.CreateOrUpdateJob(r.ctx, jobParams); jobErr != nil {
r.log("failed to update job %d: %s", jobParams.ID, jobErr)
}
if triggeredBy != 0 && jobParams.ID != triggeredBy {
// The triggeredBy value is only set by the "in_progress" webhook. The runner that
// transitioned to in_progress was created as a result of a different queued job. If that job is
// still queued and we don't remove the lock, it will linger until the lock timeout is reached.
// That may take a long time, so we break the lock here and allow it to be scheduled again.
if err := r.store.BreakLockJobIsQueued(r.ctx, triggeredBy); err != nil {
r.log("failed to break lock for job %d: %s", triggeredBy, err)
}
}
}()
switch job.Action {
case "queued":
// Create instance in database and set it to pending create.
// If we already have an idle runner around, that runner will pick up the job
// and trigger an "in_progress" update from github (see bellow), which in turn will set the
// runner state of the instance to "active". The ensureMinIdleRunners() function will
// exclude that runner from available runners and attempt to ensure
// the needed number of runners.
if err := r.acquireNewInstance(job); err != nil {
r.log("failed to add instance: %s", err)
// Record the job in the database. Queued jobs will be picked up by the consumeQueuedJobs() method
// when reconciling.
jobParams, err = r.paramsWorkflowJobToParamsJob(job)
if err != nil {
return errors.Wrap(err, "converting job to params")
}
case "completed":
// ignore the error here. A completed job may not have a runner name set
// if it was never assigned to a runner, and was canceled.
runnerInfo, err := r.getRunnerDetailsFromJob(job)
jobParams, err = r.paramsWorkflowJobToParamsJob(job)
if err != nil {
if !errors.Is(err, runnerErrors.ErrUnauthorized) {
if errors.Is(err, runnerErrors.ErrNotFound) {
// Unassigned jobs will have an empty runner_name.
// We also need to ignore not found errors, as we may get a webhook regarding
// a workflow that is handled by a runner at a different hierarchy level.
return nil
}
return errors.Wrap(err, "updating runner")
return errors.Wrap(err, "converting job to params")
}
// update instance workload state.
if err := r.setInstanceRunnerStatus(runnerInfo.Name, providerCommon.RunnerTerminated); err != nil {
if _, err := r.setInstanceRunnerStatus(jobParams.RunnerName, providerCommon.RunnerTerminated); err != nil {
if errors.Is(err, runnerErrors.ErrNotFound) {
return nil
}
r.log("failed to update runner %s status: %s", util.SanitizeLogEntry(runnerInfo.Name), err)
r.log("failed to update runner %s status: %s", util.SanitizeLogEntry(jobParams.RunnerName), err)
return errors.Wrap(err, "updating runner")
}
r.log("marking instance %s as pending_delete", util.SanitizeLogEntry(runnerInfo.Name))
if err := r.setInstanceStatus(runnerInfo.Name, providerCommon.InstancePendingDelete, nil); err != nil {
r.log("marking instance %s as pending_delete", util.SanitizeLogEntry(jobParams.RunnerName))
if _, err := r.setInstanceStatus(jobParams.RunnerName, providerCommon.InstancePendingDelete, nil); err != nil {
if errors.Is(err, runnerErrors.ErrNotFound) {
return nil
}
r.log("failed to update runner %s status: %s", util.SanitizeLogEntry(runnerInfo.Name), err)
r.log("failed to update runner %s status: %s", util.SanitizeLogEntry(jobParams.RunnerName), err)
return errors.Wrap(err, "updating runner")
}
case "in_progress":
// in_progress jobs must have a runner name/ID assigned. Sometimes github will send a hook without
// a runner set. In such cases, we attemt to fetch it from the API.
runnerInfo, err := r.getRunnerDetailsFromJob(job)
jobParams, err = r.paramsWorkflowJobToParamsJob(job)
if err != nil {
if errors.Is(err, runnerErrors.ErrNotFound) {
// This is most likely a runner we're not managing. If we define a repo from within an org
@ -161,21 +202,47 @@ func (r *basePoolManager) HandleWorkflowJob(job params.WorkflowJob) (err error)
// that we are not responsible for that runner, and we should ignore it.
return nil
}
return errors.Wrap(err, "determining runner name")
return errors.Wrap(err, "converting job to params")
}
// update instance workload state.
if err := r.setInstanceRunnerStatus(runnerInfo.Name, providerCommon.RunnerActive); err != nil {
instance, err := r.setInstanceRunnerStatus(jobParams.RunnerName, providerCommon.RunnerActive)
if err != nil {
if errors.Is(err, runnerErrors.ErrNotFound) {
return nil
}
r.log("failed to update runner %s status: %s", util.SanitizeLogEntry(runnerInfo.Name), err)
r.log("failed to update runner %s status: %s", util.SanitizeLogEntry(jobParams.RunnerName), err)
return errors.Wrap(err, "updating runner")
}
// Set triggeredBy here so we break the lock on any potential queued job.
triggeredBy = jobIdFromLabels(instance.AditionalLabels)
// A runner has picked up the job, and is now running it. It may need to be replaced if the pool has
// a minimum number of idle runners configured.
pool, err := r.store.GetPoolByID(r.ctx, instance.PoolID)
if err != nil {
return errors.Wrap(err, "getting pool")
}
if err := r.ensureIdleRunnersForOnePool(pool); err != nil {
r.log("error ensuring idle runners for pool %s: %s", pool.ID, err)
}
}
return nil
}
func jobIdFromLabels(labels []string) int64 {
for _, lbl := range labels {
if strings.HasPrefix(lbl, jobLabelPrefix) {
jobId, err := strconv.ParseInt(lbl[len(jobLabelPrefix):], 10, 64)
if err != nil {
return 0
}
return jobId
}
}
return 0
}
func (r *basePoolManager) startLoopForFunction(f func() error, interval time.Duration, name string, alwaysRun bool) {
r.log("starting %s loop for %s", name, r.helper.String())
ticker := time.NewTicker(interval)
@ -218,7 +285,7 @@ func (r *basePoolManager) startLoopForFunction(f func() error, interval time.Dur
// this worker was stopped.
return
default:
r.waitForTimeoutOrCanceled(common.UnauthorizedBackoffTimer)
r.waitForTimeoutOrCanceled(common.BackoffTimer)
}
}
}
@ -228,18 +295,16 @@ func (r *basePoolManager) updateTools() error {
// Update tools cache.
tools, err := r.helper.FetchTools()
if err != nil {
r.log("failed to update tools for repo %s: %s", r.helper.String(), err)
r.setPoolRunningState(false, err.Error())
if errors.Is(err, runnerErrors.ErrUnauthorized) {
r.waitForTimeoutOrCanceled(common.UnauthorizedBackoffTimer)
} else {
r.waitForTimeoutOrCanceled(60 * time.Second)
}
r.waitForTimeoutOrCanceled(common.BackoffTimer)
return fmt.Errorf("failed to update tools for repo %s: %w", r.helper.String(), err)
}
r.mux.Lock()
r.tools = tools
r.mux.Unlock()
r.log("successfully updated tools")
r.setPoolRunningState(true, "")
return err
}
@ -329,7 +394,7 @@ func (r *basePoolManager) cleanupOrphanedProviderRunners(runners []*github.Runne
if ok := runnerNames[instance.Name]; !ok {
// Set pending_delete on DB field. Allow consolidate() to remove it.
if err := r.setInstanceStatus(instance.Name, providerCommon.InstancePendingDelete, nil); err != nil {
if _, err := r.setInstanceStatus(instance.Name, providerCommon.InstancePendingDelete, nil); err != nil {
r.log("failed to update runner %s status: %s", instance.Name, err)
return errors.Wrap(err, "updating runner")
}
@ -568,101 +633,45 @@ func (r *basePoolManager) fetchInstance(runnerName string) (params.Instance, err
return runner, nil
}
func (r *basePoolManager) setInstanceRunnerStatus(runnerName string, status providerCommon.RunnerStatus) error {
func (r *basePoolManager) setInstanceRunnerStatus(runnerName string, status providerCommon.RunnerStatus) (params.Instance, error) {
updateParams := params.UpdateInstanceParams{
RunnerStatus: status,
}
if err := r.updateInstance(runnerName, updateParams); err != nil {
return errors.Wrap(err, "updating runner state")
instance, err := r.updateInstance(runnerName, updateParams)
if err != nil {
return params.Instance{}, errors.Wrap(err, "updating runner state")
}
return nil
return instance, nil
}
func (r *basePoolManager) updateInstance(runnerName string, update params.UpdateInstanceParams) error {
func (r *basePoolManager) updateInstance(runnerName string, update params.UpdateInstanceParams) (params.Instance, error) {
runner, err := r.fetchInstance(runnerName)
if err != nil {
return errors.Wrap(err, "fetching instance")
return params.Instance{}, errors.Wrap(err, "fetching instance")
}
if _, err := r.store.UpdateInstance(r.ctx, runner.ID, update); err != nil {
return errors.Wrap(err, "updating runner state")
instance, err := r.store.UpdateInstance(r.ctx, runner.ID, update)
if err != nil {
return params.Instance{}, errors.Wrap(err, "updating runner state")
}
return nil
return instance, nil
}
func (r *basePoolManager) setInstanceStatus(runnerName string, status providerCommon.InstanceStatus, providerFault []byte) error {
func (r *basePoolManager) setInstanceStatus(runnerName string, status providerCommon.InstanceStatus, providerFault []byte) (params.Instance, error) {
updateParams := params.UpdateInstanceParams{
Status: status,
ProviderFault: providerFault,
}
if err := r.updateInstance(runnerName, updateParams); err != nil {
return errors.Wrap(err, "updating runner state")
instance, err := r.updateInstance(runnerName, updateParams)
if err != nil {
return params.Instance{}, errors.Wrap(err, "updating runner state")
}
return nil
return instance, nil
}
func (r *basePoolManager) acquireNewInstance(job params.WorkflowJob) error {
requestedLabels := job.WorkflowJob.Labels
if len(requestedLabels) == 0 {
// no labels were requested.
return nil
}
pool, err := r.helper.FindPoolByTags(requestedLabels)
if err != nil {
if errors.Is(err, runnerErrors.ErrNotFound) {
r.log("failed to find an enabled pool with required labels: %s", strings.Join(requestedLabels, ", "))
return nil
}
return errors.Wrap(err, "fetching suitable pool")
}
r.log("adding new runner with requested tags %s in pool %s", util.SanitizeLogEntry(strings.Join(job.WorkflowJob.Labels, ", ")), util.SanitizeLogEntry(pool.ID))
if !pool.Enabled {
r.log("selected pool (%s) is disabled", pool.ID)
return nil
}
poolInstances, err := r.store.PoolInstanceCount(r.ctx, pool.ID)
if err != nil {
return errors.Wrap(err, "fetching instances")
}
if poolInstances >= int64(pool.MaxRunners) {
r.log("max_runners (%d) reached for pool %s, skipping...", pool.MaxRunners, pool.ID)
return nil
}
instances, err := r.store.ListPoolInstances(r.ctx, pool.ID)
if err != nil {
return errors.Wrap(err, "fetching instances")
}
idleWorkers := 0
for _, inst := range instances {
if providerCommon.RunnerStatus(inst.RunnerStatus) == providerCommon.RunnerIdle &&
providerCommon.InstanceStatus(inst.Status) == providerCommon.InstanceRunning {
idleWorkers++
}
}
// Skip creating a new runner if we have at least one idle runner and the minimum is already satisfied.
// This should work even for pools that define a MinIdleRunner of 0.
if int64(idleWorkers) > 0 && int64(idleWorkers) >= int64(pool.MinIdleRunners) {
r.log("we have enough min_idle_runners (%d) for pool %s, skipping...", pool.MinIdleRunners, pool.ID)
return nil
}
if err := r.AddRunner(r.ctx, pool.ID); err != nil {
r.log("failed to add runner to pool %s", pool.ID)
return errors.Wrap(err, "adding runner")
}
return nil
}
func (r *basePoolManager) AddRunner(ctx context.Context, poolID string) error {
func (r *basePoolManager) AddRunner(ctx context.Context, poolID string, aditionalLabels []string) error {
pool, err := r.helper.GetPoolByID(poolID)
if err != nil {
return errors.Wrap(err, "fetching pool")
@ -680,6 +689,7 @@ func (r *basePoolManager) AddRunner(ctx context.Context, poolID string) error {
MetadataURL: r.helper.GetMetadataURL(),
CreateAttempt: 1,
GitHubRunnerGroup: pool.GitHubRunnerGroup,
AditionalLabels: aditionalLabels,
}
_, err = r.store.CreateInstance(r.ctx, poolID, createParams)
@ -825,6 +835,73 @@ func (r *basePoolManager) getRunnerDetailsFromJob(job params.WorkflowJob) (param
return runnerInfo, nil
}
// paramsWorkflowJobToParamsJob returns a params.Job from a params.WorkflowJob, and aditionally determines
// if the runner belongs to this pool or not. It will always return a valid params.Job, even if it errs out.
// This allows us to still update the job in the database, even if we determined that it wasn't necessarily meant
// for this pool.
// If garm manages multiple hierarchies (repos, org, enterprise) which involve the same repo, we will get a hook
// whenever a job involving our repo triggers a hook. So even if the job is picked up by a runner at the enterprise
// level, the repo and org still get a hook.
// We even get a hook if a particular job is picked up by a GitHub hosted runner. We don't know who will pick up the job
// until the "in_progress" event is sent and we can see which runner picked it up.
//
// We save the details of that job at every level, because we want to at least update the status of the job. We make
// decissions based on the status of saved jobs. A "queued" job will prompt garm to search for an appropriate pool
// and spin up a runner there if no other idle runner exists to pick it up.
func (r *basePoolManager) paramsWorkflowJobToParamsJob(job params.WorkflowJob) (params.Job, error) {
asUUID, err := uuid.Parse(r.ID())
if err != nil {
return params.Job{}, errors.Wrap(err, "parsing pool ID as UUID")
}
jobParams := params.Job{
ID: job.WorkflowJob.ID,
Action: job.Action,
RunID: job.WorkflowJob.RunID,
Status: job.WorkflowJob.Status,
Conclusion: job.WorkflowJob.Conclusion,
StartedAt: job.WorkflowJob.StartedAt,
CompletedAt: job.WorkflowJob.CompletedAt,
Name: job.WorkflowJob.Name,
GithubRunnerID: job.WorkflowJob.RunnerID,
RunnerGroupID: job.WorkflowJob.RunnerGroupID,
RunnerGroupName: job.WorkflowJob.RunnerGroupName,
RepositoryName: job.Repository.Name,
RepositoryOwner: job.Repository.Owner.Login,
Labels: job.WorkflowJob.Labels,
}
runnerName := job.WorkflowJob.RunnerName
if job.Action != "queued" && runnerName == "" {
if job.WorkflowJob.Conclusion != "skipped" && job.WorkflowJob.Conclusion != "canceled" {
// Runner name was not set in WorkflowJob by github. We can still attempt to fetch the info we need,
// using the workflow run ID, from the API.
// We may still get no runner name. In situations such as jobs being cancelled before a runner had the chance
// to pick up the job, the runner name is not available from the API.
runnerInfo, err := r.getRunnerDetailsFromJob(job)
if err != nil && !errors.Is(err, runnerErrors.ErrNotFound) {
return jobParams, errors.Wrap(err, "fetching runner details")
}
runnerName = runnerInfo.Name
}
}
jobParams.RunnerName = runnerName
switch r.helper.PoolType() {
case params.EnterprisePool:
jobParams.EnterpriseID = &asUUID
case params.RepositoryPool:
jobParams.RepoID = &asUUID
case params.OrganizationPool:
jobParams.OrgID = &asUUID
default:
return jobParams, errors.Errorf("unknown pool type: %s", r.helper.PoolType())
}
return jobParams, nil
}
func (r *basePoolManager) poolLabel(poolID string) string {
return fmt.Sprintf("%s%s", poolIDLabelprefix, poolID)
}
@ -845,7 +922,9 @@ func (r *basePoolManager) updateArgsFromProviderInstance(providerInstance params
}
}
func (r *basePoolManager) scaleDownOnePool(ctx context.Context, pool params.Pool) error {
r.log("scaling down pool %s", pool.ID)
if !pool.Enabled {
r.log("pool %s is disabled, skipping scale down", pool.ID)
return nil
}
@ -860,9 +939,7 @@ func (r *basePoolManager) scaleDownOnePool(ctx context.Context, pool params.Pool
// consideration for scale-down. The 5 minute grace period prevents a situation where a
// "queued" workflow triggers the creation of a new idle runner, and this routine reaps
// an idle runner before they have a chance to pick up a job.
if providerCommon.RunnerStatus(inst.RunnerStatus) == providerCommon.RunnerIdle &&
providerCommon.InstanceStatus(inst.Status) == providerCommon.InstanceRunning &&
time.Since(inst.UpdatedAt).Minutes() > 5 {
if inst.RunnerStatus == providerCommon.RunnerIdle && inst.Status == providerCommon.InstanceRunning && time.Since(inst.UpdatedAt).Minutes() > 2 {
idleWorkers = append(idleWorkers, inst)
}
}
@ -911,13 +988,35 @@ func (r *basePoolManager) scaleDownOnePool(ctx context.Context, pool params.Pool
return nil
}
func (r *basePoolManager) ensureIdleRunnersForOnePool(pool params.Pool) error {
func (r *basePoolManager) addRunnerToPool(pool params.Pool, aditionalLabels []string) error {
if !pool.Enabled {
return fmt.Errorf("pool %s is disabled", pool.ID)
}
poolInstanceCount, err := r.store.PoolInstanceCount(r.ctx, pool.ID)
if err != nil {
return fmt.Errorf("failed to list pool instances: %w", err)
}
if poolInstanceCount >= int64(pool.MaxRunners) {
return fmt.Errorf("max workers (%d) reached for pool %s", pool.MaxRunners, pool.ID)
}
if err := r.AddRunner(r.ctx, pool.ID, aditionalLabels); err != nil {
return fmt.Errorf("failed to add new instance for pool %s: %s", pool.ID, err)
}
return nil
}
func (r *basePoolManager) ensureIdleRunnersForOnePool(pool params.Pool) error {
if !pool.Enabled || pool.MinIdleRunners == 0 {
return nil
}
existingInstances, err := r.store.ListPoolInstances(r.ctx, pool.ID)
if err != nil {
return fmt.Errorf("failed to ensure minimum idle workers for pool %s: %w", pool.ID, err)
}
if uint(len(existingInstances)) >= pool.MaxRunners {
@ -927,7 +1026,7 @@ func (r *basePoolManager) ensureIdleRunnersForOnePool(pool params.Pool) error {
idleOrPendingWorkers := []params.Instance{}
for _, inst := range existingInstances {
if providerCommon.RunnerStatus(inst.RunnerStatus) != providerCommon.RunnerActive {
if inst.RunnerStatus != providerCommon.RunnerActive && inst.RunnerStatus != providerCommon.RunnerTerminated {
idleOrPendingWorkers = append(idleOrPendingWorkers, inst)
}
}
@ -947,7 +1046,7 @@ func (r *basePoolManager) ensureIdleRunnersForOnePool(pool params.Pool) error {
for i := 0; i < required; i++ {
r.log("adding new idle worker to pool %s", pool.ID)
if err := r.AddRunner(r.ctx, pool.ID); err != nil {
if err := r.AddRunner(r.ctx, pool.ID, nil); err != nil {
return fmt.Errorf("failed to add new instance for pool %s: %w", pool.ID, err)
}
}
@ -1010,7 +1109,7 @@ func (r *basePoolManager) retryFailedInstancesForOnePool(ctx context.Context, po
}
r.log("queueing previously failed instance %s for retry", instance.Name)
// Set instance to pending create and wait for retry.
if err := r.updateInstance(instance.Name, updateParams); err != nil {
if _, err := r.updateInstance(instance.Name, updateParams); err != nil {
r.log("failed to update runner %s status: %s", instance.Name, err)
}
return nil
@ -1131,7 +1230,7 @@ func (r *basePoolManager) deletePendingInstances() error {
// Set the status to deleting before launching the goroutine that removes
// the runner from the provider (which can take a long time).
if err := r.setInstanceStatus(instance.Name, providerCommon.InstanceDeleting, nil); err != nil {
if _, err := r.setInstanceStatus(instance.Name, providerCommon.InstanceDeleting, nil); err != nil {
r.log("failed to update runner %s status: %q", instance.Name, err)
r.keyMux.Unlock(instance.Name, false)
continue
@ -1147,7 +1246,7 @@ func (r *basePoolManager) deletePendingInstances() error {
r.log("failed to remove instance %s: %s", instance.Name, err)
// failed to remove from provider. Set the status back to pending_delete, which
// will retry the operation.
if err := r.setInstanceStatus(instance.Name, providerCommon.InstancePendingDelete, nil); err != nil {
if _, err := r.setInstanceStatus(instance.Name, providerCommon.InstancePendingDelete, nil); err != nil {
r.log("failed to update runner %s status: %s", instance.Name, err)
}
}
@ -1192,7 +1291,7 @@ func (r *basePoolManager) addPendingInstances() error {
// Set the instance to "creating" before launching the goroutine. This will ensure that addPendingInstances()
// won't attempt to create the runner a second time.
if err := r.setInstanceStatus(instance.Name, providerCommon.InstanceCreating, nil); err != nil {
if _, err := r.setInstanceStatus(instance.Name, providerCommon.InstanceCreating, nil); err != nil {
r.log("failed to update runner %s status: %s", instance.Name, err)
r.keyMux.Unlock(instance.Name, false)
// We failed to transition the instance to Creating. This means that garm will retry to create this instance
@ -1206,7 +1305,7 @@ func (r *basePoolManager) addPendingInstances() error {
if err := r.addInstanceToProvider(instance); err != nil {
r.log("failed to add instance to provider: %s", err)
errAsBytes := []byte(err.Error())
if err := r.setInstanceStatus(instance.Name, providerCommon.InstanceError, errAsBytes); err != nil {
if _, err := r.setInstanceStatus(instance.Name, providerCommon.InstanceError, errAsBytes); err != nil {
r.log("failed to update runner %s status: %s", instance.Name, err)
}
r.log("failed to create instance in provider: %s", err)
@ -1275,6 +1374,7 @@ func (r *basePoolManager) Start() error {
go r.startLoopForFunction(r.ensureMinIdleRunners, common.PoolConsilitationInterval, "consolidate[ensure_min_idle]", false)
go r.startLoopForFunction(r.retryFailedInstances, common.PoolConsilitationInterval, "consolidate[retry_failed]", false)
go r.startLoopForFunction(r.updateTools, common.PoolToolUpdateInterval, "update_tools", true)
go r.startLoopForFunction(r.consumeQueuedJobs, common.PoolConsilitationInterval, "job_queue_consumer", false)
return nil
}
@ -1331,9 +1431,163 @@ func (r *basePoolManager) ForceDeleteRunner(runner params.Instance) error {
}
r.log("setting instance status for: %v", runner.Name)
if err := r.setInstanceStatus(runner.Name, providerCommon.InstancePendingDelete, nil); err != nil {
if _, err := r.setInstanceStatus(runner.Name, providerCommon.InstancePendingDelete, nil); err != nil {
r.log("failed to update runner %s status: %s", runner.Name, err)
return errors.Wrap(err, "updating runner")
}
return nil
}
// consumeQueuedJobs qull pull all the known jobs from the database and attempt to create a new
// runner in one of the pools it manages if it matches the requested labels.
// This is a best effort attempt to consume queued jobs. We do not have any real way to know which
// runner from which pool will pick up a job we react to here. For example, the same job may be received
// by an enterprise manager, an org manager AND a repo manager. If an idle runner from another pool
// picks up the job after we created a runner in this pool, we will have an extra runner that may or may not
// have a job waiting for it.
// This is not a huge problem, as we have scale down logic which should remove any idle runners that have not
// picked up a job within a certain time frame. Also, the logic here should ensure that eventually, all known
// queued jobs will be consumed sooner or later.
//
// NOTE: jobs that were created while the garm instance was down, will be unknown to garm itself and will linger
// in queued state if the pools defined in garm have a minimum idle runner value set to 0. Simply put, garm won't
// know about the queued jobs that we didn't get a webhook for. Listing all jobs on startup is not feasible, as
// an enterprise may have thousands of repos and thousands of jobs in queued state. To fetch all jobs for an
// enterprise, we'd have to list all repos, and for each repo list all jobs currently in queued state. This is
// not desirable by any measure.
func (r *basePoolManager) consumeQueuedJobs() error {
queued, err := r.store.ListEntityJobsByStatus(r.ctx, r.helper.PoolType(), r.helper.ID(), params.JobStatusQueued)
if err != nil {
return errors.Wrap(err, "listing queued jobs")
}
poolsCache := poolsForTags{}
r.log("found %d queued jobs for %s", len(queued), r.helper.String())
for _, job := range queued {
if job.LockedBy != uuid.Nil && job.LockedBy.String() != r.ID() {
// Job was handled by us or another entity.
r.log("job %d is locked by %s", job.ID, job.LockedBy.String())
continue
}
if time.Since(job.UpdatedAt) < time.Second*30 {
// give the idle runners a chance to pick up the job.
r.log("job %d was updated less than 30 seconds ago. Skipping", job.ID)
continue
}
if time.Since(job.UpdatedAt) >= time.Minute*10 {
// Job has been in queued state for 10 minutes or more. Check if it was consumed by another runner.
workflow, ghResp, err := r.helper.GithubCLI().GetWorkflowJobByID(r.ctx, job.RepositoryOwner, job.RepositoryName, job.ID)
if err != nil {
if ghResp != nil {
switch ghResp.StatusCode {
case http.StatusNotFound:
// Job does not exist in github. Remove it from the database.
if err := r.store.DeleteJob(r.ctx, job.ID); err != nil {
return errors.Wrap(err, "deleting job")
}
default:
r.log("failed to fetch job information from github: %q (status code: %d)", err, ghResp.StatusCode)
}
}
r.log("error fetching workflow info: %q", err)
continue
}
if workflow.GetStatus() != "queued" {
r.log("job is no longer in queued state on github. New status is: %s", workflow.GetStatus())
job.Action = workflow.GetStatus()
job.Status = workflow.GetStatus()
job.Conclusion = workflow.GetConclusion()
if workflow.RunnerName != nil {
job.RunnerName = *workflow.RunnerName
}
if workflow.RunnerID != nil {
job.GithubRunnerID = *workflow.RunnerID
}
if workflow.RunnerGroupName != nil {
job.RunnerGroupName = *workflow.RunnerGroupName
}
if workflow.RunnerGroupID != nil {
job.RunnerGroupID = *workflow.RunnerGroupID
}
if _, err := r.store.CreateOrUpdateJob(r.ctx, job); err != nil {
r.log("failed to update job status: %q", err)
}
continue
}
// Job is still queued in our db and in github. Unlock it and try again.
if err := r.store.UnlockJob(r.ctx, job.ID, r.ID()); err != nil {
// TODO: Implament a cache? Should we return here?
r.log("failed to unlock job %d: %q", job.ID, err)
continue
}
}
if job.LockedBy.String() == r.ID() {
// Job is locked by us. We must have already attepted to create a runner for it. Skip.
// TODO(gabriel-samfira): create an in-memory state of existing runners that we can easily
// check for existing pending or idle runners. If we can't find any, attempt to allocate another
// runner.
r.log("job %d is locked by us", job.ID)
continue
}
poolRR, ok := poolsCache.Get(job.Labels)
if !ok {
potentialPools, err := r.store.FindPoolsMatchingAllTags(r.ctx, r.helper.PoolType(), r.helper.ID(), job.Labels)
if err != nil {
r.log("error finding pools matching labels: %s", err)
continue
}
poolRR = poolsCache.Add(job.Labels, potentialPools)
}
if poolRR.Len() == 0 {
r.log("could not find pools with labels %s", strings.Join(job.Labels, ","))
continue
}
runnerCreated := false
if err := r.store.LockJob(r.ctx, job.ID, r.ID()); err != nil {
r.log("could not lock job %d: %s", job.ID, err)
continue
}
jobLabels := []string{
fmt.Sprintf("%s%d", jobLabelPrefix, job.ID),
}
for i := 0; i < poolRR.Len(); i++ {
pool, err := poolRR.Next()
if err != nil {
r.log("could not find a pool to create a runner for job %d: %s", job.ID, err)
break
}
r.log("attempting to create a runner in pool %s for job %d", pool.ID, job.ID)
if err := r.addRunnerToPool(pool, jobLabels); err != nil {
r.log("[PoolRR] could not add runner to pool %s: %s", pool.ID, err)
continue
}
r.log("a new runner was added to pool %s as a response to queued job %d", pool.ID, job.ID)
runnerCreated = true
break
}
if !runnerCreated {
r.log("could not create a runner for job %d; unlocking", job.ID)
if err := r.store.UnlockJob(r.ctx, job.ID, r.ID()); err != nil {
r.log("failed to unlock job: %d", job.ID)
return errors.Wrap(err, "unlocking job")
}
}
}
if err := r.store.DeleteCompletedJobs(r.ctx); err != nil {
r.log("failed to delete completed jobs: %q", err)
}
return nil
}

View file

@ -79,6 +79,14 @@ type repository struct {
mux sync.Mutex
}
func (r *repository) GithubCLI() common.GithubClient {
return r.ghcli
}
func (r *repository) PoolType() params.PoolType {
return params.RepositoryPool
}
func (r *repository) GetRunnerInfoFromWorkflow(job params.WorkflowJob) (params.RunnerInfo, error) {
if err := r.ValidateOwner(job); err != nil {
return params.RunnerInfo{}, errors.Wrap(err, "validating owner")

View file

@ -1,6 +1,62 @@
package pool
import "log"
import (
"log"
"sort"
"strings"
"sync"
"sync/atomic"
runnerErrors "github.com/cloudbase/garm/errors"
"github.com/cloudbase/garm/params"
)
type poolRoundRobin struct {
pools []params.Pool
next uint32
}
func (p *poolRoundRobin) Next() (params.Pool, error) {
if len(p.pools) == 0 {
return params.Pool{}, runnerErrors.ErrNoPoolsAvailable
}
n := atomic.AddUint32(&p.next, 1)
return p.pools[(int(n)-1)%len(p.pools)], nil
}
func (p *poolRoundRobin) Len() int {
return len(p.pools)
}
func (p *poolRoundRobin) Reset() {
atomic.StoreUint32(&p.next, 0)
}
type poolsForTags struct {
pools sync.Map
}
func (p *poolsForTags) Get(tags []string) (*poolRoundRobin, bool) {
sort.Strings(tags)
key := strings.Join(tags, "^")
v, ok := p.pools.Load(key)
if !ok {
return nil, false
}
return v.(*poolRoundRobin), true
}
func (p *poolsForTags) Add(tags []string, pools []params.Pool) *poolRoundRobin {
sort.Strings(tags)
key := strings.Join(tags, "^")
poolRR := &poolRoundRobin{pools: pools}
v, _ := p.pools.LoadOrStore(key, poolRR)
return v.(*poolRoundRobin)
}
func (r *basePoolManager) log(msg string, args ...interface{}) {
msgArgs := []interface{}{

View file

@ -125,3 +125,15 @@ func (r *Runner) UpdatePoolByID(ctx context.Context, poolID string, param params
}
return newPool, nil
}
func (r *Runner) ListAllJobs(ctx context.Context) ([]params.Job, error) {
if !auth.IsAdmin(ctx) {
return []params.Job{}, runnerErrors.ErrUnauthorized
}
jobs, err := r.store.ListAllJobs(ctx)
if err != nil {
return nil, errors.Wrap(err, "fetching jobs")
}
return jobs, nil
}

View file

@ -31,7 +31,6 @@ import (
"github.com/cloudbase/garm/auth"
"github.com/cloudbase/garm/config"
"github.com/cloudbase/garm/database"
dbCommon "github.com/cloudbase/garm/database/common"
runnerErrors "github.com/cloudbase/garm/errors"
"github.com/cloudbase/garm/params"
@ -42,18 +41,13 @@ import (
"github.com/cloudbase/garm/util"
"golang.org/x/sync/errgroup"
"github.com/google/uuid"
"github.com/juju/clock"
"github.com/juju/retry"
"github.com/pkg/errors"
uuid "github.com/satori/go.uuid"
)
func NewRunner(ctx context.Context, cfg config.Config) (*Runner, error) {
db, err := database.NewDatabase(ctx, cfg.Database)
if err != nil {
return nil, errors.Wrap(err, "creating db connection")
}
func NewRunner(ctx context.Context, cfg config.Config, db dbCommon.Store) (*Runner, error) {
ctrlId, err := db.ControllerInfo()
if err != nil {
return nil, errors.Wrap(err, "fetching controller info")

View file

@ -1,21 +0,0 @@
language: go
sudo: false
go:
- 1.6.x
- 1.7.x
- 1.8.x
- 1.9.x
- 1.10.x
- 1.11.x
- tip
matrix:
allow_failures:
- go: tip
fast_finish: true
before_install:
- go get github.com/mattn/goveralls
- go get golang.org/x/tools/cmd/cover
script:
- $HOME/gopath/bin/goveralls -service=travis-ci
notifications:
email: false

View file

@ -1,20 +0,0 @@
Copyright (C) 2013-2018 by Maxim Bublis <b@codemonkey.ru>
Permission is hereby granted, free of charge, to any person obtaining
a copy of this software and associated documentation files (the
"Software"), to deal in the Software without restriction, including
without limitation the rights to use, copy, modify, merge, publish,
distribute, sublicense, and/or sell copies of the Software, and to
permit persons to whom the Software is furnished to do so, subject to
the following conditions:
The above copyright notice and this permission notice shall be
included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

View file

@ -1,75 +0,0 @@
# UUID package for Go language
[![Build Status](https://travis-ci.org/satori/go.uuid.svg?branch=master)](https://travis-ci.org/satori/go.uuid)
[![Coverage Status](https://coveralls.io/repos/github/satori/go.uuid/badge.svg?branch=master)](https://coveralls.io/github/satori/go.uuid)
[![GoDoc](http://godoc.org/github.com/satori/go.uuid?status.svg)](http://godoc.org/github.com/satori/go.uuid)
This package provides pure Go implementation of Universally Unique Identifier (UUID). Supported both creation and parsing of UUIDs.
With 100% test coverage and benchmarks out of box.
Supported versions:
* Version 1, based on timestamp and MAC address (RFC 4122)
* Version 2, based on timestamp, MAC address and POSIX UID/GID (DCE 1.1)
* Version 3, based on MD5 hashing (RFC 4122)
* Version 4, based on random numbers (RFC 4122)
* Version 5, based on SHA-1 hashing (RFC 4122)
## Installation
Use the `go` command:
$ go get github.com/satori/go.uuid
## Requirements
UUID package tested against Go >= 1.6.
## Example
```go
package main
import (
"fmt"
"github.com/satori/go.uuid"
)
func main() {
// Creating UUID Version 4
// panic on error
u1 := uuid.Must(uuid.NewV4())
fmt.Printf("UUIDv4: %s\n", u1)
// or error handling
u2, err := uuid.NewV4()
if err != nil {
fmt.Printf("Something went wrong: %s", err)
return
}
fmt.Printf("UUIDv4: %s\n", u2)
// Parsing UUID from string input
u2, err := uuid.FromString("6ba7b810-9dad-11d1-80b4-00c04fd430c8")
if err != nil {
fmt.Printf("Something went wrong: %s", err)
return
}
fmt.Printf("Successfully parsed: %s", u2)
}
```
## Documentation
[Documentation](http://godoc.org/github.com/satori/go.uuid) is hosted at GoDoc project.
## Links
* [RFC 4122](http://tools.ietf.org/html/rfc4122)
* [DCE 1.1: Authentication and Security Services](http://pubs.opengroup.org/onlinepubs/9696989899/chap5.htm#tagcjh_08_02_01_01)
## Copyright
Copyright (C) 2013-2018 by Maxim Bublis <b@codemonkey.ru>.
UUID package released under MIT License.
See [LICENSE](https://github.com/satori/go.uuid/blob/master/LICENSE) for details.

View file

@ -1,206 +0,0 @@
// Copyright (C) 2013-2018 by Maxim Bublis <b@codemonkey.ru>
//
// Permission is hereby granted, free of charge, to any person obtaining
// a copy of this software and associated documentation files (the
// "Software"), to deal in the Software without restriction, including
// without limitation the rights to use, copy, modify, merge, publish,
// distribute, sublicense, and/or sell copies of the Software, and to
// permit persons to whom the Software is furnished to do so, subject to
// the following conditions:
//
// The above copyright notice and this permission notice shall be
// included in all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
// EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
// NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
// LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
// WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
package uuid
import (
"bytes"
"encoding/hex"
"fmt"
)
// FromBytes returns UUID converted from raw byte slice input.
// It will return error if the slice isn't 16 bytes long.
func FromBytes(input []byte) (u UUID, err error) {
err = u.UnmarshalBinary(input)
return
}
// FromBytesOrNil returns UUID converted from raw byte slice input.
// Same behavior as FromBytes, but returns a Nil UUID on error.
func FromBytesOrNil(input []byte) UUID {
uuid, err := FromBytes(input)
if err != nil {
return Nil
}
return uuid
}
// FromString returns UUID parsed from string input.
// Input is expected in a form accepted by UnmarshalText.
func FromString(input string) (u UUID, err error) {
err = u.UnmarshalText([]byte(input))
return
}
// FromStringOrNil returns UUID parsed from string input.
// Same behavior as FromString, but returns a Nil UUID on error.
func FromStringOrNil(input string) UUID {
uuid, err := FromString(input)
if err != nil {
return Nil
}
return uuid
}
// MarshalText implements the encoding.TextMarshaler interface.
// The encoding is the same as returned by String.
func (u UUID) MarshalText() (text []byte, err error) {
text = []byte(u.String())
return
}
// UnmarshalText implements the encoding.TextUnmarshaler interface.
// Following formats are supported:
// "6ba7b810-9dad-11d1-80b4-00c04fd430c8",
// "{6ba7b810-9dad-11d1-80b4-00c04fd430c8}",
// "urn:uuid:6ba7b810-9dad-11d1-80b4-00c04fd430c8"
// "6ba7b8109dad11d180b400c04fd430c8"
// ABNF for supported UUID text representation follows:
// uuid := canonical | hashlike | braced | urn
// plain := canonical | hashlike
// canonical := 4hexoct '-' 2hexoct '-' 2hexoct '-' 6hexoct
// hashlike := 12hexoct
// braced := '{' plain '}'
// urn := URN ':' UUID-NID ':' plain
// URN := 'urn'
// UUID-NID := 'uuid'
// 12hexoct := 6hexoct 6hexoct
// 6hexoct := 4hexoct 2hexoct
// 4hexoct := 2hexoct 2hexoct
// 2hexoct := hexoct hexoct
// hexoct := hexdig hexdig
// hexdig := '0' | '1' | '2' | '3' | '4' | '5' | '6' | '7' | '8' | '9' |
// 'a' | 'b' | 'c' | 'd' | 'e' | 'f' |
// 'A' | 'B' | 'C' | 'D' | 'E' | 'F'
func (u *UUID) UnmarshalText(text []byte) (err error) {
switch len(text) {
case 32:
return u.decodeHashLike(text)
case 36:
return u.decodeCanonical(text)
case 38:
return u.decodeBraced(text)
case 41:
fallthrough
case 45:
return u.decodeURN(text)
default:
return fmt.Errorf("uuid: incorrect UUID length: %s", text)
}
}
// decodeCanonical decodes UUID string in format
// "6ba7b810-9dad-11d1-80b4-00c04fd430c8".
func (u *UUID) decodeCanonical(t []byte) (err error) {
if t[8] != '-' || t[13] != '-' || t[18] != '-' || t[23] != '-' {
return fmt.Errorf("uuid: incorrect UUID format %s", t)
}
src := t[:]
dst := u[:]
for i, byteGroup := range byteGroups {
if i > 0 {
src = src[1:] // skip dash
}
_, err = hex.Decode(dst[:byteGroup/2], src[:byteGroup])
if err != nil {
return
}
src = src[byteGroup:]
dst = dst[byteGroup/2:]
}
return
}
// decodeHashLike decodes UUID string in format
// "6ba7b8109dad11d180b400c04fd430c8".
func (u *UUID) decodeHashLike(t []byte) (err error) {
src := t[:]
dst := u[:]
if _, err = hex.Decode(dst, src); err != nil {
return err
}
return
}
// decodeBraced decodes UUID string in format
// "{6ba7b810-9dad-11d1-80b4-00c04fd430c8}" or in format
// "{6ba7b8109dad11d180b400c04fd430c8}".
func (u *UUID) decodeBraced(t []byte) (err error) {
l := len(t)
if t[0] != '{' || t[l-1] != '}' {
return fmt.Errorf("uuid: incorrect UUID format %s", t)
}
return u.decodePlain(t[1 : l-1])
}
// decodeURN decodes UUID string in format
// "urn:uuid:6ba7b810-9dad-11d1-80b4-00c04fd430c8" or in format
// "urn:uuid:6ba7b8109dad11d180b400c04fd430c8".
func (u *UUID) decodeURN(t []byte) (err error) {
total := len(t)
urn_uuid_prefix := t[:9]
if !bytes.Equal(urn_uuid_prefix, urnPrefix) {
return fmt.Errorf("uuid: incorrect UUID format: %s", t)
}
return u.decodePlain(t[9:total])
}
// decodePlain decodes UUID string in canonical format
// "6ba7b810-9dad-11d1-80b4-00c04fd430c8" or in hash-like format
// "6ba7b8109dad11d180b400c04fd430c8".
func (u *UUID) decodePlain(t []byte) (err error) {
switch len(t) {
case 32:
return u.decodeHashLike(t)
case 36:
return u.decodeCanonical(t)
default:
return fmt.Errorf("uuid: incorrrect UUID length: %s", t)
}
}
// MarshalBinary implements the encoding.BinaryMarshaler interface.
func (u UUID) MarshalBinary() (data []byte, err error) {
data = u.Bytes()
return
}
// UnmarshalBinary implements the encoding.BinaryUnmarshaler interface.
// It will return error if the slice isn't 16 bytes long.
func (u *UUID) UnmarshalBinary(data []byte) (err error) {
if len(data) != Size {
err = fmt.Errorf("uuid: UUID must be exactly 16 bytes long, got %d bytes", len(data))
return
}
copy(u[:], data)
return
}

View file

@ -1,265 +0,0 @@
// Copyright (C) 2013-2018 by Maxim Bublis <b@codemonkey.ru>
//
// Permission is hereby granted, free of charge, to any person obtaining
// a copy of this software and associated documentation files (the
// "Software"), to deal in the Software without restriction, including
// without limitation the rights to use, copy, modify, merge, publish,
// distribute, sublicense, and/or sell copies of the Software, and to
// permit persons to whom the Software is furnished to do so, subject to
// the following conditions:
//
// The above copyright notice and this permission notice shall be
// included in all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
// EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
// NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
// LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
// WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
package uuid
import (
"crypto/md5"
"crypto/rand"
"crypto/sha1"
"encoding/binary"
"fmt"
"hash"
"io"
"net"
"os"
"sync"
"time"
)
// Difference in 100-nanosecond intervals between
// UUID epoch (October 15, 1582) and Unix epoch (January 1, 1970).
const epochStart = 122192928000000000
type epochFunc func() time.Time
type hwAddrFunc func() (net.HardwareAddr, error)
var (
global = newRFC4122Generator()
posixUID = uint32(os.Getuid())
posixGID = uint32(os.Getgid())
)
// NewV1 returns UUID based on current timestamp and MAC address.
func NewV1() (UUID, error) {
return global.NewV1()
}
// NewV2 returns DCE Security UUID based on POSIX UID/GID.
func NewV2(domain byte) (UUID, error) {
return global.NewV2(domain)
}
// NewV3 returns UUID based on MD5 hash of namespace UUID and name.
func NewV3(ns UUID, name string) UUID {
return global.NewV3(ns, name)
}
// NewV4 returns random generated UUID.
func NewV4() (UUID, error) {
return global.NewV4()
}
// NewV5 returns UUID based on SHA-1 hash of namespace UUID and name.
func NewV5(ns UUID, name string) UUID {
return global.NewV5(ns, name)
}
// Generator provides interface for generating UUIDs.
type Generator interface {
NewV1() (UUID, error)
NewV2(domain byte) (UUID, error)
NewV3(ns UUID, name string) UUID
NewV4() (UUID, error)
NewV5(ns UUID, name string) UUID
}
// Default generator implementation.
type rfc4122Generator struct {
clockSequenceOnce sync.Once
hardwareAddrOnce sync.Once
storageMutex sync.Mutex
rand io.Reader
epochFunc epochFunc
hwAddrFunc hwAddrFunc
lastTime uint64
clockSequence uint16
hardwareAddr [6]byte
}
func newRFC4122Generator() Generator {
return &rfc4122Generator{
epochFunc: time.Now,
hwAddrFunc: defaultHWAddrFunc,
rand: rand.Reader,
}
}
// NewV1 returns UUID based on current timestamp and MAC address.
func (g *rfc4122Generator) NewV1() (UUID, error) {
u := UUID{}
timeNow, clockSeq, err := g.getClockSequence()
if err != nil {
return Nil, err
}
binary.BigEndian.PutUint32(u[0:], uint32(timeNow))
binary.BigEndian.PutUint16(u[4:], uint16(timeNow>>32))
binary.BigEndian.PutUint16(u[6:], uint16(timeNow>>48))
binary.BigEndian.PutUint16(u[8:], clockSeq)
hardwareAddr, err := g.getHardwareAddr()
if err != nil {
return Nil, err
}
copy(u[10:], hardwareAddr)
u.SetVersion(V1)
u.SetVariant(VariantRFC4122)
return u, nil
}
// NewV2 returns DCE Security UUID based on POSIX UID/GID.
func (g *rfc4122Generator) NewV2(domain byte) (UUID, error) {
u, err := g.NewV1()
if err != nil {
return Nil, err
}
switch domain {
case DomainPerson:
binary.BigEndian.PutUint32(u[:], posixUID)
case DomainGroup:
binary.BigEndian.PutUint32(u[:], posixGID)
}
u[9] = domain
u.SetVersion(V2)
u.SetVariant(VariantRFC4122)
return u, nil
}
// NewV3 returns UUID based on MD5 hash of namespace UUID and name.
func (g *rfc4122Generator) NewV3(ns UUID, name string) UUID {
u := newFromHash(md5.New(), ns, name)
u.SetVersion(V3)
u.SetVariant(VariantRFC4122)
return u
}
// NewV4 returns random generated UUID.
func (g *rfc4122Generator) NewV4() (UUID, error) {
u := UUID{}
if _, err := io.ReadFull(g.rand, u[:]); err != nil {
return Nil, err
}
u.SetVersion(V4)
u.SetVariant(VariantRFC4122)
return u, nil
}
// NewV5 returns UUID based on SHA-1 hash of namespace UUID and name.
func (g *rfc4122Generator) NewV5(ns UUID, name string) UUID {
u := newFromHash(sha1.New(), ns, name)
u.SetVersion(V5)
u.SetVariant(VariantRFC4122)
return u
}
// Returns epoch and clock sequence.
func (g *rfc4122Generator) getClockSequence() (uint64, uint16, error) {
var err error
g.clockSequenceOnce.Do(func() {
buf := make([]byte, 2)
if _, err = io.ReadFull(g.rand, buf); err != nil {
return
}
g.clockSequence = binary.BigEndian.Uint16(buf)
})
if err != nil {
return 0, 0, err
}
g.storageMutex.Lock()
defer g.storageMutex.Unlock()
timeNow := g.getEpoch()
// Clock didn't change since last UUID generation.
// Should increase clock sequence.
if timeNow <= g.lastTime {
g.clockSequence++
}
g.lastTime = timeNow
return timeNow, g.clockSequence, nil
}
// Returns hardware address.
func (g *rfc4122Generator) getHardwareAddr() ([]byte, error) {
var err error
g.hardwareAddrOnce.Do(func() {
if hwAddr, err := g.hwAddrFunc(); err == nil {
copy(g.hardwareAddr[:], hwAddr)
return
}
// Initialize hardwareAddr randomly in case
// of real network interfaces absence.
if _, err = io.ReadFull(g.rand, g.hardwareAddr[:]); err != nil {
return
}
// Set multicast bit as recommended by RFC 4122
g.hardwareAddr[0] |= 0x01
})
if err != nil {
return []byte{}, err
}
return g.hardwareAddr[:], nil
}
// Returns difference in 100-nanosecond intervals between
// UUID epoch (October 15, 1582) and current time.
func (g *rfc4122Generator) getEpoch() uint64 {
return epochStart + uint64(g.epochFunc().UnixNano()/100)
}
// Returns UUID based on hashing of namespace UUID and name.
func newFromHash(h hash.Hash, ns UUID, name string) UUID {
u := UUID{}
h.Write(ns[:])
h.Write([]byte(name))
copy(u[:], h.Sum(nil))
return u
}
// Returns hardware address.
func defaultHWAddrFunc() (net.HardwareAddr, error) {
ifaces, err := net.Interfaces()
if err != nil {
return []byte{}, err
}
for _, iface := range ifaces {
if len(iface.HardwareAddr) >= 6 {
return iface.HardwareAddr, nil
}
}
return []byte{}, fmt.Errorf("uuid: no HW address found")
}

View file

@ -1,78 +0,0 @@
// Copyright (C) 2013-2018 by Maxim Bublis <b@codemonkey.ru>
//
// Permission is hereby granted, free of charge, to any person obtaining
// a copy of this software and associated documentation files (the
// "Software"), to deal in the Software without restriction, including
// without limitation the rights to use, copy, modify, merge, publish,
// distribute, sublicense, and/or sell copies of the Software, and to
// permit persons to whom the Software is furnished to do so, subject to
// the following conditions:
//
// The above copyright notice and this permission notice shall be
// included in all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
// EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
// NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
// LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
// WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
package uuid
import (
"database/sql/driver"
"fmt"
)
// Value implements the driver.Valuer interface.
func (u UUID) Value() (driver.Value, error) {
return u.String(), nil
}
// Scan implements the sql.Scanner interface.
// A 16-byte slice is handled by UnmarshalBinary, while
// a longer byte slice or a string is handled by UnmarshalText.
func (u *UUID) Scan(src interface{}) error {
switch src := src.(type) {
case []byte:
if len(src) == Size {
return u.UnmarshalBinary(src)
}
return u.UnmarshalText(src)
case string:
return u.UnmarshalText([]byte(src))
}
return fmt.Errorf("uuid: cannot convert %T to UUID", src)
}
// NullUUID can be used with the standard sql package to represent a
// UUID value that can be NULL in the database
type NullUUID struct {
UUID UUID
Valid bool
}
// Value implements the driver.Valuer interface.
func (u NullUUID) Value() (driver.Value, error) {
if !u.Valid {
return nil, nil
}
// Delegate to UUID Value function
return u.UUID.Value()
}
// Scan implements the sql.Scanner interface.
func (u *NullUUID) Scan(src interface{}) error {
if src == nil {
u.UUID, u.Valid = Nil, false
return nil
}
// Delegate to UUID Scan function
u.Valid = true
return u.UUID.Scan(src)
}

View file

@ -1,161 +0,0 @@
// Copyright (C) 2013-2018 by Maxim Bublis <b@codemonkey.ru>
//
// Permission is hereby granted, free of charge, to any person obtaining
// a copy of this software and associated documentation files (the
// "Software"), to deal in the Software without restriction, including
// without limitation the rights to use, copy, modify, merge, publish,
// distribute, sublicense, and/or sell copies of the Software, and to
// permit persons to whom the Software is furnished to do so, subject to
// the following conditions:
//
// The above copyright notice and this permission notice shall be
// included in all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
// EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
// NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
// LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
// WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
// Package uuid provides implementation of Universally Unique Identifier (UUID).
// Supported versions are 1, 3, 4 and 5 (as specified in RFC 4122) and
// version 2 (as specified in DCE 1.1).
package uuid
import (
"bytes"
"encoding/hex"
)
// Size of a UUID in bytes.
const Size = 16
// UUID representation compliant with specification
// described in RFC 4122.
type UUID [Size]byte
// UUID versions
const (
_ byte = iota
V1
V2
V3
V4
V5
)
// UUID layout variants.
const (
VariantNCS byte = iota
VariantRFC4122
VariantMicrosoft
VariantFuture
)
// UUID DCE domains.
const (
DomainPerson = iota
DomainGroup
DomainOrg
)
// String parse helpers.
var (
urnPrefix = []byte("urn:uuid:")
byteGroups = []int{8, 4, 4, 4, 12}
)
// Nil is special form of UUID that is specified to have all
// 128 bits set to zero.
var Nil = UUID{}
// Predefined namespace UUIDs.
var (
NamespaceDNS = Must(FromString("6ba7b810-9dad-11d1-80b4-00c04fd430c8"))
NamespaceURL = Must(FromString("6ba7b811-9dad-11d1-80b4-00c04fd430c8"))
NamespaceOID = Must(FromString("6ba7b812-9dad-11d1-80b4-00c04fd430c8"))
NamespaceX500 = Must(FromString("6ba7b814-9dad-11d1-80b4-00c04fd430c8"))
)
// Equal returns true if u1 and u2 equals, otherwise returns false.
func Equal(u1 UUID, u2 UUID) bool {
return bytes.Equal(u1[:], u2[:])
}
// Version returns algorithm version used to generate UUID.
func (u UUID) Version() byte {
return u[6] >> 4
}
// Variant returns UUID layout variant.
func (u UUID) Variant() byte {
switch {
case (u[8] >> 7) == 0x00:
return VariantNCS
case (u[8] >> 6) == 0x02:
return VariantRFC4122
case (u[8] >> 5) == 0x06:
return VariantMicrosoft
case (u[8] >> 5) == 0x07:
fallthrough
default:
return VariantFuture
}
}
// Bytes returns bytes slice representation of UUID.
func (u UUID) Bytes() []byte {
return u[:]
}
// Returns canonical string representation of UUID:
// xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx.
func (u UUID) String() string {
buf := make([]byte, 36)
hex.Encode(buf[0:8], u[0:4])
buf[8] = '-'
hex.Encode(buf[9:13], u[4:6])
buf[13] = '-'
hex.Encode(buf[14:18], u[6:8])
buf[18] = '-'
hex.Encode(buf[19:23], u[8:10])
buf[23] = '-'
hex.Encode(buf[24:], u[10:])
return string(buf)
}
// SetVersion sets version bits.
func (u *UUID) SetVersion(v byte) {
u[6] = (u[6] & 0x0f) | (v << 4)
}
// SetVariant sets variant bits.
func (u *UUID) SetVariant(v byte) {
switch v {
case VariantNCS:
u[8] = (u[8]&(0xff>>1) | (0x00 << 7))
case VariantRFC4122:
u[8] = (u[8]&(0xff>>2) | (0x02 << 6))
case VariantMicrosoft:
u[8] = (u[8]&(0xff>>3) | (0x06 << 5))
case VariantFuture:
fallthrough
default:
u[8] = (u[8]&(0xff>>3) | (0x07 << 5))
}
}
// Must is a helper that wraps a call to a function returning (UUID, error)
// and panics if the error is non-nil. It is intended for use in variable
// initializations such as
// var packageUUID = uuid.Must(uuid.FromString("123e4567-e89b-12d3-a456-426655440000"));
func Must(u UUID, err error) UUID {
if err != nil {
panic(err)
}
return u
}

3
vendor/modules.txt vendored
View file

@ -296,9 +296,6 @@ github.com/robfig/cron/v3
# github.com/rogpeppe/fastuuid v1.2.0
## explicit; go 1.12
github.com/rogpeppe/fastuuid
# github.com/satori/go.uuid v1.2.1-0.20181028125025-b2ce2384e17b
## explicit
github.com/satori/go.uuid
# github.com/sirupsen/logrus v1.9.0
## explicit; go 1.13
github.com/sirupsen/logrus