Add ScaleSet models, functions and types

Signed-off-by: Gabriel Adrian Samfira <gsamfira@cloudbasesolutions.com>
This commit is contained in:
Gabriel Adrian Samfira 2025-04-08 09:15:54 +00:00
parent 5ba53adf84
commit 85eac363d5
9 changed files with 772 additions and 12 deletions

View file

@ -288,6 +288,25 @@ func (s *sqlDatabase) ListPoolInstances(_ context.Context, poolID string) ([]par
return ret, nil
}
func (s *sqlDatabase) ListScaleSetInstances(_ context.Context, scalesetID uint) ([]params.Instance, error) {
var instances []Instance
query := s.conn.Model(&Instance{}).Preload("Job").Where("scale_set_id = ?", scalesetID)
if err := query.Find(&instances); err.Error != nil {
return nil, errors.Wrap(err.Error, "fetching instances")
}
var err error
ret := make([]params.Instance, len(instances))
for idx, inst := range instances {
ret[idx], err = s.sqlToParamsInstance(inst)
if err != nil {
return nil, errors.Wrap(err, "converting instance")
}
}
return ret, nil
}
func (s *sqlDatabase) ListAllInstances(_ context.Context) ([]params.Instance, error) {
var instances []Instance

View file

@ -86,6 +86,54 @@ type Pool struct {
Priority uint `gorm:"index:idx_pool_priority"`
}
// ScaleSet represents a github scale set. Scale sets are almost identical to pools with a few
// notable exceptions:
// - Labels are no longer relevant
// - Workflows will use the scaleset name to target runners.
// - A scale set is a stand alone unit. If a workflow targets a scale set, no other runner will pick up that job.
type ScaleSet struct {
gorm.Model
// ScaleSetID is the github ID of the scale set. This field may not be set if
// the scale set was ceated in GARM but has not yet been created in GitHub.
ScaleSetID int `gorm:"index:idx_scale_set"`
Name string `gorm:"index:idx_name"`
DisableUpdate bool
// State stores the provisioning state of the scale set in GitHub
State params.ScaleSetState
// ExtendedState stores a more detailed message regarding the State.
// If an error occurs, the reason for the error will be stored here.
ExtendedState string
ProviderName string
RunnerPrefix string
MaxRunners uint
MinIdleRunners uint
RunnerBootstrapTimeout uint
Image string
Flavor string
OSType commonParams.OSType
OSArch commonParams.OSArch
Enabled bool
// ExtraSpecs is an opaque json that gets sent to the provider
// as part of the bootstrap params for instances. It can contain
// any kind of data needed by providers.
ExtraSpecs datatypes.JSON
GitHubRunnerGroup string
RepoID *uuid.UUID `gorm:"index"`
Repository Repository `gorm:"foreignKey:RepoID;"`
OrgID *uuid.UUID `gorm:"index"`
Organization Organization `gorm:"foreignKey:OrgID"`
EnterpriseID *uuid.UUID `gorm:"index"`
Enterprise Enterprise `gorm:"foreignKey:EnterpriseID"`
Instances []Instance `gorm:"foreignKey:ScaleSetFkID"`
}
type Repository struct {
Base
@ -98,6 +146,7 @@ type Repository struct {
Name string `gorm:"index:idx_owner_nocase,unique,collate:nocase"`
WebhookSecret []byte
Pools []Pool `gorm:"foreignKey:RepoID"`
ScaleSets []ScaleSet `gorm:"foreignKey:RepoID"`
Jobs []WorkflowJob `gorm:"foreignKey:RepoID;constraint:OnDelete:SET NULL"`
PoolBalancerType params.PoolBalancerType `gorm:"type:varchar(64)"`
@ -116,6 +165,7 @@ type Organization struct {
Name string `gorm:"index:idx_org_name_nocase,collate:nocase"`
WebhookSecret []byte
Pools []Pool `gorm:"foreignKey:OrgID"`
ScaleSet []ScaleSet `gorm:"foreignKey:OrgID"`
Jobs []WorkflowJob `gorm:"foreignKey:OrgID;constraint:OnDelete:SET NULL"`
PoolBalancerType params.PoolBalancerType `gorm:"type:varchar(64)"`
@ -134,6 +184,7 @@ type Enterprise struct {
Name string `gorm:"index:idx_ent_name_nocase,collate:nocase"`
WebhookSecret []byte
Pools []Pool `gorm:"foreignKey:EnterpriseID"`
ScaleSet []ScaleSet `gorm:"foreignKey:EnterpriseID"`
Jobs []WorkflowJob `gorm:"foreignKey:EnterpriseID;constraint:OnDelete:SET NULL"`
PoolBalancerType params.PoolBalancerType `gorm:"type:varchar(64)"`
@ -187,6 +238,9 @@ type Instance struct {
PoolID uuid.UUID
Pool Pool `gorm:"foreignKey:PoolID"`
ScaleSetFkID *uint
ScaleSet ScaleSet `gorm:"foreignKey:ScaleSetFkID"`
StatusMessages []InstanceStatusUpdate `gorm:"foreignKey:InstanceID;constraint:OnDelete:CASCADE,OnUpdate:CASCADE;"`
Job *WorkflowJob `gorm:"foreignKey:InstanceID;constraint:OnDelete:CASCADE,OnUpdate:CASCADE;"`

381
database/sql/scalesets.go Normal file
View file

@ -0,0 +1,381 @@
// Copyright 2024 Cloudbase Solutions SRL
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License. You may obtain
// a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations
// under the License.
package sql
import (
"context"
"fmt"
runnerErrors "github.com/cloudbase/garm-provider-common/errors"
"github.com/cloudbase/garm/database/common"
"github.com/cloudbase/garm/params"
"github.com/google/uuid"
"github.com/pkg/errors"
"gorm.io/datatypes"
"gorm.io/gorm"
)
func (s *sqlDatabase) ListAllScaleSets(_ context.Context) ([]params.ScaleSet, error) {
var scaleSets []ScaleSet
q := s.conn.Model(&ScaleSet{}).
Preload("Organization").
Preload("Repository").
Preload("Enterprise").
Omit("extra_specs").
Omit("status_messages").
Find(&scaleSets)
if q.Error != nil {
return nil, errors.Wrap(q.Error, "fetching all scale sets")
}
ret := make([]params.ScaleSet, len(scaleSets))
var err error
for idx, val := range scaleSets {
ret[idx], err = s.sqlToCommonScaleSet(val)
if err != nil {
return nil, errors.Wrap(err, "converting scale sets")
}
}
return ret, nil
}
func (s *sqlDatabase) CreateEntityScaleSet(_ context.Context, entity params.GithubEntity, param params.CreateScaleSetParams) (scaleSet params.ScaleSet, err error) {
if err := param.Validate(); err != nil {
return params.ScaleSet{}, fmt.Errorf("failed to validate create params: %w", err)
}
defer func() {
if err == nil {
s.sendNotify(common.ScaleSetEntityType, common.CreateOperation, scaleSet)
}
}()
newScaleSet := ScaleSet{
Name: param.Name,
ScaleSetID: param.ScaleSetID,
DisableUpdate: param.DisableUpdate,
ProviderName: param.ProviderName,
RunnerPrefix: param.GetRunnerPrefix(),
MaxRunners: param.MaxRunners,
MinIdleRunners: param.MinIdleRunners,
RunnerBootstrapTimeout: param.RunnerBootstrapTimeout,
Image: param.Image,
Flavor: param.Flavor,
OSType: param.OSType,
OSArch: param.OSArch,
Enabled: param.Enabled,
GitHubRunnerGroup: param.GitHubRunnerGroup,
State: params.ScaleSetPendingCreate,
}
if len(param.ExtraSpecs) > 0 {
newScaleSet.ExtraSpecs = datatypes.JSON(param.ExtraSpecs)
}
entityID, err := uuid.Parse(entity.ID)
if err != nil {
return params.ScaleSet{}, errors.Wrap(runnerErrors.ErrBadRequest, "parsing id")
}
switch entity.EntityType {
case params.GithubEntityTypeRepository:
newScaleSet.RepoID = &entityID
case params.GithubEntityTypeOrganization:
newScaleSet.OrgID = &entityID
case params.GithubEntityTypeEnterprise:
newScaleSet.EnterpriseID = &entityID
}
err = s.conn.Transaction(func(tx *gorm.DB) error {
if err := s.hasGithubEntity(tx, entity.EntityType, entity.ID); err != nil {
return errors.Wrap(err, "checking entity existence")
}
q := tx.Create(&newScaleSet)
if q.Error != nil {
return errors.Wrap(q.Error, "creating scale set")
}
return nil
})
if err != nil {
return params.ScaleSet{}, err
}
dbScaleSet, err := s.getScaleSetByID(s.conn, newScaleSet.ID, "Instances", "Enterprise", "Organization", "Repository")
if err != nil {
return params.ScaleSet{}, errors.Wrap(err, "fetching scale set")
}
return s.sqlToCommonScaleSet(dbScaleSet)
}
func (s *sqlDatabase) listEntityScaleSets(tx *gorm.DB, entityType params.GithubEntityType, entityID string, preload ...string) ([]ScaleSet, error) {
if _, err := uuid.Parse(entityID); err != nil {
return nil, errors.Wrap(runnerErrors.ErrBadRequest, "parsing id")
}
if err := s.hasGithubEntity(tx, entityType, entityID); err != nil {
return nil, errors.Wrap(err, "checking entity existence")
}
var preloadEntity string
var fieldName string
switch entityType {
case params.GithubEntityTypeRepository:
fieldName = entityTypeRepoName
preloadEntity = "Repository"
case params.GithubEntityTypeOrganization:
fieldName = entityTypeOrgName
preloadEntity = "Organization"
case params.GithubEntityTypeEnterprise:
fieldName = entityTypeEnterpriseName
preloadEntity = "Enterprise"
default:
return nil, fmt.Errorf("invalid entityType: %v", entityType)
}
q := tx
q = q.Preload(preloadEntity)
if len(preload) > 0 {
for _, item := range preload {
q = q.Preload(item)
}
}
var scaleSets []ScaleSet
condition := fmt.Sprintf("%s = ?", fieldName)
err := q.Model(&ScaleSet{}).
Where(condition, entityID).
Omit("extra_specs").
Omit("status_messages").
Find(&scaleSets).Error
if err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return []ScaleSet{}, nil
}
return nil, errors.Wrap(err, "fetching scale sets")
}
return scaleSets, nil
}
func (s *sqlDatabase) ListEntityScaleSets(_ context.Context, entity params.GithubEntity) ([]params.ScaleSet, error) {
scaleSets, err := s.listEntityScaleSets(s.conn, entity.EntityType, entity.ID)
if err != nil {
return nil, errors.Wrap(err, "fetching scale sets")
}
ret := make([]params.ScaleSet, len(scaleSets))
for idx, set := range scaleSets {
ret[idx], err = s.sqlToCommonScaleSet(set)
if err != nil {
return nil, errors.Wrap(err, "conbverting scale set")
}
}
return ret, nil
}
func (s *sqlDatabase) UpdateEntityScaleSet(_ context.Context, entity params.GithubEntity, scaleSetID uint, param params.UpdateScaleSetParams, callback func(old, new params.ScaleSet) error) (updatedScaleSet params.ScaleSet, err error) {
defer func() {
if err == nil {
s.sendNotify(common.ScaleSetEntityType, common.UpdateOperation, updatedScaleSet)
}
}()
err = s.conn.Transaction(func(tx *gorm.DB) error {
scaleSet, err := s.getEntityScaleSet(tx, entity.EntityType, entity.ID, scaleSetID, "Instances")
if err != nil {
return errors.Wrap(err, "fetching scale set")
}
old, err := s.sqlToCommonScaleSet(scaleSet)
if err != nil {
return errors.Wrap(err, "converting scale set")
}
updatedScaleSet, err = s.updateScaleSet(tx, scaleSet, param)
if err != nil {
return errors.Wrap(err, "updating scale set")
}
if callback != nil {
if err := callback(old, updatedScaleSet); err != nil {
return errors.Wrap(err, "executing update callback")
}
}
return nil
})
if err != nil {
return params.ScaleSet{}, err
}
return updatedScaleSet, nil
}
func (s *sqlDatabase) getEntityScaleSet(tx *gorm.DB, entityType params.GithubEntityType, entityID string, scaleSetID uint, preload ...string) (ScaleSet, error) {
if entityID == "" {
return ScaleSet{}, errors.Wrap(runnerErrors.ErrBadRequest, "missing entity id")
}
if scaleSetID == 0 {
return ScaleSet{}, errors.Wrap(runnerErrors.ErrBadRequest, "missing scaleset id")
}
var fieldName string
var entityField string
switch entityType {
case params.GithubEntityTypeRepository:
fieldName = entityTypeRepoName
entityField = "Repository"
case params.GithubEntityTypeOrganization:
fieldName = entityTypeOrgName
entityField = "Organization"
case params.GithubEntityTypeEnterprise:
fieldName = entityTypeEnterpriseName
entityField = "Enterprise"
default:
return ScaleSet{}, fmt.Errorf("invalid entityType: %v", entityType)
}
q := tx
q = q.Preload(entityField)
if len(preload) > 0 {
for _, item := range preload {
q = q.Preload(item)
}
}
var scaleSet ScaleSet
condition := fmt.Sprintf("id = ? and %s = ?", fieldName)
err := q.Model(&ScaleSet{}).
Where(condition, scaleSetID, entityID).
First(&scaleSet).Error
if err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return ScaleSet{}, errors.Wrap(runnerErrors.ErrNotFound, "finding scale set")
}
return ScaleSet{}, errors.Wrap(err, "fetching scale set")
}
return scaleSet, nil
}
func (s *sqlDatabase) updateScaleSet(tx *gorm.DB, scaleSet ScaleSet, param params.UpdateScaleSetParams) (params.ScaleSet, error) {
if param.Enabled != nil && scaleSet.Enabled != *param.Enabled {
scaleSet.Enabled = *param.Enabled
}
if param.State != nil && *param.State != scaleSet.State {
scaleSet.State = *param.State
}
if param.ExtendedState != nil && *param.ExtendedState != scaleSet.ExtendedState {
scaleSet.ExtendedState = *param.ExtendedState
}
if param.Name != "" {
scaleSet.Name = param.Name
}
if param.GitHubRunnerGroup != nil && *param.GitHubRunnerGroup != "" {
scaleSet.GitHubRunnerGroup = *param.GitHubRunnerGroup
}
if param.Flavor != "" {
scaleSet.Flavor = param.Flavor
}
if param.Image != "" {
scaleSet.Image = param.Image
}
if param.Prefix != "" {
scaleSet.RunnerPrefix = param.Prefix
}
if param.MaxRunners != nil {
scaleSet.MaxRunners = *param.MaxRunners
}
if param.MinIdleRunners != nil {
scaleSet.MinIdleRunners = *param.MinIdleRunners
}
if param.OSArch != "" {
scaleSet.OSArch = param.OSArch
}
if param.OSType != "" {
scaleSet.OSType = param.OSType
}
if param.ExtraSpecs != nil {
scaleSet.ExtraSpecs = datatypes.JSON(param.ExtraSpecs)
}
if param.RunnerBootstrapTimeout != nil && *param.RunnerBootstrapTimeout > 0 {
scaleSet.RunnerBootstrapTimeout = *param.RunnerBootstrapTimeout
}
if param.GitHubRunnerGroup != nil {
scaleSet.GitHubRunnerGroup = *param.GitHubRunnerGroup
}
if q := tx.Save(&scaleSet); q.Error != nil {
return params.ScaleSet{}, errors.Wrap(q.Error, "saving database entry")
}
return s.sqlToCommonScaleSet(scaleSet)
}
func (s *sqlDatabase) GetScaleSetByID(_ context.Context, scaleSet uint) (params.ScaleSet, error) {
set, err := s.getScaleSetByID(s.conn, scaleSet, "Instances", "Enterprise", "Organization", "Repository")
if err != nil {
return params.ScaleSet{}, errors.Wrap(err, "fetching scale set by ID")
}
return s.sqlToCommonScaleSet(set)
}
func (s *sqlDatabase) DeleteScaleSetByID(ctx context.Context, scaleSetID uint) (err error) {
var scaleSet params.ScaleSet
defer func() {
if err == nil && scaleSet.ID != 0 {
s.sendNotify(common.ScaleSetEntityType, common.DeleteOperation, scaleSet)
}
}()
err = s.conn.Transaction(func(tx *gorm.DB) error {
dbSet, err := s.getScaleSetByID(tx, scaleSetID, "Instances")
if err != nil {
return errors.Wrap(err, "fetching scale set")
}
if len(dbSet.Instances) > 0 {
return runnerErrors.NewBadRequestError("cannot delete scaleset with runners")
}
scaleSet, err = s.sqlToCommonScaleSet(dbSet)
if err != nil {
return errors.Wrap(err, "converting scale set")
}
if q := tx.Unscoped().Delete(&dbSet); q.Error != nil {
return errors.Wrap(q.Error, "deleting scale set")
}
return nil
})
if err != nil {
return errors.Wrap(err, "removing scale set")
}
return nil
}

View file

@ -428,6 +428,7 @@ func (s *sqlDatabase) migrateDB() error {
&Instance{},
&ControllerInfo{},
&WorkflowJob{},
&ScaleSet{},
); err != nil {
return errors.Wrap(err, "running auto migrate")
}

View file

@ -73,6 +73,10 @@ func (s *sqlDatabase) sqlToParamsInstance(instance Instance) (params.Instance, e
AditionalLabels: labels,
}
if instance.ScaleSetFkID != nil {
ret.ScaleSetID = *instance.ScaleSetFkID
}
if instance.Job != nil {
paramJob, err := sqlWorkflowJobToParamsJob(*instance.Job)
if err != nil {
@ -265,6 +269,60 @@ func (s *sqlDatabase) sqlToCommonPool(pool Pool) (params.Pool, error) {
return ret, nil
}
func (s *sqlDatabase) sqlToCommonScaleSet(scaleSet ScaleSet) (params.ScaleSet, error) {
ret := params.ScaleSet{
ID: scaleSet.ID,
ScaleSetID: scaleSet.ScaleSetID,
Name: scaleSet.Name,
DisableUpdate: scaleSet.DisableUpdate,
ProviderName: scaleSet.ProviderName,
MaxRunners: scaleSet.MaxRunners,
MinIdleRunners: scaleSet.MinIdleRunners,
RunnerPrefix: params.RunnerPrefix{
Prefix: scaleSet.RunnerPrefix,
},
Image: scaleSet.Image,
Flavor: scaleSet.Flavor,
OSArch: scaleSet.OSArch,
OSType: scaleSet.OSType,
Enabled: scaleSet.Enabled,
Instances: make([]params.Instance, len(scaleSet.Instances)),
RunnerBootstrapTimeout: scaleSet.RunnerBootstrapTimeout,
ExtraSpecs: json.RawMessage(scaleSet.ExtraSpecs),
GitHubRunnerGroup: scaleSet.GitHubRunnerGroup,
State: scaleSet.State,
ExtendedState: scaleSet.ExtendedState,
}
if scaleSet.RepoID != nil {
ret.RepoID = scaleSet.RepoID.String()
if scaleSet.Repository.Owner != "" && scaleSet.Repository.Name != "" {
ret.RepoName = fmt.Sprintf("%s/%s", scaleSet.Repository.Owner, scaleSet.Repository.Name)
}
}
if scaleSet.OrgID != nil && scaleSet.Organization.Name != "" {
ret.OrgID = scaleSet.OrgID.String()
ret.OrgName = scaleSet.Organization.Name
}
if scaleSet.EnterpriseID != nil && scaleSet.Enterprise.Name != "" {
ret.EnterpriseID = scaleSet.EnterpriseID.String()
ret.EnterpriseName = scaleSet.Enterprise.Name
}
var err error
for idx, inst := range scaleSet.Instances {
ret.Instances[idx], err = s.sqlToParamsInstance(inst)
if err != nil {
return params.ScaleSet{}, errors.Wrap(err, "converting instance")
}
}
return ret, nil
}
func (s *sqlDatabase) sqlToCommonTags(tag Tag) params.Tag {
return params.Tag{
ID: tag.ID.String(),
@ -452,6 +510,26 @@ func (s *sqlDatabase) getPoolByID(tx *gorm.DB, poolID string, preload ...string)
return pool, nil
}
func (s *sqlDatabase) getScaleSetByID(tx *gorm.DB, scaleSetID uint, preload ...string) (ScaleSet, error) {
var scaleSet ScaleSet
q := tx.Model(&ScaleSet{})
if len(preload) > 0 {
for _, item := range preload {
q = q.Preload(item)
}
}
q = q.Where("id = ?", scaleSetID).First(&scaleSet)
if q.Error != nil {
if errors.Is(q.Error, gorm.ErrRecordNotFound) {
return ScaleSet{}, runnerErrors.ErrNotFound
}
return ScaleSet{}, errors.Wrap(q.Error, "fetching scale set from database")
}
return scaleSet, nil
}
func (s *sqlDatabase) hasGithubEntity(tx *gorm.DB, entityType params.GithubEntityType, entityID string) error {
u, err := uuid.Parse(entityID)
if err != nil {