This commit is contained in:
Gabriel Adrian Samfira 2022-04-26 20:29:58 +00:00
parent 2be5653683
commit 62ba5a5a08
18 changed files with 1132 additions and 294 deletions

View file

@ -1,172 +1,172 @@
package main
import (
"context"
"flag"
"fmt"
"log"
"os/signal"
// import (
// "context"
// "flag"
// "fmt"
// "log"
// "os/signal"
"runner-manager/config"
"runner-manager/params"
"runner-manager/runner/providers/lxd"
"runner-manager/util"
// "runner-manager/config"
// "runner-manager/params"
// "runner-manager/runner/providers/lxd"
// "runner-manager/util"
"github.com/google/go-github/v43/github"
"golang.org/x/oauth2"
"gopkg.in/yaml.v3"
)
// "github.com/google/go-github/v43/github"
// "golang.org/x/oauth2"
// "gopkg.in/yaml.v3"
// )
var (
conf = flag.String("config", config.DefaultConfigFilePath, "runner-manager config file")
version = flag.Bool("version", false, "prints version")
)
// var (
// conf = flag.String("config", config.DefaultConfigFilePath, "runner-manager config file")
// version = flag.Bool("version", false, "prints version")
// )
var Version string
// var Version string
// var token = "super secret token"
// // var token = "super secret token"
func main() {
flag.Parse()
if *version {
fmt.Println(Version)
return
}
ctx, stop := signal.NotifyContext(context.Background(), signals...)
defer stop()
fmt.Println(ctx)
// func main() {
// flag.Parse()
// if *version {
// fmt.Println(Version)
// return
// }
// ctx, stop := signal.NotifyContext(context.Background(), signals...)
// defer stop()
// fmt.Println(ctx)
cfg, err := config.NewConfig(*conf)
if err != nil {
log.Fatalf("Fetching config: %+v", err)
}
// cfg, err := config.NewConfig(*conf)
// if err != nil {
// log.Fatalf("Fetching config: %+v", err)
// }
ts := oauth2.StaticTokenSource(
&oauth2.Token{AccessToken: cfg.Github.OAuth2Token},
)
// ts := oauth2.StaticTokenSource(
// &oauth2.Token{AccessToken: cfg.Github.OAuth2Token},
// )
tc := oauth2.NewClient(ctx, ts)
// tc := oauth2.NewClient(ctx, ts)
ghClient := github.NewClient(tc)
// ghClient := github.NewClient(tc)
// // list all repositories for the authenticated user
// repos, _, err := client.Repositories.List(ctx, "", nil)
// // // list all repositories for the authenticated user
// // repos, _, err := client.Repositories.List(ctx, "", nil)
// fmt.Println(repos, err)
// // fmt.Println(repos, err)
logWriter, err := util.GetLoggingWriter(cfg)
if err != nil {
log.Fatalf("fetching log writer: %+v", err)
}
log.SetOutput(logWriter)
// logWriter, err := util.GetLoggingWriter(cfg)
// if err != nil {
// log.Fatalf("fetching log writer: %+v", err)
// }
// log.SetOutput(logWriter)
// controller, err := controllers.NewAPIController()
// if err != nil {
// log.Fatalf("failed to create controller: %+v", err)
// }
// // controller, err := controllers.NewAPIController()
// // if err != nil {
// // log.Fatalf("failed to create controller: %+v", err)
// // }
// router := routers.NewAPIRouter(controller, logWriter)
// // router := routers.NewAPIRouter(controller, logWriter)
// tlsCfg, err := cfg.APIServer.APITLSConfig()
// if err != nil {
// log.Fatalf("failed to get TLS config: %q", err)
// }
// // tlsCfg, err := cfg.APIServer.APITLSConfig()
// // if err != nil {
// // log.Fatalf("failed to get TLS config: %q", err)
// // }
// srv := &http.Server{
// Addr: cfg.APIServer.BindAddress(),
// TLSConfig: tlsCfg,
// // Pass our instance of gorilla/mux in.
// Handler: router,
// }
// // srv := &http.Server{
// // Addr: cfg.APIServer.BindAddress(),
// // TLSConfig: tlsCfg,
// // // Pass our instance of gorilla/mux in.
// // Handler: router,
// // }
// listener, err := net.Listen("tcp", srv.Addr)
// if err != nil {
// log.Fatalf("creating listener: %q", err)
// }
// // listener, err := net.Listen("tcp", srv.Addr)
// // if err != nil {
// // log.Fatalf("creating listener: %q", err)
// // }
// go func() {
// if err := srv.Serve(listener); err != nil {
// log.Fatalf("Listening: %+v", err)
// }
// }()
// // go func() {
// // if err := srv.Serve(listener); err != nil {
// // log.Fatalf("Listening: %+v", err)
// // }
// // }()
// <-ctx.Done()
// // <-ctx.Done()
// runner, err := runner.NewRunner(ctx, *cfg)
// if err != nil {
// log.Fatal(err)
// }
// // runner, err := runner.NewRunner(ctx, *cfg)
// // if err != nil {
// // log.Fatal(err)
// // }
// fmt.Println(runner)
controllerID := "026d374d-6a8a-4241-8ed9-a246fff6762f"
provider, err := lxd.NewProvider(ctx, &cfg.Providers[0], controllerID)
if err != nil {
log.Fatal(err)
}
// // fmt.Println(runner)
// controllerID := "026d374d-6a8a-4241-8ed9-a246fff6762f"
// provider, err := lxd.NewProvider(ctx, &cfg.Providers[0], controllerID)
// if err != nil {
// log.Fatal(err)
// }
// if err := provider.RemoveAllInstances(ctx); err != nil {
// log.Fatal(err)
// }
// // if err := provider.RemoveAllInstances(ctx); err != nil {
// // log.Fatal(err)
// // }
// fmt.Println(provider)
// // fmt.Println(provider)
// if err := provider.DeleteInstance(ctx, "runner-manager-2fbe5354-be28-4e00-95a8-11479912368d"); err != nil {
// log.Fatal(err)
// }
// // if err := provider.DeleteInstance(ctx, "runner-manager-2fbe5354-be28-4e00-95a8-11479912368d"); err != nil {
// // log.Fatal(err)
// // }
// instances, err := provider.ListInstances(ctx)
// // instances, err := provider.ListInstances(ctx)
// asJs, err := json.MarshalIndent(instances, "", " ")
// fmt.Println(string(asJs), err)
// // asJs, err := json.MarshalIndent(instances, "", " ")
// // fmt.Println(string(asJs), err)
log.Print("Fetching tools")
tools, _, err := ghClient.Actions.ListOrganizationRunnerApplicationDownloads(ctx, cfg.Organizations[0].Name)
// tools, _, err := ghClient.Actions.ListRunnerApplicationDownloads(ctx, cfg.Repositories[0].Owner, cfg.Repositories[0].Name)
if err != nil {
log.Fatal(err)
}
// log.Print("Fetching tools")
// tools, _, err := ghClient.Actions.ListOrganizationRunnerApplicationDownloads(ctx, cfg.Organizations[0].Name)
// // tools, _, err := ghClient.Actions.ListRunnerApplicationDownloads(ctx, cfg.Repositories[0].Owner, cfg.Repositories[0].Name)
// if err != nil {
// log.Fatal(err)
// }
tk, _, err := ghClient.Actions.CreateOrganizationRegistrationToken(ctx, "gsamfira")
// tk, _, err := ghClient.Actions.CreateOrganizationRegistrationToken(ctx, "gsamfira")
if err != nil {
log.Fatalf("fetching org token: %+v", err)
}
// if err != nil {
// log.Fatalf("fetching org token: %+v", err)
// }
fmt.Printf("Org token is: %v\n", *tk)
// fmt.Printf("Org token is: %v\n", *tk)
toolsAsYaml, err := yaml.Marshal(tools)
if err != nil {
log.Fatal(err)
}
log.Printf("got tools:\n%s\n", string(toolsAsYaml))
// toolsAsYaml, err := yaml.Marshal(tools)
// if err != nil {
// log.Fatal(err)
// }
// log.Printf("got tools:\n%s\n", string(toolsAsYaml))
log.Print("fetching runner token")
ghRunnerToken, _, err := ghClient.Actions.CreateRegistrationToken(ctx, cfg.Repositories[0].Owner, cfg.Repositories[0].Name)
if err != nil {
log.Fatal(err)
}
log.Printf("got token %v", ghRunnerToken)
// log.Print("fetching runner token")
// ghRunnerToken, _, err := ghClient.Actions.CreateRegistrationToken(ctx, cfg.Repositories[0].Owner, cfg.Repositories[0].Name)
// if err != nil {
// log.Fatal(err)
// }
// log.Printf("got token %v", ghRunnerToken)
bootstrapArgs := params.BootstrapInstance{
Tools: tools,
RepoURL: cfg.Organizations[0].String(),
GithubRunnerAccessToken: *tk.Token,
RunnerType: cfg.Repositories[0].Pool.Runners[0].Name,
CallbackURL: "",
InstanceToken: "",
OSArch: config.Amd64,
Flavor: cfg.Organizations[0].Pool.Runners[0].Flavor,
Image: cfg.Organizations[0].Pool.Runners[0].Image,
Labels: cfg.Organizations[0].Pool.Runners[0].Labels,
SSHKeys: []string{
"ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAACAQC2oT7j/+elHY9U2ibgk2RYJgCvqIwewYKJTtHslTQFDWlHLeDam93BBOFlQJm9/wKX/qjC8d26qyzjeeeVf2EEAztp+jQfEq9OU+EtgQUi589jxtVmaWuYED8KVNbzLuP79SrBtEZD4xqgmnNotPhRshh3L6eYj4XzLWDUuOD6kzNdsJA2QOKeMOIFpBN6urKJHRHYD+oUPUX1w5QMv1W1Srlffl4m5uE+0eJYAMr02980PG4+jS4bzM170wYdWwUI0pSZsEDC8Fn7jef6QARU2CgHJYlaTem+KWSXislOUTaCpR0uhakP1ezebW20yuuc3bdRNgSlZi9B7zAPALGZpOshVqwF+KmLDi6XiFwG+NnwAFa6zaQfhOxhw/rF5Jk/wVjHIHkNNvYewycZPbKui0E3QrdVtR908N3VsPtLhMQ59BEMl3xlURSi0fiOU3UjnwmOkOoFDy/WT8qk//gFD93tUxlf4eKXDgNfME3zNz8nVi2uCPvG5NT/P/VWR8NMqW6tZcmWyswM/GgL6Y84JQ3ESZq/7WvAetdc1gVIDQJ2ejYbSHBcQpWvkocsiuMTCwiEvQ0sr+UE5jmecQvLPUyXOhuMhw43CwxnLk1ZSeYeCorxbskyqIXH71o8zhbPoPiEbwgB+i9WEoq02u7c8CmCmO8Y9aOnh8MzTKxIgQ==",
},
}
// bootstrapArgs := params.BootstrapInstance{
// Tools: tools,
// RepoURL: cfg.Organizations[0].String(),
// GithubRunnerAccessToken: *tk.Token,
// RunnerType: cfg.Repositories[0].Pool.Runners[0].Name,
// CallbackURL: "",
// InstanceToken: "",
// OSArch: config.Amd64,
// Flavor: cfg.Organizations[0].Pool.Runners[0].Flavor,
// Image: cfg.Organizations[0].Pool.Runners[0].Image,
// Labels: cfg.Organizations[0].Pool.Runners[0].Labels,
// SSHKeys: []string{
// "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAACAQC2oT7j/+elHY9U2ibgk2RYJgCvqIwewYKJTtHslTQFDWlHLeDam93BBOFlQJm9/wKX/qjC8d26qyzjeeeVf2EEAztp+jQfEq9OU+EtgQUi589jxtVmaWuYED8KVNbzLuP79SrBtEZD4xqgmnNotPhRshh3L6eYj4XzLWDUuOD6kzNdsJA2QOKeMOIFpBN6urKJHRHYD+oUPUX1w5QMv1W1Srlffl4m5uE+0eJYAMr02980PG4+jS4bzM170wYdWwUI0pSZsEDC8Fn7jef6QARU2CgHJYlaTem+KWSXislOUTaCpR0uhakP1ezebW20yuuc3bdRNgSlZi9B7zAPALGZpOshVqwF+KmLDi6XiFwG+NnwAFa6zaQfhOxhw/rF5Jk/wVjHIHkNNvYewycZPbKui0E3QrdVtR908N3VsPtLhMQ59BEMl3xlURSi0fiOU3UjnwmOkOoFDy/WT8qk//gFD93tUxlf4eKXDgNfME3zNz8nVi2uCPvG5NT/P/VWR8NMqW6tZcmWyswM/GgL6Y84JQ3ESZq/7WvAetdc1gVIDQJ2ejYbSHBcQpWvkocsiuMTCwiEvQ0sr+UE5jmecQvLPUyXOhuMhw43CwxnLk1ZSeYeCorxbskyqIXH71o8zhbPoPiEbwgB+i9WEoq02u7c8CmCmO8Y9aOnh8MzTKxIgQ==",
// },
// }
instance, err := provider.CreateInstance(ctx, bootstrapArgs)
if err != nil {
log.Fatal(err)
}
// instance, err := provider.CreateInstance(ctx, bootstrapArgs)
// if err != nil {
// log.Fatal(err)
// }
fmt.Println(instance)
}
// fmt.Println(instance)
// }

