WiP
Signed-off-by: Gabriel Adrian Samfira <gsamfira@cloudbasesolutions.com>
This commit is contained in:
parent
020210d6ad
commit
436fd7746f
7 changed files with 301 additions and 7 deletions
|
|
@ -97,7 +97,7 @@ func (a *APIController) ListScaleSetInstancesHandler(w http.ResponseWriter, r *h
|
|||
}
|
||||
return
|
||||
}
|
||||
id, err := strconv.ParseUint(scalesetID, 10, 64)
|
||||
id, err := strconv.ParseUint(scalesetID, 10, 32)
|
||||
if err != nil {
|
||||
slog.With(slog.Any("error", err)).ErrorContext(ctx, "failed to parse id")
|
||||
handleError(ctx, w, gErrors.ErrBadRequest)
|
||||
|
|
|
|||
|
|
@ -79,7 +79,7 @@ func (a *APIController) GetScaleSetByIDHandler(w http.ResponseWriter, r *http.Re
|
|||
}
|
||||
return
|
||||
}
|
||||
id, err := strconv.ParseUint(scaleSetID, 10, 64)
|
||||
id, err := strconv.ParseUint(scaleSetID, 10, 32)
|
||||
if err != nil {
|
||||
slog.With(slog.Any("error", err)).ErrorContext(ctx, "failed to parse id")
|
||||
handleError(ctx, w, gErrors.ErrBadRequest)
|
||||
|
|
@ -130,7 +130,7 @@ func (a *APIController) DeleteScaleSetByIDHandler(w http.ResponseWriter, r *http
|
|||
return
|
||||
}
|
||||
|
||||
id, err := strconv.ParseUint(scalesetID, 10, 64)
|
||||
id, err := strconv.ParseUint(scalesetID, 10, 32)
|
||||
if err != nil {
|
||||
slog.With(slog.Any("error", err)).ErrorContext(ctx, "failed to parse id")
|
||||
handleError(ctx, w, gErrors.ErrBadRequest)
|
||||
|
|
@ -183,7 +183,7 @@ func (a *APIController) UpdateScaleSetByIDHandler(w http.ResponseWriter, r *http
|
|||
return
|
||||
}
|
||||
|
||||
id, err := strconv.ParseUint(scalesetID, 10, 64)
|
||||
id, err := strconv.ParseUint(scalesetID, 10, 32)
|
||||
if err != nil {
|
||||
slog.With(slog.Any("error", err)).ErrorContext(ctx, "failed to parse id")
|
||||
handleError(ctx, w, gErrors.ErrBadRequest)
|
||||
|
|
|
|||
|
|
@ -86,6 +86,17 @@ type Pool struct {
|
|||
Priority uint `gorm:"index:idx_pool_priority"`
|
||||
}
|
||||
|
||||
type ScaleSetEvent struct {
|
||||
gorm.Model
|
||||
|
||||
EventType params.EventType
|
||||
EventLevel params.EventLevel
|
||||
Message string `gorm:"type:text"`
|
||||
|
||||
ScaleSetID uint `gorm:"index:idx_scale_set_event"`
|
||||
ScaleSet ScaleSet `gorm:"foreignKey:ScaleSetID"`
|
||||
}
|
||||
|
||||
// ScaleSet represents a github scale set. Scale sets are almost identical to pools with a few
|
||||
// notable exceptions:
|
||||
// - Labels are no longer relevant
|
||||
|
|
@ -135,7 +146,11 @@ type ScaleSet struct {
|
|||
EnterpriseID *uuid.UUID `gorm:"index"`
|
||||
Enterprise Enterprise `gorm:"foreignKey:EnterpriseID"`
|
||||
|
||||
Instances []Instance `gorm:"foreignKey:ScaleSetFkID"`
|
||||
Status string
|
||||
StatusReason string `gorm:"type:text"`
|
||||
|
||||
Instances []Instance `gorm:"foreignKey:ScaleSetFkID"`
|
||||
Events []ScaleSetEvent `gorm:"foreignKey:ScaleSetID;constraint:OnDelete:CASCADE,OnUpdate:CASCADE;"`
|
||||
}
|
||||
|
||||
type RepositoryEvent struct {
|
||||
|
|
|
|||
|
|
@ -432,6 +432,7 @@ func (s *sqlDatabase) migrateDB() error {
|
|||
&ControllerInfo{},
|
||||
&WorkflowJob{},
|
||||
&ScaleSet{},
|
||||
&ScaleSetEvent{},
|
||||
); err != nil {
|
||||
return errors.Wrap(err, "running auto migrate")
|
||||
}
|
||||
|
|
|
|||
|
|
@ -634,6 +634,40 @@ func (s *sqlDatabase) GetGithubEntity(_ context.Context, entityType params.Githu
|
|||
return entity, nil
|
||||
}
|
||||
|
||||
func (s *sqlDatabase) AddScaleSetEvent(ctx context.Context, scaleSetID uint, event params.EventType, eventLevel params.EventLevel, statusMessage string, maxEvents int) error {
|
||||
scaleSet, err := s.GetScaleSetByID(ctx, scaleSetID)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "updating instance")
|
||||
}
|
||||
|
||||
msg := InstanceStatusUpdate{
|
||||
Message: statusMessage,
|
||||
EventType: event,
|
||||
EventLevel: eventLevel,
|
||||
}
|
||||
|
||||
if err := s.conn.Model(&scaleSet).Association("Events").Append(&msg); err != nil {
|
||||
return errors.Wrap(err, "adding status message")
|
||||
}
|
||||
|
||||
if maxEvents > 0 {
|
||||
var latestEvents []ScaleSetEvent
|
||||
q := s.conn.Model(&ScaleSetEvent{}).
|
||||
Limit(maxEvents).Order("id desc").
|
||||
Where("scale_set_id = ?", scaleSetID).Find(&latestEvents)
|
||||
if q.Error != nil {
|
||||
return errors.Wrap(q.Error, "fetching latest events")
|
||||
}
|
||||
if len(latestEvents) == maxEvents {
|
||||
lastInList := latestEvents[len(latestEvents)-1]
|
||||
if err := s.conn.Where("scale_set_id = ? and id < ?", scaleSetID, lastInList.ID).Unscoped().Delete(&ScaleSetEvent{}).Error; err != nil {
|
||||
return errors.Wrap(err, "deleting old events")
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *sqlDatabase) addRepositoryEvent(ctx context.Context, repoID string, event params.EventType, eventLevel params.EventLevel, statusMessage string, maxEvents int) error {
|
||||
repo, err := s.GetRepositoryByID(ctx, repoID)
|
||||
if err != nil {
|
||||
|
|
|
|||
|
|
@ -3,10 +3,12 @@ package provider
|
|||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"sync"
|
||||
|
||||
dbCommon "github.com/cloudbase/garm/database/common"
|
||||
"github.com/cloudbase/garm/database/watcher"
|
||||
"github.com/cloudbase/garm/params"
|
||||
"github.com/cloudbase/garm/runner/common"
|
||||
)
|
||||
|
||||
|
|
@ -32,12 +34,51 @@ type provider struct {
|
|||
store dbCommon.Store
|
||||
|
||||
providers map[string]common.Provider
|
||||
// A cache of all scale sets kept updated by the watcher.
|
||||
// This helps us avoid a bunch of queries to the database.
|
||||
scaleSets map[uint]params.ScaleSet
|
||||
runners map[string]params.Instance
|
||||
|
||||
mux sync.Mutex
|
||||
running bool
|
||||
quit chan struct{}
|
||||
}
|
||||
|
||||
func (p *provider) loadAllScaleSets() error {
|
||||
p.mux.Lock()
|
||||
defer p.mux.Unlock()
|
||||
|
||||
scaleSets, err := p.store.ListAllScaleSets(p.ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("fetching scale sets: %w", err)
|
||||
}
|
||||
|
||||
for _, scaleSet := range scaleSets {
|
||||
p.scaleSets[scaleSet.ID] = scaleSet
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// loadAllRunners loads all runners from the database. At this stage we only
|
||||
// care about runners created by scale sets, but in the future, we will migrate
|
||||
// the pool manager to the same model.
|
||||
func (p *provider) loadAllRunners() error {
|
||||
p.mux.Lock()
|
||||
defer p.mux.Unlock()
|
||||
|
||||
runners, err := p.store.ListAllInstances(p.ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("fetching runners: %w", err)
|
||||
}
|
||||
|
||||
for _, runner := range runners {
|
||||
p.runners[runner.Name] = runner
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *provider) Start() error {
|
||||
p.mux.Lock()
|
||||
defer p.mux.Unlock()
|
||||
|
|
@ -46,6 +87,14 @@ func (p *provider) Start() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
if err := p.loadAllScaleSets(); err != nil {
|
||||
return fmt.Errorf("loading all scale sets: %w", err)
|
||||
}
|
||||
|
||||
if err := p.loadAllRunners(); err != nil {
|
||||
return fmt.Errorf("loading all runners: %w", err)
|
||||
}
|
||||
|
||||
consumer, err := watcher.RegisterConsumer(
|
||||
p.ctx, p.consumerID, composeProviderWatcher())
|
||||
if err != nil {
|
||||
|
|
@ -55,6 +104,8 @@ func (p *provider) Start() error {
|
|||
|
||||
p.quit = make(chan struct{})
|
||||
p.running = true
|
||||
go p.loop()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
@ -71,3 +122,75 @@ func (p *provider) Stop() error {
|
|||
p.running = false
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *provider) loop() {
|
||||
defer p.Stop()
|
||||
for {
|
||||
select {
|
||||
case payload := <-p.consumer.Watch():
|
||||
slog.InfoContext(p.ctx, "received payload", slog.Any("payload", payload))
|
||||
go p.handleWatcherEvent(payload)
|
||||
case <-p.ctx.Done():
|
||||
return
|
||||
case <-p.quit:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (p *provider) handleWatcherEvent(payload dbCommon.ChangePayload) {
|
||||
switch payload.EntityType {
|
||||
case dbCommon.ScaleSetEntityType:
|
||||
p.handleScaleSetEvent(payload)
|
||||
case dbCommon.InstanceEntityType:
|
||||
p.handleInstanceEvent(payload)
|
||||
default:
|
||||
slog.ErrorContext(p.ctx, "invalid entity type", "entity_type", payload.EntityType)
|
||||
}
|
||||
}
|
||||
|
||||
func (p *provider) handleScaleSetEvent(event dbCommon.ChangePayload) {
|
||||
p.mux.Lock()
|
||||
defer p.mux.Unlock()
|
||||
|
||||
scaleSet, ok := event.Payload.(params.ScaleSet)
|
||||
if !ok {
|
||||
slog.ErrorContext(p.ctx, "invalid payload type", "payload_type", fmt.Sprintf("%T", event.Payload))
|
||||
return
|
||||
}
|
||||
|
||||
switch event.Operation {
|
||||
case dbCommon.CreateOperation, dbCommon.UpdateOperation:
|
||||
slog.DebugContext(p.ctx, "got create/update operation")
|
||||
p.scaleSets[scaleSet.ID] = scaleSet
|
||||
case dbCommon.DeleteOperation:
|
||||
slog.DebugContext(p.ctx, "got delete operation")
|
||||
delete(p.scaleSets, scaleSet.ID)
|
||||
default:
|
||||
slog.ErrorContext(p.ctx, "invalid operation type", "operation_type", event.Operation)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func (p *provider) handleInstanceEvent(event dbCommon.ChangePayload) {
|
||||
p.mux.Lock()
|
||||
defer p.mux.Unlock()
|
||||
|
||||
instance, ok := event.Payload.(params.Instance)
|
||||
if !ok {
|
||||
slog.ErrorContext(p.ctx, "invalid payload type", "payload_type", fmt.Sprintf("%T", event.Payload))
|
||||
return
|
||||
}
|
||||
|
||||
switch event.Operation {
|
||||
case dbCommon.CreateOperation, dbCommon.UpdateOperation:
|
||||
slog.DebugContext(p.ctx, "got create/update operation")
|
||||
p.runners[instance.Name] = instance
|
||||
case dbCommon.DeleteOperation:
|
||||
slog.DebugContext(p.ctx, "got delete operation")
|
||||
delete(p.runners, instance.Name)
|
||||
default:
|
||||
slog.ErrorContext(p.ctx, "invalid operation type", "operation_type", event.Operation)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -99,7 +99,96 @@ func (w *Worker) Start() (err error) {
|
|||
}
|
||||
|
||||
for _, instance := range instances {
|
||||
if instance.Status == commonParams.InstanceCreating {
|
||||
// We're just starting up. We found an instance stuck in creating.
|
||||
// When a provider creates an instance, it sets the db instance to
|
||||
// creating and then issues an API call to the IaaS to create the
|
||||
// instance using some userdata it needs to come up. But the instance
|
||||
// will still need to call back home to fetch aditional metadata and
|
||||
// complete its setup. We should remove the instance as it is not
|
||||
// possible to reliably determine the state of the instance (if it's in
|
||||
// mid boot before it reached the phase where it runs the metadtata, or
|
||||
// if it already failed).
|
||||
instanceState := commonParams.InstancePendingDelete
|
||||
locking.Lock(instance.Name)
|
||||
if instance.AgentID != 0 {
|
||||
if err := w.scaleSetCli.RemoveRunner(w.ctx, instance.AgentID); err != nil {
|
||||
// scale sets use JIT runners. This means that we create the runner in github
|
||||
// before we create the actual instance that will use the credentials. We need
|
||||
// to remove the runner from github if it exists.
|
||||
if !errors.Is(err, runnerErrors.ErrNotFound) {
|
||||
if errors.Is(err, runnerErrors.ErrUnauthorized) {
|
||||
// we don't have access to remove the runner. This implies that our
|
||||
// credentials may have expired.
|
||||
//
|
||||
// TODO: we need to set the scale set as inactive and stop the listener (if any).
|
||||
slog.ErrorContext(w.ctx, "error removing runner", "runner_name", instance.Name, "error", err)
|
||||
w.runners[instance.ID] = instance
|
||||
locking.Unlock(instance.Name, false)
|
||||
continue
|
||||
}
|
||||
// The runner may have come up, registered and is currently running a
|
||||
// job, in which case, github will not allow us to remove it.
|
||||
runnerInstance, err := w.scaleSetCli.GetRunner(w.ctx, instance.AgentID)
|
||||
if err != nil {
|
||||
if !errors.Is(err, runnerErrors.ErrNotFound) {
|
||||
// We could not get info about the runner and it wasn't not found
|
||||
slog.ErrorContext(w.ctx, "error getting runner details", "error", err)
|
||||
w.runners[instance.ID] = instance
|
||||
locking.Unlock(instance.Name, false)
|
||||
continue
|
||||
}
|
||||
}
|
||||
if runnerInstance.Status == string(params.RunnerIdle) ||
|
||||
runnerInstance.Status == string(params.RunnerActive) {
|
||||
// This is a highly unlikely scenario, but let's account for it anyway.
|
||||
//
|
||||
// The runner is running a job or is idle. Mark it as running, as
|
||||
// it appears that it finished booting and is now running.
|
||||
//
|
||||
// NOTE: if the instance was in creating and it managed to boot, there
|
||||
// is a high chance that the we do not have a provider ID for the runner
|
||||
// inside our database. When removing the runner, the provider will attempt
|
||||
// to use the instance name instead of the provider ID, the same as when
|
||||
// creation of the instance fails and we try to clean up any lingering resources
|
||||
// in the provider.
|
||||
slog.DebugContext(w.ctx, "runner is running a job or is idle; not removing", "runner_name", instance.Name)
|
||||
instanceState = commonParams.InstanceRunning
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
runnerUpdateParams := params.UpdateInstanceParams{
|
||||
Status: instanceState,
|
||||
}
|
||||
instance, err = w.store.UpdateInstance(w.ctx, instance.Name, runnerUpdateParams)
|
||||
if err != nil {
|
||||
if !errors.Is(err, runnerErrors.ErrNotFound) {
|
||||
locking.Unlock(instance.Name, false)
|
||||
return fmt.Errorf("updating runner %s: %w", instance.Name, err)
|
||||
}
|
||||
}
|
||||
locking.Unlock(instance.Name, false)
|
||||
} else if instance.Status == commonParams.InstanceDeleting {
|
||||
// Set the instance in deleting. It is assumed that the runner was already
|
||||
// removed from github either by github or by garm. Deleting status indicates
|
||||
// that it was already being handled by the provider. There should be no entry on
|
||||
// github for the runner if that was the case.
|
||||
// Setting it in pending_delete will cause the provider to try again, an operation
|
||||
// which is idempotent (if it's already deleted, the provider reports success).
|
||||
runnerUpdateParams := params.UpdateInstanceParams{
|
||||
Status: commonParams.InstancePendingDelete,
|
||||
}
|
||||
instance, err = w.store.UpdateInstance(w.ctx, instance.Name, runnerUpdateParams)
|
||||
if err != nil {
|
||||
if !errors.Is(err, runnerErrors.ErrNotFound) {
|
||||
locking.Unlock(instance.Name, false)
|
||||
return fmt.Errorf("updating runner %s: %w", instance.Name, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
w.runners[instance.ID] = instance
|
||||
locking.Unlock(instance.Name, false)
|
||||
}
|
||||
|
||||
consumer, err := watcher.RegisterConsumer(
|
||||
|
|
@ -212,11 +301,43 @@ func (w *Worker) handleInstanceEntityEvent(event dbCommon.ChangePayload) {
|
|||
return
|
||||
}
|
||||
switch event.Operation {
|
||||
case dbCommon.UpdateOperation, dbCommon.CreateOperation:
|
||||
slog.DebugContext(w.ctx, "got update operation")
|
||||
case dbCommon.CreateOperation:
|
||||
slog.DebugContext(w.ctx, "got create operation")
|
||||
w.mux.Lock()
|
||||
w.runners[instance.ID] = instance
|
||||
w.mux.Unlock()
|
||||
case dbCommon.UpdateOperation:
|
||||
slog.DebugContext(w.ctx, "got update operation")
|
||||
w.mux.Lock()
|
||||
oldInstance, ok := w.runners[instance.ID]
|
||||
w.runners[instance.ID] = instance
|
||||
|
||||
if !ok {
|
||||
slog.DebugContext(w.ctx, "instance not found in local cache; ignoring", "instance_id", instance.ID)
|
||||
w.mux.Unlock()
|
||||
return
|
||||
}
|
||||
if oldInstance.RunnerStatus != instance.RunnerStatus && instance.RunnerStatus == params.RunnerIdle {
|
||||
serviceRuner, err := w.scaleSetCli.GetRunner(w.ctx, instance.AgentID)
|
||||
if err != nil {
|
||||
slog.ErrorContext(w.ctx, "error getting runner details", "error", err)
|
||||
w.mux.Unlock()
|
||||
return
|
||||
}
|
||||
status, ok := serviceRuner.Status.(string)
|
||||
if !ok {
|
||||
slog.ErrorContext(w.ctx, "error getting runner status", "runner_id", instance.AgentID)
|
||||
w.mux.Unlock()
|
||||
return
|
||||
}
|
||||
if status != string(params.RunnerIdle) && status != string(params.RunnerActive) {
|
||||
// TODO: Wait for the status to change for a while (30 seconds?). Mark the instance as
|
||||
// pending_delete if the runner never comes online.
|
||||
w.mux.Unlock()
|
||||
return
|
||||
}
|
||||
}
|
||||
w.mux.Unlock()
|
||||
case dbCommon.DeleteOperation:
|
||||
slog.DebugContext(w.ctx, "got delete operation")
|
||||
w.mux.Lock()
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue