Use watcher and get rid of RefreshState()

This change uses the database watcher to watch for changes to the
github entities, credentials and controller info.

Signed-off-by: Gabriel Adrian Samfira <gsamfira@cloudbasesolutions.com>
This commit is contained in:
Gabriel Adrian Samfira 2024-06-20 15:28:56 +00:00
parent 38127af747
commit daaca0bd8f
23 changed files with 452 additions and 462 deletions

View file

@ -35,6 +35,7 @@ import (
"github.com/cloudbase/garm-provider-common/util"
"github.com/cloudbase/garm/auth"
dbCommon "github.com/cloudbase/garm/database/common"
"github.com/cloudbase/garm/database/watcher"
"github.com/cloudbase/garm/params"
"github.com/cloudbase/garm/runner/common"
garmUtil "github.com/cloudbase/garm/util"
@ -61,16 +62,9 @@ const (
maxCreateAttempts = 5
)
type urls struct {
callbackURL string
metadataURL string
webhookURL string
controllerWebhookURL string
}
func NewEntityPoolManager(ctx context.Context, entity params.GithubEntity, cfgInternal params.Internal, providers map[string]common.Provider, store dbCommon.Store) (common.PoolManager, error) {
func NewEntityPoolManager(ctx context.Context, entity params.GithubEntity, instanceTokenGetter auth.InstanceTokenGetter, providers map[string]common.Provider, store dbCommon.Store) (common.PoolManager, error) {
ctx = garmUtil.WithContext(ctx, slog.Any("pool_mgr", entity.String()), slog.Any("pool_type", entity.EntityType))
ghc, err := garmUtil.GithubClient(ctx, entity, cfgInternal.GithubCredentialsDetails)
ghc, err := garmUtil.GithubClient(ctx, entity, entity.Credentials)
if err != nil {
return nil, errors.Wrap(err, "getting github client")
}
@ -79,38 +73,47 @@ func NewEntityPoolManager(ctx context.Context, entity params.GithubEntity, cfgIn
return nil, errors.New("webhook secret is empty")
}
controllerInfo, err := store.ControllerInfo()
if err != nil {
return nil, errors.Wrap(err, "getting controller info")
}
consumerID := fmt.Sprintf("pool-manager-%s", entity.String())
consumer, err := watcher.RegisterConsumer(
ctx, consumerID,
composeWatcherFilters(entity),
)
if err != nil {
return nil, errors.Wrap(err, "registering consumer")
}
wg := &sync.WaitGroup{}
keyMuxes := &keyMutex{}
repo := &basePoolManager{
ctx: ctx,
cfgInternal: cfgInternal,
entity: entity,
ghcli: ghc,
ctx: ctx,
entity: entity,
ghcli: ghc,
controllerInfo: controllerInfo,
instanceTokenGetter: instanceTokenGetter,
store: store,
providers: providers,
controllerID: cfgInternal.ControllerID,
urls: urls{
webhookURL: cfgInternal.BaseWebhookURL,
callbackURL: cfgInternal.InstanceCallbackURL,
metadataURL: cfgInternal.InstanceMetadataURL,
controllerWebhookURL: cfgInternal.ControllerWebhookURL,
},
quit: make(chan struct{}),
credsDetails: cfgInternal.GithubCredentialsDetails,
wg: wg,
keyMux: keyMuxes,
store: store,
providers: providers,
quit: make(chan struct{}),
wg: wg,
keyMux: keyMuxes,
consumer: consumer,
}
return repo, nil
}
type basePoolManager struct {
ctx context.Context
controllerID string
entity params.GithubEntity
ghcli common.GithubClient
cfgInternal params.Internal
ctx context.Context
entity params.GithubEntity
ghcli common.GithubClient
controllerInfo params.ControllerInfo
instanceTokenGetter auth.InstanceTokenGetter
consumer dbCommon.Consumer
store dbCommon.Store
@ -118,13 +121,9 @@ type basePoolManager struct {
tools []commonParams.RunnerApplicationDownload
quit chan struct{}
credsDetails params.GithubCredentials
managerIsRunning bool
managerErrorReason string
urls urls
mux sync.Mutex
wg *sync.WaitGroup
keyMux *keyMutex
@ -353,9 +352,9 @@ func (r *basePoolManager) updateTools() error {
tools, err := r.FetchTools()
if err != nil {
slog.With(slog.Any("error", err)).ErrorContext(
r.ctx, "failed to update tools for repo")
r.ctx, "failed to update tools for entity", "entity", r.entity.String())
r.setPoolRunningState(false, err.Error())
return fmt.Errorf("failed to update tools for repo %s: %w", r.entity.String(), err)
return fmt.Errorf("failed to update tools for entity %s: %w", r.entity.String(), err)
}
r.mux.Lock()
r.tools = tools
@ -381,7 +380,7 @@ func (r *basePoolManager) cleanupOrphanedProviderRunners(runners []*github.Runne
runnerNames := map[string]bool{}
for _, run := range runners {
if !isManagedRunner(labelsFromRunner(run), r.controllerID) {
if !isManagedRunner(labelsFromRunner(run), r.controllerInfo.ControllerID.String()) {
slog.DebugContext(
r.ctx, "runner is not managed by a pool we manage",
"runner_name", run.GetName())
@ -457,7 +456,7 @@ func (r *basePoolManager) reapTimedOutRunners(runners []*github.Runner) error {
runnersByName := map[string]*github.Runner{}
for _, run := range runners {
if !isManagedRunner(labelsFromRunner(run), r.controllerID) {
if !isManagedRunner(labelsFromRunner(run), r.controllerInfo.ControllerID.String()) {
slog.DebugContext(
r.ctx, "runner is not managed by a pool we manage",
"runner_name", run.GetName())
@ -515,7 +514,7 @@ func (r *basePoolManager) cleanupOrphanedGithubRunners(runners []*github.Runner)
poolInstanceCache := map[string][]commonParams.ProviderInstance{}
g, ctx := errgroup.WithContext(r.ctx)
for _, runner := range runners {
if !isManagedRunner(labelsFromRunner(runner), r.controllerID) {
if !isManagedRunner(labelsFromRunner(runner), r.controllerInfo.ControllerID.String()) {
slog.DebugContext(
r.ctx, "runner is not managed by a pool we manage",
"runner_name", runner.GetName())
@ -741,8 +740,8 @@ func (r *basePoolManager) AddRunner(ctx context.Context, poolID string, aditiona
RunnerStatus: params.RunnerPending,
OSArch: pool.OSArch,
OSType: pool.OSType,
CallbackURL: r.urls.callbackURL,
MetadataURL: r.urls.metadataURL,
CallbackURL: r.controllerInfo.CallbackURL,
MetadataURL: r.controllerInfo.MetadataURL,
CreateAttempt: 1,
GitHubRunnerGroup: pool.GitHubRunnerGroup,
AditionalLabels: aditionalLabels,
@ -832,7 +831,7 @@ func (r *basePoolManager) addInstanceToProvider(instance params.Instance) error
jwtValidity := pool.RunnerTimeout()
entity := r.entity.String()
jwtToken, err := auth.NewInstanceJWTToken(instance, r.cfgInternal.JWTSecret, entity, pool.PoolType(), jwtValidity)
jwtToken, err := r.instanceTokenGetter.NewInstanceJWTToken(instance, entity, pool.PoolType(), jwtValidity)
if err != nil {
return errors.Wrap(err, "fetching instance jwt token")
}
@ -852,7 +851,7 @@ func (r *basePoolManager) addInstanceToProvider(instance params.Instance) error
Image: pool.Image,
ExtraSpecs: pool.ExtraSpecs,
PoolID: instance.PoolID,
CACertBundle: r.credsDetails.CABundle,
CACertBundle: r.entity.Credentials.CABundle,
GitHubRunnerGroup: instance.GitHubRunnerGroup,
JitConfigEnabled: hasJITConfig,
}
@ -954,7 +953,7 @@ func (r *basePoolManager) poolLabel(poolID string) string {
}
func (r *basePoolManager) controllerLabel() string {
return fmt.Sprintf("%s%s", controllerLabelPrefix, r.controllerID)
return fmt.Sprintf("%s%s", controllerLabelPrefix, r.controllerInfo.ControllerID.String())
}
func (r *basePoolManager) updateArgsFromProviderInstance(providerInstance commonParams.ProviderInstance) params.UpdateInstanceParams {
@ -1525,6 +1524,7 @@ func (r *basePoolManager) Start() error {
initialToolUpdate <- struct{}{}
}()
go r.runWatcher()
go func() {
select {
case <-r.quit:
@ -1552,37 +1552,6 @@ func (r *basePoolManager) Stop() error {
return nil
}
func (r *basePoolManager) RefreshState(param params.UpdatePoolStateParams) error {
r.mux.Lock()
if param.WebhookSecret != "" {
r.entity.WebhookSecret = param.WebhookSecret
}
if param.InternalConfig != nil {
r.cfgInternal = *param.InternalConfig
r.urls = urls{
webhookURL: r.cfgInternal.BaseWebhookURL,
callbackURL: r.cfgInternal.InstanceCallbackURL,
metadataURL: r.cfgInternal.InstanceMetadataURL,
controllerWebhookURL: r.cfgInternal.ControllerWebhookURL,
}
}
ghc, err := garmUtil.GithubClient(r.ctx, r.entity, r.cfgInternal.GithubCredentialsDetails)
if err != nil {
return errors.Wrap(err, "getting github client")
}
r.ghcli = ghc
r.mux.Unlock()
// Update the tools as soon as state is updated. This should revive a stopped pool manager
// or stop one if the supplied credentials are not okay.
if err := r.updateTools(); err != nil {
return fmt.Errorf("failed to update tools: %w", err)
}
return nil
}
func (r *basePoolManager) WebhookSecret() string {
return r.entity.WebhookSecret
}
@ -1688,7 +1657,7 @@ func (r *basePoolManager) consumeQueuedJobs() error {
}
poolsCache := poolsForTags{
poolCacheType: r.PoolBalancerType(),
poolCacheType: r.entity.GetPoolBalancerType(),
}
slog.DebugContext(
@ -1812,7 +1781,7 @@ func (r *basePoolManager) consumeQueuedJobs() error {
}
func (r *basePoolManager) UninstallWebhook(ctx context.Context) error {
if r.urls.controllerWebhookURL == "" {
if r.controllerInfo.ControllerWebhookURL == "" {
return errors.Wrap(runnerErrors.ErrBadRequest, "controller webhook url is empty")
}
@ -1823,8 +1792,8 @@ func (r *basePoolManager) UninstallWebhook(ctx context.Context) error {
var controllerHookID int64
var baseHook string
trimmedBase := strings.TrimRight(r.urls.webhookURL, "/")
trimmedController := strings.TrimRight(r.urls.controllerWebhookURL, "/")
trimmedBase := strings.TrimRight(r.controllerInfo.WebhookURL, "/")
trimmedController := strings.TrimRight(r.controllerInfo.ControllerWebhookURL, "/")
for _, hook := range allHooks {
hookInfo := hookToParamsHookInfo(hook)
@ -1859,7 +1828,7 @@ func (r *basePoolManager) InstallHook(ctx context.Context, req *github.Hook) (pa
return params.HookInfo{}, errors.Wrap(err, "listing hooks")
}
if err := validateHookRequest(r.cfgInternal.ControllerID, r.cfgInternal.BaseWebhookURL, allHooks, req); err != nil {
if err := validateHookRequest(r.controllerInfo.ControllerID.String(), r.controllerInfo.WebhookURL, allHooks, req); err != nil {
return params.HookInfo{}, errors.Wrap(err, "validating hook request")
}
@ -1879,7 +1848,7 @@ func (r *basePoolManager) InstallHook(ctx context.Context, req *github.Hook) (pa
}
func (r *basePoolManager) InstallWebhook(ctx context.Context, param params.InstallWebhookParams) (params.HookInfo, error) {
if r.urls.controllerWebhookURL == "" {
if r.controllerInfo.ControllerWebhookURL == "" {
return params.HookInfo{}, errors.Wrap(runnerErrors.ErrBadRequest, "controller webhook url is empty")
}
@ -1890,7 +1859,7 @@ func (r *basePoolManager) InstallWebhook(ctx context.Context, param params.Insta
req := &github.Hook{
Active: github.Bool(true),
Config: map[string]interface{}{
"url": r.urls.controllerWebhookURL,
"url": r.controllerInfo.ControllerWebhookURL,
"content_type": "json",
"insecure_ssl": insecureSSL,
"secret": r.WebhookSecret(),
@ -1978,21 +1947,14 @@ func (r *basePoolManager) GetGithubRunners() ([]*github.Runner, error) {
return allRunners, nil
}
func (r *basePoolManager) PoolBalancerType() params.PoolBalancerType {
if r.cfgInternal.PoolBalancerType == "" {
return params.PoolBalancerTypeRoundRobin
}
return r.cfgInternal.PoolBalancerType
}
func (r *basePoolManager) GithubURL() string {
switch r.entity.EntityType {
case params.GithubEntityTypeRepository:
return fmt.Sprintf("%s/%s/%s", r.cfgInternal.GithubCredentialsDetails.BaseURL, r.entity.Owner, r.entity.Name)
return fmt.Sprintf("%s/%s/%s", r.entity.Credentials.BaseURL, r.entity.Owner, r.entity.Name)
case params.GithubEntityTypeOrganization:
return fmt.Sprintf("%s/%s", r.cfgInternal.GithubCredentialsDetails.BaseURL, r.entity.Owner)
return fmt.Sprintf("%s/%s", r.entity.Credentials.BaseURL, r.entity.Owner)
case params.GithubEntityTypeEnterprise:
return fmt.Sprintf("%s/enterprises/%s", r.cfgInternal.GithubCredentialsDetails.BaseURL, r.entity.Owner)
return fmt.Sprintf("%s/enterprises/%s", r.entity.Credentials.BaseURL, r.entity.Owner)
}
return ""
}
@ -2002,8 +1964,8 @@ func (r *basePoolManager) GetWebhookInfo(ctx context.Context) (params.HookInfo,
if err != nil {
return params.HookInfo{}, errors.Wrap(err, "listing hooks")
}
trimmedBase := strings.TrimRight(r.urls.webhookURL, "/")
trimmedController := strings.TrimRight(r.urls.controllerWebhookURL, "/")
trimmedBase := strings.TrimRight(r.controllerInfo.WebhookURL, "/")
trimmedController := strings.TrimRight(r.controllerInfo.ControllerWebhookURL, "/")
var controllerHookInfo *params.HookInfo
var baseHookInfo *params.HookInfo
@ -2034,5 +1996,5 @@ func (r *basePoolManager) GetWebhookInfo(ctx context.Context) (params.HookInfo,
}
func (r *basePoolManager) RootCABundle() (params.CertificateBundle, error) {
return r.credsDetails.RootCertificateBundle()
return r.entity.Credentials.RootCertificateBundle()
}