All checks were successful
ci / ci (push) Successful in 2m2s
Replace net/http handlers with Fuego framework for automatic OpenAPI 3.0 spec generation. Add generated Go client package, OpenAPI extraction script, and update Makefile with separate build/run targets for both binaries. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
303 lines
11 KiB
Go
303 lines
11 KiB
Go
// ABOUTME: HTTP handlers for the metrics receiver service using Fuego framework.
|
|
// ABOUTME: Provides endpoints for receiving and querying metrics with automatic OpenAPI generation.
|
|
package receiver
|
|
|
|
import (
|
|
"crypto/subtle"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"log/slog"
|
|
"net/http"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/go-fuego/fuego"
|
|
)
|
|
|
|
// Handler handles HTTP requests for the metrics receiver
|
|
type Handler struct {
|
|
store *Store
|
|
logger *slog.Logger
|
|
readToken string // Pre-shared token for read endpoint authentication
|
|
hmacKey string // Separate key for HMAC-based push token generation/validation
|
|
tokenTTL time.Duration
|
|
}
|
|
|
|
// NewHandler creates a new HTTP handler with the given store.
|
|
// readToken authenticates read endpoints and the token generation endpoint.
|
|
// hmacKey is the secret used to derive scoped push tokens.
|
|
// tokenTTL specifies how long push tokens are valid (0 uses DefaultTokenTTL).
|
|
func NewHandler(store *Store, logger *slog.Logger, readToken, hmacKey string, tokenTTL time.Duration) *Handler {
|
|
if tokenTTL == 0 {
|
|
tokenTTL = DefaultTokenTTL
|
|
}
|
|
return &Handler{store: store, logger: logger, readToken: readToken, hmacKey: hmacKey, tokenTTL: tokenTTL}
|
|
}
|
|
|
|
// Common errors
|
|
var (
|
|
ErrUnauthorized = errors.New("authorization required")
|
|
ErrInvalidToken = errors.New("invalid token")
|
|
ErrInvalidFormat = errors.New("invalid authorization format")
|
|
ErrMissingHMACKey = errors.New("token generation requires a configured HMAC key")
|
|
ErrMissingFields = errors.New("organization, repository, workflow, and job are required")
|
|
ErrMissingRunID = errors.New("run_id is required")
|
|
ErrInvalidParams = errors.New("org, repo, workflow and job are required")
|
|
ErrNoMetrics = errors.New("no metrics found for this workflow/job")
|
|
ErrInvalidPercent = errors.New("invalid cpu_percentile: must be one of peak, p99, p95, p75, p50, avg")
|
|
)
|
|
|
|
// HealthResponse is the response for the health endpoint
|
|
type HealthResponse struct {
|
|
Status string `json:"status"`
|
|
}
|
|
|
|
// MetricCreatedResponse is the response when a metric is successfully created
|
|
type MetricCreatedResponse struct {
|
|
ID uint `json:"id"`
|
|
Status string `json:"status"`
|
|
}
|
|
|
|
// GetMetricsRequest contains path parameters for getting metrics
|
|
type GetMetricsRequest struct {
|
|
Org string `path:"org"`
|
|
Repo string `path:"repo"`
|
|
Workflow string `path:"workflow"`
|
|
Job string `path:"job"`
|
|
}
|
|
|
|
// GetSizingRequest contains path and query parameters for sizing endpoint
|
|
type GetSizingRequest struct {
|
|
Org string `path:"org"`
|
|
Repo string `path:"repo"`
|
|
Workflow string `path:"workflow"`
|
|
Job string `path:"job"`
|
|
Runs int `query:"runs" default:"5" validate:"min=1,max=100" description:"Number of recent runs to analyze"`
|
|
Buffer int `query:"buffer" default:"20" validate:"min=0,max=100" description:"Buffer percentage to add"`
|
|
CPUPercentile string `query:"cpu_percentile" default:"p95" description:"CPU percentile to use (peak, p99, p95, p75, p50, avg)"`
|
|
}
|
|
|
|
// RegisterRoutes registers all HTTP routes on the Fuego server
|
|
func (h *Handler) RegisterRoutes(s *fuego.Server) {
|
|
// Health endpoint (no auth)
|
|
fuego.Get(s, "/health", h.Health)
|
|
|
|
// API group with authentication
|
|
api := fuego.Group(s, "/api/v1")
|
|
|
|
// Token generation (requires read token)
|
|
fuego.Post(api, "/token", h.GenerateToken, fuego.OptionMiddleware(h.requireReadToken))
|
|
|
|
// Metrics endpoints
|
|
fuego.Post(api, "/metrics", h.ReceiveMetrics) // Uses push token validated in handler
|
|
fuego.Get(api, "/metrics/repo/{org}/{repo}/{workflow}/{job}", h.GetMetricsByWorkflowJob, fuego.OptionMiddleware(h.requireReadToken))
|
|
|
|
// Sizing endpoint
|
|
fuego.Get(api, "/sizing/repo/{org}/{repo}/{workflow}/{job}", h.GetSizing, fuego.OptionMiddleware(h.requireReadToken))
|
|
}
|
|
|
|
// requireReadToken is middleware that validates the read token
|
|
func (h *Handler) requireReadToken(next http.Handler) http.Handler {
|
|
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
if h.readToken == "" {
|
|
h.logger.Warn("no read-token configured, rejecting request", slog.String("path", r.URL.Path))
|
|
http.Error(w, "authorization required", http.StatusUnauthorized)
|
|
return
|
|
}
|
|
|
|
authHeader := r.Header.Get("Authorization")
|
|
if authHeader == "" {
|
|
h.logger.Warn("missing authorization header", slog.String("path", r.URL.Path))
|
|
http.Error(w, "authorization required", http.StatusUnauthorized)
|
|
return
|
|
}
|
|
|
|
const bearerPrefix = "Bearer "
|
|
if !strings.HasPrefix(authHeader, bearerPrefix) {
|
|
h.logger.Warn("invalid authorization format", slog.String("path", r.URL.Path))
|
|
http.Error(w, "invalid authorization format", http.StatusUnauthorized)
|
|
return
|
|
}
|
|
|
|
token := strings.TrimPrefix(authHeader, bearerPrefix)
|
|
if subtle.ConstantTimeCompare([]byte(token), []byte(h.readToken)) != 1 {
|
|
h.logger.Warn("invalid token", slog.String("path", r.URL.Path))
|
|
http.Error(w, "invalid token", http.StatusUnauthorized)
|
|
return
|
|
}
|
|
|
|
next.ServeHTTP(w, r)
|
|
})
|
|
}
|
|
|
|
// validatePushToken checks push authentication via scoped HMAC token
|
|
func (h *Handler) validatePushToken(r *http.Request, exec ExecutionContext) error {
|
|
if h.hmacKey == "" {
|
|
h.logger.Warn("no HMAC key configured, rejecting push", slog.String("path", r.URL.Path))
|
|
return ErrUnauthorized
|
|
}
|
|
|
|
authHeader := r.Header.Get("Authorization")
|
|
if authHeader == "" {
|
|
h.logger.Warn("missing push authorization", slog.String("path", r.URL.Path))
|
|
return ErrUnauthorized
|
|
}
|
|
|
|
const bearerPrefix = "Bearer "
|
|
if !strings.HasPrefix(authHeader, bearerPrefix) {
|
|
h.logger.Warn("invalid push authorization format", slog.String("path", r.URL.Path))
|
|
return ErrInvalidFormat
|
|
}
|
|
|
|
token := strings.TrimPrefix(authHeader, bearerPrefix)
|
|
if !ValidateToken(h.hmacKey, token, exec.Organization, exec.Repository, exec.Workflow, exec.Job, h.tokenTTL) {
|
|
h.logger.Warn("invalid push token", slog.String("path", r.URL.Path))
|
|
return ErrInvalidToken
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Health returns the service health status
|
|
func (h *Handler) Health(c fuego.ContextNoBody) (HealthResponse, error) {
|
|
return HealthResponse{Status: "ok"}, nil
|
|
}
|
|
|
|
// GenerateToken generates a scoped HMAC push token for a workflow/job
|
|
func (h *Handler) GenerateToken(c fuego.ContextWithBody[TokenRequest]) (TokenResponse, error) {
|
|
if h.hmacKey == "" {
|
|
return TokenResponse{}, fuego.BadRequestError{Detail: ErrMissingHMACKey.Error()}
|
|
}
|
|
|
|
req, err := c.Body()
|
|
if err != nil {
|
|
return TokenResponse{}, fuego.BadRequestError{Detail: "invalid JSON body"}
|
|
}
|
|
|
|
if req.Organization == "" || req.Repository == "" || req.Workflow == "" || req.Job == "" {
|
|
return TokenResponse{}, fuego.BadRequestError{Detail: ErrMissingFields.Error()}
|
|
}
|
|
|
|
token := GenerateToken(h.hmacKey, req.Organization, req.Repository, req.Workflow, req.Job)
|
|
return TokenResponse{Token: token}, nil
|
|
}
|
|
|
|
// ReceiveMetrics receives and stores metrics from a collector
|
|
func (h *Handler) ReceiveMetrics(c fuego.ContextNoBody) (MetricCreatedResponse, error) {
|
|
var payload MetricsPayload
|
|
if err := json.NewDecoder(c.Request().Body).Decode(&payload); err != nil {
|
|
h.logger.Error("failed to decode payload", slog.String("error", err.Error()))
|
|
return MetricCreatedResponse{}, fuego.BadRequestError{Detail: "invalid JSON payload"}
|
|
}
|
|
|
|
if payload.Execution.RunID == "" {
|
|
return MetricCreatedResponse{}, fuego.BadRequestError{Detail: ErrMissingRunID.Error()}
|
|
}
|
|
|
|
// Validate push token
|
|
if err := h.validatePushToken(c.Request(), payload.Execution); err != nil {
|
|
return MetricCreatedResponse{}, fuego.UnauthorizedError{Detail: err.Error()}
|
|
}
|
|
|
|
id, err := h.store.SaveMetric(&payload)
|
|
if err != nil {
|
|
h.logger.Error("failed to save metric", slog.String("error", err.Error()))
|
|
return MetricCreatedResponse{}, fuego.InternalServerError{Detail: "failed to save metric"}
|
|
}
|
|
|
|
h.logger.Info("metric saved",
|
|
slog.Uint64("id", uint64(id)),
|
|
slog.String("run_id", payload.Execution.RunID),
|
|
slog.String("repository", payload.Execution.Repository),
|
|
)
|
|
|
|
c.SetStatus(http.StatusCreated)
|
|
return MetricCreatedResponse{ID: id, Status: "created"}, nil
|
|
}
|
|
|
|
// GetMetricsByWorkflowJob retrieves all metrics for a specific workflow/job
|
|
func (h *Handler) GetMetricsByWorkflowJob(c fuego.ContextNoBody) ([]MetricResponse, error) {
|
|
org := c.PathParam("org")
|
|
repo := c.PathParam("repo")
|
|
workflow := c.PathParam("workflow")
|
|
job := c.PathParam("job")
|
|
|
|
if org == "" || repo == "" || workflow == "" || job == "" {
|
|
return nil, fuego.BadRequestError{Detail: ErrInvalidParams.Error()}
|
|
}
|
|
|
|
metrics, err := h.store.GetMetricsByWorkflowJob(org, repo, workflow, job)
|
|
if err != nil {
|
|
h.logger.Error("failed to get metrics", slog.String("error", err.Error()))
|
|
return nil, fuego.InternalServerError{Detail: "failed to get metrics"}
|
|
}
|
|
|
|
// Convert to response type with Payload as JSON object
|
|
response := make([]MetricResponse, len(metrics))
|
|
for i, m := range metrics {
|
|
response[i] = m.ToResponse()
|
|
}
|
|
|
|
return response, nil
|
|
}
|
|
|
|
// GetSizing computes Kubernetes resource sizing recommendations
|
|
func (h *Handler) GetSizing(c fuego.ContextNoBody) (SizingResponse, error) {
|
|
org := c.PathParam("org")
|
|
repo := c.PathParam("repo")
|
|
workflow := c.PathParam("workflow")
|
|
job := c.PathParam("job")
|
|
|
|
if org == "" || repo == "" || workflow == "" || job == "" {
|
|
return SizingResponse{}, fuego.BadRequestError{Detail: ErrInvalidParams.Error()}
|
|
}
|
|
|
|
// Parse query parameters with defaults
|
|
runs := parseIntQueryParamFromContext(c, "runs", 5, 1, 100)
|
|
buffer := parseIntQueryParamFromContext(c, "buffer", 20, 0, 100)
|
|
cpuPercentile := c.QueryParam("cpu_percentile")
|
|
if cpuPercentile == "" {
|
|
cpuPercentile = "p95"
|
|
}
|
|
if !IsValidPercentile(cpuPercentile) {
|
|
return SizingResponse{}, fuego.BadRequestError{Detail: ErrInvalidPercent.Error()}
|
|
}
|
|
|
|
metrics, err := h.store.GetRecentMetricsByWorkflowJob(org, repo, workflow, job, runs)
|
|
if err != nil {
|
|
h.logger.Error("failed to get metrics", slog.String("error", err.Error()))
|
|
return SizingResponse{}, fuego.InternalServerError{Detail: "failed to get metrics"}
|
|
}
|
|
|
|
if len(metrics) == 0 {
|
|
return SizingResponse{}, fuego.NotFoundError{Detail: ErrNoMetrics.Error()}
|
|
}
|
|
|
|
response, err := computeSizing(metrics, buffer, cpuPercentile)
|
|
if err != nil {
|
|
h.logger.Error("failed to compute sizing", slog.String("error", err.Error()))
|
|
return SizingResponse{}, fuego.InternalServerError{Detail: "failed to compute sizing"}
|
|
}
|
|
|
|
return *response, nil
|
|
}
|
|
|
|
// parseIntQueryParamFromContext parses an integer query parameter with default, min, and max values
|
|
func parseIntQueryParamFromContext(c fuego.ContextNoBody, name string, defaultVal, minVal, maxVal int) int {
|
|
strVal := c.QueryParam(name)
|
|
if strVal == "" {
|
|
return defaultVal
|
|
}
|
|
var val int
|
|
if _, err := fmt.Sscanf(strVal, "%d", &val); err != nil {
|
|
return defaultVal
|
|
}
|
|
if val < minVal {
|
|
return minVal
|
|
}
|
|
if val > maxVal {
|
|
return maxVal
|
|
}
|
|
return val
|
|
}
|