diff --git a/README.md b/README.md index b7c98d3..abd58e9 100644 --- a/README.md +++ b/README.md @@ -30,7 +30,7 @@ Runs as a sidecar alongside CI workloads. On a configurable interval, it reads ` ./collector --interval=2s --top=10 --push-endpoint=http://receiver:8080/api/v1/metrics ``` -**Flags:** `--interval`, `--proc-path`, `--log-level`, `--log-format`, `--top`, `--push-endpoint` +**Flags:** `--interval`, `--proc-path`, `--log-level`, `--log-format`, `--top`, `--push-endpoint`, `--push-token` **Environment variables:** @@ -41,6 +41,7 @@ Runs as a sidecar alongside CI workloads. On a configurable interval, it reads ` | `GITHUB_WORKFLOW` | Workflow filename | `ci.yml` | | `GITHUB_JOB` | Job name | `build` | | `GITHUB_RUN_ID` | Unique run identifier | `run-123` | +| `COLLECTOR_PUSH_TOKEN` | Bearer token for push endpoint auth | — | | `CGROUP_PROCESS_MAP` | JSON: process name → container name | `{"node":"runner"}` | | `CGROUP_LIMITS` | JSON: per-container CPU/memory limits | See below | @@ -60,32 +61,52 @@ CPU supports Kubernetes notation (`"2"` = 2 cores, `"500m"` = 0.5 cores). Memory HTTP service that stores metric summaries in SQLite (via GORM) and exposes a query API. ```bash -./receiver --addr=:8080 --db=metrics.db --read-token=my-secret-token +./receiver --addr=:8080 --db=metrics.db --read-token=my-secret-token --hmac-key=my-hmac-key ``` **Flags:** -| Flag | Environment Variable | Description | Default | -| -------------- | --------------------- | ---------------------------------------------- | ------------ | -| `--addr` | — | HTTP listen address | `:8080` | -| `--db` | — | SQLite database path | `metrics.db` | -| `--read-token` | `RECEIVER_READ_TOKEN` | Pre-shared token for read endpoints (optional) | — | +| Flag | Environment Variable | Description | Default | +| -------------- | --------------------- | ----------------------------------------------------- | ------------ | +| `--addr` | — | HTTP listen address | `:8080` | +| `--db` | — | SQLite database path | `metrics.db` | +| `--read-token` | `RECEIVER_READ_TOKEN` | Pre-shared token for read/admin endpoints (required) | — | +| `--hmac-key` | `RECEIVER_HMAC_KEY` | Secret key for push token generation/validation (required) | — | **Endpoints:** -- `POST /api/v1/metrics` — receive and store a metric summary -- `GET /api/v1/metrics/repo/{org}/{repo}/{workflow}/{job}` — query stored metrics (protected if `--read-token` is set) +- `POST /api/v1/metrics` — receive and store a metric summary (requires scoped push token) +- `POST /api/v1/token` — generate a scoped push token (requires read token auth) +- `GET /api/v1/metrics/repo/{org}/{repo}/{workflow}/{job}` — query stored metrics (requires read token auth) **Authentication:** -When `--read-token` is configured, the GET endpoint requires a Bearer token: +All metrics endpoints require authentication via `--read-token`: + +- The GET endpoint requires a Bearer token matching the read token +- The POST metrics endpoint requires a scoped push token (generated via `POST /api/v1/token`) +- The token endpoint itself requires the read token + +**Token flow:** ```bash +# 1. Admin generates a scoped push token using the read token +curl -X POST http://localhost:8080/api/v1/token \ + -H "Authorization: Bearer my-secret-token" \ + -H "Content-Type: application/json" \ + -d '{"organization":"my-org","repository":"my-repo","workflow":"ci.yml","job":"build"}' +# → {"token":""} + +# 2. Collector uses the scoped token to push metrics +./collector --push-endpoint=http://localhost:8080/api/v1/metrics \ + --push-token= + +# 3. Query metrics with the read token curl -H "Authorization: Bearer my-secret-token" \ #gitleaks:allow - http://localhost:8080/api/v1/metrics/repo/org/repo/workflow/job + http://localhost:8080/api/v1/metrics/repo/my-org/my-repo/ci.yml/build ``` -If no token is configured, the endpoint remains open. +Push tokens are HMAC-SHA256 digests derived from `--hmac-key` and the scope (org/repo/workflow/job). They are stateless — no database storage is needed. The HMAC key is separate from the read token so that compromising a push token does not expose the admin credential. ## How Metrics Are Collected @@ -155,11 +176,28 @@ All memory values are in **bytes**. ### Docker Compose ```bash -docker compose -f test/docker/docker-compose-stress.yaml up -d -# Wait for collection, then trigger shutdown summary: +# Start the receiver (builds image if needed): +docker compose -f test/docker/docker-compose-stress.yaml up -d --build receiver + +# Generate a scoped push token for the collector: +PUSH_TOKEN=$(curl -s -X POST http://localhost:9080/api/v1/token \ + -H "Authorization: Bearer dummyreadtoken" \ + -H "Content-Type: application/json" \ + -d '{"organization":"test-org","repository":"test-org/stress-test","workflow":"stress-test-workflow","job":"heavy-workload"}' \ + | jq -r .token) + +# Start the collector and stress workloads with the push token: +COLLECTOR_PUSH_TOKEN=$PUSH_TOKEN \ + docker compose -f test/docker/docker-compose-stress.yaml up -d --build collector + +# ... Wait for data collection ... + +# Trigger shutdown summary: docker compose -f test/docker/docker-compose-stress.yaml stop collector -# Query results: -curl http://localhost:9080/api/v1/metrics/repo/test-org/test-org%2Fstress-test/stress-test-workflow/heavy-workload + +# Query results with the read token: +curl -H "Authorization: Bearer dummyreadtoken" \ + http://localhost:9080/api/v1/metrics/repo/test-org/test-org%2Fstress-test/stress-test-workflow/heavy-workload ``` ### Local @@ -168,8 +206,21 @@ curl http://localhost:9080/api/v1/metrics/repo/test-org/test-org%2Fstress-test/s go build -o collector ./cmd/collector go build -o receiver ./cmd/receiver -./receiver --addr=:8080 --db=metrics.db -./collector --interval=2s --top=10 --push-endpoint=http://localhost:8080/api/v1/metrics +# Start receiver with both keys: +./receiver --addr=:8080 --db=metrics.db \ + --read-token=my-secret-token --hmac-key=my-hmac-key + +# Generate a scoped push token: +PUSH_TOKEN=$(curl -s -X POST http://localhost:8080/api/v1/token \ + -H "Authorization: Bearer my-secret-token" \ + -H "Content-Type: application/json" \ + -d '{"organization":"my-org","repository":"my-repo","workflow":"ci.yml","job":"build"}' \ + | jq -r .token) + +# Run collector with the push token: +./collector --interval=2s --top=10 \ + --push-endpoint=http://localhost:8080/api/v1/metrics \ + --push-token=$PUSH_TOKEN ``` ## Internal Packages diff --git a/cmd/collector/main.go b/cmd/collector/main.go index ec06b9b..7a88a85 100644 --- a/cmd/collector/main.go +++ b/cmd/collector/main.go @@ -31,6 +31,7 @@ func main() { logFormat := flag.String("log-format", defaultLogFormat, "Output format: json, text") topN := flag.Int("top", defaultTopN, "Number of top processes to include") pushEndpoint := flag.String("push-endpoint", "", "HTTP endpoint to push metrics to (e.g., http://localhost:8080/api/v1/metrics)") + pushToken := flag.String("push-token", os.Getenv("COLLECTOR_PUSH_TOKEN"), "Bearer token for push endpoint authentication (or set COLLECTOR_PUSH_TOKEN)") flag.Parse() // Setup structured logging for application logs @@ -61,7 +62,7 @@ func main() { // Setup push client if endpoint is configured if *pushEndpoint != "" { - pushClient := summary.NewPushClient(*pushEndpoint) + pushClient := summary.NewPushClient(*pushEndpoint, *pushToken) c.SetPushClient(pushClient) execCtx := pushClient.ExecutionContext() appLogger.Info("push client configured", diff --git a/cmd/receiver/main.go b/cmd/receiver/main.go index 21a59d4..1379b53 100644 --- a/cmd/receiver/main.go +++ b/cmd/receiver/main.go @@ -23,6 +23,7 @@ func main() { addr := flag.String("addr", defaultAddr, "HTTP listen address") dbPath := flag.String("db", defaultDBPath, "SQLite database path") readToken := flag.String("read-token", os.Getenv("RECEIVER_READ_TOKEN"), "Pre-shared token for read endpoints (or set RECEIVER_READ_TOKEN)") + hmacKey := flag.String("hmac-key", os.Getenv("RECEIVER_HMAC_KEY"), "Secret key for push token generation/validation (or set RECEIVER_HMAC_KEY)") flag.Parse() logger := slog.New(slog.NewJSONHandler(os.Stderr, &slog.HandlerOptions{ @@ -36,7 +37,7 @@ func main() { } defer func() { _ = store.Close() }() - handler := receiver.NewHandler(store, logger, *readToken) + handler := receiver.NewHandler(store, logger, *readToken, *hmacKey) mux := http.NewServeMux() handler.RegisterRoutes(mux) diff --git a/internal/integration/integration_test.go b/internal/integration/integration_test.go index c9dc9a2..685d2b6 100644 --- a/internal/integration/integration_test.go +++ b/internal/integration/integration_test.go @@ -18,7 +18,12 @@ import ( "edp.buildth.ing/DevFW-CICD/forgejo-runner-resource-collector/internal/summary" ) -// setupTestReceiver creates a test receiver with SQLite storage and HTTP server +const ( + testReadToken = "integration-test-token" + testHMACKey = "integration-hmac-key" +) + +// setupTestReceiver creates a test receiver with SQLite storage, auth, and HTTP server func setupTestReceiver(t *testing.T) (*receiver.Store, *httptest.Server, func()) { t.Helper() dbPath := filepath.Join(t.TempDir(), "test.db") @@ -27,7 +32,7 @@ func setupTestReceiver(t *testing.T) (*receiver.Store, *httptest.Server, func()) t.Fatalf("NewStore() error = %v", err) } - handler := receiver.NewHandler(store, slog.New(slog.NewTextHandler(io.Discard, nil)), "") + handler := receiver.NewHandler(store, slog.New(slog.NewTextHandler(io.Discard, nil)), testReadToken, testHMACKey) mux := http.NewServeMux() handler.RegisterRoutes(mux) @@ -41,6 +46,11 @@ func setupTestReceiver(t *testing.T) (*receiver.Store, *httptest.Server, func()) return store, server, cleanup } +// generatePushToken generates a scoped push token for an execution context +func generatePushToken(exec summary.ExecutionContext) string { + return receiver.GenerateScopedToken(testHMACKey, exec.Organization, exec.Repository, exec.Workflow, exec.Job) +} + func TestPushClientToReceiver(t *testing.T) { store, server, cleanup := setupTestReceiver(t) defer cleanup() @@ -85,10 +95,18 @@ func TestPushClientToReceiver(t *testing.T) { t.Fatalf("Marshal() error = %v", err) } - // Send via HTTP client - resp, err := http.Post(server.URL+"/api/v1/metrics", "application/json", bytes.NewReader(body)) + // Send via HTTP client with scoped push token + pushToken := generatePushToken(testCtx) + req, err := http.NewRequest(http.MethodPost, server.URL+"/api/v1/metrics", bytes.NewReader(body)) if err != nil { - t.Fatalf("Post() error = %v", err) + t.Fatalf("NewRequest() error = %v", err) + } + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Authorization", "Bearer "+pushToken) + + resp, err := http.DefaultClient.Do(req) + if err != nil { + t.Fatalf("Do() error = %v", err) } defer func() { _ = resp.Body.Close() }() @@ -148,8 +166,11 @@ func TestPushClientIntegration(t *testing.T) { t.Setenv("GITHUB_JOB", "push-job") t.Setenv("GITHUB_RUN_ID", "push-run-456") - // Create push client - it reads from env vars - pushClient := summary.NewPushClient(server.URL + "/api/v1/metrics") + // Generate scoped push token + pushToken := receiver.GenerateScopedToken(testHMACKey, "push-client-org", "push-client-repo", "push-test.yml", "push-job") + + // Create push client with token - it reads execution context from env vars + pushClient := summary.NewPushClient(server.URL+"/api/v1/metrics", pushToken) // Verify execution context was read from env ctx := pushClient.ExecutionContext() @@ -194,7 +215,6 @@ func TestMultiplePushes(t *testing.T) { defer cleanup() // Simulate multiple workflow runs pushing metrics via direct HTTP POST - // This avoids env var manipulation which could cause issues with parallel tests runs := []summary.ExecutionContext{ {Organization: "org-a", Repository: "repo-1", Workflow: "ci.yml", Job: "build", RunID: "run-1"}, {Organization: "org-a", Repository: "repo-1", Workflow: "ci.yml", Job: "build", RunID: "run-2"}, @@ -219,9 +239,17 @@ func TestMultiplePushes(t *testing.T) { t.Fatalf("Marshal() error = %v", err) } - resp, err := http.Post(server.URL+"/api/v1/metrics", "application/json", bytes.NewReader(body)) + pushToken := generatePushToken(execCtx) + req, err := http.NewRequest(http.MethodPost, server.URL+"/api/v1/metrics", bytes.NewReader(body)) if err != nil { - t.Fatalf("Post() error = %v for run %+v", err, execCtx) + t.Fatalf("NewRequest() error = %v for run %+v", err, execCtx) + } + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Authorization", "Bearer "+pushToken) + + resp, err := http.DefaultClient.Do(req) + if err != nil { + t.Fatalf("Do() error = %v for run %+v", err, execCtx) } _ = resp.Body.Close() @@ -247,3 +275,112 @@ func TestMultiplePushes(t *testing.T) { t.Errorf("got %d metrics for org-a/repo-1/ci.yml/test, want 1", len(metrics)) } } + +func TestPushClientWithTokenIntegration(t *testing.T) { + readToken := "integration-read-secret" + hmacKey := "integration-hmac-secret" + store, server, cleanup := setupTestReceiverWithToken(t, readToken, hmacKey) + defer cleanup() + + // Generate a scoped token via the API + tokenReqBody, _ := json.Marshal(map[string]string{ + "organization": "token-org", + "repository": "token-repo", + "workflow": "ci.yml", + "job": "build", + }) + tokenReq, _ := http.NewRequest(http.MethodPost, server.URL+"/api/v1/token", bytes.NewReader(tokenReqBody)) + tokenReq.Header.Set("Authorization", "Bearer "+readToken) + tokenReq.Header.Set("Content-Type", "application/json") + + tokenResp, err := http.DefaultClient.Do(tokenReq) + if err != nil { + t.Fatalf("token request error: %v", err) + } + defer func() { _ = tokenResp.Body.Close() }() + + if tokenResp.StatusCode != http.StatusOK { + t.Fatalf("token request status = %d, want %d", tokenResp.StatusCode, http.StatusOK) + } + + var tokenBody struct { + Token string `json:"token"` + } + if err := json.NewDecoder(tokenResp.Body).Decode(&tokenBody); err != nil { + t.Fatalf("decode token response: %v", err) + } + + // Use the scoped token to push metrics + t.Setenv("GITHUB_REPOSITORY_OWNER", "token-org") + t.Setenv("GITHUB_REPOSITORY", "token-repo") + t.Setenv("GITHUB_WORKFLOW", "ci.yml") + t.Setenv("GITHUB_JOB", "build") + t.Setenv("GITHUB_RUN_ID", "token-run-1") + + pushClient := summary.NewPushClient(server.URL+"/api/v1/metrics", tokenBody.Token) + + testSummary := &summary.RunSummary{ + StartTime: time.Now().Add(-10 * time.Second), + EndTime: time.Now(), + DurationSeconds: 10.0, + SampleCount: 2, + } + + if err := pushClient.Push(context.Background(), testSummary); err != nil { + t.Fatalf("Push() error = %v", err) + } + + // Verify stored + metrics, err := store.GetMetricsByWorkflowJob("token-org", "token-repo", "ci.yml", "build") + if err != nil { + t.Fatalf("GetMetricsByWorkflowJob() error = %v", err) + } + if len(metrics) != 1 { + t.Fatalf("got %d metrics, want 1", len(metrics)) + } + if metrics[0].RunID != "token-run-1" { + t.Errorf("RunID = %q, want %q", metrics[0].RunID, "token-run-1") + } +} + +func TestPushClientWithWrongTokenIntegration(t *testing.T) { + readToken := "integration-read-secret" + hmacKey := "integration-hmac-secret" + _, server, cleanup := setupTestReceiverWithToken(t, readToken, hmacKey) + defer cleanup() + + t.Setenv("GITHUB_REPOSITORY_OWNER", "token-org") + t.Setenv("GITHUB_REPOSITORY", "token-repo") + t.Setenv("GITHUB_WORKFLOW", "ci.yml") + t.Setenv("GITHUB_JOB", "build") + t.Setenv("GITHUB_RUN_ID", "token-run-2") + + pushClient := summary.NewPushClient(server.URL+"/api/v1/metrics", "wrong-token") + + err := pushClient.Push(context.Background(), &summary.RunSummary{SampleCount: 1}) + if err == nil { + t.Error("Push() with wrong token should fail") + } +} + +func setupTestReceiverWithToken(t *testing.T, readToken, hmacKey string) (*receiver.Store, *httptest.Server, func()) { + t.Helper() + dbPath := filepath.Join(t.TempDir(), "test.db") + store, err := receiver.NewStore(dbPath) + if err != nil { + t.Fatalf("NewStore() error = %v", err) + } + + handler := receiver.NewHandler(store, slog.New(slog.NewTextHandler(io.Discard, nil)), readToken, hmacKey) + mux := http.NewServeMux() + handler.RegisterRoutes(mux) + + server := httptest.NewServer(mux) + + cleanup := func() { + server.Close() + _ = store.Close() + } + + return store, server, cleanup +} diff --git a/internal/receiver/handler.go b/internal/receiver/handler.go index ef4bfae..d847f62 100644 --- a/internal/receiver/handler.go +++ b/internal/receiver/handler.go @@ -14,28 +14,31 @@ import ( type Handler struct { store *Store logger *slog.Logger - readToken string // Pre-shared token for read endpoints (empty = no auth required) + readToken string // Pre-shared token for read endpoint authentication + hmacKey string // Separate key for HMAC-based push token generation/validation } // NewHandler creates a new HTTP handler with the given store. -// If readToken is non-empty, the GET metrics endpoint will require -// Bearer token authentication. -func NewHandler(store *Store, logger *slog.Logger, readToken string) *Handler { - return &Handler{store: store, logger: logger, readToken: readToken} +// readToken authenticates read endpoints and the token generation endpoint. +// hmacKey is the secret used to derive scoped push tokens. +func NewHandler(store *Store, logger *slog.Logger, readToken, hmacKey string) *Handler { + return &Handler{store: store, logger: logger, readToken: readToken, hmacKey: hmacKey} } // RegisterRoutes registers all HTTP routes on the given mux func (h *Handler) RegisterRoutes(mux *http.ServeMux) { mux.HandleFunc("POST /api/v1/metrics", h.handleReceiveMetrics) + mux.HandleFunc("POST /api/v1/token", h.handleGenerateToken) mux.HandleFunc("GET /api/v1/metrics/repo/{org}/{repo}/{workflow}/{job}", h.handleGetByWorkflowJob) mux.HandleFunc("GET /health", h.handleHealth) } // validateReadToken checks the Authorization header for a valid Bearer token. -// Returns true if authentication is disabled (empty readToken) or if the token matches. func (h *Handler) validateReadToken(w http.ResponseWriter, r *http.Request) bool { if h.readToken == "" { - return true + h.logger.Warn("no read-token configured, rejecting request", slog.String("path", r.URL.Path)) + http.Error(w, "authorization required", http.StatusUnauthorized) + return false } authHeader := r.Header.Get("Authorization") @@ -62,6 +65,65 @@ func (h *Handler) validateReadToken(w http.ResponseWriter, r *http.Request) bool return true } +func (h *Handler) handleGenerateToken(w http.ResponseWriter, r *http.Request) { + if h.hmacKey == "" { + http.Error(w, "token generation requires a configured HMAC key", http.StatusBadRequest) + return + } + + if !h.validateReadToken(w, r) { + return + } + + var req TokenRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + http.Error(w, "invalid JSON body", http.StatusBadRequest) + return + } + + if req.Organization == "" || req.Repository == "" || req.Workflow == "" || req.Job == "" { + http.Error(w, "organization, repository, workflow, and job are required", http.StatusBadRequest) + return + } + + token := GenerateScopedToken(h.hmacKey, req.Organization, req.Repository, req.Workflow, req.Job) + + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(TokenResponse{Token: token}) +} + +// validatePushToken checks push authentication via scoped HMAC token. +func (h *Handler) validatePushToken(w http.ResponseWriter, r *http.Request, exec ExecutionContext) bool { + if h.hmacKey == "" { + h.logger.Warn("no HMAC key configured, rejecting push", slog.String("path", r.URL.Path)) + http.Error(w, "authorization required", http.StatusUnauthorized) + return false + } + + authHeader := r.Header.Get("Authorization") + if authHeader == "" { + h.logger.Warn("missing push authorization", slog.String("path", r.URL.Path)) + http.Error(w, "authorization required", http.StatusUnauthorized) + return false + } + + const bearerPrefix = "Bearer " + if !strings.HasPrefix(authHeader, bearerPrefix) { + h.logger.Warn("invalid push authorization format", slog.String("path", r.URL.Path)) + http.Error(w, "invalid authorization format", http.StatusUnauthorized) + return false + } + + token := strings.TrimPrefix(authHeader, bearerPrefix) + if !ValidateScopedToken(h.hmacKey, token, exec.Organization, exec.Repository, exec.Workflow, exec.Job) { + h.logger.Warn("invalid push token", slog.String("path", r.URL.Path)) + http.Error(w, "invalid token", http.StatusUnauthorized) + return false + } + + return true +} + func (h *Handler) handleReceiveMetrics(w http.ResponseWriter, r *http.Request) { var payload MetricsPayload if err := json.NewDecoder(r.Body).Decode(&payload); err != nil { @@ -75,6 +137,10 @@ func (h *Handler) handleReceiveMetrics(w http.ResponseWriter, r *http.Request) { return } + if !h.validatePushToken(w, r, payload.Execution) { + return + } + id, err := h.store.SaveMetric(&payload) if err != nil { h.logger.Error("failed to save metric", slog.String("error", err.Error())) diff --git a/internal/receiver/handler_test.go b/internal/receiver/handler_test.go index 373e1ae..cea58f0 100644 --- a/internal/receiver/handler_test.go +++ b/internal/receiver/handler_test.go @@ -14,17 +14,21 @@ import ( ) func TestHandler_ReceiveMetrics(t *testing.T) { - h, cleanup := newTestHandler(t) + const readToken = "test-token" + h, cleanup := newTestHandlerWithToken(t, readToken) defer cleanup() + exec := ExecutionContext{ + Organization: "test-org", + Repository: "test-repo", + Workflow: "ci.yml", + Job: "build", + RunID: "run-123", + } + pushToken := GenerateScopedToken(readToken, exec.Organization, exec.Repository, exec.Workflow, exec.Job) + payload := MetricsPayload{ - Execution: ExecutionContext{ - Organization: "test-org", - Repository: "test-repo", - Workflow: "ci.yml", - Job: "build", - RunID: "run-123", - }, + Execution: exec, Summary: summary.RunSummary{ DurationSeconds: 60.0, SampleCount: 12, @@ -34,6 +38,7 @@ func TestHandler_ReceiveMetrics(t *testing.T) { body, _ := json.Marshal(payload) req := httptest.NewRequest(http.MethodPost, "/api/v1/metrics", bytes.NewReader(body)) req.Header.Set("Content-Type", "application/json") + req.Header.Set("Authorization", "Bearer "+pushToken) rec := httptest.NewRecorder() mux := http.NewServeMux() @@ -99,7 +104,8 @@ func TestHandler_ReceiveMetrics_MissingRunID(t *testing.T) { } func TestHandler_GetByWorkflowJob(t *testing.T) { - h, cleanup := newTestHandler(t) + const readToken = "test-token" + h, cleanup := newTestHandlerWithToken(t, readToken) defer cleanup() // Save metrics for different workflow/job combinations @@ -115,6 +121,7 @@ func TestHandler_GetByWorkflowJob(t *testing.T) { } req := httptest.NewRequest(http.MethodGet, "/api/v1/metrics/repo/org-x/repo-y/ci.yml/build", nil) + req.Header.Set("Authorization", "Bearer "+readToken) rec := httptest.NewRecorder() mux := http.NewServeMux() @@ -135,10 +142,12 @@ func TestHandler_GetByWorkflowJob(t *testing.T) { } func TestHandler_GetByWorkflowJob_NotFound(t *testing.T) { - h, cleanup := newTestHandler(t) + const readToken = "test-token" + h, cleanup := newTestHandlerWithToken(t, readToken) defer cleanup() req := httptest.NewRequest(http.MethodGet, "/api/v1/metrics/repo/org/repo/workflow/job", nil) + req.Header.Set("Authorization", "Bearer "+readToken) rec := httptest.NewRecorder() mux := http.NewServeMux() @@ -224,6 +233,212 @@ func TestHandler_Health(t *testing.T) { } } +func TestHandler_GenerateToken(t *testing.T) { + h, cleanup := newTestHandlerWithToken(t, "secret-token") + defer cleanup() + + body, _ := json.Marshal(TokenRequest{ + Organization: "org", + Repository: "repo", + Workflow: "ci.yml", + Job: "build", + }) + + req := httptest.NewRequest(http.MethodPost, "/api/v1/token", bytes.NewReader(body)) + req.Header.Set("Authorization", "Bearer secret-token") + req.Header.Set("Content-Type", "application/json") + rec := httptest.NewRecorder() + + mux := http.NewServeMux() + h.RegisterRoutes(mux) + mux.ServeHTTP(rec, req) + + if rec.Code != http.StatusOK { + t.Fatalf("status = %d, want %d", rec.Code, http.StatusOK) + } + + var resp TokenResponse + if err := json.NewDecoder(rec.Body).Decode(&resp); err != nil { + t.Fatalf("failed to decode response: %v", err) + } + if resp.Token == "" { + t.Error("expected non-empty token") + } + if len(resp.Token) != 64 { + t.Errorf("token length = %d, want 64", len(resp.Token)) + } +} + +func TestHandler_GenerateToken_NoAuth(t *testing.T) { + h, cleanup := newTestHandlerWithToken(t, "secret-token") + defer cleanup() + + body, _ := json.Marshal(TokenRequest{ + Organization: "org", + Repository: "repo", + Workflow: "ci.yml", + Job: "build", + }) + + req := httptest.NewRequest(http.MethodPost, "/api/v1/token", bytes.NewReader(body)) + rec := httptest.NewRecorder() + + mux := http.NewServeMux() + h.RegisterRoutes(mux) + mux.ServeHTTP(rec, req) + + if rec.Code != http.StatusUnauthorized { + t.Errorf("status = %d, want %d", rec.Code, http.StatusUnauthorized) + } +} + +func TestHandler_GenerateToken_MissingFields(t *testing.T) { + h, cleanup := newTestHandlerWithToken(t, "secret-token") + defer cleanup() + + // Missing job field + body, _ := json.Marshal(TokenRequest{ + Organization: "org", + Repository: "repo", + Workflow: "ci.yml", + }) + + req := httptest.NewRequest(http.MethodPost, "/api/v1/token", bytes.NewReader(body)) + req.Header.Set("Authorization", "Bearer secret-token") + req.Header.Set("Content-Type", "application/json") + rec := httptest.NewRecorder() + + mux := http.NewServeMux() + h.RegisterRoutes(mux) + mux.ServeHTTP(rec, req) + + if rec.Code != http.StatusBadRequest { + t.Errorf("status = %d, want %d", rec.Code, http.StatusBadRequest) + } +} + +func TestHandler_GenerateToken_NoReadToken(t *testing.T) { + h, cleanup := newTestHandler(t) // no readToken configured + defer cleanup() + + body, _ := json.Marshal(TokenRequest{ + Organization: "org", + Repository: "repo", + Workflow: "ci.yml", + Job: "build", + }) + + req := httptest.NewRequest(http.MethodPost, "/api/v1/token", bytes.NewReader(body)) + req.Header.Set("Content-Type", "application/json") + rec := httptest.NewRecorder() + + mux := http.NewServeMux() + h.RegisterRoutes(mux) + mux.ServeHTTP(rec, req) + + if rec.Code != http.StatusBadRequest { + t.Errorf("status = %d, want %d", rec.Code, http.StatusBadRequest) + } +} + +func TestHandler_ReceiveMetrics_WithPushToken(t *testing.T) { + readToken := "secret-token" + h, cleanup := newTestHandlerWithToken(t, readToken) + defer cleanup() + + mux := http.NewServeMux() + h.RegisterRoutes(mux) + + exec := ExecutionContext{ + Organization: "org", + Repository: "repo", + Workflow: "ci.yml", + Job: "build", + RunID: "run-1", + } + + validToken := GenerateScopedToken(readToken, exec.Organization, exec.Repository, exec.Workflow, exec.Job) + wrongScopeToken := GenerateScopedToken(readToken, "other-org", "repo", "ci.yml", "build") + + tests := []struct { + name string + authHeader string + wantCode int + }{ + {"no auth", "", http.StatusUnauthorized}, + {"wrong token", "Bearer wrong-token", http.StatusUnauthorized}, + {"wrong scope", "Bearer " + wrongScopeToken, http.StatusUnauthorized}, + {"valid token", "Bearer " + validToken, http.StatusCreated}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + payload := MetricsPayload{ + Execution: exec, + Summary: summary.RunSummary{SampleCount: 1}, + } + body, _ := json.Marshal(payload) + + req := httptest.NewRequest(http.MethodPost, "/api/v1/metrics", bytes.NewReader(body)) + req.Header.Set("Content-Type", "application/json") + if tt.authHeader != "" { + req.Header.Set("Authorization", tt.authHeader) + } + rec := httptest.NewRecorder() + mux.ServeHTTP(rec, req) + + if rec.Code != tt.wantCode { + t.Errorf("status = %d, want %d", rec.Code, tt.wantCode) + } + }) + } +} + +func TestHandler_ReceiveMetrics_RejectsWhenNoReadToken(t *testing.T) { + h, cleanup := newTestHandlerWithToken(t, "") // no readToken configured + defer cleanup() + + payload := MetricsPayload{ + Execution: ExecutionContext{ + Organization: "org", + Repository: "repo", + Workflow: "ci.yml", + Job: "build", + RunID: "run-1", + }, + Summary: summary.RunSummary{SampleCount: 1}, + } + body, _ := json.Marshal(payload) + + req := httptest.NewRequest(http.MethodPost, "/api/v1/metrics", bytes.NewReader(body)) + req.Header.Set("Content-Type", "application/json") + rec := httptest.NewRecorder() + + mux := http.NewServeMux() + h.RegisterRoutes(mux) + mux.ServeHTTP(rec, req) + + if rec.Code != http.StatusUnauthorized { + t.Errorf("status = %d, want %d", rec.Code, http.StatusUnauthorized) + } +} + +func TestHandler_GetByWorkflowJob_RejectsWhenNoReadToken(t *testing.T) { + h, cleanup := newTestHandlerWithToken(t, "") // no readToken configured + defer cleanup() + + req := httptest.NewRequest(http.MethodGet, "/api/v1/metrics/repo/org/repo/ci.yml/build", nil) + rec := httptest.NewRecorder() + + mux := http.NewServeMux() + h.RegisterRoutes(mux) + mux.ServeHTTP(rec, req) + + if rec.Code != http.StatusUnauthorized { + t.Errorf("status = %d, want %d", rec.Code, http.StatusUnauthorized) + } +} + func newTestHandler(t *testing.T) (*Handler, func()) { t.Helper() dbPath := filepath.Join(t.TempDir(), "test.db") @@ -233,12 +448,17 @@ func newTestHandler(t *testing.T) (*Handler, func()) { } logger := slog.New(slog.NewTextHandler(io.Discard, nil)) - handler := NewHandler(store, logger, "") // no auth token for basic tests + handler := NewHandler(store, logger, "", "") // no auth — endpoints will reject return handler, func() { _ = store.Close() } } -func newTestHandlerWithToken(t *testing.T, token string) (*Handler, func()) { +func newTestHandlerWithToken(t *testing.T, readToken string) (*Handler, func()) { + t.Helper() + return newTestHandlerWithKeys(t, readToken, readToken) +} + +func newTestHandlerWithKeys(t *testing.T, readToken, hmacKey string) (*Handler, func()) { t.Helper() dbPath := filepath.Join(t.TempDir(), "test.db") store, err := NewStore(dbPath) @@ -247,7 +467,7 @@ func newTestHandlerWithToken(t *testing.T, token string) (*Handler, func()) { } logger := slog.New(slog.NewTextHandler(io.Discard, nil)) - handler := NewHandler(store, logger, token) + handler := NewHandler(store, logger, readToken, hmacKey) return handler, func() { _ = store.Close() } } diff --git a/internal/receiver/token.go b/internal/receiver/token.go new file mode 100644 index 0000000..087546c --- /dev/null +++ b/internal/receiver/token.go @@ -0,0 +1,25 @@ +// ABOUTME: HMAC-SHA256 token generation and validation for scoped push authentication. +// ABOUTME: Tokens are derived from a key + scope, enabling stateless validation without DB storage. +package receiver + +import ( + "crypto/hmac" + "crypto/sha256" + "crypto/subtle" + "encoding/hex" +) + +// GenerateScopedToken computes an HMAC-SHA256 token scoped to a specific org/repo/workflow/job. +// The canonical input is "v1\x00\x00\x00\x00". +func GenerateScopedToken(key, org, repo, workflow, job string) string { + mac := hmac.New(sha256.New, []byte(key)) + mac.Write([]byte("v1\x00" + org + "\x00" + repo + "\x00" + workflow + "\x00" + job)) + return hex.EncodeToString(mac.Sum(nil)) +} + +// ValidateScopedToken checks whether a token matches the expected HMAC for the given scope. +// Uses constant-time comparison to prevent timing attacks. +func ValidateScopedToken(key, token, org, repo, workflow, job string) bool { + expected := GenerateScopedToken(key, org, repo, workflow, job) + return subtle.ConstantTimeCompare([]byte(token), []byte(expected)) == 1 +} diff --git a/internal/receiver/token_test.go b/internal/receiver/token_test.go new file mode 100644 index 0000000..2140ecd --- /dev/null +++ b/internal/receiver/token_test.go @@ -0,0 +1,78 @@ +package receiver + +import ( + "encoding/hex" + "testing" +) + +func TestGenerateScopedToken_Deterministic(t *testing.T) { + token1 := GenerateScopedToken("key", "org", "repo", "wf", "job") + token2 := GenerateScopedToken("key", "org", "repo", "wf", "job") + if token1 != token2 { + t.Errorf("tokens differ: %q vs %q", token1, token2) + } +} + +func TestGenerateScopedToken_ScopePinning(t *testing.T) { + base := GenerateScopedToken("key", "org", "repo", "wf", "job") + + variants := []struct { + name string + org string + repo string + wf string + job string + }{ + {"different org", "other-org", "repo", "wf", "job"}, + {"different repo", "org", "other-repo", "wf", "job"}, + {"different workflow", "org", "repo", "other-wf", "job"}, + {"different job", "org", "repo", "wf", "other-job"}, + } + + for _, v := range variants { + t.Run(v.name, func(t *testing.T) { + token := GenerateScopedToken("key", v.org, v.repo, v.wf, v.job) + if token == base { + t.Errorf("token for %s should differ from base", v.name) + } + }) + } +} + +func TestGenerateScopedToken_DifferentKeys(t *testing.T) { + token1 := GenerateScopedToken("key-a", "org", "repo", "wf", "job") + token2 := GenerateScopedToken("key-b", "org", "repo", "wf", "job") + if token1 == token2 { + t.Error("different keys should produce different tokens") + } +} + +func TestGenerateScopedToken_ValidHex(t *testing.T) { + token := GenerateScopedToken("key", "org", "repo", "wf", "job") + if len(token) != 64 { + t.Errorf("token length = %d, want 64", len(token)) + } + if _, err := hex.DecodeString(token); err != nil { + t.Errorf("token is not valid hex: %v", err) + } +} + +func TestValidateScopedToken_Correct(t *testing.T) { + token := GenerateScopedToken("key", "org", "repo", "wf", "job") + if !ValidateScopedToken("key", token, "org", "repo", "wf", "job") { + t.Error("ValidateScopedToken should accept correct token") + } +} + +func TestValidateScopedToken_WrongToken(t *testing.T) { + if ValidateScopedToken("key", "deadbeef", "org", "repo", "wf", "job") { + t.Error("ValidateScopedToken should reject wrong token") + } +} + +func TestValidateScopedToken_WrongScope(t *testing.T) { + token := GenerateScopedToken("key", "org", "repo", "wf", "job") + if ValidateScopedToken("key", token, "org", "repo", "wf", "other-job") { + t.Error("ValidateScopedToken should reject token for different scope") + } +} diff --git a/internal/receiver/types.go b/internal/receiver/types.go index e11a98d..dbc56e0 100644 --- a/internal/receiver/types.go +++ b/internal/receiver/types.go @@ -30,3 +30,16 @@ type StoredMetric struct { ReceivedAt string Payload string // JSON-encoded RunSummary } + +// TokenRequest is the request body for POST /api/v1/token +type TokenRequest struct { + Organization string `json:"organization"` + Repository string `json:"repository"` + Workflow string `json:"workflow"` + Job string `json:"job"` +} + +// TokenResponse is the response body for POST /api/v1/token +type TokenResponse struct { + Token string `json:"token"` +} diff --git a/internal/summary/push.go b/internal/summary/push.go index d01fb5c..b6383db 100644 --- a/internal/summary/push.go +++ b/internal/summary/push.go @@ -30,14 +30,17 @@ type MetricsPayload struct { // PushClient sends metrics to the receiver service type PushClient struct { endpoint string + token string client *http.Client ctx ExecutionContext } -// NewPushClient creates a new push client configured from environment variables -func NewPushClient(endpoint string) *PushClient { +// NewPushClient creates a new push client configured from environment variables. +// If token is non-empty, it is sent as a Bearer token on each push request. +func NewPushClient(endpoint, token string) *PushClient { return &PushClient{ endpoint: endpoint, + token: token, client: &http.Client{ Timeout: 30 * time.Second, }, @@ -86,6 +89,9 @@ func (p *PushClient) Push(ctx context.Context, summary *RunSummary) error { return fmt.Errorf("creating request: %w", err) } req.Header.Set("Content-Type", "application/json") + if p.token != "" { + req.Header.Set("Authorization", "Bearer "+p.token) + } resp, err := p.client.Do(req) if err != nil { diff --git a/internal/summary/push_test.go b/internal/summary/push_test.go index 58aea9e..552ae68 100644 --- a/internal/summary/push_test.go +++ b/internal/summary/push_test.go @@ -25,7 +25,7 @@ func TestPushClient_Push(t *testing.T) { })) defer server.Close() - client := NewPushClient(server.URL) + client := NewPushClient(server.URL, "") client.ctx = ExecutionContext{ Organization: "test-org", Repository: "test-repo", @@ -59,7 +59,7 @@ func TestPushClient_Push(t *testing.T) { } func TestPushClient_Push_NilSummary(t *testing.T) { - client := NewPushClient("http://localhost:9999") + client := NewPushClient("http://localhost:9999", "") err := client.Push(context.Background(), nil) if err != nil { t.Errorf("Push(nil) error = %v, want nil", err) @@ -72,7 +72,7 @@ func TestPushClient_Push_ServerError(t *testing.T) { })) defer server.Close() - client := NewPushClient(server.URL) + client := NewPushClient(server.URL, "") client.ctx = ExecutionContext{RunID: "test"} err := client.Push(context.Background(), &RunSummary{}) @@ -82,7 +82,7 @@ func TestPushClient_Push_ServerError(t *testing.T) { } func TestPushClient_Push_ConnectionError(t *testing.T) { - client := NewPushClient("http://localhost:1") // Invalid port + client := NewPushClient("http://localhost:1", "") // Invalid port client.ctx = ExecutionContext{RunID: "test"} err := client.Push(context.Background(), &RunSummary{}) @@ -147,8 +147,48 @@ func TestExecutionContextFromEnv_GiteaFallback(t *testing.T) { } } +func TestPushClient_Push_WithToken(t *testing.T) { + var gotAuth string + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + gotAuth = r.Header.Get("Authorization") + w.WriteHeader(http.StatusCreated) + })) + defer server.Close() + + client := NewPushClient(server.URL, "my-token") + client.ctx = ExecutionContext{RunID: "test"} + + err := client.Push(context.Background(), &RunSummary{}) + if err != nil { + t.Fatalf("Push() error = %v", err) + } + if gotAuth != "Bearer my-token" { + t.Errorf("Authorization = %q, want %q", gotAuth, "Bearer my-token") + } +} + +func TestPushClient_Push_WithoutToken(t *testing.T) { + var gotAuth string + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + gotAuth = r.Header.Get("Authorization") + w.WriteHeader(http.StatusCreated) + })) + defer server.Close() + + client := NewPushClient(server.URL, "") + client.ctx = ExecutionContext{RunID: "test"} + + err := client.Push(context.Background(), &RunSummary{}) + if err != nil { + t.Fatalf("Push() error = %v", err) + } + if gotAuth != "" { + t.Errorf("Authorization = %q, want empty", gotAuth) + } +} + func TestPushClient_ExecutionContext(t *testing.T) { - client := NewPushClient("http://example.com") + client := NewPushClient("http://example.com", "") client.ctx = ExecutionContext{ Organization: "org", Repository: "repo", diff --git a/test/docker/docker-compose-stress.yaml b/test/docker/docker-compose-stress.yaml index 0d560a7..d4a0be0 100644 --- a/test/docker/docker-compose-stress.yaml +++ b/test/docker/docker-compose-stress.yaml @@ -1,10 +1,12 @@ # Docker Compose stress test with receiver -# Run with: docker compose -f test/docker/docker-compose-stress.yaml up +# See README.md "Docker Compose" section for the full token workflow. # # This test: -# 1. Starts the metrics receiver -# 2. Runs heavy CPU/memory workloads in multiple containers with shared PID namespace -# 3. Collector gathers metrics and pushes summary to receiver on shutdown +# 1. Starts the metrics receiver (with read-token and hmac-key) +# 2. You generate a scoped push token via POST /api/v1/token +# 3. Start the collector with COLLECTOR_PUSH_TOKEN set +# 4. Runs heavy CPU/memory workloads in multiple containers with shared PID namespace +# 5. Collector gathers metrics and pushes summary to receiver on shutdown # # To trigger the push, stop the collector gracefully: # docker compose -f test/docker/docker-compose-stress.yaml stop collector @@ -20,6 +22,8 @@ services: - "9080:8080" environment: - DB_PATH=/data/metrics.db + - RECEIVER_READ_TOKEN=dummyreadtoken + - RECEIVER_HMAC_KEY=dummyhmackey volumes: - receiver-data:/data healthcheck: @@ -98,6 +102,8 @@ services: - --log-format=json - --push-endpoint=http://receiver:8080/api/v1/metrics environment: + # Push token — pass via COLLECTOR_PUSH_TOKEN from host env + COLLECTOR_PUSH_TOKEN: "${COLLECTOR_PUSH_TOKEN}" # Execution context for the receiver GITHUB_REPOSITORY_OWNER: "test-org" GITHUB_REPOSITORY: "test-org/stress-test"