feat(receiver): add HTTP metrics receiver with SQLite storage
All checks were successful
ci / build (push) Successful in 2m33s

Add a new receiver application under cmd/receiver that accepts metrics
via HTTP POST and stores them in SQLite using GORM. The receiver expects
GitHub Actions style execution context (org, repo, workflow, job, run_id).

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Manuel Ganter 2026-02-06 11:40:03 +01:00
parent c5c872a373
commit c309bd810d
No known key found for this signature in database
8 changed files with 786 additions and 0 deletions

View file

@ -0,0 +1,101 @@
// ABOUTME: HTTP handlers for the metrics receiver service.
// ABOUTME: Provides endpoints for receiving and querying metrics.
package receiver
import (
"encoding/json"
"log/slog"
"net/http"
)
// Handler handles HTTP requests for the metrics receiver
type Handler struct {
store *Store
logger *slog.Logger
}
// NewHandler creates a new HTTP handler with the given store
func NewHandler(store *Store, logger *slog.Logger) *Handler {
return &Handler{store: store, logger: logger}
}
// RegisterRoutes registers all HTTP routes on the given mux
func (h *Handler) RegisterRoutes(mux *http.ServeMux) {
mux.HandleFunc("POST /api/v1/metrics", h.handleReceiveMetrics)
mux.HandleFunc("GET /api/v1/metrics/run/{runID}", h.handleGetByRunID)
mux.HandleFunc("GET /api/v1/metrics/repo/{org}/{repo}", h.handleGetByRepository)
mux.HandleFunc("GET /health", h.handleHealth)
}
func (h *Handler) handleReceiveMetrics(w http.ResponseWriter, r *http.Request) {
var payload MetricsPayload
if err := json.NewDecoder(r.Body).Decode(&payload); err != nil {
h.logger.Error("failed to decode payload", slog.String("error", err.Error()))
http.Error(w, "invalid JSON payload", http.StatusBadRequest)
return
}
if payload.Execution.RunID == "" {
http.Error(w, "run_id is required", http.StatusBadRequest)
return
}
id, err := h.store.SaveMetric(&payload)
if err != nil {
h.logger.Error("failed to save metric", slog.String("error", err.Error()))
http.Error(w, "failed to save metric", http.StatusInternalServerError)
return
}
h.logger.Info("metric saved",
slog.Uint64("id", uint64(id)),
slog.String("run_id", payload.Execution.RunID),
slog.String("repository", payload.Execution.Repository),
)
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusCreated)
_ = json.NewEncoder(w).Encode(map[string]any{"id": id, "status": "created"})
}
func (h *Handler) handleGetByRunID(w http.ResponseWriter, r *http.Request) {
runID := r.PathValue("runID")
if runID == "" {
http.Error(w, "run_id is required", http.StatusBadRequest)
return
}
metrics, err := h.store.GetMetricsByRunID(runID)
if err != nil {
h.logger.Error("failed to get metrics", slog.String("error", err.Error()))
http.Error(w, "failed to get metrics", http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(metrics)
}
func (h *Handler) handleGetByRepository(w http.ResponseWriter, r *http.Request) {
org := r.PathValue("org")
repo := r.PathValue("repo")
if org == "" || repo == "" {
http.Error(w, "org and repo are required", http.StatusBadRequest)
return
}
metrics, err := h.store.GetMetricsByRepository(org, repo)
if err != nil {
h.logger.Error("failed to get metrics", slog.String("error", err.Error()))
http.Error(w, "failed to get metrics", http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(metrics)
}
func (h *Handler) handleHealth(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(map[string]string{"status": "ok"})
}

View file

@ -0,0 +1,239 @@
package receiver
import (
"bytes"
"encoding/json"
"io"
"log/slog"
"net/http"
"net/http/httptest"
"path/filepath"
"testing"
"edp.buildth.ing/DevFW-CICD/forgejo-runner-resource-collector/internal/summary"
)
func TestHandler_ReceiveMetrics(t *testing.T) {
h, cleanup := newTestHandler(t)
defer cleanup()
payload := MetricsPayload{
Execution: ExecutionContext{
Organization: "test-org",
Repository: "test-repo",
Workflow: "ci.yml",
Job: "build",
RunID: "run-123",
},
Summary: summary.RunSummary{
DurationSeconds: 60.0,
SampleCount: 12,
},
}
body, _ := json.Marshal(payload)
req := httptest.NewRequest(http.MethodPost, "/api/v1/metrics", bytes.NewReader(body))
req.Header.Set("Content-Type", "application/json")
rec := httptest.NewRecorder()
mux := http.NewServeMux()
h.RegisterRoutes(mux)
mux.ServeHTTP(rec, req)
if rec.Code != http.StatusCreated {
t.Errorf("status = %d, want %d", rec.Code, http.StatusCreated)
}
var resp map[string]any
if err := json.NewDecoder(rec.Body).Decode(&resp); err != nil {
t.Fatalf("failed to decode response: %v", err)
}
if resp["status"] != "created" {
t.Errorf("response status = %v, want %q", resp["status"], "created")
}
if resp["id"] == nil || resp["id"].(float64) == 0 {
t.Error("response id is missing or zero")
}
}
func TestHandler_ReceiveMetrics_InvalidJSON(t *testing.T) {
h, cleanup := newTestHandler(t)
defer cleanup()
req := httptest.NewRequest(http.MethodPost, "/api/v1/metrics", bytes.NewReader([]byte("not json")))
rec := httptest.NewRecorder()
mux := http.NewServeMux()
h.RegisterRoutes(mux)
mux.ServeHTTP(rec, req)
if rec.Code != http.StatusBadRequest {
t.Errorf("status = %d, want %d", rec.Code, http.StatusBadRequest)
}
}
func TestHandler_ReceiveMetrics_MissingRunID(t *testing.T) {
h, cleanup := newTestHandler(t)
defer cleanup()
payload := MetricsPayload{
Execution: ExecutionContext{
Organization: "test-org",
Repository: "test-repo",
// RunID is missing
},
Summary: summary.RunSummary{},
}
body, _ := json.Marshal(payload)
req := httptest.NewRequest(http.MethodPost, "/api/v1/metrics", bytes.NewReader(body))
rec := httptest.NewRecorder()
mux := http.NewServeMux()
h.RegisterRoutes(mux)
mux.ServeHTTP(rec, req)
if rec.Code != http.StatusBadRequest {
t.Errorf("status = %d, want %d", rec.Code, http.StatusBadRequest)
}
}
func TestHandler_GetByRunID(t *testing.T) {
h, cleanup := newTestHandler(t)
defer cleanup()
// First, save a metric
payload := &MetricsPayload{
Execution: ExecutionContext{
Organization: "test-org",
Repository: "test-repo",
Workflow: "ci.yml",
Job: "build",
RunID: "run-get-test",
},
Summary: summary.RunSummary{SampleCount: 5},
}
if _, err := h.store.SaveMetric(payload); err != nil {
t.Fatalf("SaveMetric() error = %v", err)
}
req := httptest.NewRequest(http.MethodGet, "/api/v1/metrics/run/run-get-test", nil)
rec := httptest.NewRecorder()
mux := http.NewServeMux()
h.RegisterRoutes(mux)
mux.ServeHTTP(rec, req)
if rec.Code != http.StatusOK {
t.Errorf("status = %d, want %d", rec.Code, http.StatusOK)
}
var metrics []Metric
if err := json.NewDecoder(rec.Body).Decode(&metrics); err != nil {
t.Fatalf("failed to decode response: %v", err)
}
if len(metrics) != 1 {
t.Errorf("got %d metrics, want 1", len(metrics))
}
if metrics[0].RunID != "run-get-test" {
t.Errorf("RunID = %q, want %q", metrics[0].RunID, "run-get-test")
}
}
func TestHandler_GetByRunID_NotFound(t *testing.T) {
h, cleanup := newTestHandler(t)
defer cleanup()
req := httptest.NewRequest(http.MethodGet, "/api/v1/metrics/run/nonexistent", nil)
rec := httptest.NewRecorder()
mux := http.NewServeMux()
h.RegisterRoutes(mux)
mux.ServeHTTP(rec, req)
if rec.Code != http.StatusOK {
t.Errorf("status = %d, want %d", rec.Code, http.StatusOK)
}
var metrics []Metric
if err := json.NewDecoder(rec.Body).Decode(&metrics); err != nil {
t.Fatalf("failed to decode response: %v", err)
}
if len(metrics) != 0 {
t.Errorf("got %d metrics, want 0", len(metrics))
}
}
func TestHandler_GetByRepository(t *testing.T) {
h, cleanup := newTestHandler(t)
defer cleanup()
// Save metrics for different repos
payloads := []*MetricsPayload{
{Execution: ExecutionContext{Organization: "org-x", Repository: "repo-y", RunID: "r1"}},
{Execution: ExecutionContext{Organization: "org-x", Repository: "repo-y", RunID: "r2"}},
{Execution: ExecutionContext{Organization: "org-x", Repository: "repo-z", RunID: "r3"}},
}
for _, p := range payloads {
if _, err := h.store.SaveMetric(p); err != nil {
t.Fatalf("SaveMetric() error = %v", err)
}
}
req := httptest.NewRequest(http.MethodGet, "/api/v1/metrics/repo/org-x/repo-y", nil)
rec := httptest.NewRecorder()
mux := http.NewServeMux()
h.RegisterRoutes(mux)
mux.ServeHTTP(rec, req)
if rec.Code != http.StatusOK {
t.Errorf("status = %d, want %d", rec.Code, http.StatusOK)
}
var metrics []Metric
if err := json.NewDecoder(rec.Body).Decode(&metrics); err != nil {
t.Fatalf("failed to decode response: %v", err)
}
if len(metrics) != 2 {
t.Errorf("got %d metrics, want 2", len(metrics))
}
}
func TestHandler_Health(t *testing.T) {
h, cleanup := newTestHandler(t)
defer cleanup()
req := httptest.NewRequest(http.MethodGet, "/health", nil)
rec := httptest.NewRecorder()
mux := http.NewServeMux()
h.RegisterRoutes(mux)
mux.ServeHTTP(rec, req)
if rec.Code != http.StatusOK {
t.Errorf("status = %d, want %d", rec.Code, http.StatusOK)
}
var resp map[string]string
if err := json.NewDecoder(rec.Body).Decode(&resp); err != nil {
t.Fatalf("failed to decode response: %v", err)
}
if resp["status"] != "ok" {
t.Errorf("status = %q, want %q", resp["status"], "ok")
}
}
func newTestHandler(t *testing.T) (*Handler, func()) {
t.Helper()
dbPath := filepath.Join(t.TempDir(), "test.db")
store, err := NewStore(dbPath)
if err != nil {
t.Fatalf("NewStore() error = %v", err)
}
logger := slog.New(slog.NewTextHandler(io.Discard, nil))
handler := NewHandler(store, logger)
return handler, func() { _ = store.Close() }
}

View file

@ -0,0 +1,94 @@
// ABOUTME: SQLite storage layer for metrics receiver using GORM.
// ABOUTME: Handles database initialization and metric storage/retrieval.
package receiver
import (
"encoding/json"
"fmt"
"time"
"gorm.io/driver/sqlite"
"gorm.io/gorm"
"gorm.io/gorm/logger"
)
// Metric represents a stored metric record in the database
type Metric struct {
ID uint `gorm:"primaryKey"`
Organization string `gorm:"index:idx_org_repo;not null"`
Repository string `gorm:"index:idx_org_repo;not null"`
Workflow string `gorm:"not null"`
Job string `gorm:"not null"`
RunID string `gorm:"index;not null"`
ReceivedAt time.Time `gorm:"index;not null"`
Payload string `gorm:"type:text;not null"` // JSON-encoded RunSummary
}
// Store handles SQLite storage for metrics using GORM
type Store struct {
db *gorm.DB
}
// NewStore creates a new SQLite store at the given path
func NewStore(dbPath string) (*Store, error) {
db, err := gorm.Open(sqlite.Open(dbPath), &gorm.Config{
Logger: logger.Default.LogMode(logger.Silent),
})
if err != nil {
return nil, fmt.Errorf("opening database: %w", err)
}
if err := db.AutoMigrate(&Metric{}); err != nil {
return nil, fmt.Errorf("migrating schema: %w", err)
}
return &Store{db: db}, nil
}
// SaveMetric stores a metrics payload in the database
func (s *Store) SaveMetric(payload *MetricsPayload) (uint, error) {
summaryJSON, err := json.Marshal(payload.Summary)
if err != nil {
return 0, fmt.Errorf("marshaling summary: %w", err)
}
metric := Metric{
Organization: payload.Execution.Organization,
Repository: payload.Execution.Repository,
Workflow: payload.Execution.Workflow,
Job: payload.Execution.Job,
RunID: payload.Execution.RunID,
ReceivedAt: time.Now().UTC(),
Payload: string(summaryJSON),
}
result := s.db.Create(&metric)
if result.Error != nil {
return 0, fmt.Errorf("inserting metric: %w", result.Error)
}
return metric.ID, nil
}
// GetMetricsByRunID retrieves all metrics for a specific run
func (s *Store) GetMetricsByRunID(runID string) ([]Metric, error) {
var metrics []Metric
result := s.db.Where("run_id = ?", runID).Order("received_at DESC").Find(&metrics)
return metrics, result.Error
}
// GetMetricsByRepository retrieves all metrics for a specific repository
func (s *Store) GetMetricsByRepository(org, repo string) ([]Metric, error) {
var metrics []Metric
result := s.db.Where("organization = ? AND repository = ?", org, repo).Order("received_at DESC").Find(&metrics)
return metrics, result.Error
}
// Close closes the database connection
func (s *Store) Close() error {
sqlDB, err := s.db.DB()
if err != nil {
return err
}
return sqlDB.Close()
}

View file

@ -0,0 +1,219 @@
package receiver
import (
"os"
"path/filepath"
"testing"
"time"
"edp.buildth.ing/DevFW-CICD/forgejo-runner-resource-collector/internal/summary"
)
func TestNewStore(t *testing.T) {
dbPath := filepath.Join(t.TempDir(), "test.db")
store, err := NewStore(dbPath)
if err != nil {
t.Fatalf("NewStore() error = %v", err)
}
defer func() { _ = store.Close() }()
if _, err := os.Stat(dbPath); os.IsNotExist(err) {
t.Error("database file was not created")
}
}
func TestStore_SaveMetric(t *testing.T) {
store := newTestStore(t)
defer func() { _ = store.Close() }()
payload := &MetricsPayload{
Execution: ExecutionContext{
Organization: "test-org",
Repository: "test-repo",
Workflow: "ci.yml",
Job: "build",
RunID: "run-123",
},
Summary: summary.RunSummary{
StartTime: time.Now().Add(-time.Minute),
EndTime: time.Now(),
DurationSeconds: 60.0,
SampleCount: 12,
CPUTotal: summary.StatSummary{Peak: 80.5, Avg: 45.2, P95: 75.0},
MemUsedBytes: summary.StatSummary{Peak: 1024000, Avg: 512000, P95: 900000},
MemUsedPercent: summary.StatSummary{Peak: 50.0, Avg: 25.0, P95: 45.0},
},
}
id, err := store.SaveMetric(payload)
if err != nil {
t.Fatalf("SaveMetric() error = %v", err)
}
if id == 0 {
t.Error("SaveMetric() returned id = 0, want non-zero")
}
}
func TestStore_GetMetricsByRunID(t *testing.T) {
store := newTestStore(t)
defer func() { _ = store.Close() }()
// Save two metrics with same run ID
for i := 0; i < 2; i++ {
payload := &MetricsPayload{
Execution: ExecutionContext{
Organization: "test-org",
Repository: "test-repo",
Workflow: "ci.yml",
Job: "build",
RunID: "run-456",
},
Summary: summary.RunSummary{SampleCount: i + 1},
}
if _, err := store.SaveMetric(payload); err != nil {
t.Fatalf("SaveMetric() error = %v", err)
}
}
// Save one with different run ID
otherPayload := &MetricsPayload{
Execution: ExecutionContext{RunID: "run-789"},
Summary: summary.RunSummary{},
}
if _, err := store.SaveMetric(otherPayload); err != nil {
t.Fatalf("SaveMetric() error = %v", err)
}
metrics, err := store.GetMetricsByRunID("run-456")
if err != nil {
t.Fatalf("GetMetricsByRunID() error = %v", err)
}
if len(metrics) != 2 {
t.Errorf("GetMetricsByRunID() returned %d metrics, want 2", len(metrics))
}
for _, m := range metrics {
if m.RunID != "run-456" {
t.Errorf("GetMetricsByRunID() returned metric with RunID = %q, want %q", m.RunID, "run-456")
}
}
}
func TestStore_GetMetricsByRunID_NotFound(t *testing.T) {
store := newTestStore(t)
defer func() { _ = store.Close() }()
metrics, err := store.GetMetricsByRunID("nonexistent")
if err != nil {
t.Fatalf("GetMetricsByRunID() error = %v", err)
}
if len(metrics) != 0 {
t.Errorf("GetMetricsByRunID() returned %d metrics, want 0", len(metrics))
}
}
func TestStore_GetMetricsByRepository(t *testing.T) {
store := newTestStore(t)
defer func() { _ = store.Close() }()
// Save metrics for different repos
repos := []struct {
org string
repo string
}{
{"org-a", "repo-1"},
{"org-a", "repo-1"},
{"org-a", "repo-2"},
{"org-b", "repo-1"},
}
for i, r := range repos {
payload := &MetricsPayload{
Execution: ExecutionContext{
Organization: r.org,
Repository: r.repo,
RunID: "run-" + string(rune('a'+i)),
},
Summary: summary.RunSummary{},
}
if _, err := store.SaveMetric(payload); err != nil {
t.Fatalf("SaveMetric() error = %v", err)
}
}
metrics, err := store.GetMetricsByRepository("org-a", "repo-1")
if err != nil {
t.Fatalf("GetMetricsByRepository() error = %v", err)
}
if len(metrics) != 2 {
t.Errorf("GetMetricsByRepository() returned %d metrics, want 2", len(metrics))
}
for _, m := range metrics {
if m.Organization != "org-a" || m.Repository != "repo-1" {
t.Errorf("GetMetricsByRepository() returned metric with org=%q repo=%q, want org-a/repo-1",
m.Organization, m.Repository)
}
}
}
func TestStore_SaveMetric_PreservesPayload(t *testing.T) {
store := newTestStore(t)
defer func() { _ = store.Close() }()
original := &MetricsPayload{
Execution: ExecutionContext{
Organization: "test-org",
Repository: "test-repo",
Workflow: "build.yml",
Job: "test",
RunID: "run-preserve",
},
Summary: summary.RunSummary{
DurationSeconds: 123.45,
SampleCount: 50,
CPUTotal: summary.StatSummary{Peak: 99.9, Avg: 55.5, P95: 88.8},
},
}
_, err := store.SaveMetric(original)
if err != nil {
t.Fatalf("SaveMetric() error = %v", err)
}
metrics, err := store.GetMetricsByRunID("run-preserve")
if err != nil {
t.Fatalf("GetMetricsByRunID() error = %v", err)
}
if len(metrics) != 1 {
t.Fatalf("GetMetricsByRunID() returned %d metrics, want 1", len(metrics))
}
m := metrics[0]
if m.Organization != original.Execution.Organization {
t.Errorf("Organization = %q, want %q", m.Organization, original.Execution.Organization)
}
if m.Repository != original.Execution.Repository {
t.Errorf("Repository = %q, want %q", m.Repository, original.Execution.Repository)
}
if m.Workflow != original.Execution.Workflow {
t.Errorf("Workflow = %q, want %q", m.Workflow, original.Execution.Workflow)
}
if m.Job != original.Execution.Job {
t.Errorf("Job = %q, want %q", m.Job, original.Execution.Job)
}
if m.Payload == "" {
t.Error("Payload is empty")
}
}
func newTestStore(t *testing.T) *Store {
t.Helper()
dbPath := filepath.Join(t.TempDir(), "test.db")
store, err := NewStore(dbPath)
if err != nil {
t.Fatalf("NewStore() error = %v", err)
}
return store
}

View file

@ -0,0 +1,32 @@
// ABOUTME: Data types for the metrics receiver service.
// ABOUTME: Defines MetricsPayload combining execution metadata with run summary.
package receiver
import "edp.buildth.ing/DevFW-CICD/forgejo-runner-resource-collector/internal/summary"
// ExecutionContext holds GitHub Actions style identifiers for a workflow run
type ExecutionContext struct {
Organization string `json:"organization"`
Repository string `json:"repository"`
Workflow string `json:"workflow"`
Job string `json:"job"`
RunID string `json:"run_id"`
}
// MetricsPayload is the complete payload sent to the receiver
type MetricsPayload struct {
Execution ExecutionContext `json:"execution"`
Summary summary.RunSummary `json:"run_summary"`
}
// StoredMetric represents a metric record as stored in the database
type StoredMetric struct {
ID int64
Organization string
Repository string
Workflow string
Job string
RunID string
ReceivedAt string
Payload string // JSON-encoded RunSummary
}