Move some code around

Move the metrics code into its own package.

Signed-off-by: Gabriel Adrian Samfira <gsamfira@cloudbasesolutions.com>
This commit is contained in:
Gabriel Adrian Samfira 2023-01-27 12:35:51 +02:00
parent a3ddb5e63a
commit 8f56f51598
6 changed files with 179 additions and 67 deletions

View file

@ -20,10 +20,12 @@ import (
"log"
"net/http"
"strings"
"sync"
"garm/apiserver/params"
"garm/auth"
gErrors "garm/errors"
"garm/metrics"
runnerParams "garm/params"
"garm/runner"
"garm/util"
@ -31,28 +33,32 @@ import (
"github.com/gorilla/websocket"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
)
func NewAPIController(r *runner.Runner, auth *auth.Authenticator, hub *wsWriter.Hub, controllerInfo runnerParams.ControllerInfo) (*APIController, error) {
func NewAPIController(r *runner.Runner, authenticator *auth.Authenticator, hub *wsWriter.Hub) (*APIController, error) {
controllerInfo, err := r.GetControllerInfo(auth.GetAdminContext())
if err != nil {
return nil, errors.Wrap(err, "getting controller info")
}
return &APIController{
r: r,
auth: auth,
auth: authenticator,
hub: hub,
upgrader: websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 16384,
},
controllerInfo: controllerInfo,
cachedControllerInfo: controllerInfo,
}, nil
}
type APIController struct {
r *runner.Runner
auth *auth.Authenticator
hub *wsWriter.Hub
upgrader websocket.Upgrader
controllerInfo runnerParams.ControllerInfo
r *runner.Runner
auth *auth.Authenticator
hub *wsWriter.Hub
upgrader websocket.Upgrader
cachedControllerInfo runnerParams.ControllerInfo
mux sync.Mutex
}
func handleError(w http.ResponseWriter, err error) {
@ -89,18 +95,30 @@ func handleError(w http.ResponseWriter, err error) {
}
}
// metric to count total webhooks received
// at this point the webhook is not yet authenticated and
// we don't know if it's meant for us or not
var webhooksReceived = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "garm_webhooks_received",
Help: "The total number of webhooks received",
}, []string{"valid", "reason", "hostname", "controller_id"})
func init() {
err := prometheus.Register(webhooksReceived)
// controllerInfo calls into runner.GetControllerInfo(), but instead of erroring out, will
// fall back on a cached version of that info. If successful, the cached version is updated.
func (a *APIController) controllerInfo() runnerParams.ControllerInfo {
// Atempt to fetch controller info. We do this on every call, in case the hostname
// changes while garm is running. The ControllerID will never change, once initialized.
info, err := a.r.GetControllerInfo(auth.GetAdminContext())
if err != nil {
log.Printf("error registering prometheus metric: %q", err)
// The call may fail, but we shouldn't loose metrics just because something went
// terribly wrong while fetching the hostname.
log.Printf("failed to get new controller info; falling back on cached version: %s", err)
return a.cachedControllerInfo
}
// Set new controller info and return it.
a.mux.Lock()
defer a.mux.Unlock()
a.cachedControllerInfo = info
return a.cachedControllerInfo
}
func (a *APIController) webhookMetricLabelValues(valid, reason string) []string {
controllerInfo := a.controllerInfo()
return []string{
valid, reason,
controllerInfo.Hostname, controllerInfo.ControllerID.String(),
}
}
@ -115,23 +133,31 @@ func (a *APIController) handleWorkflowJobEvent(w http.ResponseWriter, r *http.Re
signature := r.Header.Get("X-Hub-Signature-256")
hookType := r.Header.Get("X-Github-Hook-Installation-Target-Type")
controllerInfo := a.r.GetControllerInfo(r.Context())
var labelValues []string
defer func() {
if len(labelValues) == 0 {
return
}
if err := metrics.RecordWebhookWithLabels(labelValues...); err != nil {
log.Printf("failed to record metric: %s", err)
}
}()
if err := a.r.DispatchWorkflowJob(hookType, signature, body); err != nil {
if errors.Is(err, gErrors.ErrNotFound) {
webhooksReceived.WithLabelValues("false", "owner_unknown", controllerInfo.Hostname, controllerInfo.ControllerID.String()).Inc()
labelValues = a.webhookMetricLabelValues("false", "owner_unknown")
log.Printf("got not found error from DispatchWorkflowJob. webhook not meant for us?: %q", err)
return
} else if strings.Contains(err.Error(), "signature") { // TODO: check error type
webhooksReceived.WithLabelValues("false", "signature_invalid", controllerInfo.Hostname, controllerInfo.ControllerID.String()).Inc()
labelValues = a.webhookMetricLabelValues("false", "signature_invalid")
} else {
webhooksReceived.WithLabelValues("false", "unknown", controllerInfo.Hostname, controllerInfo.ControllerID.String()).Inc()
labelValues = a.webhookMetricLabelValues("false", "unknown")
}
handleError(w, err)
return
}
webhooksReceived.WithLabelValues("true", "", controllerInfo.Hostname, controllerInfo.ControllerID.String()).Inc()
labelValues = a.webhookMetricLabelValues("true", "")
}
func (a *APIController) CatchAll(w http.ResponseWriter, r *http.Request) {

View file

@ -23,24 +23,28 @@ import (
"garm/apiserver/controllers"
"garm/auth"
"garm/config"
"garm/util"
)
func NewAPIRouter(han *controllers.APIController, logWriter io.Writer, cfg *config.Config, authMiddleware, initMiddleware, instanceMiddleware, metricsMiddlerware auth.Middleware) *mux.Router {
func WithMetricsRouter(parentRouter *mux.Router, disableAuth bool, metricsMiddlerware auth.Middleware) *mux.Router {
if parentRouter == nil {
return nil
}
metricsRouter := parentRouter.PathPrefix("/metrics").Subrouter()
if !disableAuth {
metricsRouter.Use(metricsMiddlerware.Middleware)
}
metricsRouter.Handle("/", promhttp.Handler()).Methods("GET", "OPTIONS")
metricsRouter.Handle("", promhttp.Handler()).Methods("GET", "OPTIONS")
return parentRouter
}
func NewAPIRouter(han *controllers.APIController, logWriter io.Writer, authMiddleware, initMiddleware, instanceMiddleware auth.Middleware) *mux.Router {
router := mux.NewRouter()
logMiddleware := util.NewLoggingMiddleware(logWriter)
router.Use(logMiddleware)
if cfg.Metrics.Enable {
metricsRouter := router.PathPrefix("/metrics").Subrouter()
if !cfg.Metrics.DisableAuth {
metricsRouter.Use(metricsMiddlerware.Middleware)
}
metricsRouter.Handle("/", promhttp.Handler()).Methods("GET", "OPTIONS")
metricsRouter.Handle("", promhttp.Handler()).Methods("GET", "OPTIONS")
}
// Handles github webhooks
webhookRouter := router.PathPrefix("/webhooks").Subrouter()
webhookRouter.PathPrefix("/").Handler(http.HandlerFunc(han.CatchAll))

View file

@ -32,6 +32,7 @@ import (
"garm/config"
"garm/database"
"garm/database/common"
"garm/metrics"
"garm/runner"
"garm/util"
"garm/websocket"
@ -39,7 +40,6 @@ import (
"github.com/gorilla/handlers"
"github.com/gorilla/mux"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
)
var (
@ -111,19 +111,13 @@ func main() {
log.Fatalf("failed to create controller: %+v", err)
}
controllerInfo, err := db.ControllerInfo()
if err != nil {
log.Fatal(err)
}
// If there are many repos/pools, this may take a long time.
// TODO: start pool managers in the background and log errors.
if err := runner.Start(); err != nil {
log.Fatal(err)
}
authenticator := auth.NewAuthenticator(cfg.JWTAuth, db)
controller, err := controllers.NewAPIController(runner, authenticator, hub, controllerInfo)
controller, err := controllers.NewAPIController(runner, authenticator, hub)
if err != nil {
log.Fatalf("failed to create controller: %+v", err)
}
@ -147,12 +141,18 @@ func main() {
if err != nil {
log.Fatal(err)
}
err = prometheus.Register(controllers.NewGarmCollector(runner))
if err != nil {
log.Println("failed to register garm collector in prometheus", err)
router := routers.NewAPIRouter(controller, multiWriter, jwtMiddleware, initMiddleware, instanceMiddleware)
if cfg.Metrics.Enable {
log.Printf("registering prometheus metrics collectors")
if err := metrics.RegisterCollectors(runner); err != nil {
log.Fatal(err)
}
log.Printf("setting up metric routes")
router = routers.WithMetricsRouter(router, cfg.Metrics.DisableAuth, metricsMiddleware)
}
router := routers.NewAPIRouter(controller, multiWriter, cfg, jwtMiddleware, initMiddleware, instanceMiddleware, metricsMiddleware)
corsMw := mux.CORSMethodMiddleware(router)
router.Use(corsMw)

View file

@ -1,21 +1,70 @@
package controllers
package metrics
import (
"log"
"sync"
"garm/auth"
"garm/params"
"garm/runner"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
)
type GarmCollector struct {
healthMetric *prometheus.Desc
instanceMetric *prometheus.Desc
runner *runner.Runner
var webhooksReceived *prometheus.CounterVec = nil
// RecordWebhookWithLabels will increment a webhook metric identified by specific
// values. If metrics are disabled, this function is a noop.
func RecordWebhookWithLabels(lvs ...string) error {
if webhooksReceived == nil {
// not registered. Noop
return nil
}
counter, err := webhooksReceived.GetMetricWithLabelValues(lvs...)
if err != nil {
return errors.Wrap(err, "recording metric")
}
counter.Inc()
return nil
}
func NewGarmCollector(r *runner.Runner) *GarmCollector {
func RegisterCollectors(runner *runner.Runner) error {
if webhooksReceived != nil {
// Already registered.
return nil
}
garmCollector, err := NewGarmCollector(runner)
if err != nil {
return errors.Wrap(err, "getting collector")
}
if err := prometheus.Register(garmCollector); err != nil {
return errors.Wrap(err, "registering collector")
}
// metric to count total webhooks received
// at this point the webhook is not yet authenticated and
// we don't know if it's meant for us or not
webhooksReceived = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "garm_webhooks_received",
Help: "The total number of webhooks received",
}, []string{"valid", "reason", "hostname", "controller_id"})
err = prometheus.Register(webhooksReceived)
if err != nil {
return errors.Wrap(err, "registering webhooks recv counter")
}
return nil
}
func NewGarmCollector(r *runner.Runner) (*GarmCollector, error) {
controllerInfo, err := r.GetControllerInfo(auth.GetAdminContext())
if err != nil {
return nil, errors.Wrap(err, "fetching controller info")
}
return &GarmCollector{
runner: r,
instanceMetric: prometheus.NewDesc(
@ -28,7 +77,28 @@ func NewGarmCollector(r *runner.Runner) *GarmCollector {
"Health of the runner",
[]string{"hostname", "controller_id"}, nil,
),
cachedControllerInfo: controllerInfo,
}, nil
}
type GarmCollector struct {
healthMetric *prometheus.Desc
instanceMetric *prometheus.Desc
runner *runner.Runner
cachedControllerInfo params.ControllerInfo
mux sync.Mutex
}
func (c *GarmCollector) controllerInfo() params.ControllerInfo {
controllerInfo, err := c.runner.GetControllerInfo(auth.GetAdminContext())
if err != nil {
log.Printf("error getting controller info: %s", err)
return c.cachedControllerInfo
}
c.mux.Lock()
defer c.mux.Unlock()
c.cachedControllerInfo = controllerInfo
return c.cachedControllerInfo
}
func (c *GarmCollector) Describe(ch chan<- *prometheus.Desc) {
@ -37,8 +107,7 @@ func (c *GarmCollector) Describe(ch chan<- *prometheus.Desc) {
}
func (c *GarmCollector) Collect(ch chan<- prometheus.Metric) {
controllerInfo := c.runner.GetControllerInfo(auth.GetAdminContext())
controllerInfo := c.controllerInfo()
c.CollectInstanceMetric(ch, controllerInfo.Hostname, controllerInfo.ControllerID.String())
c.CollectHealthMetric(ch, controllerInfo.Hostname, controllerInfo.ControllerID.String())
}
@ -61,7 +130,6 @@ func (c *GarmCollector) CollectHealthMetric(ch chan<- prometheus.Metric, hostnam
// CollectInstanceMetric collects the metrics for the runner instances
// reflecting the statuses and the pool they belong to.
func (c *GarmCollector) CollectInstanceMetric(ch chan<- prometheus.Metric, hostname string, controllerID string) {
ctx := auth.GetAdminContext()
instances, err := c.runner.ListAllInstances(ctx)

View file

@ -42,6 +42,7 @@ import (
"garm/util"
"github.com/pkg/errors"
uuid "github.com/satori/go.uuid"
)
func NewRunner(ctx context.Context, cfg config.Config) (*Runner, error) {
@ -66,10 +67,6 @@ func NewRunner(ctx context.Context, cfg config.Config) (*Runner, error) {
creds[ghcreds.Name] = ghcreds
}
controllerInfo := params.ControllerInfo{
ControllerID: ctrlId.ControllerID,
}
poolManagerCtrl := &poolManagerCtrl{
controllerID: ctrlId.ControllerID.String(),
config: cfg,
@ -85,7 +82,7 @@ func NewRunner(ctx context.Context, cfg config.Config) (*Runner, error) {
poolManagerCtrl: poolManagerCtrl,
providers: providers,
credentials: creds,
controllerInfo: controllerInfo,
controllerID: ctrlId.ControllerID,
}
if err := runner.loadReposOrgsAndEnterprises(); err != nil {
@ -271,19 +268,25 @@ type Runner struct {
credentials map[string]config.Github
controllerInfo params.ControllerInfo
controllerID uuid.UUID
}
// GetControllerInfo returns the controller id and the hostname.
// This data might be used in metrics and logging.
func (r *Runner) GetControllerInfo(ctx context.Context) params.ControllerInfo {
func (r *Runner) GetControllerInfo(ctx context.Context) (params.ControllerInfo, error) {
if !auth.IsAdmin(ctx) {
return params.ControllerInfo{}, runnerErrors.ErrUnauthorized
}
// hostname could change
hostname, err := os.Hostname()
if err != nil {
log.Printf("error getting hostname: %v", err)
//not much choice but to continue
return params.ControllerInfo{}, errors.Wrap(err, "fetching hostname")
}
r.controllerInfo.Hostname = hostname
return r.controllerInfo
return params.ControllerInfo{
ControllerID: r.controllerID,
Hostname: hostname,
}, nil
}
func (r *Runner) ListCredentials(ctx context.Context) ([]params.GithubCredentials, error) {

13
testdata/config.toml vendored
View file

@ -21,6 +21,16 @@ config_dir = "/etc/garm"
# Enable streaming logs via web sockets. Use garm-cli debug-log.
enable_log_streamer = false
[metrics]
# Toggle metrics. If set to false, the API endpoint for metrics collection will
# be disabled.
enable = true
# Toggle to disable authentication (not recommended) on the metrics endpoint.
# If you do disable authentication, I encourage you to put a reverse proxy in front
# of garm and limit which systems can access that particular endpoint. Ideally, you
# would enable some kind of authentication using the reverse proxy.
disable_auth = false
[jwt_auth]
# A JWT token secret used to sign tokens.
# Obviously, this needs to be changed :).
@ -29,7 +39,8 @@ secret = ")9gk_4A6KrXz9D2u`0@MPea*sd6W`%@5MAWpWWJ3P3EqW~qB!!(Vd$FhNc*eU4vG"
# Time to live for tokens. Both the instances and you will use JWT tokens to
# authenticate against the API. However, this TTL is applied only to tokens you
# get when logging into the API. The tokens issued to the instances we manage,
# have a hardcoded TTL of 15 minutes. The minimum TTL for this token is 24h.
# have a TTL based on the runner bootstrap timeout set on each pool. The minimum
# TTL for this token is 24h.
time_to_live = "8760h"
[apiserver]