diff --git a/internal/integration/integration_test.go b/internal/integration/integration_test.go new file mode 100644 index 0000000..a7932c3 --- /dev/null +++ b/internal/integration/integration_test.go @@ -0,0 +1,249 @@ +// ABOUTME: Integration tests for collector and receiver interaction. +// ABOUTME: Tests that the push client can successfully send metrics to the receiver. +package integration + +import ( + "bytes" + "context" + "encoding/json" + "io" + "log/slog" + "net/http" + "net/http/httptest" + "path/filepath" + "testing" + "time" + + "edp.buildth.ing/DevFW-CICD/forgejo-runner-resource-collector/internal/receiver" + "edp.buildth.ing/DevFW-CICD/forgejo-runner-resource-collector/internal/summary" +) + +// setupTestReceiver creates a test receiver with SQLite storage and HTTP server +func setupTestReceiver(t *testing.T) (*receiver.Store, *httptest.Server, func()) { + t.Helper() + dbPath := filepath.Join(t.TempDir(), "test.db") + store, err := receiver.NewStore(dbPath) + if err != nil { + t.Fatalf("NewStore() error = %v", err) + } + + handler := receiver.NewHandler(store, slog.New(slog.NewTextHandler(io.Discard, nil))) + mux := http.NewServeMux() + handler.RegisterRoutes(mux) + + server := httptest.NewServer(mux) + + cleanup := func() { + server.Close() + _ = store.Close() + } + + return store, server, cleanup +} + +func TestPushClientToReceiver(t *testing.T) { + store, server, cleanup := setupTestReceiver(t) + defer cleanup() + + // Test execution context + testCtx := summary.ExecutionContext{ + Organization: "integration-org", + Repository: "integration-repo", + Workflow: "test.yml", + Job: "integration-test", + RunID: "run-integration-123", + } + + // Create a test summary + testSummary := &summary.RunSummary{ + StartTime: time.Now().Add(-time.Minute), + EndTime: time.Now(), + DurationSeconds: 60.0, + SampleCount: 10, + CPUTotal: summary.StatSummary{Peak: 85.5, Avg: 42.3, P95: 78.0}, + MemUsedBytes: summary.StatSummary{Peak: 4294967296, Avg: 2147483648, P95: 3865470566}, + MemUsedPercent: summary.StatSummary{Peak: 50.0, Avg: 25.0, P95: 45.0}, + TopCPUProcesses: []summary.ProcessPeak{ + {PID: 1234, Name: "test-process", PeakCPU: 45.0, PeakMem: 1073741824}, + }, + TopMemProcesses: []summary.ProcessPeak{ + {PID: 1234, Name: "test-process", PeakCPU: 45.0, PeakMem: 1073741824}, + }, + } + + // Build payload matching what push client sends + payload := struct { + Execution summary.ExecutionContext `json:"execution"` + Summary summary.RunSummary `json:"run_summary"` + }{ + Execution: testCtx, + Summary: *testSummary, + } + + body, err := json.Marshal(payload) + if err != nil { + t.Fatalf("Marshal() error = %v", err) + } + + // Send via HTTP client + resp, err := http.Post(server.URL+"/api/v1/metrics", "application/json", bytes.NewReader(body)) + if err != nil { + t.Fatalf("Post() error = %v", err) + } + defer func() { _ = resp.Body.Close() }() + + if resp.StatusCode != http.StatusCreated { + t.Errorf("status = %d, want %d", resp.StatusCode, http.StatusCreated) + } + + // Verify metrics were stored + metrics, err := store.GetMetricsByWorkflowJob("integration-org", "integration-repo", "test.yml", "integration-test") + if err != nil { + t.Fatalf("GetMetricsByWorkflowJob() error = %v", err) + } + + if len(metrics) != 1 { + t.Fatalf("got %d metrics, want 1", len(metrics)) + } + + m := metrics[0] + if m.Organization != testCtx.Organization { + t.Errorf("Organization = %q, want %q", m.Organization, testCtx.Organization) + } + if m.Repository != testCtx.Repository { + t.Errorf("Repository = %q, want %q", m.Repository, testCtx.Repository) + } + if m.Workflow != testCtx.Workflow { + t.Errorf("Workflow = %q, want %q", m.Workflow, testCtx.Workflow) + } + if m.Job != testCtx.Job { + t.Errorf("Job = %q, want %q", m.Job, testCtx.Job) + } + if m.RunID != testCtx.RunID { + t.Errorf("RunID = %q, want %q", m.RunID, testCtx.RunID) + } + + // Verify payload was stored correctly + var storedSummary summary.RunSummary + if err := json.Unmarshal([]byte(m.Payload), &storedSummary); err != nil { + t.Fatalf("Unmarshal payload error = %v", err) + } + + if storedSummary.SampleCount != testSummary.SampleCount { + t.Errorf("SampleCount = %d, want %d", storedSummary.SampleCount, testSummary.SampleCount) + } + if storedSummary.CPUTotal.Peak != testSummary.CPUTotal.Peak { + t.Errorf("CPUTotal.Peak = %f, want %f", storedSummary.CPUTotal.Peak, testSummary.CPUTotal.Peak) + } +} + +func TestPushClientIntegration(t *testing.T) { + store, server, cleanup := setupTestReceiver(t) + defer cleanup() + + // Set environment variables for the push client + t.Setenv("GITHUB_REPOSITORY_OWNER", "push-client-org") + t.Setenv("GITHUB_REPOSITORY", "push-client-repo") + t.Setenv("GITHUB_WORKFLOW", "push-test.yml") + t.Setenv("GITHUB_JOB", "push-job") + t.Setenv("GITHUB_RUN_ID", "push-run-456") + + // Create push client - it reads from env vars + pushClient := summary.NewPushClient(server.URL + "/api/v1/metrics") + + // Verify execution context was read from env + ctx := pushClient.ExecutionContext() + if ctx.Organization != "push-client-org" { + t.Errorf("Organization = %q, want %q", ctx.Organization, "push-client-org") + } + + // Create and push a summary + testSummary := &summary.RunSummary{ + StartTime: time.Now().Add(-30 * time.Second), + EndTime: time.Now(), + DurationSeconds: 30.0, + SampleCount: 6, + CPUTotal: summary.StatSummary{Peak: 50.0, Avg: 25.0, P95: 45.0}, + MemUsedBytes: summary.StatSummary{Peak: 1000000, Avg: 500000, P95: 900000}, + MemUsedPercent: summary.StatSummary{Peak: 10.0, Avg: 5.0, P95: 9.0}, + } + + // Push the summary + err := pushClient.Push(context.Background(), testSummary) + if err != nil { + t.Fatalf("Push() error = %v", err) + } + + // Verify it was stored + metrics, err := store.GetMetricsByWorkflowJob("push-client-org", "push-client-repo", "push-test.yml", "push-job") + if err != nil { + t.Fatalf("GetMetricsByWorkflowJob() error = %v", err) + } + + if len(metrics) != 1 { + t.Fatalf("got %d metrics, want 1", len(metrics)) + } + + if metrics[0].RunID != "push-run-456" { + t.Errorf("RunID = %q, want %q", metrics[0].RunID, "push-run-456") + } +} + +func TestMultiplePushes(t *testing.T) { + store, server, cleanup := setupTestReceiver(t) + defer cleanup() + + // Simulate multiple workflow runs pushing metrics via direct HTTP POST + // This avoids env var manipulation which could cause issues with parallel tests + runs := []summary.ExecutionContext{ + {Organization: "org-a", Repository: "repo-1", Workflow: "ci.yml", Job: "build", RunID: "run-1"}, + {Organization: "org-a", Repository: "repo-1", Workflow: "ci.yml", Job: "build", RunID: "run-2"}, + {Organization: "org-a", Repository: "repo-1", Workflow: "ci.yml", Job: "test", RunID: "run-1"}, + {Organization: "org-a", Repository: "repo-2", Workflow: "ci.yml", Job: "build", RunID: "run-1"}, + } + + for _, execCtx := range runs { + payload := struct { + Execution summary.ExecutionContext `json:"execution"` + Summary summary.RunSummary `json:"run_summary"` + }{ + Execution: execCtx, + Summary: summary.RunSummary{ + SampleCount: 5, + CPUTotal: summary.StatSummary{Peak: 50.0}, + }, + } + + body, err := json.Marshal(payload) + if err != nil { + t.Fatalf("Marshal() error = %v", err) + } + + resp, err := http.Post(server.URL+"/api/v1/metrics", "application/json", bytes.NewReader(body)) + if err != nil { + t.Fatalf("Post() error = %v for run %+v", err, execCtx) + } + _ = resp.Body.Close() + + if resp.StatusCode != http.StatusCreated { + t.Fatalf("status = %d, want %d for run %+v", resp.StatusCode, http.StatusCreated, execCtx) + } + } + + // Verify filtering works correctly + metrics, err := store.GetMetricsByWorkflowJob("org-a", "repo-1", "ci.yml", "build") + if err != nil { + t.Fatalf("GetMetricsByWorkflowJob() error = %v", err) + } + if len(metrics) != 2 { + t.Errorf("got %d metrics for org-a/repo-1/ci.yml/build, want 2", len(metrics)) + } + + metrics, err = store.GetMetricsByWorkflowJob("org-a", "repo-1", "ci.yml", "test") + if err != nil { + t.Fatalf("GetMetricsByWorkflowJob() error = %v", err) + } + if len(metrics) != 1 { + t.Errorf("got %d metrics for org-a/repo-1/ci.yml/test, want 1", len(metrics)) + } +}