196 lines
6.5 KiB
Go
196 lines
6.5 KiB
Go
// ABOUTME: HTTP handlers for the metrics receiver service.
|
|
// ABOUTME: Provides endpoints for receiving and querying metrics.
|
|
package receiver
|
|
|
|
import (
|
|
"crypto/subtle"
|
|
"encoding/json"
|
|
"log/slog"
|
|
"net/http"
|
|
"strings"
|
|
)
|
|
|
|
// Handler handles HTTP requests for the metrics receiver
|
|
type Handler struct {
|
|
store *Store
|
|
logger *slog.Logger
|
|
readToken string // Pre-shared token for read endpoint authentication
|
|
hmacKey string // Separate key for HMAC-based push token generation/validation
|
|
}
|
|
|
|
// NewHandler creates a new HTTP handler with the given store.
|
|
// readToken authenticates read endpoints and the token generation endpoint.
|
|
// hmacKey is the secret used to derive scoped push tokens.
|
|
func NewHandler(store *Store, logger *slog.Logger, readToken, hmacKey string) *Handler {
|
|
return &Handler{store: store, logger: logger, readToken: readToken, hmacKey: hmacKey}
|
|
}
|
|
|
|
// RegisterRoutes registers all HTTP routes on the given mux
|
|
func (h *Handler) RegisterRoutes(mux *http.ServeMux) {
|
|
mux.HandleFunc("POST /api/v1/metrics", h.handleReceiveMetrics)
|
|
mux.HandleFunc("POST /api/v1/token", h.handleGenerateToken)
|
|
mux.HandleFunc("GET /api/v1/metrics/repo/{org}/{repo}/{workflow}/{job}", h.handleGetByWorkflowJob)
|
|
mux.HandleFunc("GET /health", h.handleHealth)
|
|
}
|
|
|
|
// validateReadToken checks the Authorization header for a valid Bearer token.
|
|
func (h *Handler) validateReadToken(w http.ResponseWriter, r *http.Request) bool {
|
|
if h.readToken == "" {
|
|
h.logger.Warn("no read-token configured, rejecting request", slog.String("path", r.URL.Path))
|
|
http.Error(w, "authorization required", http.StatusUnauthorized)
|
|
return false
|
|
}
|
|
|
|
authHeader := r.Header.Get("Authorization")
|
|
if authHeader == "" {
|
|
h.logger.Warn("missing authorization header", slog.String("path", r.URL.Path))
|
|
http.Error(w, "authorization required", http.StatusUnauthorized)
|
|
return false
|
|
}
|
|
|
|
const bearerPrefix = "Bearer "
|
|
if !strings.HasPrefix(authHeader, bearerPrefix) {
|
|
h.logger.Warn("invalid authorization format", slog.String("path", r.URL.Path))
|
|
http.Error(w, "invalid authorization format", http.StatusUnauthorized)
|
|
return false
|
|
}
|
|
|
|
token := strings.TrimPrefix(authHeader, bearerPrefix)
|
|
if subtle.ConstantTimeCompare([]byte(token), []byte(h.readToken)) != 1 {
|
|
h.logger.Warn("invalid token", slog.String("path", r.URL.Path))
|
|
http.Error(w, "invalid token", http.StatusUnauthorized)
|
|
return false
|
|
}
|
|
|
|
return true
|
|
}
|
|
|
|
func (h *Handler) handleGenerateToken(w http.ResponseWriter, r *http.Request) {
|
|
if h.hmacKey == "" {
|
|
http.Error(w, "token generation requires a configured HMAC key", http.StatusBadRequest)
|
|
return
|
|
}
|
|
|
|
if !h.validateReadToken(w, r) {
|
|
return
|
|
}
|
|
|
|
var req TokenRequest
|
|
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
|
http.Error(w, "invalid JSON body", http.StatusBadRequest)
|
|
return
|
|
}
|
|
|
|
if req.Organization == "" || req.Repository == "" || req.Workflow == "" || req.Job == "" {
|
|
http.Error(w, "organization, repository, workflow, and job are required", http.StatusBadRequest)
|
|
return
|
|
}
|
|
|
|
token := GenerateScopedToken(h.hmacKey, req.Organization, req.Repository, req.Workflow, req.Job)
|
|
|
|
w.Header().Set("Content-Type", "application/json")
|
|
_ = json.NewEncoder(w).Encode(TokenResponse{Token: token})
|
|
}
|
|
|
|
// validatePushToken checks push authentication via scoped HMAC token.
|
|
func (h *Handler) validatePushToken(w http.ResponseWriter, r *http.Request, exec ExecutionContext) bool {
|
|
if h.hmacKey == "" {
|
|
h.logger.Warn("no HMAC key configured, rejecting push", slog.String("path", r.URL.Path))
|
|
http.Error(w, "authorization required", http.StatusUnauthorized)
|
|
return false
|
|
}
|
|
|
|
authHeader := r.Header.Get("Authorization")
|
|
if authHeader == "" {
|
|
h.logger.Warn("missing push authorization", slog.String("path", r.URL.Path))
|
|
http.Error(w, "authorization required", http.StatusUnauthorized)
|
|
return false
|
|
}
|
|
|
|
const bearerPrefix = "Bearer "
|
|
if !strings.HasPrefix(authHeader, bearerPrefix) {
|
|
h.logger.Warn("invalid push authorization format", slog.String("path", r.URL.Path))
|
|
http.Error(w, "invalid authorization format", http.StatusUnauthorized)
|
|
return false
|
|
}
|
|
|
|
token := strings.TrimPrefix(authHeader, bearerPrefix)
|
|
if !ValidateScopedToken(h.hmacKey, token, exec.Organization, exec.Repository, exec.Workflow, exec.Job) {
|
|
h.logger.Warn("invalid push token", slog.String("path", r.URL.Path))
|
|
http.Error(w, "invalid token", http.StatusUnauthorized)
|
|
return false
|
|
}
|
|
|
|
return true
|
|
}
|
|
|
|
func (h *Handler) handleReceiveMetrics(w http.ResponseWriter, r *http.Request) {
|
|
var payload MetricsPayload
|
|
if err := json.NewDecoder(r.Body).Decode(&payload); err != nil {
|
|
h.logger.Error("failed to decode payload", slog.String("error", err.Error()))
|
|
http.Error(w, "invalid JSON payload", http.StatusBadRequest)
|
|
return
|
|
}
|
|
|
|
if payload.Execution.RunID == "" {
|
|
http.Error(w, "run_id is required", http.StatusBadRequest)
|
|
return
|
|
}
|
|
|
|
if !h.validatePushToken(w, r, payload.Execution) {
|
|
return
|
|
}
|
|
|
|
id, err := h.store.SaveMetric(&payload)
|
|
if err != nil {
|
|
h.logger.Error("failed to save metric", slog.String("error", err.Error()))
|
|
http.Error(w, "failed to save metric", http.StatusInternalServerError)
|
|
return
|
|
}
|
|
|
|
h.logger.Info("metric saved",
|
|
slog.Uint64("id", uint64(id)),
|
|
slog.String("run_id", payload.Execution.RunID),
|
|
slog.String("repository", payload.Execution.Repository),
|
|
)
|
|
|
|
w.Header().Set("Content-Type", "application/json")
|
|
w.WriteHeader(http.StatusCreated)
|
|
_ = json.NewEncoder(w).Encode(map[string]any{"id": id, "status": "created"})
|
|
}
|
|
|
|
func (h *Handler) handleGetByWorkflowJob(w http.ResponseWriter, r *http.Request) {
|
|
if !h.validateReadToken(w, r) {
|
|
return
|
|
}
|
|
|
|
org := r.PathValue("org")
|
|
repo := r.PathValue("repo")
|
|
workflow := r.PathValue("workflow")
|
|
job := r.PathValue("job")
|
|
if org == "" || repo == "" || workflow == "" || job == "" {
|
|
http.Error(w, "org, repo, workflow and job are required", http.StatusBadRequest)
|
|
return
|
|
}
|
|
|
|
metrics, err := h.store.GetMetricsByWorkflowJob(org, repo, workflow, job)
|
|
if err != nil {
|
|
h.logger.Error("failed to get metrics", slog.String("error", err.Error()))
|
|
http.Error(w, "failed to get metrics", http.StatusInternalServerError)
|
|
return
|
|
}
|
|
|
|
// Convert to response type with Payload as JSON object
|
|
response := make([]MetricResponse, len(metrics))
|
|
for i, m := range metrics {
|
|
response[i] = m.ToResponse()
|
|
}
|
|
|
|
w.Header().Set("Content-Type", "application/json")
|
|
_ = json.NewEncoder(w).Encode(response)
|
|
}
|
|
|
|
func (h *Handler) handleHealth(w http.ResponseWriter, r *http.Request) {
|
|
w.Header().Set("Content-Type", "application/json")
|
|
_ = json.NewEncoder(w).Encode(map[string]string{"status": "ok"})
|
|
}
|