View file

@ -0,0 +1,107 @@
package main
import (
"context"
"flag"
"fmt"
"log"
"os/signal"
"runner-manager/config"
"runner-manager/database/sql"
"runner-manager/params"
"runner-manager/util"
)
var (
conf = flag.String("config", config.DefaultConfigFilePath, "runner-manager config file")
version = flag.Bool("version", false, "prints version")
)
var Version string
func main() {
flag.Parse()
if *version {
fmt.Println(Version)
return
}
ctx, stop := signal.NotifyContext(context.Background(), signals...)
defer stop()
fmt.Println(ctx)
cfg, err := config.NewConfig(*conf)
if err != nil {
log.Fatalf("Fetching config: %+v", err)
}
db, err := sql.NewSQLDatabase(ctx, cfg.Database)
if err != nil {
log.Fatal(err)
}
fmt.Println(db)
txt := "ana are mere prune și alune"
enc, err := util.Aes256EncodeString(txt, "pamkotepAyksemfeghoibidEwCivbaut")
if err != nil {
log.Fatal(err)
}
fmt.Printf("encrypted: %d\n", len(enc))
dec, err := util.Aes256DecodeString(enc, "pamkotepAyksemfeghoibidEwCivbaut")
if err != nil {
log.Fatal(err)
}
fmt.Println(dec)
repo, err := db.CreateRepository(ctx, "gabriel-samfira", "scripts", "")
if err != nil {
log.Fatal(err)
}
pool, err := db.CreateRepositoryPool(ctx, repo.ID, params.CreatePoolParams{
ProviderName: "lxd_local",
MaxRunners: 10,
MinIdleRunners: 1,
Image: "ubuntu:20.04",
Flavor: "default",
Tags: []string{
"myrunner",
"superAwesome",
},
OSType: config.Linux,
OSArch: config.Amd64,
})
if err != nil {
log.Fatal(err)
}
fmt.Println(pool)
pool2, err := db.CreateRepositoryPool(ctx, repo.ID, params.CreatePoolParams{
ProviderName: "lxd_local2",
MaxRunners: 10,
MinIdleRunners: 1,
Image: "ubuntu:20.04",
Flavor: "default",
Tags: []string{
"myrunner",
"superAwesome2",
},
OSType: config.Linux,
OSArch: config.Amd64,
})
if err != nil {
log.Fatal(err)
}
fmt.Println(pool2)
pool3, err := db.FindRepositoryPoolByTags(ctx, repo.ID, []string{"myrunner", "superAwesome3"})
if err != nil {
log.Fatal(err)
}
fmt.Println(pool3)
}

View file

