diff --git a/cmd/runner-manager/create.go b/cmd/runner-manager/create.go index 67f6e18b..73938a5d 100644 --- a/cmd/runner-manager/create.go +++ b/cmd/runner-manager/create.go @@ -1,172 +1,172 @@ package main -import ( - "context" - "flag" - "fmt" - "log" - "os/signal" +// import ( +// "context" +// "flag" +// "fmt" +// "log" +// "os/signal" - "runner-manager/config" - "runner-manager/params" - "runner-manager/runner/providers/lxd" - "runner-manager/util" +// "runner-manager/config" +// "runner-manager/params" +// "runner-manager/runner/providers/lxd" +// "runner-manager/util" - "github.com/google/go-github/v43/github" - "golang.org/x/oauth2" - "gopkg.in/yaml.v3" -) +// "github.com/google/go-github/v43/github" +// "golang.org/x/oauth2" +// "gopkg.in/yaml.v3" +// ) -var ( - conf = flag.String("config", config.DefaultConfigFilePath, "runner-manager config file") - version = flag.Bool("version", false, "prints version") -) +// var ( +// conf = flag.String("config", config.DefaultConfigFilePath, "runner-manager config file") +// version = flag.Bool("version", false, "prints version") +// ) -var Version string +// var Version string -// var token = "super secret token" +// // var token = "super secret token" -func main() { - flag.Parse() - if *version { - fmt.Println(Version) - return - } - ctx, stop := signal.NotifyContext(context.Background(), signals...) - defer stop() - fmt.Println(ctx) +// func main() { +// flag.Parse() +// if *version { +// fmt.Println(Version) +// return +// } +// ctx, stop := signal.NotifyContext(context.Background(), signals...) +// defer stop() +// fmt.Println(ctx) - cfg, err := config.NewConfig(*conf) - if err != nil { - log.Fatalf("Fetching config: %+v", err) - } +// cfg, err := config.NewConfig(*conf) +// if err != nil { +// log.Fatalf("Fetching config: %+v", err) +// } - ts := oauth2.StaticTokenSource( - &oauth2.Token{AccessToken: cfg.Github.OAuth2Token}, - ) +// ts := oauth2.StaticTokenSource( +// &oauth2.Token{AccessToken: cfg.Github.OAuth2Token}, +// ) - tc := oauth2.NewClient(ctx, ts) +// tc := oauth2.NewClient(ctx, ts) - ghClient := github.NewClient(tc) +// ghClient := github.NewClient(tc) - // // list all repositories for the authenticated user - // repos, _, err := client.Repositories.List(ctx, "", nil) +// // // list all repositories for the authenticated user +// // repos, _, err := client.Repositories.List(ctx, "", nil) - // fmt.Println(repos, err) +// // fmt.Println(repos, err) - logWriter, err := util.GetLoggingWriter(cfg) - if err != nil { - log.Fatalf("fetching log writer: %+v", err) - } - log.SetOutput(logWriter) +// logWriter, err := util.GetLoggingWriter(cfg) +// if err != nil { +// log.Fatalf("fetching log writer: %+v", err) +// } +// log.SetOutput(logWriter) - // controller, err := controllers.NewAPIController() - // if err != nil { - // log.Fatalf("failed to create controller: %+v", err) - // } +// // controller, err := controllers.NewAPIController() +// // if err != nil { +// // log.Fatalf("failed to create controller: %+v", err) +// // } - // router := routers.NewAPIRouter(controller, logWriter) +// // router := routers.NewAPIRouter(controller, logWriter) - // tlsCfg, err := cfg.APIServer.APITLSConfig() - // if err != nil { - // log.Fatalf("failed to get TLS config: %q", err) - // } +// // tlsCfg, err := cfg.APIServer.APITLSConfig() +// // if err != nil { +// // log.Fatalf("failed to get TLS config: %q", err) +// // } - // srv := &http.Server{ - // Addr: cfg.APIServer.BindAddress(), - // TLSConfig: tlsCfg, - // // Pass our instance of gorilla/mux in. - // Handler: router, - // } +// // srv := &http.Server{ +// // Addr: cfg.APIServer.BindAddress(), +// // TLSConfig: tlsCfg, +// // // Pass our instance of gorilla/mux in. +// // Handler: router, +// // } - // listener, err := net.Listen("tcp", srv.Addr) - // if err != nil { - // log.Fatalf("creating listener: %q", err) - // } +// // listener, err := net.Listen("tcp", srv.Addr) +// // if err != nil { +// // log.Fatalf("creating listener: %q", err) +// // } - // go func() { - // if err := srv.Serve(listener); err != nil { - // log.Fatalf("Listening: %+v", err) - // } - // }() +// // go func() { +// // if err := srv.Serve(listener); err != nil { +// // log.Fatalf("Listening: %+v", err) +// // } +// // }() - // <-ctx.Done() +// // <-ctx.Done() - // runner, err := runner.NewRunner(ctx, *cfg) - // if err != nil { - // log.Fatal(err) - // } +// // runner, err := runner.NewRunner(ctx, *cfg) +// // if err != nil { +// // log.Fatal(err) +// // } - // fmt.Println(runner) - controllerID := "026d374d-6a8a-4241-8ed9-a246fff6762f" - provider, err := lxd.NewProvider(ctx, &cfg.Providers[0], controllerID) - if err != nil { - log.Fatal(err) - } +// // fmt.Println(runner) +// controllerID := "026d374d-6a8a-4241-8ed9-a246fff6762f" +// provider, err := lxd.NewProvider(ctx, &cfg.Providers[0], controllerID) +// if err != nil { +// log.Fatal(err) +// } - // if err := provider.RemoveAllInstances(ctx); err != nil { - // log.Fatal(err) - // } +// // if err := provider.RemoveAllInstances(ctx); err != nil { +// // log.Fatal(err) +// // } - // fmt.Println(provider) +// // fmt.Println(provider) - // if err := provider.DeleteInstance(ctx, "runner-manager-2fbe5354-be28-4e00-95a8-11479912368d"); err != nil { - // log.Fatal(err) - // } +// // if err := provider.DeleteInstance(ctx, "runner-manager-2fbe5354-be28-4e00-95a8-11479912368d"); err != nil { +// // log.Fatal(err) +// // } - // instances, err := provider.ListInstances(ctx) +// // instances, err := provider.ListInstances(ctx) - // asJs, err := json.MarshalIndent(instances, "", " ") - // fmt.Println(string(asJs), err) +// // asJs, err := json.MarshalIndent(instances, "", " ") +// // fmt.Println(string(asJs), err) - log.Print("Fetching tools") - tools, _, err := ghClient.Actions.ListOrganizationRunnerApplicationDownloads(ctx, cfg.Organizations[0].Name) - // tools, _, err := ghClient.Actions.ListRunnerApplicationDownloads(ctx, cfg.Repositories[0].Owner, cfg.Repositories[0].Name) - if err != nil { - log.Fatal(err) - } +// log.Print("Fetching tools") +// tools, _, err := ghClient.Actions.ListOrganizationRunnerApplicationDownloads(ctx, cfg.Organizations[0].Name) +// // tools, _, err := ghClient.Actions.ListRunnerApplicationDownloads(ctx, cfg.Repositories[0].Owner, cfg.Repositories[0].Name) +// if err != nil { +// log.Fatal(err) +// } - tk, _, err := ghClient.Actions.CreateOrganizationRegistrationToken(ctx, "gsamfira") +// tk, _, err := ghClient.Actions.CreateOrganizationRegistrationToken(ctx, "gsamfira") - if err != nil { - log.Fatalf("fetching org token: %+v", err) - } +// if err != nil { +// log.Fatalf("fetching org token: %+v", err) +// } - fmt.Printf("Org token is: %v\n", *tk) +// fmt.Printf("Org token is: %v\n", *tk) - toolsAsYaml, err := yaml.Marshal(tools) - if err != nil { - log.Fatal(err) - } - log.Printf("got tools:\n%s\n", string(toolsAsYaml)) +// toolsAsYaml, err := yaml.Marshal(tools) +// if err != nil { +// log.Fatal(err) +// } +// log.Printf("got tools:\n%s\n", string(toolsAsYaml)) - log.Print("fetching runner token") - ghRunnerToken, _, err := ghClient.Actions.CreateRegistrationToken(ctx, cfg.Repositories[0].Owner, cfg.Repositories[0].Name) - if err != nil { - log.Fatal(err) - } - log.Printf("got token %v", ghRunnerToken) +// log.Print("fetching runner token") +// ghRunnerToken, _, err := ghClient.Actions.CreateRegistrationToken(ctx, cfg.Repositories[0].Owner, cfg.Repositories[0].Name) +// if err != nil { +// log.Fatal(err) +// } +// log.Printf("got token %v", ghRunnerToken) - bootstrapArgs := params.BootstrapInstance{ - Tools: tools, - RepoURL: cfg.Organizations[0].String(), - GithubRunnerAccessToken: *tk.Token, - RunnerType: cfg.Repositories[0].Pool.Runners[0].Name, - CallbackURL: "", - InstanceToken: "", - OSArch: config.Amd64, - Flavor: cfg.Organizations[0].Pool.Runners[0].Flavor, - Image: cfg.Organizations[0].Pool.Runners[0].Image, - Labels: cfg.Organizations[0].Pool.Runners[0].Labels, - SSHKeys: []string{ - "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAACAQC2oT7j/+elHY9U2ibgk2RYJgCvqIwewYKJTtHslTQFDWlHLeDam93BBOFlQJm9/wKX/qjC8d26qyzjeeeVf2EEAztp+jQfEq9OU+EtgQUi589jxtVmaWuYED8KVNbzLuP79SrBtEZD4xqgmnNotPhRshh3L6eYj4XzLWDUuOD6kzNdsJA2QOKeMOIFpBN6urKJHRHYD+oUPUX1w5QMv1W1Srlffl4m5uE+0eJYAMr02980PG4+jS4bzM170wYdWwUI0pSZsEDC8Fn7jef6QARU2CgHJYlaTem+KWSXislOUTaCpR0uhakP1ezebW20yuuc3bdRNgSlZi9B7zAPALGZpOshVqwF+KmLDi6XiFwG+NnwAFa6zaQfhOxhw/rF5Jk/wVjHIHkNNvYewycZPbKui0E3QrdVtR908N3VsPtLhMQ59BEMl3xlURSi0fiOU3UjnwmOkOoFDy/WT8qk//gFD93tUxlf4eKXDgNfME3zNz8nVi2uCPvG5NT/P/VWR8NMqW6tZcmWyswM/GgL6Y84JQ3ESZq/7WvAetdc1gVIDQJ2ejYbSHBcQpWvkocsiuMTCwiEvQ0sr+UE5jmecQvLPUyXOhuMhw43CwxnLk1ZSeYeCorxbskyqIXH71o8zhbPoPiEbwgB+i9WEoq02u7c8CmCmO8Y9aOnh8MzTKxIgQ==", - }, - } +// bootstrapArgs := params.BootstrapInstance{ +// Tools: tools, +// RepoURL: cfg.Organizations[0].String(), +// GithubRunnerAccessToken: *tk.Token, +// RunnerType: cfg.Repositories[0].Pool.Runners[0].Name, +// CallbackURL: "", +// InstanceToken: "", +// OSArch: config.Amd64, +// Flavor: cfg.Organizations[0].Pool.Runners[0].Flavor, +// Image: cfg.Organizations[0].Pool.Runners[0].Image, +// Labels: cfg.Organizations[0].Pool.Runners[0].Labels, +// SSHKeys: []string{ +// "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAACAQC2oT7j/+elHY9U2ibgk2RYJgCvqIwewYKJTtHslTQFDWlHLeDam93BBOFlQJm9/wKX/qjC8d26qyzjeeeVf2EEAztp+jQfEq9OU+EtgQUi589jxtVmaWuYED8KVNbzLuP79SrBtEZD4xqgmnNotPhRshh3L6eYj4XzLWDUuOD6kzNdsJA2QOKeMOIFpBN6urKJHRHYD+oUPUX1w5QMv1W1Srlffl4m5uE+0eJYAMr02980PG4+jS4bzM170wYdWwUI0pSZsEDC8Fn7jef6QARU2CgHJYlaTem+KWSXislOUTaCpR0uhakP1ezebW20yuuc3bdRNgSlZi9B7zAPALGZpOshVqwF+KmLDi6XiFwG+NnwAFa6zaQfhOxhw/rF5Jk/wVjHIHkNNvYewycZPbKui0E3QrdVtR908N3VsPtLhMQ59BEMl3xlURSi0fiOU3UjnwmOkOoFDy/WT8qk//gFD93tUxlf4eKXDgNfME3zNz8nVi2uCPvG5NT/P/VWR8NMqW6tZcmWyswM/GgL6Y84JQ3ESZq/7WvAetdc1gVIDQJ2ejYbSHBcQpWvkocsiuMTCwiEvQ0sr+UE5jmecQvLPUyXOhuMhw43CwxnLk1ZSeYeCorxbskyqIXH71o8zhbPoPiEbwgB+i9WEoq02u7c8CmCmO8Y9aOnh8MzTKxIgQ==", +// }, +// } - instance, err := provider.CreateInstance(ctx, bootstrapArgs) - if err != nil { - log.Fatal(err) - } +// instance, err := provider.CreateInstance(ctx, bootstrapArgs) +// if err != nil { +// log.Fatal(err) +// } - fmt.Println(instance) -} +// fmt.Println(instance) +// } diff --git a/cmd/runner-manager/dbcreate.go b/cmd/runner-manager/dbcreate.go new file mode 100644 index 00000000..2e28da7a --- /dev/null +++ b/cmd/runner-manager/dbcreate.go @@ -0,0 +1,107 @@ +package main + +import ( + "context" + "flag" + "fmt" + "log" + "os/signal" + "runner-manager/config" + "runner-manager/database/sql" + "runner-manager/params" + "runner-manager/util" +) + +var ( + conf = flag.String("config", config.DefaultConfigFilePath, "runner-manager config file") + version = flag.Bool("version", false, "prints version") +) + +var Version string + +func main() { + flag.Parse() + if *version { + fmt.Println(Version) + return + } + ctx, stop := signal.NotifyContext(context.Background(), signals...) + defer stop() + fmt.Println(ctx) + + cfg, err := config.NewConfig(*conf) + if err != nil { + log.Fatalf("Fetching config: %+v", err) + } + + db, err := sql.NewSQLDatabase(ctx, cfg.Database) + if err != nil { + log.Fatal(err) + } + + fmt.Println(db) + + txt := "ana are mere prune și alune" + + enc, err := util.Aes256EncodeString(txt, "pamkotepAyksemfeghoibidEwCivbaut") + if err != nil { + log.Fatal(err) + } + + fmt.Printf("encrypted: %d\n", len(enc)) + + dec, err := util.Aes256DecodeString(enc, "pamkotepAyksemfeghoibidEwCivbaut") + if err != nil { + log.Fatal(err) + } + + fmt.Println(dec) + + repo, err := db.CreateRepository(ctx, "gabriel-samfira", "scripts", "") + if err != nil { + log.Fatal(err) + } + + pool, err := db.CreateRepositoryPool(ctx, repo.ID, params.CreatePoolParams{ + ProviderName: "lxd_local", + MaxRunners: 10, + MinIdleRunners: 1, + Image: "ubuntu:20.04", + Flavor: "default", + Tags: []string{ + "myrunner", + "superAwesome", + }, + OSType: config.Linux, + OSArch: config.Amd64, + }) + if err != nil { + log.Fatal(err) + } + fmt.Println(pool) + + pool2, err := db.CreateRepositoryPool(ctx, repo.ID, params.CreatePoolParams{ + ProviderName: "lxd_local2", + MaxRunners: 10, + MinIdleRunners: 1, + Image: "ubuntu:20.04", + Flavor: "default", + Tags: []string{ + "myrunner", + "superAwesome2", + }, + OSType: config.Linux, + OSArch: config.Amd64, + }) + if err != nil { + log.Fatal(err) + } + fmt.Println(pool2) + + pool3, err := db.FindRepositoryPoolByTags(ctx, repo.ID, []string{"myrunner", "superAwesome3"}) + if err != nil { + log.Fatal(err) + } + + fmt.Println(pool3) +} diff --git a/config/config.go b/config/config.go index 38c6614c..a96512cb 100644 --- a/config/config.go +++ b/config/config.go @@ -45,6 +45,8 @@ const ( // DefaultPoolQueueSize is the default size for a pool queue. DefaultPoolQueueSize = 10 + + GithubBaseURL = "https://github.com" ) var ( diff --git a/database/common/common.go b/database/common/common.go index 5dd0bdd4..d344e891 100644 --- a/database/common/common.go +++ b/database/common/common.go @@ -7,14 +7,14 @@ import ( type Store interface { CreateRepository(ctx context.Context, owner, name, webhookSecret string) (params.Repository, error) - GetRepository(ctx context.Context, id string) (params.Repository, error) + GetRepository(ctx context.Context, owner, name string) (params.Repository, error) ListRepositories(ctx context.Context) ([]params.Repository, error) - DeleteRepository(ctx context.Context, id string) error + DeleteRepository(ctx context.Context, owner, name string) error CreateOrganization(ctx context.Context, name, webhookSecret string) (params.Organization, error) - GetOrganization(ctx context.Context, id string) (params.Organization, error) + GetOrganization(ctx context.Context, name string) (params.Organization, error) ListOrganizations(ctx context.Context) ([]params.Organization, error) - DeleteOrganization(ctx context.Context, id string) error + DeleteOrganization(ctx context.Context, name string) error CreateRepositoryPool(ctx context.Context, repoId string, param params.CreatePoolParams) (params.Pool, error) CreateOrganizationPool(ctx context.Context, orgId string, param params.CreatePoolParams) (params.Pool, error) @@ -25,6 +25,20 @@ type Store interface { DeleteRepositoryPool(ctx context.Context, repoID, poolID string) error DeleteOrganizationPool(ctx context.Context, orgID, poolID string) error - UpdateRepositoryPool(ctx context.Context, repoID, poolID string) (params.Pool, error) - UpdateOrganizationPool(ctx context.Context, orgID, poolID string) (params.Pool, error) + UpdateRepositoryPool(ctx context.Context, repoID, poolID string, param params.UpdatePoolParams) (params.Pool, error) + UpdateOrganizationPool(ctx context.Context, orgID, poolID string, param params.UpdatePoolParams) (params.Pool, error) + + FindRepositoryPoolByTags(ctx context.Context, repoID string, tags []string) (params.Pool, error) + FindOrganizationPoolByTags(ctx context.Context, orgID string, tags []string) (params.Pool, error) + + CreateInstance(ctx context.Context, poolID string, param params.CreateInstanceParams) (params.Instance, error) + DeleteInstance(ctx context.Context, poolID string, instanceID string) error + UpdateInstance(ctx context.Context, instanceID string, param params.UpdateInstanceParams) (params.Instance, error) + + ListInstances(ctx context.Context, poolID string) ([]params.Instance, error) + ListRepoInstances(ctx context.Context, repoID string) ([]params.Instance, error) + ListOrgInstances(ctx context.Context, orgID string) ([]params.Instance, error) + + GetInstance(ctx context.Context, poolID string, instanceID string) (params.Instance, error) + GetInstanceByName(ctx context.Context, instanceName string) (params.Instance, error) } diff --git a/database/database.go b/database/database.go new file mode 100644 index 00000000..30d2d143 --- /dev/null +++ b/database/database.go @@ -0,0 +1,20 @@ +package database + +import ( + "context" + "fmt" + "runner-manager/config" + "runner-manager/database/common" + "runner-manager/database/sql" +) + +func NewDatabase(ctx context.Context, cfg config.Database) (common.Store, error) { + dbBackend := cfg.DbBackend + switch dbBackend { + case config.MySQLBackend, config.SQLiteBackend: + return sql.NewSQLDatabase(ctx, cfg) + default: + return nil, fmt.Errorf("no team manager backend available for db backend %s", dbBackend) + } + +} diff --git a/database/sql/models.go b/database/sql/models.go index 16ba82c5..073dcccf 100644 --- a/database/sql/models.go +++ b/database/sql/models.go @@ -16,6 +16,10 @@ type Base struct { } func (b *Base) BeforeCreate(tx *gorm.DB) error { + emptyId := uuid.UUID{} + if b.ID != emptyId { + return nil + } b.ID = uuid.NewV4() return nil } @@ -23,7 +27,8 @@ func (b *Base) BeforeCreate(tx *gorm.DB) error { type Tag struct { Base - Name string `gorm:"type:varchar(64);uniqueIndex"` + Name string `gorm:"type:varchar(64);uniqueIndex"` + Pools []*Pool `gorm:"many2many:pool_tags;"` } type Pool struct { @@ -36,7 +41,13 @@ type Pool struct { Flavor string `gorm:"index:idx_pool_type,unique"` OSType config.OSType OSArch config.OSArch - Tags []Tag `gorm:"foreignKey:id"` + Tags []*Tag `gorm:"many2many:pool_tags;"` + + RepoID uuid.UUID + Repository Repository `gorm:"foreignKey:RepoID"` + + OrgID uuid.UUID + Organization Organization `gorm:"foreignKey:OrgID"` } type Repository struct { @@ -45,7 +56,7 @@ type Repository struct { Owner string `gorm:"index:idx_owner,unique"` Name string `gorm:"index:idx_owner,unique"` WebhookSecret []byte - Pools []Pool `gorm:"foreignKey:id"` + Pools []Pool `gorm:"foreignKey:RepoID"` } type Organization struct { @@ -53,5 +64,29 @@ type Organization struct { Name string `gorm:"uniqueIndex"` WebhookSecret []byte - Pools []Pool `gorm:"foreignKey:id"` + Pools []Pool `gorm:"foreignKey:OrgID"` +} + +type Address struct { + Base + + Address string + Type string +} + +type Instance struct { + Base + + Name string `gorm:"uniqueIndex"` + OSType config.OSType + OSArch config.OSArch + OSName string + OSVersion string + Addresses []Address `gorm:"foreignKey:id"` + Status string + RunnerStatus string + CallbackURL string + CallbackToken string + + Pool Pool `gorm:"foreignKey:id"` } diff --git a/database/sql/sql.go b/database/sql/sql.go index 91c02dfd..77c1e2cf 100644 --- a/database/sql/sql.go +++ b/database/sql/sql.go @@ -45,6 +45,8 @@ func (s *sqlDatabase) migrateDB() error { &Pool{}, &Repository{}, &Organization{}, + &Address{}, + &Instance{}, ); err != nil { return err } @@ -54,30 +56,12 @@ func (s *sqlDatabase) migrateDB() error { func (s *sqlDatabase) sqlToCommonTags(tag Tag) params.Tag { return params.Tag{ + // ID: tag.ID.String(), ID: tag.ID.String(), Name: tag.Name, } } -// func (s *sqlDatabase) sqlToCommonRunner(runner Runner) params.Runner { -// ret := params.Runner{ -// ID: runner.ID.String(), -// MaxRunners: runner.MaxRunners, -// MinIdleRunners: runner.MinIdleRunners, -// Image: runner.Image, -// Flavor: runner.Flavor, -// OSArch: runner.OSArch, -// OSType: runner.OSType, -// Tags: make([]params.Tag, len(runner.Tags)), -// } - -// for idx, val := range runner.Tags { -// ret.Tags[idx] = s.sqlToCommonTags(val) -// } - -// return ret -// } - func (s *sqlDatabase) sqlToCommonPool(pool Pool) params.Pool { ret := params.Pool{ ID: pool.ID.String(), @@ -92,7 +76,7 @@ func (s *sqlDatabase) sqlToCommonPool(pool Pool) params.Pool { } for idx, val := range pool.Tags { - ret.Tags[idx] = s.sqlToCommonTags(val) + ret.Tags[idx] = s.sqlToCommonTags(*val) } return ret @@ -149,7 +133,19 @@ func (s *sqlDatabase) CreateRepository(ctx context.Context, owner, name, webhook return param, nil } -func (s *sqlDatabase) getRepo(ctx context.Context, id string) (Repository, error) { +func (s *sqlDatabase) getRepo(ctx context.Context, owner, name string) (Repository, error) { + var repo Repository + q := s.conn.Preload(clause.Associations).Where("name = ? and owner = ?", name, owner).First(&repo) + if q.Error != nil { + if errors.Is(q.Error, gorm.ErrRecordNotFound) { + return Repository{}, runnerErrors.ErrNotFound + } + return Repository{}, errors.Wrap(q.Error, "fetching repository from database") + } + return repo, nil +} + +func (s *sqlDatabase) getRepoByID(ctx context.Context, id string) (Repository, error) { u := uuid.Parse(id) if u == nil { return Repository{}, errors.Wrap(runnerErrors.NewBadRequestError(""), "parsing id") @@ -165,8 +161,8 @@ func (s *sqlDatabase) getRepo(ctx context.Context, id string) (Repository, error return repo, nil } -func (s *sqlDatabase) GetRepository(ctx context.Context, id string) (params.Repository, error) { - repo, err := s.getRepo(ctx, id) +func (s *sqlDatabase) GetRepository(ctx context.Context, owner, name string) (params.Repository, error) { + repo, err := s.getRepo(ctx, owner, name) if err != nil { return params.Repository{}, errors.Wrap(err, "fetching repo") } @@ -196,8 +192,8 @@ func (s *sqlDatabase) ListRepositories(ctx context.Context) ([]params.Repository return ret, nil } -func (s *sqlDatabase) DeleteRepository(ctx context.Context, id string) error { - repo, err := s.getRepo(ctx, id) +func (s *sqlDatabase) DeleteRepository(ctx context.Context, owner, name string) error { + repo, err := s.getRepo(ctx, owner, name) if err != nil { if err == runnerErrors.ErrNotFound { return nil @@ -238,7 +234,19 @@ func (s *sqlDatabase) CreateOrganization(ctx context.Context, name, webhookSecre return param, nil } -func (s *sqlDatabase) getOrg(ctx context.Context, id string) (Organization, error) { +func (s *sqlDatabase) getOrg(ctx context.Context, name string) (Organization, error) { + var org Organization + q := s.conn.Preload(clause.Associations).Where("name = ?", name).First(&org) + if q.Error != nil { + if errors.Is(q.Error, gorm.ErrRecordNotFound) { + return Organization{}, runnerErrors.ErrNotFound + } + return Organization{}, errors.Wrap(q.Error, "fetching org from database") + } + return org, nil +} + +func (s *sqlDatabase) getOrgByID(ctx context.Context, id string) (Organization, error) { u := uuid.Parse(id) if u == nil { return Organization{}, errors.Wrap(runnerErrors.NewBadRequestError(""), "parsing id") @@ -254,8 +262,8 @@ func (s *sqlDatabase) getOrg(ctx context.Context, id string) (Organization, erro return org, nil } -func (s *sqlDatabase) GetOrganization(ctx context.Context, id string) (params.Organization, error) { - org, err := s.getOrg(ctx, id) +func (s *sqlDatabase) GetOrganization(ctx context.Context, name string) (params.Organization, error) { + org, err := s.getOrg(ctx, name) if err != nil { return params.Organization{}, errors.Wrap(err, "fetching repo") } @@ -285,8 +293,8 @@ func (s *sqlDatabase) ListOrganizations(ctx context.Context) ([]params.Organizat return ret, nil } -func (s *sqlDatabase) DeleteOrganization(ctx context.Context, id string) error { - org, err := s.getOrg(ctx, id) +func (s *sqlDatabase) DeleteOrganization(ctx context.Context, name string) error { + org, err := s.getOrg(ctx, name) if err != nil { if err == runnerErrors.ErrNotFound { return nil @@ -327,7 +335,7 @@ func (s *sqlDatabase) CreateRepositoryPool(ctx context.Context, repoId string, p return params.Pool{}, runnerErrors.NewBadRequestError("no tags specified") } - repo, err := s.getRepo(ctx, repoId) + repo, err := s.getRepoByID(ctx, repoId) if err != nil { return params.Pool{}, errors.Wrap(err, "fetching repo") } @@ -340,22 +348,34 @@ func (s *sqlDatabase) CreateRepositoryPool(ctx context.Context, repoId string, p Flavor: param.Flavor, OSType: param.OSType, OSArch: param.OSArch, + RepoID: repo.ID, } - tags := make([]Tag, len(param.Tags)) - for idx, val := range param.Tags { + tags := []Tag{} + for _, val := range param.Tags { t, err := s.getOrCreateTag(val) if err != nil { return params.Pool{}, errors.Wrap(err, "fetching tag") } - tags[idx] = t + fmt.Printf(">>>> Tag name: %s --> ID: %s\n", t.Name, t.ID) + tags = append(tags, t) + // newPool.Tags = append(newPool.Tags, &t) } - err = s.conn.Model(&repo).Association("Pools").Append(&newPool) - if err != nil { + q := s.conn.Create(&newPool) + if q.Error != nil { return params.Pool{}, errors.Wrap(err, "adding pool") } - return s.sqlToCommonPool(newPool), nil + + for _, tt := range tags { + s.conn.Model(&newPool).Association("Tags").Append(&tt) + } + + repo, err = s.getRepoByID(ctx, repoId) + if err != nil { + return params.Pool{}, errors.Wrap(err, "fetching repo") + } + return s.sqlToCommonPool(repo.Pools[0]), nil } func (s *sqlDatabase) CreateOrganizationPool(ctx context.Context, orgId string, param params.CreatePoolParams) (params.Pool, error) { @@ -363,7 +383,7 @@ func (s *sqlDatabase) CreateOrganizationPool(ctx context.Context, orgId string, return params.Pool{}, runnerErrors.NewBadRequestError("no tags specified") } - org, err := s.getOrg(ctx, orgId) + org, err := s.getOrgByID(ctx, orgId) if err != nil { return params.Pool{}, errors.Wrap(err, "fetching org") } @@ -378,15 +398,16 @@ func (s *sqlDatabase) CreateOrganizationPool(ctx context.Context, orgId string, OSArch: param.OSArch, } - tags := make([]Tag, len(param.Tags)) + tags := make([]*Tag, len(param.Tags)) for idx, val := range param.Tags { t, err := s.getOrCreateTag(val) if err != nil { return params.Pool{}, errors.Wrap(err, "fetching tag") } - tags[idx] = t + tags[idx] = &t } + newPool.Tags = append(newPool.Tags, tags...) err = s.conn.Model(&org).Association("Pools").Append(&newPool) if err != nil { return params.Pool{}, errors.Wrap(err, "adding pool") @@ -394,58 +415,174 @@ func (s *sqlDatabase) CreateOrganizationPool(ctx context.Context, orgId string, return s.sqlToCommonPool(newPool), nil } -func (s *sqlDatabase) GetRepositoryPool(ctx context.Context, repoID, poolID string) (params.Pool, error) { - repo, err := s.getRepo(ctx, repoID) +func (s *sqlDatabase) getRepoPool(ctx context.Context, repoID, poolID string) (Pool, error) { + repo, err := s.getRepoByID(ctx, repoID) if err != nil { - return params.Pool{}, errors.Wrap(err, "fetching repo") + return Pool{}, errors.Wrap(err, "fetching repo") } u := uuid.Parse(poolID) if u == nil { - return params.Pool{}, fmt.Errorf("invalid pool id") + return Pool{}, fmt.Errorf("invalid pool id") } var pool []Pool err = s.conn.Model(&repo).Association("Pools").Find(&pool, "id = ?", u) if err != nil { - return params.Pool{}, errors.Wrap(err, "fetching pool") + return Pool{}, errors.Wrap(err, "fetching pool") } if len(pool) == 0 { - return params.Pool{}, runnerErrors.ErrNotFound + return Pool{}, runnerErrors.ErrNotFound } - return s.sqlToCommonPool(pool[0]), nil + + return pool[0], nil } -func (s *sqlDatabase) GetOrganizationPool(ctx context.Context, orgID, poolID string) (params.Pool, error) { - org, err := s.getOrg(ctx, orgID) +func (s *sqlDatabase) GetRepositoryPool(ctx context.Context, repoID, poolID string) (params.Pool, error) { + pool, err := s.getRepoPool(ctx, repoID, poolID) if err != nil { - return params.Pool{}, errors.Wrap(err, "fetching org") + return params.Pool{}, errors.Wrap(err, "fetching pool") + } + return s.sqlToCommonPool(pool), nil +} + +func (s *sqlDatabase) getOrgPool(ctx context.Context, orgID, poolID string) (Pool, error) { + org, err := s.getOrgByID(ctx, orgID) + if err != nil { + return Pool{}, errors.Wrap(err, "fetching repo") } u := uuid.Parse(poolID) if u == nil { - return params.Pool{}, fmt.Errorf("invalid pool id") + return Pool{}, fmt.Errorf("invalid pool id") } var pool []Pool err = s.conn.Model(&org).Association("Pools").Find(&pool, "id = ?", u) if err != nil { - return params.Pool{}, errors.Wrap(err, "fetching pool") + return Pool{}, errors.Wrap(err, "fetching pool") } if len(pool) == 0 { - return params.Pool{}, runnerErrors.ErrNotFound + return Pool{}, runnerErrors.ErrNotFound } - return s.sqlToCommonPool(pool[0]), nil + + return pool[0], nil +} + +func (s *sqlDatabase) GetOrganizationPool(ctx context.Context, orgID, poolID string) (params.Pool, error) { + pool, err := s.getOrgPool(ctx, orgID, poolID) + if err != nil { + return params.Pool{}, errors.Wrap(err, "fetching pool") + } + return s.sqlToCommonPool(pool), nil } func (s *sqlDatabase) DeleteRepositoryPool(ctx context.Context, repoID, poolID string) error { + pool, err := s.getRepoPool(ctx, repoID, poolID) + if err != nil { + if errors.Is(err, runnerErrors.ErrNotFound) { + return nil + } + return errors.Wrap(err, "looking up repo pool") + } + q := s.conn.Delete(&pool) + if q.Error != nil && !errors.Is(q.Error, gorm.ErrRecordNotFound) { + return errors.Wrap(q.Error, "deleting pool") + } return nil } func (s *sqlDatabase) DeleteOrganizationPool(ctx context.Context, orgID, poolID string) error { + pool, err := s.getOrgPool(ctx, orgID, poolID) + if err != nil { + if errors.Is(err, runnerErrors.ErrNotFound) { + return nil + } + return errors.Wrap(err, "looking up repo pool") + } + q := s.conn.Delete(&pool) + if q.Error != nil && !errors.Is(q.Error, gorm.ErrRecordNotFound) { + return errors.Wrap(q.Error, "deleting pool") + } return nil } -func (s *sqlDatabase) UpdateRepositoryPool(ctx context.Context, repoID, poolID string) (params.Pool, error) { +func (s *sqlDatabase) UpdateRepositoryPool(ctx context.Context, repoID, poolID string, param params.UpdatePoolParams) (params.Pool, error) { return params.Pool{}, nil } -func (s *sqlDatabase) UpdateOrganizationPool(ctx context.Context, orgID, poolID string) (params.Pool, error) { +func (s *sqlDatabase) UpdateOrganizationPool(ctx context.Context, orgID, poolID string, param params.UpdatePoolParams) (params.Pool, error) { return params.Pool{}, nil } + +func (s *sqlDatabase) findPoolByTags(id, poolType string, tags []string) (params.Pool, error) { + if len(tags) == 0 { + return params.Pool{}, runnerErrors.NewBadRequestError("missing tags") + } + u := uuid.Parse(id) + if u == nil { + return params.Pool{}, errors.Wrap(runnerErrors.NewBadRequestError(""), "parsing id") + } + + var pool Pool + where := fmt.Sprintf("tags.name in ? and %s = ?", poolType) + q := s.conn.Joins("JOIN pool_tags on pool_tags.pool_id=pools.id"). + Joins("JOIN tags on tags.id=pool_tags.tag_id"). + Group("pools.id"). + Preload("Tags"). + Having("count(1) = ?", len(tags)). + Where(where, tags, id).First(&pool) + + if q.Error != nil { + if errors.Is(q.Error, gorm.ErrRecordNotFound) { + return params.Pool{}, runnerErrors.ErrNotFound + } + return params.Pool{}, errors.Wrap(q.Error, "fetching pool") + } + + return s.sqlToCommonPool(pool), nil +} + +func (s *sqlDatabase) FindRepositoryPoolByTags(ctx context.Context, repoID string, tags []string) (params.Pool, error) { + pool, err := s.findPoolByTags(repoID, "repo_id", tags) + if err != nil { + return params.Pool{}, errors.Wrap(err, "fetching pool") + } + return pool, nil +} + +func (s *sqlDatabase) FindOrganizationPoolByTags(ctx context.Context, orgID string, tags []string) (params.Pool, error) { + pool, err := s.findPoolByTags(orgID, "org_id", tags) + if err != nil { + return params.Pool{}, errors.Wrap(err, "fetching pool") + } + return pool, nil +} + +func (s *sqlDatabase) CreateInstance(ctx context.Context, poolID string, param params.CreateInstanceParams) (params.Instance, error) { + return params.Instance{}, nil +} + +func (s *sqlDatabase) DeleteInstance(ctx context.Context, poolID string, instanceID string) error { + return nil +} + +func (s *sqlDatabase) UpdateInstance(ctx context.Context, instanceID string, param params.UpdateInstanceParams) (params.Instance, error) { + return params.Instance{}, nil +} + +func (s *sqlDatabase) ListInstances(ctx context.Context, poolID string) ([]params.Instance, error) { + return []params.Instance{}, nil +} + +func (s *sqlDatabase) ListRepoInstances(ctx context.Context, repoID string) ([]params.Instance, error) { + return []params.Instance{}, nil +} + +func (s *sqlDatabase) ListOrgInstances(ctx context.Context, orgID string) ([]params.Instance, error) { + return []params.Instance{}, nil +} + +func (s *sqlDatabase) GetInstance(ctx context.Context, poolID string, instanceID string) (params.Instance, error) { + return params.Instance{}, nil +} + +func (s *sqlDatabase) GetInstanceByName(ctx context.Context, instanceName string) (params.Instance, error) { + return params.Instance{}, nil +} diff --git a/go.mod b/go.mod index 1637437c..2f6f651e 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.18 require ( github.com/BurntSushi/toml v0.3.1 github.com/google/go-github/v43 v43.0.0 + github.com/google/uuid v1.3.0 github.com/gorilla/handlers v1.5.1 github.com/gorilla/mux v1.8.0 github.com/lxc/lxd v0.0.0-20220415052741-1170f2806124 @@ -27,7 +28,6 @@ require ( github.com/go-sql-driver/mysql v1.6.0 // indirect github.com/golang/protobuf v1.5.2 // indirect github.com/google/go-querystring v1.1.0 // indirect - github.com/google/uuid v1.3.0 // indirect github.com/gorilla/websocket v1.5.0 // indirect github.com/jinzhu/inflection v1.0.0 // indirect github.com/jinzhu/now v1.1.5 // indirect diff --git a/params/params.go b/params/params.go index ceac8454..5d733245 100644 --- a/params/params.go +++ b/params/params.go @@ -2,11 +2,40 @@ package params import ( "runner-manager/config" + "runner-manager/runner/providers/common" "github.com/google/go-github/v43/github" ) +type AddressType string + +const ( + PublicAddress AddressType = "public" + PrivateAddress AddressType = "private" +) + +type Address struct { + Address string `json:"address"` + Type AddressType `json:"type"` +} + +type UpdateInstanceParams struct { + ProviderID string `json:"provider_id,omitempty"` + // OSName is the name of the OS. Eg: ubuntu, centos, etc. + OSName string `json:"os_name,omitempty"` + // OSVersion is the version of the operating system. + OSVersion string `json:"os_version,omitempty"` + // Addresses is a list of IP addresses the provider reports + // for this instance. + Addresses []Address `json:"addresses,omitempty"` + // Status is the status of the instance inside the provider (eg: running, stopped, etc) + Status common.InstanceStatus `json:"status"` + RunnerStatus common.RunnerStatus `json:"runner_status"` +} + type Instance struct { + // ID is the database ID of this instance. + ID string `json:"id"` // PeoviderID is the unique ID the provider associated // with the compute instance. We use this to identify the // instance in the provider. @@ -18,32 +47,34 @@ type Instance struct { Name string `json:"name,omitempty"` // OSType is the operating system type. For now, only Linux and // Windows are supported. - OSType config.OSType `json:"os-type,omitempty"` + OSType config.OSType `json:"os_type,omitempty"` // OSName is the name of the OS. Eg: ubuntu, centos, etc. - OSName string `json:"os-name,omitempty"` + OSName string `json:"os_name,omitempty"` // OSVersion is the version of the operating system. - OSVersion string `json:"os-version,omitempty"` + OSVersion string `json:"os_version,omitempty"` // OSArch is the operating system architecture. - OSArch config.OSArch `json:"os-arch,omitempty"` + OSArch config.OSArch `json:"os_arch,omitempty"` // Addresses is a list of IP addresses the provider reports // for this instance. - Addresses []string `json:"ip-addresses,omitempty"` + Addresses []Address `json:"addresses,omitempty"` // Status is the status of the instance inside the provider (eg: running, stopped, etc) - Status string `json:"status"` + Status common.InstanceStatus `json:"status"` + RunnerStatus common.RunnerStatus `json:"runner_status"` + PoolID string `json:"pool_id"` + + // Do not serialize sensitive info. + CallbackURL string `json:"-"` + CallbackToken string `json:"-"` } type BootstrapInstance struct { + Name string `json:"name"` Tools []*github.RunnerApplicationDownload `json:"tools"` // RepoURL is the URL the github runner agent needs to configure itself. RepoURL string `json:"repo_url"` // GithubRunnerAccessToken is the token we fetch from github to allow the runner to // register itself. GithubRunnerAccessToken string `json:"github_runner_access_token"` - // RunnerType is the name of the defined runner type in a particular pool. The provider - // needs this to determine which flavor/image/settings it needs to use to create the - // instance. This is provider/runner specific. The config for the runner type is defined - // in the configuration file, as part of the pool definition. - RunnerType string `json:"runner-type"` // CallbackUrl is the URL where the instance can send a post, signaling // progress or status. CallbackURL string `json:"callback-url"` @@ -58,6 +89,7 @@ type BootstrapInstance struct { Flavor string `json:"flavor"` Image string `json:"image"` Labels []string `json:"labels"` + PoolID string `json:"pool_id"` } type Tag struct { @@ -77,19 +109,29 @@ type Pool struct { Tags []Tag `json:"tags"` } +type Internal struct { + OAuth2Token string `json:"oauth2"` + ControllerID string `json:"controller_id"` + InstanceCallbackURL string `json:"instance_callback_url"` +} + type Repository struct { - ID string `json:"id"` - Owner string `json:"owner"` - Name string `json:"name"` - WebhookSecret string `json:"-"` - Pools []Pool `json:"pool,omitempty"` + ID string `json:"id"` + Owner string `json:"owner"` + Name string `json:"name"` + Pools []Pool `json:"pool,omitempty"` + // Do not serialize sensitive info. + WebhookSecret string `json:"-"` + Internal Internal `json:"-"` } type Organization struct { - ID string `json:"id"` - Name string `json:"name"` - WebhookSecret string `json:"-"` - Pools []Pool `json:"pool,omitempty"` + ID string `json:"id"` + Name string `json:"name"` + Pools []Pool `json:"pool,omitempty"` + // Do not serialize sensitive info. + WebhookSecret string `json:"-"` + Internal Internal `json:"-"` } type CreatePoolParams struct { @@ -102,3 +144,32 @@ type CreatePoolParams struct { OSArch config.OSArch `json:"os_arch"` Tags []string `json:"tags"` } + +/* + Name string `gorm:"uniqueIndex"` + OSType config.OSType + OSArch config.OSArch + OSName string + OSVersion string + Addresses []Address `gorm:"foreignKey:id"` + Status string + RunnerStatus string + CallbackURL string + CallbackToken []byte + + Pool Pool `gorm:"foreignKey:id"` +*/ + +type CreateInstanceParams struct { + Name string + OSType config.OSType + OSArch config.OSArch + Status common.InstanceStatus + RunnerStatus common.RunnerStatus + CallbackURL string + CallbackToken string + + Pool string +} + +type UpdatePoolParams struct{} diff --git a/runner/common/pool.go b/runner/common/pool.go index 0e6c8e41..2ac301ad 100644 --- a/runner/common/pool.go +++ b/runner/common/pool.go @@ -5,6 +5,14 @@ import "runner-manager/params" type PoolManager interface { WebhookSecret() string HandleWorkflowJob(job params.WorkflowJob) error + + // PoolManager lifecycle functions. Start/stop pool. + Start() error + Stop() error + Wait() error +} + +type Pool interface { ListInstances() ([]params.Instance, error) GetInstance() (params.Instance, error) DeleteInstance() error diff --git a/runner/common/provider.go b/runner/common/provider.go index 95e4cc7b..ab7e3734 100644 --- a/runner/common/provider.go +++ b/runner/common/provider.go @@ -13,7 +13,7 @@ type Provider interface { // GetInstance will return details about one instance. GetInstance(ctx context.Context, instance string) (params.Instance, error) // ListInstances will list all instances for a provider. - ListInstances(ctx context.Context) ([]params.Instance, error) + ListInstances(ctx context.Context, poolID string) ([]params.Instance, error) // RemoveAllInstances will remove all instances created by this provider. RemoveAllInstances(ctx context.Context) error // Stop shuts down the instance. diff --git a/runner/pool/common.go b/runner/pool/common.go new file mode 100644 index 00000000..b81dc50c --- /dev/null +++ b/runner/pool/common.go @@ -0,0 +1,6 @@ +package pool + +var ( + poolIDLabelprefix = "runner-pool-id:" + controllerLabelPrefix = "runner-controller-id:" +) diff --git a/runner/pool/repository.go b/runner/pool/repository.go index 664967b1..9b630abe 100644 --- a/runner/pool/repository.go +++ b/runner/pool/repository.go @@ -4,33 +4,44 @@ import ( "context" "fmt" "log" + "strings" "sync" "time" "runner-manager/config" + dbCommon "runner-manager/database/common" runnerErrors "runner-manager/errors" "runner-manager/params" "runner-manager/runner/common" + providerCommon "runner-manager/runner/providers/common" + "runner-manager/util" "github.com/google/go-github/v43/github" + "github.com/google/uuid" "github.com/pkg/errors" ) // test that we implement PoolManager var _ common.PoolManager = &Repository{} -func NewRepositoryRunnerPool(ctx context.Context, cfg config.Repository, provider common.Provider, ghcli *github.Client, controllerID string) (common.PoolManager, error) { - queueSize := cfg.Pool.QueueSize - if queueSize == 0 { - queueSize = config.DefaultPoolQueueSize +func NewRepositoryPoolManager(ctx context.Context, cfg params.Repository, providers map[string]common.Provider, store dbCommon.Store) (common.PoolManager, error) { + ghc, err := util.GithubClient(ctx, cfg.Internal.OAuth2Token) + if err != nil { + return nil, errors.Wrap(err, "getting github client") + } + pools := map[string]params.Pool{} + + for _, val := range cfg.Pools { + pools[val.ID] = val } repo := &Repository{ ctx: ctx, cfg: cfg, - ghcli: ghcli, - provider: provider, - controllerID: controllerID, - jobQueue: make(chan params.WorkflowJob, queueSize), + ghcli: ghc, + store: store, + providers: providers, + pools: pools, + controllerID: cfg.Internal.ControllerID, quit: make(chan struct{}), done: make(chan struct{}), } @@ -44,14 +55,30 @@ func NewRepositoryRunnerPool(ctx context.Context, cfg config.Repository, provide type Repository struct { ctx context.Context controllerID string - cfg config.Repository + cfg params.Repository + store dbCommon.Store ghcli *github.Client - provider common.Provider + providers map[string]common.Provider tools []*github.RunnerApplicationDownload - jobQueue chan params.WorkflowJob quit chan struct{} done chan struct{} - mux sync.Mutex + id string + pools map[string]params.Pool + + mux sync.Mutex +} + +func (r *Repository) AddPool(ctx context.Context, pool params.Pool) error { + r.mux.Lock() + defer r.mux.Unlock() + + if _, ok := r.pools[pool.ID]; ok { + return nil + } + + // start pool loop + r.pools[pool.ID] = pool + return nil } func (r *Repository) getGithubRunners() ([]*github.Runner, error) { @@ -68,6 +95,17 @@ func (r *Repository) getProviderInstances() ([]params.Instance, error) { } func (r *Repository) Start() error { + runners, err := r.getGithubRunners() + if err != nil { + return errors.Wrap(err, "fetching github runners") + } + if err := r.cleanupOrphanedProviderRunners(runners); err != nil { + return errors.Wrap(err, "cleaning orphaned instances") + } + + if err := r.cleanupOrphanedGithubRunners(runners); err != nil { + return errors.Wrap(err, "cleaning orphaned github runners") + } go r.loop() return nil } @@ -97,6 +135,359 @@ func (r *Repository) Wait() error { return nil } +func (r *Repository) consolidate() { + r.mux.Lock() + defer r.mux.Unlock() + + r.deletePendingInstances() + r.addPendingInstances() + r.ensureMinIdleRunners() +} + +func (r *Repository) addPendingInstances() { + instances, err := r.store.ListRepoInstances(r.ctx, r.id) + if err != nil { + log.Printf("failed to fetch instances from store: %s", err) + return + } + + for _, instance := range instances { + if instance.Status != providerCommon.InstancePendingCreate { + // not in pending_create status. Skip. + continue + } + + if err := r.addInstanceToProvider(instance); err != nil { + log.Printf("failed to create instance in provider: %s", err) + } + } +} + +func (r *Repository) deletePendingInstances() { + instances, err := r.store.ListRepoInstances(r.ctx, r.id) + if err != nil { + log.Printf("failed to fetch instances from store: %s", err) + return + } + + for _, instance := range instances { + if instance.Status != providerCommon.InstancePendingDelete { + // not in pending_delete status. Skip. + continue + } + + if err := r.deleteInstanceFromProvider(instance); err != nil { + log.Printf("failed to delete instance from provider: %s", err) + } + } +} + +func (r *Repository) poolIDFromLabels(labels []*github.RunnerLabels) (string, error) { + for _, lbl := range labels { + if strings.HasPrefix(*lbl.Name, poolIDLabelprefix) { + labelName := *lbl.Name + return labelName[len(poolIDLabelprefix):], nil + } + } + return "", runnerErrors.ErrNotFound +} + +// cleanupOrphanedGithubRunners will forcefully remove any github runners that appear +// as offline and for which we no longer have a local instance. +// This may happen if someone manually deletes the instance in the provider. We need to +// first remove the instance from github, and then from our database. +func (r *Repository) cleanupOrphanedGithubRunners(runners []*github.Runner) error { + for _, runner := range runners { + status := runner.GetStatus() + if status != "offline" { + // Runner is online. Ignore it. + continue + } + + removeRunner := false + // check locally and delete + // dbInstance, err := r.store.GetInstance() + poolID, err := r.poolIDFromLabels(runner.Labels) + if err != nil { + if !errors.Is(err, runnerErrors.ErrNotFound) { + return errors.Wrap(err, "finding pool") + } + // not a runner we manage + continue + } + + pool, ok := r.pools[poolID] + if !ok { + // not a pool we manage. + continue + } + + dbInstance, err := r.store.GetInstance(r.ctx, poolID, *runner.Name) + if err != nil { + if !errors.Is(err, runnerErrors.ErrNotFound) { + return errors.Wrap(err, "fetching instance from DB") + } + // We no longer have a DB entry for this instance. Previous forceful + // removal may have failed? + removeRunner = true + } else { + if providerCommon.InstanceStatus(dbInstance.Status) == providerCommon.InstancePendingDelete { + // already marked for deleting. Let consolidate take care of it. + continue + } + // check if the provider still has the instance. + provider, ok := r.providers[pool.ProviderName] + if !ok { + return fmt.Errorf("unknown provider %s for pool %s", pool.ProviderName, pool.ID) + } + + instance, err := provider.GetInstance(r.ctx, dbInstance.Name) + if err != nil { + if !errors.Is(err, runnerErrors.ErrNotFound) { + return errors.Wrap(err, "fetching instance from provider") + } + // instance was manually deleted? + removeRunner = true + } else { + if providerCommon.InstanceStatus(instance.Status) == providerCommon.InstanceRunning { + // instance is running, but github reports runner as offline. Log the event. + // This scenario requires manual intervention. + // Perhaps it just came online and github did not yet change it's status? + log.Printf("instance %s is online but github reports runner as offline", instance.Name) + continue + } + //start the instance + if err := provider.Start(r.ctx, instance.ProviderID); err != nil { + return errors.Wrapf(err, "starting instance %s", instance.ProviderID) + } + // we started the instance. Give it a chance to come online + continue + } + + if removeRunner { + if _, err := r.ghcli.Actions.RemoveRunner(r.ctx, r.cfg.Owner, r.cfg.Name, *runner.ID); err != nil { + return errors.Wrap(err, "removing runner") + } + } + } + } + return nil +} + +// cleanupOrphanedProviderRunners compares runners in github with local runners and removes +// any local runners that are not present in Github. Runners that are "idle" in our +// provider, but do not exist in github, will be removed. This can happen if the +// runner-manager was offline while a job was executed by a github action. When this +// happens, github will remove the ephemeral worker and send a webhook our way. +// If we were offline and did not process the webhook, the instance will linger. +// We need to remove it from the provider and database. +func (r *Repository) cleanupOrphanedProviderRunners(runners []*github.Runner) error { + // runners, err := r.getGithubRunners() + // if err != nil { + // return errors.Wrap(err, "fetching github runners") + // } + + dbInstances, err := r.store.ListRepoInstances(r.ctx, r.id) + if err != nil { + return errors.Wrap(err, "fetching instances from db") + } + + runnerNames := map[string]bool{} + for _, run := range runners { + runnerNames[*run.Name] = true + } + + for _, instance := range dbInstances { + if providerCommon.InstanceStatus(instance.Status) == providerCommon.InstancePendingCreate || providerCommon.InstanceStatus(instance.Status) == providerCommon.InstancePendingDelete { + // this instance is in the process of being created or is awaiting deletion. + // Instances in pending_Create did not get a chance to register themselves in, + // github so we let them be for now. + continue + } + if ok := runnerNames[instance.Name]; !ok { + // Set pending_delete on DB field. Allow consolidate() to remove it. + _, err = r.store.UpdateInstance(r.ctx, instance.Name, params.UpdateInstanceParams{}) + if err != nil { + return errors.Wrap(err, "syncing local state with github") + } + } + } + return nil +} + +func (r *Repository) ensureMinIdleRunners() { + for poolID, pool := range r.pools { + existingInstances, err := r.store.ListInstances(r.ctx, poolID) + if err != nil { + log.Printf("failed to ensure minimum idle workers for pool %s: %s", poolID, err) + return + } + + if uint(len(existingInstances)) >= pool.MaxRunners { + log.Printf("max workers reached for pool %s, skipping idle worker creation", poolID) + continue + } + + idleOrPendingWorkers := []params.Instance{} + for _, inst := range existingInstances { + if providerCommon.RunnerStatus(inst.RunnerStatus) == providerCommon.RunnerIdle || providerCommon.RunnerStatus(inst.RunnerStatus) == providerCommon.RunnerPending { + idleOrPendingWorkers = append(idleOrPendingWorkers, inst) + } + } + + var required int + if len(idleOrPendingWorkers) < int(pool.MinIdleRunners) { + // get the needed delta. + required = int(pool.MinIdleRunners) - len(idleOrPendingWorkers) + + projectedInstanceCount := len(existingInstances) + required + if uint(projectedInstanceCount) > pool.MaxRunners { + // ensure we don't go above max workers + required = (len(existingInstances) + required) - int(pool.MaxRunners) + } + } + + for i := 0; i < required; i++ { + log.Printf("addind new idle worker to pool %s", poolID) + if err := r.AddRunner(r.ctx, poolID); err != nil { + log.Printf("failed to add new instance for pool %s: %s", poolID, err) + } + } + } +} + +func (r *Repository) githubURL() string { + return fmt.Sprintf("%s/%s/%s", config.GithubBaseURL, r.cfg.Owner, r.cfg.Name) +} + +func (r *Repository) poolLabel() string { + return fmt.Sprintf("%s%s", poolIDLabelprefix, r.id) +} + +func (r *Repository) controllerLabel() string { + return fmt.Sprintf("%s%s", controllerLabelPrefix, r.controllerID) +} + +func (r *Repository) getLabels() []string { + return []string{ + r.poolLabel(), + r.controllerLabel(), + } +} + +func (r *Repository) updateArgsFromProviderInstance(providerInstance params.Instance) params.UpdateInstanceParams { + return params.UpdateInstanceParams{ + ProviderID: providerInstance.ProviderID, + OSName: providerInstance.OSName, + OSVersion: providerInstance.OSName, + Addresses: providerInstance.Addresses, + Status: providerInstance.Status, + RunnerStatus: providerInstance.RunnerStatus, + } +} + +func (r *Repository) deleteInstanceFromProvider(instance params.Instance) error { + pool, ok := r.pools[instance.PoolID] + if !ok { + return runnerErrors.NewNotFoundError("invalid pool ID") + } + + provider, ok := r.providers[pool.ProviderName] + if !ok { + return runnerErrors.NewNotFoundError("invalid provider ID") + } + + if err := provider.DeleteInstance(r.ctx, instance.ProviderID); err != nil { + return errors.Wrap(err, "removing instance") + } + + if err := r.store.DeleteInstance(r.ctx, pool.ID, instance.ID); err != nil { + return errors.Wrap(err, "deleting instance from database") + } + return nil +} + +func (r *Repository) addInstanceToProvider(instance params.Instance) error { + pool, ok := r.pools[instance.PoolID] + if !ok { + return runnerErrors.NewNotFoundError("invalid pool ID") + } + + provider, ok := r.providers[pool.ProviderName] + if !ok { + return runnerErrors.NewNotFoundError("invalid provider ID") + } + + labels := []string{} + for _, tag := range pool.Tags { + labels = append(labels, tag.Name) + } + labels = append(labels, r.getLabels()...) + + tk, _, err := r.ghcli.Actions.CreateRegistrationToken(r.ctx, r.cfg.Owner, r.cfg.Name) + + if err != nil { + return errors.Wrap(err, "creating runner token") + } + + bootstrapArgs := params.BootstrapInstance{ + Tools: r.tools, + RepoURL: r.githubURL(), + GithubRunnerAccessToken: *tk.Token, + CallbackURL: instance.CallbackURL, + InstanceToken: instance.CallbackToken, + OSArch: pool.OSArch, + Flavor: pool.Flavor, + Image: pool.Image, + Labels: labels, + } + + providerInstance, err := provider.CreateInstance(r.ctx, bootstrapArgs) + if err != nil { + return errors.Wrap(err, "creating instance") + } + + updateInstanceArgs := r.updateArgsFromProviderInstance(providerInstance) + if _, err := r.store.UpdateInstance(r.ctx, instance.ID, updateInstanceArgs); err != nil { + return errors.Wrap(err, "updating instance") + } + return nil +} + +// TODO: add function to set runner status to idle when instance calls home on callback url + +func (r *Repository) AddRunner(ctx context.Context, poolID string) error { + callbackToken, err := util.GetRandomString(32) + if err != nil { + return errors.Wrap(err, "fetching callbackToken") + } + + pool, ok := r.pools[poolID] + if !ok { + return runnerErrors.NewNotFoundError("invalid provider ID") + } + + name := fmt.Sprintf("runner-manager-%s", uuid.New()) + + createParams := params.CreateInstanceParams{ + Name: name, + Pool: poolID, + Status: providerCommon.InstancePendingCreate, + RunnerStatus: providerCommon.RunnerPending, + OSArch: pool.OSArch, + OSType: pool.OSType, + CallbackToken: callbackToken, + CallbackURL: r.cfg.Internal.InstanceCallbackURL, + } + + _, err = r.store.CreateInstance(r.ctx, poolID, createParams) + if err != nil { + return errors.Wrap(err, "creating instance") + } + + return nil +} + func (r *Repository) loop() { defer close(r.done) // TODO: Consolidate runners on loop start. Provider runners must match runners @@ -112,25 +503,13 @@ func (r *Repository) loop() { for { select { - case job, ok := <-r.jobQueue: - if !ok { - // queue was closed. return. - return - } - // We handle jobs synchronously (for now) - switch job.Action { - case "queued": - // Create instance. - case "completed": - // Remove instance. - case "in_progress": - // update state - } - fmt.Println(job) + case <-time.After(5 * time.Second): + // consolidate. + r.consolidate() case <-time.After(3 * time.Hour): // Update tools cache. if err := r.fetchTools(); err != nil { - log.Printf("failed to update tools for repo %s: %s", r.cfg.String(), err) + log.Printf("failed to update tools for repo %s/%s: %s", r.cfg.Owner, r.cfg.Name, err) } case <-r.ctx.Done(): // daemon is shutting down. @@ -142,30 +521,24 @@ func (r *Repository) loop() { } } -// addJobToQueue adds a new workflow job to the queue of jobs that need to be -// processed by this pool. Jobs are added by github webhooks, so it makes no sense -// to return an error when that happens. But we do need to log any error that comes -// up. The queue size is configurable. If we hit that limit, new jobs will be discarded -// and logged. -// TODO: setup a state pipeline that will send back updates to the runner and update the -// database as needed. -func (r *Repository) addJobToQueue(job params.WorkflowJob) { - select { - case r.jobQueue <- job: - case <-time.After(1 * time.Second): - log.Printf("timed out accepting job. Queue is full.") - } -} - func (r *Repository) WebhookSecret() string { return r.cfg.WebhookSecret } func (r *Repository) HandleWorkflowJob(job params.WorkflowJob) error { - if job.Repository.FullName != r.cfg.String() { - return runnerErrors.NewBadRequestError("job not meant for this pool") + if job.Repository.Name != r.cfg.Name || job.Repository.Owner.Login != r.cfg.Owner { + return runnerErrors.NewBadRequestError("job not meant for this pool manager") + } + + switch job.Action { + case "queued": + // Create instance in database and set it to pending create. + case "completed": + // Set instance in database to pending delete. Unassigned jobs will have + // an empty runner_name. There is nothing to to in that case. + case "in_progress": + // update instance workload state. Set job_id in instance state. } - r.addJobToQueue(job) return nil } diff --git a/runner/providers/common/common.go b/runner/providers/common/common.go new file mode 100644 index 00000000..201551f6 --- /dev/null +++ b/runner/providers/common/common.go @@ -0,0 +1,16 @@ +package common + +type InstanceStatus string +type RunnerStatus string + +const ( + InstanceRunning InstanceStatus = "running" + InstanceStopped InstanceStatus = "stopped" + InstancePendingDelete InstanceStatus = "pending_delete" + InstancePendingCreate InstanceStatus = "pending_create" + InstanceStatusUnknown InstanceStatus = "unknown" + + RunnerIdle RunnerStatus = "idle" + RunnerPending RunnerStatus = "pending" + RunnerActive RunnerStatus = "active" +) diff --git a/runner/providers/lxd/lxd.go b/runner/providers/lxd/lxd.go index 42bec17b..7f6a72d3 100644 --- a/runner/providers/lxd/lxd.go +++ b/runner/providers/lxd/lxd.go @@ -13,7 +13,6 @@ import ( "github.com/google/go-github/v43/github" lxd "github.com/lxc/lxd/client" "github.com/lxc/lxd/shared/api" - "github.com/pborman/uuid" "github.com/pkg/errors" "gopkg.in/yaml.v3" ) @@ -24,6 +23,7 @@ const ( // We look for this key in the config of the instances to determine if they are // created by us or not. controllerIDKeyName = "user.runner-controller-id" + poolIDKey = "user.runner-pool-id" ) var ( @@ -182,7 +182,7 @@ func (l *LXD) secureBootEnabled() string { } func (l *LXD) getCreateInstanceArgs(bootstrapParams params.BootstrapInstance) (api.InstancesPost, error) { - name := fmt.Sprintf("runner-manager-%s", uuid.New()) + // name := fmt.Sprintf("runner-manager-%s", uuid.New()) profiles, err := l.getProfiles(bootstrapParams.Flavor) if err != nil { @@ -204,7 +204,7 @@ func (l *LXD) getCreateInstanceArgs(bootstrapParams params.BootstrapInstance) (a return api.InstancesPost{}, errors.Wrap(err, "getting tools") } - cloudCfg, err := util.GetCloudConfig(bootstrapParams, tools, name) + cloudCfg, err := util.GetCloudConfig(bootstrapParams, tools, bootstrapParams.Name) if err != nil { return api.InstancesPost{}, errors.Wrap(err, "generating cloud-config") } @@ -218,13 +218,14 @@ func (l *LXD) getCreateInstanceArgs(bootstrapParams params.BootstrapInstance) (a "user.user-data": cloudCfg, "security.secureboot": l.secureBootEnabled(), controllerIDKeyName: l.controllerID, + poolIDKey: bootstrapParams.PoolID, }, }, Source: api.InstanceSource{ Type: "image", Fingerprint: image.Fingerprint, }, - Name: name, + Name: bootstrapParams.Name, Type: api.InstanceTypeVM, } return args, nil @@ -307,7 +308,7 @@ func (l *LXD) DeleteInstance(ctx context.Context, instance string) error { } // ListInstances will list all instances for a provider. -func (l *LXD) ListInstances(ctx context.Context) ([]params.Instance, error) { +func (l *LXD) ListInstances(ctx context.Context, poolID string) ([]params.Instance, error) { instances, err := l.cli.GetInstancesFull(api.InstanceTypeAny) if err != nil { return []params.Instance{}, errors.Wrap(err, "fetching instances") @@ -317,6 +318,13 @@ func (l *LXD) ListInstances(ctx context.Context) ([]params.Instance, error) { for _, instance := range instances { if id, ok := instance.ExpandedConfig[controllerIDKeyName]; ok && id == l.controllerID { + if poolID != "" { + id := instance.ExpandedConfig[poolID] + if id != poolID { + // Pool ID was specified. Filter out instances belonging to other pools. + continue + } + } ret = append(ret, lxdInstanceToAPIInstance(&instance)) } } @@ -326,7 +334,7 @@ func (l *LXD) ListInstances(ctx context.Context) ([]params.Instance, error) { // RemoveAllInstances will remove all instances created by this provider. func (l *LXD) RemoveAllInstances(ctx context.Context) error { - instances, err := l.ListInstances(ctx) + instances, err := l.ListInstances(ctx, "") if err != nil { return errors.Wrap(err, "fetching instance list") } diff --git a/runner/providers/lxd/util.go b/runner/providers/lxd/util.go index 36042c11..c0d9183d 100644 --- a/runner/providers/lxd/util.go +++ b/runner/providers/lxd/util.go @@ -7,6 +7,7 @@ import ( "log" "runner-manager/config" "runner-manager/params" + "runner-manager/runner/providers/common" "runner-manager/util" "strings" @@ -31,14 +32,17 @@ func lxdInstanceToAPIInstance(instance *api.InstanceFull) params.Instance { } state := instance.State - addresses := []string{} + addresses := []params.Address{} if state.Network != nil { for _, details := range state.Network { for _, addr := range details.Addresses { if addr.Scope != "global" { continue } - addresses = append(addresses, addr.Address) + addresses = append(addresses, params.Address{ + Address: addr.Address, + Type: params.PublicAddress, + }) } } } @@ -56,7 +60,18 @@ func lxdInstanceToAPIInstance(instance *api.InstanceFull) params.Instance { OSName: strings.ToLower(os), OSVersion: osRelease, Addresses: addresses, - Status: state.Status, + Status: lxdStatusToProviderStatus(state.Status), + } +} + +func lxdStatusToProviderStatus(status string) common.InstanceStatus { + switch status { + case "Running": + return common.InstanceRunning + case "Stopped": + return common.InstanceStopped + default: + return common.InstanceStatusUnknown } } diff --git a/runner/runner.go b/runner/runner.go index 289749be..9906cb2d 100644 --- a/runner/runner.go +++ b/runner/runner.go @@ -9,16 +9,20 @@ import ( "encoding/json" "hash" "io/ioutil" + "log" "os" "path/filepath" + "strings" + "sync" + "runner-manager/config" + "runner-manager/database" + dbCommon "runner-manager/database/common" gErrors "runner-manager/errors" "runner-manager/params" "runner-manager/runner/common" "runner-manager/runner/providers" "runner-manager/util" - "strings" - "sync" "github.com/google/go-github/v43/github" "github.com/pkg/errors" @@ -26,7 +30,7 @@ import ( ) func NewRunner(ctx context.Context, cfg config.Config) (*Runner, error) { - ghc, err := util.GithubClientFromConfig(ctx, cfg.Github) + ghc, err := util.GithubClient(ctx, cfg.Github.OAuth2Token) if err != nil { return nil, errors.Wrap(err, "getting github client") } @@ -35,10 +39,15 @@ func NewRunner(ctx context.Context, cfg config.Config) (*Runner, error) { if err != nil { return nil, errors.Wrap(err, "loading providers") } + db, err := database.NewDatabase(ctx, cfg.Database) + if err != nil { + log.Fatal(err) + } runner := &Runner{ ctx: ctx, config: cfg, + db: db, ghc: ghc, providers: providers, } @@ -55,6 +64,7 @@ type Runner struct { ctx context.Context ghc *github.Client + db dbCommon.Store controllerID string @@ -64,10 +74,29 @@ type Runner struct { providers map[string]common.Provider } -func (r *Runner) findRepoPool(name string) (common.PoolManager, error) { - if pool, ok := r.repositories[name]; ok { - return pool, nil - } +func (r *Runner) loadPools() error { + r.mux.Lock() + defer r.mux.Unlock() + + // repos, err := r.db.ListRepositories(r.ctx) + // if err != nil { + // return errors.Wrap(err, "fetching repositories") + // } + + return nil +} + +func (r *Runner) findRepoPool(owner, name string) (common.PoolManager, error) { + r.mux.Lock() + defer r.mux.Unlock() + + // key := fmt.Sprintf("%s/%s", owner, name) + // if repo, ok := r.repositories[key]; ok { + // return pool, nil + // } + + // repo, err := r.db.GetRepository(r.ctx, owner, name) + // r.repositories[key] = repo return nil, errors.Wrapf(gErrors.ErrNotFound, "repository %s not configured", name) } @@ -132,17 +161,14 @@ func (r *Runner) DispatchWorkflowJob(hookTargetType, signature string, jobData [ return errors.Wrapf(gErrors.ErrBadRequest, "invalid job data: %s", err) } - var entity string - var pool common.PoolManager + var poolManager common.PoolManager var err error switch HookTargetType(hookTargetType) { case RepoHook: - entity = job.Repository.FullName - pool, err = r.findRepoPool(entity) + poolManager, err = r.findRepoPool(job.Repository.Owner.Login, job.Repository.Name) case OrganizationHook: - entity = job.Organization.Login - pool, err = r.findOrgPool(entity) + poolManager, err = r.findOrgPool(job.Organization.Login) default: return gErrors.NewBadRequestError("cannot handle hook target type %s", hookTargetType) } @@ -150,12 +176,12 @@ func (r *Runner) DispatchWorkflowJob(hookTargetType, signature string, jobData [ if err != nil { // We don't have a repository or organization configured that // can handle this workflow job. - return errors.Wrap(err, "fetching pool") + return errors.Wrap(err, "fetching poolManager") } // We found a pool. Validate the webhook job. If a secret is configured, // we make sure that the source of this workflow job is valid. - secret := pool.WebhookSecret() + secret := poolManager.WebhookSecret() if err := r.validateHookBody(signature, secret, jobData); err != nil { return errors.Wrap(err, "validating webhook data") } diff --git a/util/util.go b/util/util.go index 8106d41f..81e49c87 100644 --- a/util/util.go +++ b/util/util.go @@ -123,9 +123,9 @@ func OSToOSType(os string) (config.OSType, error) { return osType, nil } -func GithubClientFromConfig(ctx context.Context, cfg config.Github) (*github.Client, error) { +func GithubClient(ctx context.Context, token string) (*github.Client, error) { ts := oauth2.StaticTokenSource( - &oauth2.Token{AccessToken: cfg.OAuth2Token}, + &oauth2.Token{AccessToken: token}, ) tc := oauth2.NewClient(ctx, ts)