test: add integration tests for collector-receiver interaction
All checks were successful
ci / build (push) Successful in 38s

Add integration tests that verify the push client can successfully
send metrics to the receiver and they are stored correctly in SQLite.

Tests:
- TestPushClientToReceiver: Direct HTTP POST verification
- TestPushClientIntegration: Full PushClient with env vars
- TestMultiplePushes: Multiple pushes and filtering (parallel-safe)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Manuel Ganter 2026-02-06 12:12:36 +01:00
parent 7da7dc138f
commit 0bf7dfee38
No known key found for this signature in database

View file

@ -0,0 +1,249 @@
// ABOUTME: Integration tests for collector and receiver interaction.
// ABOUTME: Tests that the push client can successfully send metrics to the receiver.
package integration
import (
"bytes"
"context"
"encoding/json"
"io"
"log/slog"
"net/http"
"net/http/httptest"
"path/filepath"
"testing"
"time"
"edp.buildth.ing/DevFW-CICD/forgejo-runner-resource-collector/internal/receiver"
"edp.buildth.ing/DevFW-CICD/forgejo-runner-resource-collector/internal/summary"
)
// setupTestReceiver creates a test receiver with SQLite storage and HTTP server
func setupTestReceiver(t *testing.T) (*receiver.Store, *httptest.Server, func()) {
t.Helper()
dbPath := filepath.Join(t.TempDir(), "test.db")
store, err := receiver.NewStore(dbPath)
if err != nil {
t.Fatalf("NewStore() error = %v", err)
}
handler := receiver.NewHandler(store, slog.New(slog.NewTextHandler(io.Discard, nil)))
mux := http.NewServeMux()
handler.RegisterRoutes(mux)
server := httptest.NewServer(mux)
cleanup := func() {
server.Close()
_ = store.Close()
}
return store, server, cleanup
}
func TestPushClientToReceiver(t *testing.T) {
store, server, cleanup := setupTestReceiver(t)
defer cleanup()
// Test execution context
testCtx := summary.ExecutionContext{
Organization: "integration-org",
Repository: "integration-repo",
Workflow: "test.yml",
Job: "integration-test",
RunID: "run-integration-123",
}
// Create a test summary
testSummary := &summary.RunSummary{
StartTime: time.Now().Add(-time.Minute),
EndTime: time.Now(),
DurationSeconds: 60.0,
SampleCount: 10,
CPUTotal: summary.StatSummary{Peak: 85.5, Avg: 42.3, P95: 78.0},
MemUsedBytes: summary.StatSummary{Peak: 4294967296, Avg: 2147483648, P95: 3865470566},
MemUsedPercent: summary.StatSummary{Peak: 50.0, Avg: 25.0, P95: 45.0},
TopCPUProcesses: []summary.ProcessPeak{
{PID: 1234, Name: "test-process", PeakCPU: 45.0, PeakMem: 1073741824},
},
TopMemProcesses: []summary.ProcessPeak{
{PID: 1234, Name: "test-process", PeakCPU: 45.0, PeakMem: 1073741824},
},
}
// Build payload matching what push client sends
payload := struct {
Execution summary.ExecutionContext `json:"execution"`
Summary summary.RunSummary `json:"run_summary"`
}{
Execution: testCtx,
Summary: *testSummary,
}
body, err := json.Marshal(payload)
if err != nil {
t.Fatalf("Marshal() error = %v", err)
}
// Send via HTTP client
resp, err := http.Post(server.URL+"/api/v1/metrics", "application/json", bytes.NewReader(body))
if err != nil {
t.Fatalf("Post() error = %v", err)
}
defer func() { _ = resp.Body.Close() }()
if resp.StatusCode != http.StatusCreated {
t.Errorf("status = %d, want %d", resp.StatusCode, http.StatusCreated)
}
// Verify metrics were stored
metrics, err := store.GetMetricsByWorkflowJob("integration-org", "integration-repo", "test.yml", "integration-test")
if err != nil {
t.Fatalf("GetMetricsByWorkflowJob() error = %v", err)
}
if len(metrics) != 1 {
t.Fatalf("got %d metrics, want 1", len(metrics))
}
m := metrics[0]
if m.Organization != testCtx.Organization {
t.Errorf("Organization = %q, want %q", m.Organization, testCtx.Organization)
}
if m.Repository != testCtx.Repository {
t.Errorf("Repository = %q, want %q", m.Repository, testCtx.Repository)
}
if m.Workflow != testCtx.Workflow {
t.Errorf("Workflow = %q, want %q", m.Workflow, testCtx.Workflow)
}
if m.Job != testCtx.Job {
t.Errorf("Job = %q, want %q", m.Job, testCtx.Job)
}
if m.RunID != testCtx.RunID {
t.Errorf("RunID = %q, want %q", m.RunID, testCtx.RunID)
}
// Verify payload was stored correctly
var storedSummary summary.RunSummary
if err := json.Unmarshal([]byte(m.Payload), &storedSummary); err != nil {
t.Fatalf("Unmarshal payload error = %v", err)
}
if storedSummary.SampleCount != testSummary.SampleCount {
t.Errorf("SampleCount = %d, want %d", storedSummary.SampleCount, testSummary.SampleCount)
}
if storedSummary.CPUTotal.Peak != testSummary.CPUTotal.Peak {
t.Errorf("CPUTotal.Peak = %f, want %f", storedSummary.CPUTotal.Peak, testSummary.CPUTotal.Peak)
}
}
func TestPushClientIntegration(t *testing.T) {
store, server, cleanup := setupTestReceiver(t)
defer cleanup()
// Set environment variables for the push client
t.Setenv("GITHUB_REPOSITORY_OWNER", "push-client-org")
t.Setenv("GITHUB_REPOSITORY", "push-client-repo")
t.Setenv("GITHUB_WORKFLOW", "push-test.yml")
t.Setenv("GITHUB_JOB", "push-job")
t.Setenv("GITHUB_RUN_ID", "push-run-456")
// Create push client - it reads from env vars
pushClient := summary.NewPushClient(server.URL + "/api/v1/metrics")
// Verify execution context was read from env
ctx := pushClient.ExecutionContext()
if ctx.Organization != "push-client-org" {
t.Errorf("Organization = %q, want %q", ctx.Organization, "push-client-org")
}
// Create and push a summary
testSummary := &summary.RunSummary{
StartTime: time.Now().Add(-30 * time.Second),
EndTime: time.Now(),
DurationSeconds: 30.0,
SampleCount: 6,
CPUTotal: summary.StatSummary{Peak: 50.0, Avg: 25.0, P95: 45.0},
MemUsedBytes: summary.StatSummary{Peak: 1000000, Avg: 500000, P95: 900000},
MemUsedPercent: summary.StatSummary{Peak: 10.0, Avg: 5.0, P95: 9.0},
}
// Push the summary
err := pushClient.Push(context.Background(), testSummary)
if err != nil {
t.Fatalf("Push() error = %v", err)
}
// Verify it was stored
metrics, err := store.GetMetricsByWorkflowJob("push-client-org", "push-client-repo", "push-test.yml", "push-job")
if err != nil {
t.Fatalf("GetMetricsByWorkflowJob() error = %v", err)
}
if len(metrics) != 1 {
t.Fatalf("got %d metrics, want 1", len(metrics))
}
if metrics[0].RunID != "push-run-456" {
t.Errorf("RunID = %q, want %q", metrics[0].RunID, "push-run-456")
}
}
func TestMultiplePushes(t *testing.T) {
store, server, cleanup := setupTestReceiver(t)
defer cleanup()
// Simulate multiple workflow runs pushing metrics via direct HTTP POST
// This avoids env var manipulation which could cause issues with parallel tests
runs := []summary.ExecutionContext{
{Organization: "org-a", Repository: "repo-1", Workflow: "ci.yml", Job: "build", RunID: "run-1"},
{Organization: "org-a", Repository: "repo-1", Workflow: "ci.yml", Job: "build", RunID: "run-2"},
{Organization: "org-a", Repository: "repo-1", Workflow: "ci.yml", Job: "test", RunID: "run-1"},
{Organization: "org-a", Repository: "repo-2", Workflow: "ci.yml", Job: "build", RunID: "run-1"},
}
for _, execCtx := range runs {
payload := struct {
Execution summary.ExecutionContext `json:"execution"`
Summary summary.RunSummary `json:"run_summary"`
}{
Execution: execCtx,
Summary: summary.RunSummary{
SampleCount: 5,
CPUTotal: summary.StatSummary{Peak: 50.0},
},
}
body, err := json.Marshal(payload)
if err != nil {
t.Fatalf("Marshal() error = %v", err)
}
resp, err := http.Post(server.URL+"/api/v1/metrics", "application/json", bytes.NewReader(body))
if err != nil {
t.Fatalf("Post() error = %v for run %+v", err, execCtx)
}
_ = resp.Body.Close()
if resp.StatusCode != http.StatusCreated {
t.Fatalf("status = %d, want %d for run %+v", resp.StatusCode, http.StatusCreated, execCtx)
}
}
// Verify filtering works correctly
metrics, err := store.GetMetricsByWorkflowJob("org-a", "repo-1", "ci.yml", "build")
if err != nil {
t.Fatalf("GetMetricsByWorkflowJob() error = %v", err)
}
if len(metrics) != 2 {
t.Errorf("got %d metrics for org-a/repo-1/ci.yml/build, want 2", len(metrics))
}
metrics, err = store.GetMetricsByWorkflowJob("org-a", "repo-1", "ci.yml", "test")
if err != nil {
t.Fatalf("GetMetricsByWorkflowJob() error = %v", err)
}
if len(metrics) != 1 {
t.Errorf("got %d metrics for org-a/repo-1/ci.yml/test, want 1", len(metrics))
}
}