@ -45,6 +45,8 @@ const (
// DefaultPoolQueueSize is the default size for a pool queue.
DefaultPoolQueueSize = 10
GithubBaseURL = "https://github.com"
)
var (

View file

@ -7,14 +7,14 @@ import (
type Store interface {
CreateRepository(ctx context.Context, owner, name, webhookSecret string) (params.Repository, error)
GetRepository(ctx context.Context, id string) (params.Repository, error)
GetRepository(ctx context.Context, owner, name string) (params.Repository, error)
ListRepositories(ctx context.Context) ([]params.Repository, error)
DeleteRepository(ctx context.Context, id string) error
DeleteRepository(ctx context.Context, owner, name string) error
CreateOrganization(ctx context.Context, name, webhookSecret string) (params.Organization, error)
GetOrganization(ctx context.Context, id string) (params.Organization, error)
GetOrganization(ctx context.Context, name string) (params.Organization, error)
ListOrganizations(ctx context.Context) ([]params.Organization, error)
DeleteOrganization(ctx context.Context, id string) error
DeleteOrganization(ctx context.Context, name string) error
CreateRepositoryPool(ctx context.Context, repoId string, param params.CreatePoolParams) (params.Pool, error)
CreateOrganizationPool(ctx context.Context, orgId string, param params.CreatePoolParams) (params.Pool, error)
@ -25,6 +25,20 @@ type Store interface {
DeleteRepositoryPool(ctx context.Context, repoID, poolID string) error
DeleteOrganizationPool(ctx context.Context, orgID, poolID string) error
UpdateRepositoryPool(ctx context.Context, repoID, poolID string) (params.Pool, error)
UpdateOrganizationPool(ctx context.Context, orgID, poolID string) (params.Pool, error)
UpdateRepositoryPool(ctx context.Context, repoID, poolID string, param params.UpdatePoolParams) (params.Pool, error)
UpdateOrganizationPool(ctx context.Context, orgID, poolID string, param params.UpdatePoolParams) (params.Pool, error)
FindRepositoryPoolByTags(ctx context.Context, repoID string, tags []string) (params.Pool, error)
FindOrganizationPoolByTags(ctx context.Context, orgID string, tags []string) (params.Pool, error)
CreateInstance(ctx context.Context, poolID string, param params.CreateInstanceParams) (params.Instance, error)
DeleteInstance(ctx context.Context, poolID string, instanceID string) error
UpdateInstance(ctx context.Context, instanceID string, param params.UpdateInstanceParams) (params.Instance, error)
ListInstances(ctx context.Context, poolID string) ([]params.Instance, error)
ListRepoInstances(ctx context.Context, repoID string) ([]params.Instance, error)
ListOrgInstances(ctx context.Context, orgID string) ([]params.Instance, error)
GetInstance(ctx context.Context, poolID string, instanceID string) (params.Instance, error)
GetInstanceByName(ctx context.Context, instanceName string) (params.Instance, error)
}

20
database/database.go Normal file
View file

@ -0,0 +1,20 @@
package database
import (
"context"
"fmt"
"runner-manager/config"
"runner-manager/database/common"
"runner-manager/database/sql"
)
func NewDatabase(ctx context.Context, cfg config.Database) (common.Store, error) {
dbBackend := cfg.DbBackend
switch dbBackend {
case config.MySQLBackend, config.SQLiteBackend:
return sql.NewSQLDatabase(ctx, cfg)
default:
return nil, fmt.Errorf("no team manager backend available for db backend %s", dbBackend)
}
}

View file

@ -16,6 +16,10 @@ type Base struct {
}
func (b *Base) BeforeCreate(tx *gorm.DB) error {
emptyId := uuid.UUID{}
if b.ID != emptyId {
return nil
}
b.ID = uuid.NewV4()
return nil
}
@ -23,7 +27,8 @@ func (b *Base) BeforeCreate(tx *gorm.DB) error {
type Tag struct {
Base
Name string `gorm:"type:varchar(64);uniqueIndex"`
Name string `gorm:"type:varchar(64);uniqueIndex"`
Pools []*Pool `gorm:"many2many:pool_tags;"`
}
type Pool struct {
@ -36,7 +41,13 @@ type Pool struct {
Flavor string `gorm:"index:idx_pool_type,unique"`
OSType config.OSType
OSArch config.OSArch
Tags []Tag `gorm:"foreignKey:id"`
Tags []*Tag `gorm:"many2many:pool_tags;"`
RepoID uuid.UUID
Repository Repository `gorm:"foreignKey:RepoID"`
OrgID uuid.UUID
Organization Organization `gorm:"foreignKey:OrgID"`
}
type Repository struct {
@ -45,7 +56,7 @@ type Repository struct {
Owner string `gorm:"index:idx_owner,unique"`
Name string `gorm:"index:idx_owner,unique"`
WebhookSecret []byte
Pools []Pool `gorm:"foreignKey:id"`
Pools []Pool `gorm:"foreignKey:RepoID"`
}
type Organization struct {
@ -53,5 +64,29 @@ type Organization struct {
Name string `gorm:"uniqueIndex"`
WebhookSecret []byte
Pools []Pool `gorm:"foreignKey:id"`
Pools []Pool `gorm:"foreignKey:OrgID"`
}
type Address struct {
Base
Address string
Type string
}
type Instance struct {
Base
Name string `gorm:"uniqueIndex"`
OSType config.OSType
OSArch config.OSArch
OSName string
OSVersion string
Addresses []Address `gorm:"foreignKey:id"`
Status string
RunnerStatus string
CallbackURL string
CallbackToken string
Pool Pool `gorm:"foreignKey:id"`
}

View file

@ -45,6 +45,8 @@ func (s *sqlDatabase) migrateDB() error {
&Pool{},
&Repository{},
&Organization{},
&Address{},
&Instance{},
); err != nil {
return err
}
@ -54,30 +56,12 @@ func (s *sqlDatabase) migrateDB() error {
func (s *sqlDatabase) sqlToCommonTags(tag Tag) params.Tag {
return params.Tag{
// ID: tag.ID.String(),
ID: tag.ID.String(),
Name: tag.Name,
}
}
// func (s *sqlDatabase) sqlToCommonRunner(runner Runner) params.Runner {
// ret := params.Runner{
// ID: runner.ID.String(),
// MaxRunners: runner.MaxRunners,
// MinIdleRunners: runner.MinIdleRunners,
// Image: runner.Image,
// Flavor: runner.Flavor,
// OSArch: runner.OSArch,
// OSType: runner.OSType,
// Tags: make([]params.Tag, len(runner.Tags)),
// }
// for idx, val := range runner.Tags {
// ret.Tags[idx] = s.sqlToCommonTags(val)
// }
// return ret
// }
func (s *sqlDatabase) sqlToCommonPool(pool Pool) params.Pool {
ret := params.Pool{
ID: pool.ID.String(),
@ -92,7 +76,7 @@ func (s *sqlDatabase) sqlToCommonPool(pool Pool) params.Pool {
}
for idx, val := range pool.Tags {
ret.Tags[idx] = s.sqlToCommonTags(val)
ret.Tags[idx] = s.sqlToCommonTags(*val)
}
return ret
@ -149,7 +133,19 @@ func (s *sqlDatabase) CreateRepository(ctx context.Context, owner, name, webhook
return param, nil
}
func (s *sqlDatabase) getRepo(ctx context.Context, id string) (Repository, error) {
func (s *sqlDatabase) getRepo(ctx context.Context, owner, name string) (Repository, error) {
var repo Repository
q := s.conn.Preload(clause.Associations).Where("name = ? and owner = ?", name, owner).First(&repo)
if q.Error != nil {
if errors.Is(q.Error, gorm.ErrRecordNotFound) {
return Repository{}, runnerErrors.ErrNotFound
}
return Repository{}, errors.Wrap(q.Error, "fetching repository from database")
}
return repo, nil
}
func (s *sqlDatabase) getRepoByID(ctx context.Context, id string) (Repository, error) {
u := uuid.Parse(id)
if u == nil {
return Repository{}, errors.Wrap(runnerErrors.NewBadRequestError(""), "parsing id")
@ -165,8 +161,8 @@ func (s *sqlDatabase) getRepo(ctx context.Context, id string) (Repository, error
return repo, nil
}
func (s *sqlDatabase) GetRepository(ctx context.Context, id string) (params.Repository, error) {
repo, err := s.getRepo(ctx, id)
func (s *sqlDatabase) GetRepository(ctx context.Context, owner, name string) (params.Repository, error) {
repo, err := s.getRepo(ctx, owner, name)
if err != nil {
return params.Repository{}, errors.Wrap(err, "fetching repo")
}
@ -196,8 +192,8 @@ func (s *sqlDatabase) ListRepositories(ctx context.Context) ([]params.Repository
return ret, nil
}
func (s *sqlDatabase) DeleteRepository(ctx context.Context, id string) error {
repo, err := s.getRepo(ctx, id)
func (s *sqlDatabase) DeleteRepository(ctx context.Context, owner, name string) error {
repo, err := s.getRepo(ctx, owner, name)
if err != nil {
if err == runnerErrors.ErrNotFound {
return nil
@ -238,7 +234,19 @@ func (s *sqlDatabase) CreateOrganization(ctx context.Context, name, webhookSecre
return param, nil
}
func (s *sqlDatabase) getOrg(ctx context.Context, id string) (Organization, error) {
func (s *sqlDatabase) getOrg(ctx context.Context, name string) (Organization, error) {
var org Organization
q := s.conn.Preload(clause.Associations).Where("name = ?", name).First(&org)
if q.Error != nil {
if errors.Is(q.Error, gorm.ErrRecordNotFound) {
return Organization{}, runnerErrors.ErrNotFound
}
return Organization{}, errors.Wrap(q.Error, "fetching org from database")
}
return org, nil
}
func (s *sqlDatabase) getOrgByID(ctx context.Context, id string) (Organization, error) {
u := uuid.Parse(id)
if u == nil {
return Organization{}, errors.Wrap(runnerErrors.NewBadRequestError(""), "parsing id")
@ -254,8 +262,8 @@ func (s *sqlDatabase) getOrg(ctx context.Context, id string) (Organization, erro
return org, nil
}
func (s *sqlDatabase) GetOrganization(ctx context.Context, id string) (params.Organization, error) {
org, err := s.getOrg(ctx, id)
func (s *sqlDatabase) GetOrganization(ctx context.Context, name string) (params.Organization, error) {
org, err := s.getOrg(ctx, name)
if err != nil {
return params.Organization{}, errors.Wrap(err, "fetching repo")
}
@ -285,8 +293,8 @@ func (s *sqlDatabase) ListOrganizations(ctx context.Context) ([]params.Organizat
return ret, nil
}
func (s *sqlDatabase) DeleteOrganization(ctx context.Context, id string) error {
org, err := s.getOrg(ctx, id)
func (s *sqlDatabase) DeleteOrganization(ctx context.Context, name string) error {
org, err := s.getOrg(ctx, name)
if err != nil {
if err == runnerErrors.ErrNotFound {
return nil
@ -327,7 +335,7 @@ func (s *sqlDatabase) CreateRepositoryPool(ctx context.Context, repoId string, p
return params.Pool{}, runnerErrors.NewBadRequestError("no tags specified")
}
repo, err := s.getRepo(ctx, repoId)
repo, err := s.getRepoByID(ctx, repoId)
if err != nil {
return params.Pool{}, errors.Wrap(err, "fetching repo")
}
@ -340,22 +348,34 @@ func (s *sqlDatabase) CreateRepositoryPool(ctx context.Context, repoId string, p
Flavor: param.Flavor,
OSType: param.OSType,
OSArch: param.OSArch,
RepoID: repo.ID,
}
tags := make([]Tag, len(param.Tags))
for idx, val := range param.Tags {
tags := []Tag{}
for _, val := range param.Tags {
t, err := s.getOrCreateTag(val)
if err != nil {
return params.Pool{}, errors.Wrap(err, "fetching tag")
}
tags[idx] = t
fmt.Printf(">>>> Tag name: %s --> ID: %s\n", t.Name, t.ID)
tags = append(tags, t)
// newPool.Tags = append(newPool.Tags, &t)
}
err = s.conn.Model(&repo).Association("Pools").Append(&newPool)
if err != nil {
q := s.conn.Create(&newPool)
if q.Error != nil {
return params.Pool{}, errors.Wrap(err, "adding pool")
}
return s.sqlToCommonPool(newPool), nil
for _, tt := range tags {
s.conn.Model(&newPool).Association("Tags").Append(&tt)
}
repo, err = s.getRepoByID(ctx, repoId)
if err != nil {
return params.Pool{}, errors.Wrap(err, "fetching repo")
}
return s.sqlToCommonPool(repo.Pools[0]), nil
}
func (s *sqlDatabase) CreateOrganizationPool(ctx context.Context, orgId string, param params.CreatePoolParams) (params.Pool, error) {
@ -363,7 +383,7 @@ func (s *sqlDatabase) CreateOrganizationPool(ctx context.Context, orgId string,
return params.Pool{}, runnerErrors.NewBadRequestError("no tags specified")
}
org, err := s.getOrg(ctx, orgId)
org, err := s.getOrgByID(ctx, orgId)
if err != nil {
return params.Pool{}, errors.Wrap(err, "fetching org")
}
@ -378,15 +398,16 @@ func (s *sqlDatabase) CreateOrganizationPool(ctx context.Context, orgId string,
OSArch: param.OSArch,
}
tags := make([]Tag, len(param.Tags))
tags := make([]*Tag, len(param.Tags))
for idx, val := range param.Tags {
t, err := s.getOrCreateTag(val)
if err != nil {
return params.Pool{}, errors.Wrap(err, "fetching tag")
}
tags[idx] = t
tags[idx] = &t
}
newPool.Tags = append(newPool.Tags, tags...)
err = s.conn.Model(&org).Association("Pools").Append(&newPool)
if err != nil {
return params.Pool{}, errors.Wrap(err, "adding pool")
@ -394,58 +415,174 @@ func (s *sqlDatabase) CreateOrganizationPool(ctx context.Context, orgId string,
return s.sqlToCommonPool(newPool), nil
}
func (s *sqlDatabase) GetRepositoryPool(ctx context.Context, repoID, poolID string) (params.Pool, error) {
repo, err := s.getRepo(ctx, repoID)
func (s *sqlDatabase) getRepoPool(ctx context.Context, repoID, poolID string) (Pool, error) {
repo, err := s.getRepoByID(ctx, repoID)
if err != nil {
return params.Pool{}, errors.Wrap(err, "fetching repo")
return Pool{}, errors.Wrap(err, "fetching repo")
}
u := uuid.Parse(poolID)
if u == nil {
return params.Pool{}, fmt.Errorf("invalid pool id")
return Pool{}, fmt.Errorf("invalid pool id")
}
var pool []Pool
err = s.conn.Model(&repo).Association("Pools").Find(&pool, "id = ?", u)
if err != nil {
return params.Pool{}, errors.Wrap(err, "fetching pool")
return Pool{}, errors.Wrap(err, "fetching pool")
}
if len(pool) == 0 {
return params.Pool{}, runnerErrors.ErrNotFound
return Pool{}, runnerErrors.ErrNotFound
}
return s.sqlToCommonPool(pool[0]), nil
return pool[0], nil
}
func (s *sqlDatabase) GetOrganizationPool(ctx context.Context, orgID, poolID string) (params.Pool, error) {
org, err := s.getOrg(ctx, orgID)
func (s *sqlDatabase) GetRepositoryPool(ctx context.Context, repoID, poolID string) (params.Pool, error) {
pool, err := s.getRepoPool(ctx, repoID, poolID)
if err != nil {
return params.Pool{}, errors.Wrap(err, "fetching org")
return params.Pool{}, errors.Wrap(err, "fetching pool")
}
return s.sqlToCommonPool(pool), nil
}
func (s *sqlDatabase) getOrgPool(ctx context.Context, orgID, poolID string) (Pool, error) {
org, err := s.getOrgByID(ctx, orgID)
if err != nil {
return Pool{}, errors.Wrap(err, "fetching repo")
}
u := uuid.Parse(poolID)
if u == nil {
return params.Pool{}, fmt.Errorf("invalid pool id")
return Pool{}, fmt.Errorf("invalid pool id")
}
var pool []Pool
err = s.conn.Model(&org).Association("Pools").Find(&pool, "id = ?", u)
if err != nil {
return params.Pool{}, errors.Wrap(err, "fetching pool")
return Pool{}, errors.Wrap(err, "fetching pool")
}
if len(pool) == 0 {
return params.Pool{}, runnerErrors.ErrNotFound
return Pool{}, runnerErrors.ErrNotFound
}
return s.sqlToCommonPool(pool[0]), nil
return pool[0], nil
}
func (s *sqlDatabase) GetOrganizationPool(ctx context.Context, orgID, poolID string) (params.Pool, error) {
pool, err := s.getOrgPool(ctx, orgID, poolID)
if err != nil {
return params.Pool{}, errors.Wrap(err, "fetching pool")
}
return s.sqlToCommonPool(pool), nil
}
func (s *sqlDatabase) DeleteRepositoryPool(ctx context.Context, repoID, poolID string) error {
pool, err := s.getRepoPool(ctx, repoID, poolID)
if err != nil {
if errors.Is(err, runnerErrors.ErrNotFound) {
return nil
}
return errors.Wrap(err, "looking up repo pool")
}
q := s.conn.Delete(&pool)
if q.Error != nil && !errors.Is(q.Error, gorm.ErrRecordNotFound) {
return errors.Wrap(q.Error, "deleting pool")
}
return nil
}
func (s *sqlDatabase) DeleteOrganizationPool(ctx context.Context, orgID, poolID string) error {
pool, err := s.getOrgPool(ctx, orgID, poolID)
if err != nil {
if errors.Is(err, runnerErrors.ErrNotFound) {
return nil
}
return errors.Wrap(err, "looking up repo pool")
}
q := s.conn.Delete(&pool)
if q.Error != nil && !errors.Is(q.Error, gorm.ErrRecordNotFound) {
return errors.Wrap(q.Error, "deleting pool")
}
return nil
}
func (s *sqlDatabase) UpdateRepositoryPool(ctx context.Context, repoID, poolID string) (params.Pool, error) {
func (s *sqlDatabase) UpdateRepositoryPool(ctx context.Context, repoID, poolID string, param params.UpdatePoolParams) (params.Pool, error) {
return params.Pool{}, nil
}
func (s *sqlDatabase) UpdateOrganizationPool(ctx context.Context, orgID, poolID string) (params.Pool, error) {
func (s *sqlDatabase) UpdateOrganizationPool(ctx context.Context, orgID, poolID string, param params.UpdatePoolParams) (params.Pool, error) {
return params.Pool{}, nil
}
func (s *sqlDatabase) findPoolByTags(id, poolType string, tags []string) (params.Pool, error) {
if len(tags) == 0 {
return params.Pool{}, runnerErrors.NewBadRequestError("missing tags")
}
u := uuid.Parse(id)
if u == nil {
return params.Pool{}, errors.Wrap(runnerErrors.NewBadRequestError(""), "parsing id")
}
var pool Pool
where := fmt.Sprintf("tags.name in ? and %s = ?", poolType)
q := s.conn.Joins("JOIN pool_tags on pool_tags.pool_id=pools.id").
Joins("JOIN tags on tags.id=pool_tags.tag_id").
Group("pools.id").
Preload("Tags").
Having("count(1) = ?", len(tags)).
Where(where, tags, id).First(&pool)
if q.Error != nil {
if errors.Is(q.Error, gorm.ErrRecordNotFound) {
return params.Pool{}, runnerErrors.ErrNotFound
}
return params.Pool{}, errors.Wrap(q.Error, "fetching pool")
}
return s.sqlToCommonPool(pool), nil
}
func (s *sqlDatabase) FindRepositoryPoolByTags(ctx context.Context, repoID string, tags []string) (params.Pool, error) {
pool, err := s.findPoolByTags(repoID, "repo_id", tags)
if err != nil {
return params.Pool{}, errors.Wrap(err, "fetching pool")
}
return pool, nil
}
func (s *sqlDatabase) FindOrganizationPoolByTags(ctx context.Context, orgID string, tags []string) (params.Pool, error) {
pool, err := s.findPoolByTags(orgID, "org_id", tags)
if err != nil {
return params.Pool{}, errors.Wrap(err, "fetching pool")
}
return pool, nil
}
func (s *sqlDatabase) CreateInstance(ctx context.Context, poolID string, param params.CreateInstanceParams) (params.Instance, error) {
return params.Instance{}, nil
}
func (s *sqlDatabase) DeleteInstance(ctx context.Context, poolID string, instanceID string) error {
return nil
}
func (s *sqlDatabase) UpdateInstance(ctx context.Context, instanceID string, param params.UpdateInstanceParams) (params.Instance, error) {
return params.Instance{}, nil
}
func (s *sqlDatabase) ListInstances(ctx context.Context, poolID string) ([]params.Instance, error) {
return []params.Instance{}, nil
}
func (s *sqlDatabase) ListRepoInstances(ctx context.Context, repoID string) ([]params.Instance, error) {
return []params.Instance{}, nil
}
func (s *sqlDatabase) ListOrgInstances(ctx context.Context, orgID string) ([]params.Instance, error) {
return []params.Instance{}, nil
}
func (s *sqlDatabase) GetInstance(ctx context.Context, poolID string, instanceID string) (params.Instance, error) {
return params.Instance{}, nil
}
func (s *sqlDatabase) GetInstanceByName(ctx context.Context, instanceName string) (params.Instance, error) {
return params.Instance{}, nil
}

2
go.mod
View file

@ -5,6 +5,7 @@ go 1.18
require (
github.com/BurntSushi/toml v0.3.1
github.com/google/go-github/v43 v43.0.0
github.com/google/uuid v1.3.0
github.com/gorilla/handlers v1.5.1
github.com/gorilla/mux v1.8.0
github.com/lxc/lxd v0.0.0-20220415052741-1170f2806124
@ -27,7 +28,6 @@ require (
github.com/go-sql-driver/mysql v1.6.0 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/google/go-querystring v1.1.0 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/gorilla/websocket v1.5.0 // indirect
github.com/jinzhu/inflection v1.0.0 // indirect
github.com/jinzhu/now v1.1.5 // indirect

View file

@ -2,11 +2,40 @@ package params
import (
"runner-manager/config"
"runner-manager/runner/providers/common"
"github.com/google/go-github/v43/github"
)
type AddressType string
const (
PublicAddress AddressType = "public"
PrivateAddress AddressType = "private"
)
type Address struct {
Address string `json:"address"`
Type AddressType `json:"type"`
}
type UpdateInstanceParams struct {
ProviderID string `json:"provider_id,omitempty"`
// OSName is the name of the OS. Eg: ubuntu, centos, etc.
OSName string `json:"os_name,omitempty"`
// OSVersion is the version of the operating system.
OSVersion string `json:"os_version,omitempty"`
// Addresses is a list of IP addresses the provider reports
// for this instance.
Addresses []Address `json:"addresses,omitempty"`
// Status is the status of the instance inside the provider (eg: running, stopped, etc)
Status common.InstanceStatus `json:"status"`
RunnerStatus common.RunnerStatus `json:"runner_status"`
}
type Instance struct {
// ID is the database ID of this instance.
ID string `json:"id"`
// PeoviderID is the unique ID the provider associated
// with the compute instance. We use this to identify the
// instance in the provider.
@ -18,32 +47,34 @@ type Instance struct {
Name string `json:"name,omitempty"`
// OSType is the operating system type. For now, only Linux and
// Windows are supported.
OSType config.OSType `json:"os-type,omitempty"`
OSType config.OSType `json:"os_type,omitempty"`
// OSName is the name of the OS. Eg: ubuntu, centos, etc.
OSName string `json:"os-name,omitempty"`
OSName string `json:"os_name,omitempty"`
// OSVersion is the version of the operating system.
OSVersion string `json:"os-version,omitempty"`
OSVersion string `json:"os_version,omitempty"`
// OSArch is the operating system architecture.
OSArch config.OSArch `json:"os-arch,omitempty"`
OSArch config.OSArch `json:"os_arch,omitempty"`
// Addresses is a list of IP addresses the provider reports
// for this instance.
Addresses []string `json:"ip-addresses,omitempty"`
Addresses []Address `json:"addresses,omitempty"`
// Status is the status of the instance inside the provider (eg: running, stopped, etc)
Status string `json:"status"`
Status common.InstanceStatus `json:"status"`
RunnerStatus common.RunnerStatus `json:"runner_status"`
PoolID string `json:"pool_id"`
// Do not serialize sensitive info.
CallbackURL string `json:"-"`
CallbackToken string `json:"-"`
}
type BootstrapInstance struct {
Name string `json:"name"`
Tools []*github.RunnerApplicationDownload `json:"tools"`
// RepoURL is the URL the github runner agent needs to configure itself.
RepoURL string `json:"repo_url"`
// GithubRunnerAccessToken is the token we fetch from github to allow the runner to
// register itself.
GithubRunnerAccessToken string `json:"github_runner_access_token"`
// RunnerType is the name of the defined runner type in a particular pool. The provider
// needs this to determine which flavor/image/settings it needs to use to create the
// instance. This is provider/runner specific. The config for the runner type is defined
// in the configuration file, as part of the pool definition.
RunnerType string `json:"runner-type"`
// CallbackUrl is the URL where the instance can send a post, signaling
// progress or status.
CallbackURL string `json:"callback-url"`
@ -58,6 +89,7 @@ type BootstrapInstance struct {
Flavor string `json:"flavor"`
Image string `json:"image"`
Labels []string `json:"labels"`
PoolID string `json:"pool_id"`
}
type Tag struct {
@ -77,19 +109,29 @@ type Pool struct {
Tags []Tag `json:"tags"`
}
type Internal struct {
OAuth2Token string `json:"oauth2"`
ControllerID string `json:"controller_id"`
InstanceCallbackURL string `json:"instance_callback_url"`
}
type Repository struct {
ID string `json:"id"`
Owner string `json:"owner"`
Name string `json:"name"`
WebhookSecret string `json:"-"`
Pools []Pool `json:"pool,omitempty"`
ID string `json:"id"`
Owner string `json:"owner"`
Name string `json:"name"`
Pools []Pool `json:"pool,omitempty"`
// Do not serialize sensitive info.
WebhookSecret string `json:"-"`
Internal Internal `json:"-"`
}
type Organization struct {
ID string `json:"id"`
Name string `json:"name"`
WebhookSecret string `json:"-"`
Pools []Pool `json:"pool,omitempty"`
ID string `json:"id"`
Name string `json:"name"`
Pools []Pool `json:"pool,omitempty"`
// Do not serialize sensitive info.
WebhookSecret string `json:"-"`
Internal Internal `json:"-"`
}
type CreatePoolParams struct {
@ -102,3 +144,32 @@ type CreatePoolParams struct {
OSArch config.OSArch `json:"os_arch"`
Tags []string `json:"tags"`
}
/*
Name string `gorm:"uniqueIndex"`
OSType config.OSType
OSArch config.OSArch
OSName string
OSVersion string
Addresses []Address `gorm:"foreignKey:id"`
Status string
RunnerStatus string
CallbackURL string
CallbackToken []byte
Pool Pool `gorm:"foreignKey:id"`
*/
type CreateInstanceParams struct {
Name string
OSType config.OSType
OSArch config.OSArch
Status common.InstanceStatus
RunnerStatus common.RunnerStatus
CallbackURL string
CallbackToken string
Pool string
}
type UpdatePoolParams struct{}

View file

@ -5,6 +5,14 @@ import "runner-manager/params"
type PoolManager interface {
WebhookSecret() string
HandleWorkflowJob(job params.WorkflowJob) error
// PoolManager lifecycle functions. Start/stop pool.
Start() error
Stop() error
Wait() error
}
type Pool interface {
ListInstances() ([]params.Instance, error)
GetInstance() (params.Instance, error)
DeleteInstance() error

View file

@ -13,7 +13,7 @@ type Provider interface {
// GetInstance will return details about one instance.
GetInstance(ctx context.Context, instance string) (params.Instance, error)
// ListInstances will list all instances for a provider.
ListInstances(ctx context.Context) ([]params.Instance, error)
ListInstances(ctx context.Context, poolID string) ([]params.Instance, error)
// RemoveAllInstances will remove all instances created by this provider.
RemoveAllInstances(ctx context.Context) error
// Stop shuts down the instance.

6
runner/pool/common.go Normal file
View file

@ -0,0 +1,6 @@
package pool
var (
poolIDLabelprefix = "runner-pool-id:"
controllerLabelPrefix = "runner-controller-id:"
)

View file

@ -4,33 +4,44 @@ import (
"context"
"fmt"
"log"
"strings"
"sync"
"time"
"runner-manager/config"
dbCommon "runner-manager/database/common"
runnerErrors "runner-manager/errors"
"runner-manager/params"
"runner-manager/runner/common"
providerCommon "runner-manager/runner/providers/common"
"runner-manager/util"
"github.com/google/go-github/v43/github"
"github.com/google/uuid"
"github.com/pkg/errors"
)
// test that we implement PoolManager
var _ common.PoolManager = &Repository{}
func NewRepositoryRunnerPool(ctx context.Context, cfg config.Repository, provider common.Provider, ghcli *github.Client, controllerID string) (common.PoolManager, error) {
queueSize := cfg.Pool.QueueSize
if queueSize == 0 {
queueSize = config.DefaultPoolQueueSize
func NewRepositoryPoolManager(ctx context.Context, cfg params.Repository, providers map[string]common.Provider, store dbCommon.Store) (common.PoolManager, error) {
ghc, err := util.GithubClient(ctx, cfg.Internal.OAuth2Token)
if err != nil {
return nil, errors.Wrap(err, "getting github client")
}
pools := map[string]params.Pool{}
for _, val := range cfg.Pools {
pools[val.ID] = val
}
repo := &Repository{
ctx: ctx,
cfg: cfg,
ghcli: ghcli,
provider: provider,
controllerID: controllerID,
jobQueue: make(chan params.WorkflowJob, queueSize),
ghcli: ghc,
store: store,
providers: providers,
pools: pools,
controllerID: cfg.Internal.ControllerID,
quit: make(chan struct{}),
done: make(chan struct{}),
}
@ -44,14 +55,30 @@ func NewRepositoryRunnerPool(ctx context.Context, cfg config.Repository, provide
type Repository struct {
ctx context.Context
controllerID string
cfg config.Repository
cfg params.Repository
store dbCommon.Store
ghcli *github.Client
provider common.Provider
providers map[string]common.Provider
tools []*github.RunnerApplicationDownload
jobQueue chan params.WorkflowJob
quit chan struct{}
done chan struct{}
mux sync.Mutex
id string
pools map[string]params.Pool
mux sync.Mutex
}
func (r *Repository) AddPool(ctx context.Context, pool params.Pool) error {
r.mux.Lock()
defer r.mux.Unlock()
if _, ok := r.pools[pool.ID]; ok {
return nil
}
// start pool loop
r.pools[pool.ID] = pool
return nil
}
func (r *Repository) getGithubRunners() ([]*github.Runner, error) {
@ -68,6 +95,17 @@ func (r *Repository) getProviderInstances() ([]params.Instance, error) {
}
func (r *Repository) Start() error {
runners, err := r.getGithubRunners()
if err != nil {
return errors.Wrap(err, "fetching github runners")
}
if err := r.cleanupOrphanedProviderRunners(runners); err != nil {
return errors.Wrap(err, "cleaning orphaned instances")
}
if err := r.cleanupOrphanedGithubRunners(runners); err != nil {
return errors.Wrap(err, "cleaning orphaned github runners")
}
go r.loop()
return nil
}
@ -97,6 +135,359 @@ func (r *Repository) Wait() error {
return nil
}
func (r *Repository) consolidate() {
r.mux.Lock()
defer r.mux.Unlock()
r.deletePendingInstances()
r.addPendingInstances()
r.ensureMinIdleRunners()
}
func (r *Repository) addPendingInstances() {
instances, err := r.store.ListRepoInstances(r.ctx, r.id)
if err != nil {
log.Printf("failed to fetch instances from store: %s", err)
return
}
for _, instance := range instances {
if instance.Status != providerCommon.InstancePendingCreate {
// not in pending_create status. Skip.
continue
}
if err := r.addInstanceToProvider(instance); err != nil {
log.Printf("failed to create instance in provider: %s", err)
}
}
}
func (r *Repository) deletePendingInstances() {
instances, err := r.store.ListRepoInstances(r.ctx, r.id)
if err != nil {
log.Printf("failed to fetch instances from store: %s", err)
return
}
for _, instance := range instances {
if instance.Status != providerCommon.InstancePendingDelete {
// not in pending_delete status. Skip.
continue
}
if err := r.deleteInstanceFromProvider(instance); err != nil {
log.Printf("failed to delete instance from provider: %s", err)
}
}
}
func (r *Repository) poolIDFromLabels(labels []*github.RunnerLabels) (string, error) {
for _, lbl := range labels {
if strings.HasPrefix(*lbl.Name, poolIDLabelprefix) {
labelName := *lbl.Name
return labelName[len(poolIDLabelprefix):], nil
}
}
return "", runnerErrors.ErrNotFound
}
// cleanupOrphanedGithubRunners will forcefully remove any github runners that appear
// as offline and for which we no longer have a local instance.
// This may happen if someone manually deletes the instance in the provider. We need to
// first remove the instance from github, and then from our database.
func (r *Repository) cleanupOrphanedGithubRunners(runners []*github.Runner) error {
for _, runner := range runners {
status := runner.GetStatus()
if status != "offline" {
// Runner is online. Ignore it.
continue
}
removeRunner := false
// check locally and delete
// dbInstance, err := r.store.GetInstance()
poolID, err := r.poolIDFromLabels(runner.Labels)
if err != nil {
if !errors.Is(err, runnerErrors.ErrNotFound) {
return errors.Wrap(err, "finding pool")
}
// not a runner we manage
continue
}
pool, ok := r.pools[poolID]
if !ok {
// not a pool we manage.
continue
}
dbInstance, err := r.store.GetInstance(r.ctx, poolID, *runner.Name)
if err != nil {
if !errors.Is(err, runnerErrors.ErrNotFound) {
return errors.Wrap(err, "fetching instance from DB")
}
// We no longer have a DB entry for this instance. Previous forceful
// removal may have failed?
removeRunner = true
} else {
if providerCommon.InstanceStatus(dbInstance.Status) == providerCommon.InstancePendingDelete {
// already marked for deleting. Let consolidate take care of it.
continue
}
// check if the provider still has the instance.
provider, ok := r.providers[pool.ProviderName]
if !ok {
return fmt.Errorf("unknown provider %s for pool %s", pool.ProviderName, pool.ID)
}
instance, err := provider.GetInstance(r.ctx, dbInstance.Name)
if err != nil {
if !errors.Is(err, runnerErrors.ErrNotFound) {
return errors.Wrap(err, "fetching instance from provider")
}
// instance was manually deleted?
removeRunner = true
} else {
if providerCommon.InstanceStatus(instance.Status) == providerCommon.InstanceRunning {
// instance is running, but github reports runner as offline. Log the event.
// This scenario requires manual intervention.
// Perhaps it just came online and github did not yet change it's status?
log.Printf("instance %s is online but github reports runner as offline", instance.Name)
continue
}
//start the instance
if err := provider.Start(r.ctx, instance.ProviderID); err != nil {
return errors.Wrapf(err, "starting instance %s", instance.ProviderID)
}
// we started the instance. Give it a chance to come online
continue
}
if removeRunner {
if _, err := r.ghcli.Actions.RemoveRunner(r.ctx, r.cfg.Owner, r.cfg.Name, *runner.ID); err != nil {
return errors.Wrap(err, "removing runner")
}
}
}
}
return nil
}
// cleanupOrphanedProviderRunners compares runners in github with local runners and removes
// any local runners that are not present in Github. Runners that are "idle" in our
// provider, but do not exist in github, will be removed. This can happen if the
// runner-manager was offline while a job was executed by a github action. When this
// happens, github will remove the ephemeral worker and send a webhook our way.
// If we were offline and did not process the webhook, the instance will linger.
// We need to remove it from the provider and database.
func (r *Repository) cleanupOrphanedProviderRunners(runners []*github.Runner) error {
// runners, err := r.getGithubRunners()
// if err != nil {
// return errors.Wrap(err, "fetching github runners")
// }
dbInstances, err := r.store.ListRepoInstances(r.ctx, r.id)
if err != nil {
return errors.Wrap(err, "fetching instances from db")
}
runnerNames := map[string]bool{}
for _, run := range runners {
runnerNames[*run.Name] = true
}
for _, instance := range dbInstances {
if providerCommon.InstanceStatus(instance.Status) == providerCommon.InstancePendingCreate || providerCommon.InstanceStatus(instance.Status) == providerCommon.InstancePendingDelete {
// this instance is in the process of being created or is awaiting deletion.
// Instances in pending_Create did not get a chance to register themselves in,
// github so we let them be for now.
continue
}
if ok := runnerNames[instance.Name]; !ok {
// Set pending_delete on DB field. Allow consolidate() to remove it.
_, err = r.store.UpdateInstance(r.ctx, instance.Name, params.UpdateInstanceParams{})
if err != nil {
return errors.Wrap(err, "syncing local state with github")
}
}
}
return nil
}
func (r *Repository) ensureMinIdleRunners() {
for poolID, pool := range r.pools {
existingInstances, err := r.store.ListInstances(r.ctx, poolID)
if err != nil {
log.Printf("failed to ensure minimum idle workers for pool %s: %s", poolID, err)
return
}
if uint(len(existingInstances)) >= pool.MaxRunners {
log.Printf("max workers reached for pool %s, skipping idle worker creation", poolID)
continue
}
idleOrPendingWorkers := []params.Instance{}
for _, inst := range existingInstances {
if providerCommon.RunnerStatus(inst.RunnerStatus) == providerCommon.RunnerIdle || providerCommon.RunnerStatus(inst.RunnerStatus) == providerCommon.RunnerPending {
idleOrPendingWorkers = append(idleOrPendingWorkers, inst)
}
}
var required int
if len(idleOrPendingWorkers) < int(pool.MinIdleRunners) {
// get the needed delta.
required = int(pool.MinIdleRunners) - len(idleOrPendingWorkers)
projectedInstanceCount := len(existingInstances) + required
if uint(projectedInstanceCount) > pool.MaxRunners {
// ensure we don't go above max workers
required = (len(existingInstances) + required) - int(pool.MaxRunners)
}
}
for i := 0; i < required; i++ {
log.Printf("addind new idle worker to pool %s", poolID)
if err := r.AddRunner(r.ctx, poolID); err != nil {
log.Printf("failed to add new instance for pool %s: %s", poolID, err)
}
}
}
}
func (r *Repository) githubURL() string {
return fmt.Sprintf("%s/%s/%s", config.GithubBaseURL, r.cfg.Owner, r.cfg.Name)
}
func (r *Repository) poolLabel() string {
return fmt.Sprintf("%s%s", poolIDLabelprefix, r.id)
}
func (r *Repository) controllerLabel() string {
return fmt.Sprintf("%s%s", controllerLabelPrefix, r.controllerID)
}
func (r *Repository) getLabels() []string {
return []string{
r.poolLabel(),
r.controllerLabel(),
}
}
func (r *Repository) updateArgsFromProviderInstance(providerInstance params.Instance) params.UpdateInstanceParams {
return params.UpdateInstanceParams{
ProviderID: providerInstance.ProviderID,
OSName: providerInstance.OSName,
OSVersion: providerInstance.OSName,
Addresses: providerInstance.Addresses,
Status: providerInstance.Status,
RunnerStatus: providerInstance.RunnerStatus,
}
}
func (r *Repository) deleteInstanceFromProvider(instance params.Instance) error {
pool, ok := r.pools[instance.PoolID]
if !ok {
return runnerErrors.NewNotFoundError("invalid pool ID")
}
provider, ok := r.providers[pool.ProviderName]
if !ok {
return runnerErrors.NewNotFoundError("invalid provider ID")
}
if err := provider.DeleteInstance(r.ctx, instance.ProviderID); err != nil {
return errors.Wrap(err, "removing instance")
}
if err := r.store.DeleteInstance(r.ctx, pool.ID, instance.ID); err != nil {
return errors.Wrap(err, "deleting instance from database")
}
return nil
}
func (r *Repository) addInstanceToProvider(instance params.Instance) error {
pool, ok := r.pools[instance.PoolID]
if !ok {
return runnerErrors.NewNotFoundError("invalid pool ID")
}
provider, ok := r.providers[pool.ProviderName]
if !ok {
return runnerErrors.NewNotFoundError("invalid provider ID")
}
labels := []string{}
for _, tag := range pool.Tags {
labels = append(labels, tag.Name)
}
labels = append(labels, r.getLabels()...)
tk, _, err := r.ghcli.Actions.CreateRegistrationToken(r.ctx, r.cfg.Owner, r.cfg.Name)
if err != nil {
return errors.Wrap(err, "creating runner token")
}
bootstrapArgs := params.BootstrapInstance{
Tools: r.tools,
RepoURL: r.githubURL(),
GithubRunnerAccessToken: *tk.Token,
CallbackURL: instance.CallbackURL,
InstanceToken: instance.CallbackToken,
OSArch: pool.OSArch,
Flavor: pool.Flavor,
Image: pool.Image,
Labels: labels,
}
providerInstance, err := provider.CreateInstance(r.ctx, bootstrapArgs)
if err != nil {
return errors.Wrap(err, "creating instance")
}
updateInstanceArgs := r.updateArgsFromProviderInstance(providerInstance)
if _, err := r.store.UpdateInstance(r.ctx, instance.ID, updateInstanceArgs); err != nil {
return errors.Wrap(err, "updating instance")
}
return nil
}
// TODO: add function to set runner status to idle when instance calls home on callback url
func (r *Repository) AddRunner(ctx context.Context, poolID string) error {
callbackToken, err := util.GetRandomString(32)
if err != nil {
return errors.Wrap(err, "fetching callbackToken")
}
pool, ok := r.pools[poolID]
if !ok {
return runnerErrors.NewNotFoundError("invalid provider ID")
}
name := fmt.Sprintf("runner-manager-%s", uuid.New())
createParams := params.CreateInstanceParams{
Name: name,
Pool: poolID,
Status: providerCommon.InstancePendingCreate,
RunnerStatus: providerCommon.RunnerPending,
OSArch: pool.OSArch,
OSType: pool.OSType,
CallbackToken: callbackToken,
CallbackURL: r.cfg.Internal.InstanceCallbackURL,
}
_, err = r.store.CreateInstance(r.ctx, poolID, createParams)
if err != nil {
return errors.Wrap(err, "creating instance")
}
return nil
}
func (r *Repository) loop() {
defer close(r.done)
// TODO: Consolidate runners on loop start. Provider runners must match runners
@ -112,25 +503,13 @@ func (r *Repository) loop() {
for {
select {
case job, ok := <-r.jobQueue:
if !ok {
// queue was closed. return.
return
}
// We handle jobs synchronously (for now)
switch job.Action {
case "queued":
// Create instance.
case "completed":
// Remove instance.
case "in_progress":
// update state
}
fmt.Println(job)
case <-time.After(5 * time.Second):
// consolidate.
r.consolidate()
case <-time.After(3 * time.Hour):
// Update tools cache.
if err := r.fetchTools(); err != nil {
log.Printf("failed to update tools for repo %s: %s", r.cfg.String(), err)
log.Printf("failed to update tools for repo %s/%s: %s", r.cfg.Owner, r.cfg.Name, err)
}
case <-r.ctx.Done():
// daemon is shutting down.
@ -142,30 +521,24 @@ func (r *Repository) loop() {
}
}
// addJobToQueue adds a new workflow job to the queue of jobs that need to be
// processed by this pool. Jobs are added by github webhooks, so it makes no sense
// to return an error when that happens. But we do need to log any error that comes
// up. The queue size is configurable. If we hit that limit, new jobs will be discarded
// and logged.
// TODO: setup a state pipeline that will send back updates to the runner and update the
// database as needed.
func (r *Repository) addJobToQueue(job params.WorkflowJob) {
select {
case r.jobQueue <- job:
case <-time.After(1 * time.Second):
log.Printf("timed out accepting job. Queue is full.")
}
}
func (r *Repository) WebhookSecret() string {
return r.cfg.WebhookSecret
}
func (r *Repository) HandleWorkflowJob(job params.WorkflowJob) error {
if job.Repository.FullName != r.cfg.String() {
return runnerErrors.NewBadRequestError("job not meant for this pool")
if job.Repository.Name != r.cfg.Name || job.Repository.Owner.Login != r.cfg.Owner {
return runnerErrors.NewBadRequestError("job not meant for this pool manager")
}
switch job.Action {
case "queued":
// Create instance in database and set it to pending create.
case "completed":
// Set instance in database to pending delete. Unassigned jobs will have
// an empty runner_name. There is nothing to to in that case.
case "in_progress":
// update instance workload state. Set job_id in instance state.
}
r.addJobToQueue(job)
return nil
}

View file

@ -0,0 +1,16 @@
package common
type InstanceStatus string
type RunnerStatus string
const (
InstanceRunning InstanceStatus = "running"
InstanceStopped InstanceStatus = "stopped"
InstancePendingDelete InstanceStatus = "pending_delete"
InstancePendingCreate InstanceStatus = "pending_create"
InstanceStatusUnknown InstanceStatus = "unknown"
RunnerIdle RunnerStatus = "idle"
RunnerPending RunnerStatus = "pending"
RunnerActive RunnerStatus = "active"
)

View file

@ -13,7 +13,6 @@ import (
"github.com/google/go-github/v43/github"
lxd "github.com/lxc/lxd/client"
"github.com/lxc/lxd/shared/api"
"github.com/pborman/uuid"
"github.com/pkg/errors"
"gopkg.in/yaml.v3"
)
@ -24,6 +23,7 @@ const (
// We look for this key in the config of the instances to determine if they are
// created by us or not.
controllerIDKeyName = "user.runner-controller-id"
poolIDKey = "user.runner-pool-id"
)
var (
@ -182,7 +182,7 @@ func (l *LXD) secureBootEnabled() string {
}
func (l *LXD) getCreateInstanceArgs(bootstrapParams params.BootstrapInstance) (api.InstancesPost, error) {
name := fmt.Sprintf("runner-manager-%s", uuid.New())
// name := fmt.Sprintf("runner-manager-%s", uuid.New())
profiles, err := l.getProfiles(bootstrapParams.Flavor)
if err != nil {
@ -204,7 +204,7 @@ func (l *LXD) getCreateInstanceArgs(bootstrapParams params.BootstrapInstance) (a
return api.InstancesPost{}, errors.Wrap(err, "getting tools")
}
cloudCfg, err := util.GetCloudConfig(bootstrapParams, tools, name)
cloudCfg, err := util.GetCloudConfig(bootstrapParams, tools, bootstrapParams.Name)
if err != nil {
return api.InstancesPost{}, errors.Wrap(err, "generating cloud-config")
}
@ -218,13 +218,14 @@ func (l *LXD) getCreateInstanceArgs(bootstrapParams params.BootstrapInstance) (a
"user.user-data": cloudCfg,
"security.secureboot": l.secureBootEnabled(),
controllerIDKeyName: l.controllerID,
poolIDKey: bootstrapParams.PoolID,
},
},
Source: api.InstanceSource{
Type: "image",
Fingerprint: image.Fingerprint,
},
Name: name,
Name: bootstrapParams.Name,
Type: api.InstanceTypeVM,
}
return args, nil
@ -307,7 +308,7 @@ func (l *LXD) DeleteInstance(ctx context.Context, instance string) error {
}
// ListInstances will list all instances for a provider.
func (l *LXD) ListInstances(ctx context.Context) ([]params.Instance, error) {
func (l *LXD) ListInstances(ctx context.Context, poolID string) ([]params.Instance, error) {
instances, err := l.cli.GetInstancesFull(api.InstanceTypeAny)
if err != nil {
return []params.Instance{}, errors.Wrap(err, "fetching instances")
@ -317,6 +318,13 @@ func (l *LXD) ListInstances(ctx context.Context) ([]params.Instance, error) {
for _, instance := range instances {
if id, ok := instance.ExpandedConfig[controllerIDKeyName]; ok && id == l.controllerID {
if poolID != "" {
id := instance.ExpandedConfig[poolID]
if id != poolID {
// Pool ID was specified. Filter out instances belonging to other pools.
continue
}
}
ret = append(ret, lxdInstanceToAPIInstance(&instance))
}
}
@ -326,7 +334,7 @@ func (l *LXD) ListInstances(ctx context.Context) ([]params.Instance, error) {
// RemoveAllInstances will remove all instances created by this provider.
func (l *LXD) RemoveAllInstances(ctx context.Context) error {
instances, err := l.ListInstances(ctx)
instances, err := l.ListInstances(ctx, "")
if err != nil {
return errors.Wrap(err, "fetching instance list")
}

View file

@ -7,6 +7,7 @@ import (
"log"
"runner-manager/config"
"runner-manager/params"
"runner-manager/runner/providers/common"
"runner-manager/util"
"strings"
@ -31,14 +32,17 @@ func lxdInstanceToAPIInstance(instance *api.InstanceFull) params.Instance {
}
state := instance.State
addresses := []string{}
addresses := []params.Address{}
if state.Network != nil {
for _, details := range state.Network {
for _, addr := range details.Addresses {
if addr.Scope != "global" {
continue
}
addresses = append(addresses, addr.Address)
addresses = append(addresses, params.Address{
Address: addr.Address,
Type: params.PublicAddress,
})
}
}
}
@ -56,7 +60,18 @@ func lxdInstanceToAPIInstance(instance *api.InstanceFull) params.Instance {
OSName: strings.ToLower(os),
OSVersion: osRelease,
Addresses: addresses,
Status: state.Status,
Status: lxdStatusToProviderStatus(state.Status),
}
}
func lxdStatusToProviderStatus(status string) common.InstanceStatus {
switch status {
case "Running":
return common.InstanceRunning
case "Stopped":
return common.InstanceStopped
default:
return common.InstanceStatusUnknown
}
}

View file

@ -9,16 +9,20 @@ import (
"encoding/json"
"hash"
"io/ioutil"
"log"
"os"
"path/filepath"
"strings"
"sync"
"runner-manager/config"
"runner-manager/database"
dbCommon "runner-manager/database/common"
gErrors "runner-manager/errors"
"runner-manager/params"
"runner-manager/runner/common"
"runner-manager/runner/providers"
"runner-manager/util"
"strings"
"sync"
"github.com/google/go-github/v43/github"
"github.com/pkg/errors"
@ -26,7 +30,7 @@ import (
)
func NewRunner(ctx context.Context, cfg config.Config) (*Runner, error) {
ghc, err := util.GithubClientFromConfig(ctx, cfg.Github)
ghc, err := util.GithubClient(ctx, cfg.Github.OAuth2Token)
if err != nil {
return nil, errors.Wrap(err, "getting github client")
}
@ -35,10 +39,15 @@ func NewRunner(ctx context.Context, cfg config.Config) (*Runner, error) {
if err != nil {
return nil, errors.Wrap(err, "loading providers")
}
db, err := database.NewDatabase(ctx, cfg.Database)
if err != nil {
log.Fatal(err)
}
runner := &Runner{
ctx: ctx,
config: cfg,
db: db,
ghc: ghc,
providers: providers,
}
@ -55,6 +64,7 @@ type Runner struct {
ctx context.Context
ghc *github.Client
db dbCommon.Store
controllerID string
@ -64,10 +74,29 @@ type Runner struct {
providers map[string]common.Provider
}
func (r *Runner) findRepoPool(name string) (common.PoolManager, error) {
if pool, ok := r.repositories[name]; ok {
return pool, nil
}
func (r *Runner) loadPools() error {
r.mux.Lock()
defer r.mux.Unlock()
// repos, err := r.db.ListRepositories(r.ctx)
// if err != nil {
// return errors.Wrap(err, "fetching repositories")
// }
return nil
}
func (r *Runner) findRepoPool(owner, name string) (common.PoolManager, error) {
r.mux.Lock()
defer r.mux.Unlock()
// key := fmt.Sprintf("%s/%s", owner, name)
// if repo, ok := r.repositories[key]; ok {
// return pool, nil
// }
// repo, err := r.db.GetRepository(r.ctx, owner, name)
// r.repositories[key] = repo
return nil, errors.Wrapf(gErrors.ErrNotFound, "repository %s not configured", name)
}
@ -132,17 +161,14 @@ func (r *Runner) DispatchWorkflowJob(hookTargetType, signature string, jobData [
return errors.Wrapf(gErrors.ErrBadRequest, "invalid job data: %s", err)
}
var entity string
var pool common.PoolManager
var poolManager common.PoolManager
var err error
switch HookTargetType(hookTargetType) {
case RepoHook:
entity = job.Repository.FullName
pool, err = r.findRepoPool(entity)
poolManager, err = r.findRepoPool(job.Repository.Owner.Login, job.Repository.Name)
case OrganizationHook:
entity = job.Organization.Login
pool, err = r.findOrgPool(entity)
poolManager, err = r.findOrgPool(job.Organization.Login)
default:
return gErrors.NewBadRequestError("cannot handle hook target type %s", hookTargetType)
}
@ -150,12 +176,12 @@ func (r *Runner) DispatchWorkflowJob(hookTargetType, signature string, jobData [
if err != nil {
// We don't have a repository or organization configured that
// can handle this workflow job.
return errors.Wrap(err, "fetching pool")
return errors.Wrap(err, "fetching poolManager")
}
// We found a pool. Validate the webhook job. If a secret is configured,
// we make sure that the source of this workflow job is valid.
secret := pool.WebhookSecret()
secret := poolManager.WebhookSecret()
if err := r.validateHookBody(signature, secret, jobData); err != nil {
return errors.Wrap(err, "validating webhook data")
}

View file

@ -123,9 +123,9 @@ func OSToOSType(os string) (config.OSType, error) {
return osType, nil
}
func GithubClientFromConfig(ctx context.Context, cfg config.Github) (*github.Client, error) {
func GithubClient(ctx context.Context, token string) (*github.Client, error) {
ts := oauth2.StaticTokenSource(
&oauth2.Token{AccessToken: cfg.OAuth2Token},
&oauth2.Token{AccessToken: token},
)
tc := oauth2.NewClient(ctx, ts)