From c309bd810d04493c34290fedddc6e54ecbd1fa84 Mon Sep 17 00:00:00 2001 From: Manuel Ganter Date: Fri, 6 Feb 2026 11:40:03 +0100 Subject: [PATCH] feat(receiver): add HTTP metrics receiver with SQLite storage Add a new receiver application under cmd/receiver that accepts metrics via HTTP POST and stores them in SQLite using GORM. The receiver expects GitHub Actions style execution context (org, repo, workflow, job, run_id). Co-Authored-By: Claude Opus 4.5 --- cmd/receiver/main.go | 77 ++++++++++ go.mod | 12 ++ go.sum | 12 ++ internal/receiver/handler.go | 101 +++++++++++++ internal/receiver/handler_test.go | 239 ++++++++++++++++++++++++++++++ internal/receiver/store.go | 94 ++++++++++++ internal/receiver/store_test.go | 219 +++++++++++++++++++++++++++ internal/receiver/types.go | 32 ++++ 8 files changed, 786 insertions(+) create mode 100644 cmd/receiver/main.go create mode 100644 go.sum create mode 100644 internal/receiver/handler.go create mode 100644 internal/receiver/handler_test.go create mode 100644 internal/receiver/store.go create mode 100644 internal/receiver/store_test.go create mode 100644 internal/receiver/types.go diff --git a/cmd/receiver/main.go b/cmd/receiver/main.go new file mode 100644 index 0000000..a5fda16 --- /dev/null +++ b/cmd/receiver/main.go @@ -0,0 +1,77 @@ +package main + +import ( + "context" + "flag" + "fmt" + "log/slog" + "net/http" + "os" + "os/signal" + "syscall" + "time" + + "edp.buildth.ing/DevFW-CICD/forgejo-runner-resource-collector/internal/receiver" +) + +const ( + defaultAddr = ":8080" + defaultDBPath = "metrics.db" +) + +func main() { + addr := flag.String("addr", defaultAddr, "HTTP listen address") + dbPath := flag.String("db", defaultDBPath, "SQLite database path") + flag.Parse() + + logger := slog.New(slog.NewJSONHandler(os.Stderr, &slog.HandlerOptions{ + Level: slog.LevelInfo, + })) + + store, err := receiver.NewStore(*dbPath) + if err != nil { + logger.Error("failed to open database", slog.String("error", err.Error())) + os.Exit(1) + } + defer func() { _ = store.Close() }() + + handler := receiver.NewHandler(store, logger) + mux := http.NewServeMux() + handler.RegisterRoutes(mux) + + server := &http.Server{ + Addr: *addr, + Handler: mux, + ReadTimeout: 10 * time.Second, + WriteTimeout: 10 * time.Second, + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) + + go func() { + sig := <-sigChan + logger.Info("received signal, shutting down", slog.String("signal", sig.String())) + cancel() + + shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 5*time.Second) + defer shutdownCancel() + _ = server.Shutdown(shutdownCtx) + }() + + logger.Info("starting metrics receiver", + slog.String("addr", *addr), + slog.String("db", *dbPath), + ) + + if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed { + fmt.Fprintf(os.Stderr, "error: %v\n", err) + os.Exit(1) + } + + <-ctx.Done() + logger.Info("receiver stopped gracefully") +} diff --git a/go.mod b/go.mod index 3b4c218..7ef0cd0 100644 --- a/go.mod +++ b/go.mod @@ -1,3 +1,15 @@ module edp.buildth.ing/DevFW-CICD/forgejo-runner-resource-collector go 1.25.6 + +require ( + gorm.io/driver/sqlite v1.6.0 + gorm.io/gorm v1.31.1 +) + +require ( + github.com/jinzhu/inflection v1.0.0 // indirect + github.com/jinzhu/now v1.1.5 // indirect + github.com/mattn/go-sqlite3 v1.14.22 // indirect + golang.org/x/text v0.20.0 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..330dd09 --- /dev/null +++ b/go.sum @@ -0,0 +1,12 @@ +github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E= +github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc= +github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ= +github.com/jinzhu/now v1.1.5/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8= +github.com/mattn/go-sqlite3 v1.14.22 h1:2gZY6PC6kBnID23Tichd1K+Z0oS6nE/XwU+Vz/5o4kU= +github.com/mattn/go-sqlite3 v1.14.22/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y= +golang.org/x/text v0.20.0 h1:gK/Kv2otX8gz+wn7Rmb3vT96ZwuoxnQlY+HlJVj7Qug= +golang.org/x/text v0.20.0/go.mod h1:D4IsuqiFMhST5bX19pQ9ikHC2GsaKyk/oF+pn3ducp4= +gorm.io/driver/sqlite v1.6.0 h1:WHRRrIiulaPiPFmDcod6prc4l2VGVWHz80KspNsxSfQ= +gorm.io/driver/sqlite v1.6.0/go.mod h1:AO9V1qIQddBESngQUKWL9yoH93HIeA1X6V633rBwyT8= +gorm.io/gorm v1.31.1 h1:7CA8FTFz/gRfgqgpeKIBcervUn3xSyPUmr6B2WXJ7kg= +gorm.io/gorm v1.31.1/go.mod h1:XyQVbO2k6YkOis7C2437jSit3SsDK72s7n7rsSHd+Gs= diff --git a/internal/receiver/handler.go b/internal/receiver/handler.go new file mode 100644 index 0000000..ff4edd0 --- /dev/null +++ b/internal/receiver/handler.go @@ -0,0 +1,101 @@ +// ABOUTME: HTTP handlers for the metrics receiver service. +// ABOUTME: Provides endpoints for receiving and querying metrics. +package receiver + +import ( + "encoding/json" + "log/slog" + "net/http" +) + +// Handler handles HTTP requests for the metrics receiver +type Handler struct { + store *Store + logger *slog.Logger +} + +// NewHandler creates a new HTTP handler with the given store +func NewHandler(store *Store, logger *slog.Logger) *Handler { + return &Handler{store: store, logger: logger} +} + +// RegisterRoutes registers all HTTP routes on the given mux +func (h *Handler) RegisterRoutes(mux *http.ServeMux) { + mux.HandleFunc("POST /api/v1/metrics", h.handleReceiveMetrics) + mux.HandleFunc("GET /api/v1/metrics/run/{runID}", h.handleGetByRunID) + mux.HandleFunc("GET /api/v1/metrics/repo/{org}/{repo}", h.handleGetByRepository) + mux.HandleFunc("GET /health", h.handleHealth) +} + +func (h *Handler) handleReceiveMetrics(w http.ResponseWriter, r *http.Request) { + var payload MetricsPayload + if err := json.NewDecoder(r.Body).Decode(&payload); err != nil { + h.logger.Error("failed to decode payload", slog.String("error", err.Error())) + http.Error(w, "invalid JSON payload", http.StatusBadRequest) + return + } + + if payload.Execution.RunID == "" { + http.Error(w, "run_id is required", http.StatusBadRequest) + return + } + + id, err := h.store.SaveMetric(&payload) + if err != nil { + h.logger.Error("failed to save metric", slog.String("error", err.Error())) + http.Error(w, "failed to save metric", http.StatusInternalServerError) + return + } + + h.logger.Info("metric saved", + slog.Uint64("id", uint64(id)), + slog.String("run_id", payload.Execution.RunID), + slog.String("repository", payload.Execution.Repository), + ) + + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusCreated) + _ = json.NewEncoder(w).Encode(map[string]any{"id": id, "status": "created"}) +} + +func (h *Handler) handleGetByRunID(w http.ResponseWriter, r *http.Request) { + runID := r.PathValue("runID") + if runID == "" { + http.Error(w, "run_id is required", http.StatusBadRequest) + return + } + + metrics, err := h.store.GetMetricsByRunID(runID) + if err != nil { + h.logger.Error("failed to get metrics", slog.String("error", err.Error())) + http.Error(w, "failed to get metrics", http.StatusInternalServerError) + return + } + + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(metrics) +} + +func (h *Handler) handleGetByRepository(w http.ResponseWriter, r *http.Request) { + org := r.PathValue("org") + repo := r.PathValue("repo") + if org == "" || repo == "" { + http.Error(w, "org and repo are required", http.StatusBadRequest) + return + } + + metrics, err := h.store.GetMetricsByRepository(org, repo) + if err != nil { + h.logger.Error("failed to get metrics", slog.String("error", err.Error())) + http.Error(w, "failed to get metrics", http.StatusInternalServerError) + return + } + + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(metrics) +} + +func (h *Handler) handleHealth(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(map[string]string{"status": "ok"}) +} diff --git a/internal/receiver/handler_test.go b/internal/receiver/handler_test.go new file mode 100644 index 0000000..a1845d2 --- /dev/null +++ b/internal/receiver/handler_test.go @@ -0,0 +1,239 @@ +package receiver + +import ( + "bytes" + "encoding/json" + "io" + "log/slog" + "net/http" + "net/http/httptest" + "path/filepath" + "testing" + + "edp.buildth.ing/DevFW-CICD/forgejo-runner-resource-collector/internal/summary" +) + +func TestHandler_ReceiveMetrics(t *testing.T) { + h, cleanup := newTestHandler(t) + defer cleanup() + + payload := MetricsPayload{ + Execution: ExecutionContext{ + Organization: "test-org", + Repository: "test-repo", + Workflow: "ci.yml", + Job: "build", + RunID: "run-123", + }, + Summary: summary.RunSummary{ + DurationSeconds: 60.0, + SampleCount: 12, + }, + } + + body, _ := json.Marshal(payload) + req := httptest.NewRequest(http.MethodPost, "/api/v1/metrics", bytes.NewReader(body)) + req.Header.Set("Content-Type", "application/json") + rec := httptest.NewRecorder() + + mux := http.NewServeMux() + h.RegisterRoutes(mux) + mux.ServeHTTP(rec, req) + + if rec.Code != http.StatusCreated { + t.Errorf("status = %d, want %d", rec.Code, http.StatusCreated) + } + + var resp map[string]any + if err := json.NewDecoder(rec.Body).Decode(&resp); err != nil { + t.Fatalf("failed to decode response: %v", err) + } + if resp["status"] != "created" { + t.Errorf("response status = %v, want %q", resp["status"], "created") + } + if resp["id"] == nil || resp["id"].(float64) == 0 { + t.Error("response id is missing or zero") + } +} + +func TestHandler_ReceiveMetrics_InvalidJSON(t *testing.T) { + h, cleanup := newTestHandler(t) + defer cleanup() + + req := httptest.NewRequest(http.MethodPost, "/api/v1/metrics", bytes.NewReader([]byte("not json"))) + rec := httptest.NewRecorder() + + mux := http.NewServeMux() + h.RegisterRoutes(mux) + mux.ServeHTTP(rec, req) + + if rec.Code != http.StatusBadRequest { + t.Errorf("status = %d, want %d", rec.Code, http.StatusBadRequest) + } +} + +func TestHandler_ReceiveMetrics_MissingRunID(t *testing.T) { + h, cleanup := newTestHandler(t) + defer cleanup() + + payload := MetricsPayload{ + Execution: ExecutionContext{ + Organization: "test-org", + Repository: "test-repo", + // RunID is missing + }, + Summary: summary.RunSummary{}, + } + + body, _ := json.Marshal(payload) + req := httptest.NewRequest(http.MethodPost, "/api/v1/metrics", bytes.NewReader(body)) + rec := httptest.NewRecorder() + + mux := http.NewServeMux() + h.RegisterRoutes(mux) + mux.ServeHTTP(rec, req) + + if rec.Code != http.StatusBadRequest { + t.Errorf("status = %d, want %d", rec.Code, http.StatusBadRequest) + } +} + +func TestHandler_GetByRunID(t *testing.T) { + h, cleanup := newTestHandler(t) + defer cleanup() + + // First, save a metric + payload := &MetricsPayload{ + Execution: ExecutionContext{ + Organization: "test-org", + Repository: "test-repo", + Workflow: "ci.yml", + Job: "build", + RunID: "run-get-test", + }, + Summary: summary.RunSummary{SampleCount: 5}, + } + if _, err := h.store.SaveMetric(payload); err != nil { + t.Fatalf("SaveMetric() error = %v", err) + } + + req := httptest.NewRequest(http.MethodGet, "/api/v1/metrics/run/run-get-test", nil) + rec := httptest.NewRecorder() + + mux := http.NewServeMux() + h.RegisterRoutes(mux) + mux.ServeHTTP(rec, req) + + if rec.Code != http.StatusOK { + t.Errorf("status = %d, want %d", rec.Code, http.StatusOK) + } + + var metrics []Metric + if err := json.NewDecoder(rec.Body).Decode(&metrics); err != nil { + t.Fatalf("failed to decode response: %v", err) + } + if len(metrics) != 1 { + t.Errorf("got %d metrics, want 1", len(metrics)) + } + if metrics[0].RunID != "run-get-test" { + t.Errorf("RunID = %q, want %q", metrics[0].RunID, "run-get-test") + } +} + +func TestHandler_GetByRunID_NotFound(t *testing.T) { + h, cleanup := newTestHandler(t) + defer cleanup() + + req := httptest.NewRequest(http.MethodGet, "/api/v1/metrics/run/nonexistent", nil) + rec := httptest.NewRecorder() + + mux := http.NewServeMux() + h.RegisterRoutes(mux) + mux.ServeHTTP(rec, req) + + if rec.Code != http.StatusOK { + t.Errorf("status = %d, want %d", rec.Code, http.StatusOK) + } + + var metrics []Metric + if err := json.NewDecoder(rec.Body).Decode(&metrics); err != nil { + t.Fatalf("failed to decode response: %v", err) + } + if len(metrics) != 0 { + t.Errorf("got %d metrics, want 0", len(metrics)) + } +} + +func TestHandler_GetByRepository(t *testing.T) { + h, cleanup := newTestHandler(t) + defer cleanup() + + // Save metrics for different repos + payloads := []*MetricsPayload{ + {Execution: ExecutionContext{Organization: "org-x", Repository: "repo-y", RunID: "r1"}}, + {Execution: ExecutionContext{Organization: "org-x", Repository: "repo-y", RunID: "r2"}}, + {Execution: ExecutionContext{Organization: "org-x", Repository: "repo-z", RunID: "r3"}}, + } + for _, p := range payloads { + if _, err := h.store.SaveMetric(p); err != nil { + t.Fatalf("SaveMetric() error = %v", err) + } + } + + req := httptest.NewRequest(http.MethodGet, "/api/v1/metrics/repo/org-x/repo-y", nil) + rec := httptest.NewRecorder() + + mux := http.NewServeMux() + h.RegisterRoutes(mux) + mux.ServeHTTP(rec, req) + + if rec.Code != http.StatusOK { + t.Errorf("status = %d, want %d", rec.Code, http.StatusOK) + } + + var metrics []Metric + if err := json.NewDecoder(rec.Body).Decode(&metrics); err != nil { + t.Fatalf("failed to decode response: %v", err) + } + if len(metrics) != 2 { + t.Errorf("got %d metrics, want 2", len(metrics)) + } +} + +func TestHandler_Health(t *testing.T) { + h, cleanup := newTestHandler(t) + defer cleanup() + + req := httptest.NewRequest(http.MethodGet, "/health", nil) + rec := httptest.NewRecorder() + + mux := http.NewServeMux() + h.RegisterRoutes(mux) + mux.ServeHTTP(rec, req) + + if rec.Code != http.StatusOK { + t.Errorf("status = %d, want %d", rec.Code, http.StatusOK) + } + + var resp map[string]string + if err := json.NewDecoder(rec.Body).Decode(&resp); err != nil { + t.Fatalf("failed to decode response: %v", err) + } + if resp["status"] != "ok" { + t.Errorf("status = %q, want %q", resp["status"], "ok") + } +} + +func newTestHandler(t *testing.T) (*Handler, func()) { + t.Helper() + dbPath := filepath.Join(t.TempDir(), "test.db") + store, err := NewStore(dbPath) + if err != nil { + t.Fatalf("NewStore() error = %v", err) + } + + logger := slog.New(slog.NewTextHandler(io.Discard, nil)) + handler := NewHandler(store, logger) + + return handler, func() { _ = store.Close() } +} diff --git a/internal/receiver/store.go b/internal/receiver/store.go new file mode 100644 index 0000000..85112a5 --- /dev/null +++ b/internal/receiver/store.go @@ -0,0 +1,94 @@ +// ABOUTME: SQLite storage layer for metrics receiver using GORM. +// ABOUTME: Handles database initialization and metric storage/retrieval. +package receiver + +import ( + "encoding/json" + "fmt" + "time" + + "gorm.io/driver/sqlite" + "gorm.io/gorm" + "gorm.io/gorm/logger" +) + +// Metric represents a stored metric record in the database +type Metric struct { + ID uint `gorm:"primaryKey"` + Organization string `gorm:"index:idx_org_repo;not null"` + Repository string `gorm:"index:idx_org_repo;not null"` + Workflow string `gorm:"not null"` + Job string `gorm:"not null"` + RunID string `gorm:"index;not null"` + ReceivedAt time.Time `gorm:"index;not null"` + Payload string `gorm:"type:text;not null"` // JSON-encoded RunSummary +} + +// Store handles SQLite storage for metrics using GORM +type Store struct { + db *gorm.DB +} + +// NewStore creates a new SQLite store at the given path +func NewStore(dbPath string) (*Store, error) { + db, err := gorm.Open(sqlite.Open(dbPath), &gorm.Config{ + Logger: logger.Default.LogMode(logger.Silent), + }) + if err != nil { + return nil, fmt.Errorf("opening database: %w", err) + } + + if err := db.AutoMigrate(&Metric{}); err != nil { + return nil, fmt.Errorf("migrating schema: %w", err) + } + + return &Store{db: db}, nil +} + +// SaveMetric stores a metrics payload in the database +func (s *Store) SaveMetric(payload *MetricsPayload) (uint, error) { + summaryJSON, err := json.Marshal(payload.Summary) + if err != nil { + return 0, fmt.Errorf("marshaling summary: %w", err) + } + + metric := Metric{ + Organization: payload.Execution.Organization, + Repository: payload.Execution.Repository, + Workflow: payload.Execution.Workflow, + Job: payload.Execution.Job, + RunID: payload.Execution.RunID, + ReceivedAt: time.Now().UTC(), + Payload: string(summaryJSON), + } + + result := s.db.Create(&metric) + if result.Error != nil { + return 0, fmt.Errorf("inserting metric: %w", result.Error) + } + + return metric.ID, nil +} + +// GetMetricsByRunID retrieves all metrics for a specific run +func (s *Store) GetMetricsByRunID(runID string) ([]Metric, error) { + var metrics []Metric + result := s.db.Where("run_id = ?", runID).Order("received_at DESC").Find(&metrics) + return metrics, result.Error +} + +// GetMetricsByRepository retrieves all metrics for a specific repository +func (s *Store) GetMetricsByRepository(org, repo string) ([]Metric, error) { + var metrics []Metric + result := s.db.Where("organization = ? AND repository = ?", org, repo).Order("received_at DESC").Find(&metrics) + return metrics, result.Error +} + +// Close closes the database connection +func (s *Store) Close() error { + sqlDB, err := s.db.DB() + if err != nil { + return err + } + return sqlDB.Close() +} diff --git a/internal/receiver/store_test.go b/internal/receiver/store_test.go new file mode 100644 index 0000000..8302535 --- /dev/null +++ b/internal/receiver/store_test.go @@ -0,0 +1,219 @@ +package receiver + +import ( + "os" + "path/filepath" + "testing" + "time" + + "edp.buildth.ing/DevFW-CICD/forgejo-runner-resource-collector/internal/summary" +) + +func TestNewStore(t *testing.T) { + dbPath := filepath.Join(t.TempDir(), "test.db") + + store, err := NewStore(dbPath) + if err != nil { + t.Fatalf("NewStore() error = %v", err) + } + defer func() { _ = store.Close() }() + + if _, err := os.Stat(dbPath); os.IsNotExist(err) { + t.Error("database file was not created") + } +} + +func TestStore_SaveMetric(t *testing.T) { + store := newTestStore(t) + defer func() { _ = store.Close() }() + + payload := &MetricsPayload{ + Execution: ExecutionContext{ + Organization: "test-org", + Repository: "test-repo", + Workflow: "ci.yml", + Job: "build", + RunID: "run-123", + }, + Summary: summary.RunSummary{ + StartTime: time.Now().Add(-time.Minute), + EndTime: time.Now(), + DurationSeconds: 60.0, + SampleCount: 12, + CPUTotal: summary.StatSummary{Peak: 80.5, Avg: 45.2, P95: 75.0}, + MemUsedBytes: summary.StatSummary{Peak: 1024000, Avg: 512000, P95: 900000}, + MemUsedPercent: summary.StatSummary{Peak: 50.0, Avg: 25.0, P95: 45.0}, + }, + } + + id, err := store.SaveMetric(payload) + if err != nil { + t.Fatalf("SaveMetric() error = %v", err) + } + if id == 0 { + t.Error("SaveMetric() returned id = 0, want non-zero") + } +} + +func TestStore_GetMetricsByRunID(t *testing.T) { + store := newTestStore(t) + defer func() { _ = store.Close() }() + + // Save two metrics with same run ID + for i := 0; i < 2; i++ { + payload := &MetricsPayload{ + Execution: ExecutionContext{ + Organization: "test-org", + Repository: "test-repo", + Workflow: "ci.yml", + Job: "build", + RunID: "run-456", + }, + Summary: summary.RunSummary{SampleCount: i + 1}, + } + if _, err := store.SaveMetric(payload); err != nil { + t.Fatalf("SaveMetric() error = %v", err) + } + } + + // Save one with different run ID + otherPayload := &MetricsPayload{ + Execution: ExecutionContext{RunID: "run-789"}, + Summary: summary.RunSummary{}, + } + if _, err := store.SaveMetric(otherPayload); err != nil { + t.Fatalf("SaveMetric() error = %v", err) + } + + metrics, err := store.GetMetricsByRunID("run-456") + if err != nil { + t.Fatalf("GetMetricsByRunID() error = %v", err) + } + if len(metrics) != 2 { + t.Errorf("GetMetricsByRunID() returned %d metrics, want 2", len(metrics)) + } + + for _, m := range metrics { + if m.RunID != "run-456" { + t.Errorf("GetMetricsByRunID() returned metric with RunID = %q, want %q", m.RunID, "run-456") + } + } +} + +func TestStore_GetMetricsByRunID_NotFound(t *testing.T) { + store := newTestStore(t) + defer func() { _ = store.Close() }() + + metrics, err := store.GetMetricsByRunID("nonexistent") + if err != nil { + t.Fatalf("GetMetricsByRunID() error = %v", err) + } + if len(metrics) != 0 { + t.Errorf("GetMetricsByRunID() returned %d metrics, want 0", len(metrics)) + } +} + +func TestStore_GetMetricsByRepository(t *testing.T) { + store := newTestStore(t) + defer func() { _ = store.Close() }() + + // Save metrics for different repos + repos := []struct { + org string + repo string + }{ + {"org-a", "repo-1"}, + {"org-a", "repo-1"}, + {"org-a", "repo-2"}, + {"org-b", "repo-1"}, + } + + for i, r := range repos { + payload := &MetricsPayload{ + Execution: ExecutionContext{ + Organization: r.org, + Repository: r.repo, + RunID: "run-" + string(rune('a'+i)), + }, + Summary: summary.RunSummary{}, + } + if _, err := store.SaveMetric(payload); err != nil { + t.Fatalf("SaveMetric() error = %v", err) + } + } + + metrics, err := store.GetMetricsByRepository("org-a", "repo-1") + if err != nil { + t.Fatalf("GetMetricsByRepository() error = %v", err) + } + if len(metrics) != 2 { + t.Errorf("GetMetricsByRepository() returned %d metrics, want 2", len(metrics)) + } + + for _, m := range metrics { + if m.Organization != "org-a" || m.Repository != "repo-1" { + t.Errorf("GetMetricsByRepository() returned metric with org=%q repo=%q, want org-a/repo-1", + m.Organization, m.Repository) + } + } +} + +func TestStore_SaveMetric_PreservesPayload(t *testing.T) { + store := newTestStore(t) + defer func() { _ = store.Close() }() + + original := &MetricsPayload{ + Execution: ExecutionContext{ + Organization: "test-org", + Repository: "test-repo", + Workflow: "build.yml", + Job: "test", + RunID: "run-preserve", + }, + Summary: summary.RunSummary{ + DurationSeconds: 123.45, + SampleCount: 50, + CPUTotal: summary.StatSummary{Peak: 99.9, Avg: 55.5, P95: 88.8}, + }, + } + + _, err := store.SaveMetric(original) + if err != nil { + t.Fatalf("SaveMetric() error = %v", err) + } + + metrics, err := store.GetMetricsByRunID("run-preserve") + if err != nil { + t.Fatalf("GetMetricsByRunID() error = %v", err) + } + if len(metrics) != 1 { + t.Fatalf("GetMetricsByRunID() returned %d metrics, want 1", len(metrics)) + } + + m := metrics[0] + if m.Organization != original.Execution.Organization { + t.Errorf("Organization = %q, want %q", m.Organization, original.Execution.Organization) + } + if m.Repository != original.Execution.Repository { + t.Errorf("Repository = %q, want %q", m.Repository, original.Execution.Repository) + } + if m.Workflow != original.Execution.Workflow { + t.Errorf("Workflow = %q, want %q", m.Workflow, original.Execution.Workflow) + } + if m.Job != original.Execution.Job { + t.Errorf("Job = %q, want %q", m.Job, original.Execution.Job) + } + if m.Payload == "" { + t.Error("Payload is empty") + } +} + +func newTestStore(t *testing.T) *Store { + t.Helper() + dbPath := filepath.Join(t.TempDir(), "test.db") + store, err := NewStore(dbPath) + if err != nil { + t.Fatalf("NewStore() error = %v", err) + } + return store +} diff --git a/internal/receiver/types.go b/internal/receiver/types.go new file mode 100644 index 0000000..e11a98d --- /dev/null +++ b/internal/receiver/types.go @@ -0,0 +1,32 @@ +// ABOUTME: Data types for the metrics receiver service. +// ABOUTME: Defines MetricsPayload combining execution metadata with run summary. +package receiver + +import "edp.buildth.ing/DevFW-CICD/forgejo-runner-resource-collector/internal/summary" + +// ExecutionContext holds GitHub Actions style identifiers for a workflow run +type ExecutionContext struct { + Organization string `json:"organization"` + Repository string `json:"repository"` + Workflow string `json:"workflow"` + Job string `json:"job"` + RunID string `json:"run_id"` +} + +// MetricsPayload is the complete payload sent to the receiver +type MetricsPayload struct { + Execution ExecutionContext `json:"execution"` + Summary summary.RunSummary `json:"run_summary"` +} + +// StoredMetric represents a metric record as stored in the database +type StoredMetric struct { + ID int64 + Organization string + Repository string + Workflow string + Job string + RunID string + ReceivedAt string + Payload string // JSON-encoded RunSummary +}