diff --git a/cmd/collector/main.go b/cmd/collector/main.go index b34561b..ec06b9b 100644 --- a/cmd/collector/main.go +++ b/cmd/collector/main.go @@ -30,6 +30,7 @@ func main() { logLevel := flag.String("log-level", defaultLogLevel, "Log level: debug, info, warn, error") logFormat := flag.String("log-format", defaultLogFormat, "Output format: json, text") topN := flag.Int("top", defaultTopN, "Number of top processes to include") + pushEndpoint := flag.String("push-endpoint", "", "HTTP endpoint to push metrics to (e.g., http://localhost:8080/api/v1/metrics)") flag.Parse() // Setup structured logging for application logs @@ -58,6 +59,21 @@ func main() { summaryWriter := summary.NewSummaryWriter(os.Stdout, *logFormat) c.SetSummaryWriter(summaryWriter) + // Setup push client if endpoint is configured + if *pushEndpoint != "" { + pushClient := summary.NewPushClient(*pushEndpoint) + c.SetPushClient(pushClient) + execCtx := pushClient.ExecutionContext() + appLogger.Info("push client configured", + slog.String("endpoint", *pushEndpoint), + slog.String("organization", execCtx.Organization), + slog.String("repository", execCtx.Repository), + slog.String("workflow", execCtx.Workflow), + slog.String("job", execCtx.Job), + slog.String("run_id", execCtx.RunID), + ) + } + // Setup signal handling for graceful shutdown ctx, cancel := context.WithCancel(context.Background()) defer cancel() diff --git a/internal/collector/collector.go b/internal/collector/collector.go index 134ab3a..393f1ae 100644 --- a/internal/collector/collector.go +++ b/internal/collector/collector.go @@ -26,6 +26,7 @@ type Collector struct { logger *slog.Logger accumulator *summary.Accumulator summaryWriter *summary.SummaryWriter + pushClient *summary.PushClient } // New creates a new collector @@ -44,6 +45,11 @@ func (c *Collector) SetSummaryWriter(w *summary.SummaryWriter) { c.summaryWriter = w } +// SetPushClient attaches a push client for sending summaries to the receiver +func (c *Collector) SetPushClient(p *summary.PushClient) { + c.pushClient = p +} + // Run starts the collector loop and blocks until context is cancelled func (c *Collector) Run(ctx context.Context) error { c.logger.Info("collector starting", @@ -64,7 +70,7 @@ func (c *Collector) Run(ctx context.Context) error { select { case <-ctx.Done(): c.logger.Info("collector stopping") - c.emitSummary() + c.emitSummary(context.Background()) // Use fresh context for shutdown tasks return ctx.Err() case <-ticker.C: if err := c.collect(); err != nil { @@ -91,11 +97,7 @@ func (c *Collector) collect() error { } // emitSummary computes and writes the run summary if a writer is configured -func (c *Collector) emitSummary() { - if c.summaryWriter == nil { - return - } - +func (c *Collector) emitSummary(ctx context.Context) { s := c.accumulator.Summarize() if s == nil { c.logger.Info("no samples collected, skipping run summary") @@ -106,7 +108,18 @@ func (c *Collector) emitSummary() { slog.Int("sample_count", s.SampleCount), slog.Float64("duration_seconds", s.DurationSeconds), ) - c.summaryWriter.Write(s) + + if c.summaryWriter != nil { + c.summaryWriter.Write(s) + } + + if c.pushClient != nil { + if err := c.pushClient.Push(ctx, s); err != nil { + c.logger.Error("failed to push metrics", slog.String("error", err.Error())) + } else { + c.logger.Info("metrics pushed successfully") + } + } } // CollectOnce performs a single collection and returns the metrics diff --git a/internal/summary/push.go b/internal/summary/push.go new file mode 100644 index 0000000..d01fb5c --- /dev/null +++ b/internal/summary/push.go @@ -0,0 +1,106 @@ +// ABOUTME: HTTP client for pushing run summaries to the metrics receiver. +// ABOUTME: Reads execution context from GitHub Actions style environment variables. +package summary + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "net/http" + "os" + "time" +) + +// ExecutionContext holds GitHub Actions style identifiers for a workflow run +type ExecutionContext struct { + Organization string `json:"organization"` + Repository string `json:"repository"` + Workflow string `json:"workflow"` + Job string `json:"job"` + RunID string `json:"run_id"` +} + +// MetricsPayload is the complete payload sent to the receiver +type MetricsPayload struct { + Execution ExecutionContext `json:"execution"` + Summary RunSummary `json:"run_summary"` +} + +// PushClient sends metrics to the receiver service +type PushClient struct { + endpoint string + client *http.Client + ctx ExecutionContext +} + +// NewPushClient creates a new push client configured from environment variables +func NewPushClient(endpoint string) *PushClient { + return &PushClient{ + endpoint: endpoint, + client: &http.Client{ + Timeout: 30 * time.Second, + }, + ctx: ExecutionContextFromEnv(), + } +} + +// ExecutionContextFromEnv reads execution context from GitHub Actions environment variables +func ExecutionContextFromEnv() ExecutionContext { + return ExecutionContext{ + Organization: getEnvWithFallback("GITHUB_REPOSITORY_OWNER", "GITEA_REPO_OWNER"), + Repository: getEnvWithFallback("GITHUB_REPOSITORY", "GITEA_REPO"), + Workflow: getEnvWithFallback("GITHUB_WORKFLOW", "GITEA_WORKFLOW"), + Job: getEnvWithFallback("GITHUB_JOB", "GITEA_JOB"), + RunID: getEnvWithFallback("GITHUB_RUN_ID", "GITEA_RUN_ID"), + } +} + +func getEnvWithFallback(keys ...string) string { + for _, key := range keys { + if val := os.Getenv(key); val != "" { + return val + } + } + return "" +} + +// Push sends the run summary to the receiver +func (p *PushClient) Push(ctx context.Context, summary *RunSummary) error { + if summary == nil { + return nil + } + + payload := MetricsPayload{ + Execution: p.ctx, + Summary: *summary, + } + + body, err := json.Marshal(payload) + if err != nil { + return fmt.Errorf("marshaling payload: %w", err) + } + + req, err := http.NewRequestWithContext(ctx, http.MethodPost, p.endpoint, bytes.NewReader(body)) + if err != nil { + return fmt.Errorf("creating request: %w", err) + } + req.Header.Set("Content-Type", "application/json") + + resp, err := p.client.Do(req) + if err != nil { + return fmt.Errorf("sending request: %w", err) + } + defer func() { _ = resp.Body.Close() }() + + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + return fmt.Errorf("unexpected status code: %d", resp.StatusCode) + } + + return nil +} + +// ExecutionContext returns the current execution context +func (p *PushClient) ExecutionContext() ExecutionContext { + return p.ctx +} diff --git a/internal/summary/push_test.go b/internal/summary/push_test.go new file mode 100644 index 0000000..58aea9e --- /dev/null +++ b/internal/summary/push_test.go @@ -0,0 +1,162 @@ +package summary + +import ( + "context" + "encoding/json" + "net/http" + "net/http/httptest" + "testing" + "time" +) + +func TestPushClient_Push(t *testing.T) { + var received MetricsPayload + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + t.Errorf("expected POST, got %s", r.Method) + } + if ct := r.Header.Get("Content-Type"); ct != "application/json" { + t.Errorf("expected Content-Type application/json, got %s", ct) + } + if err := json.NewDecoder(r.Body).Decode(&received); err != nil { + t.Errorf("failed to decode body: %v", err) + } + w.WriteHeader(http.StatusCreated) + })) + defer server.Close() + + client := NewPushClient(server.URL) + client.ctx = ExecutionContext{ + Organization: "test-org", + Repository: "test-repo", + Workflow: "ci.yml", + Job: "build", + RunID: "12345", + } + + summary := &RunSummary{ + StartTime: time.Now().Add(-time.Minute), + EndTime: time.Now(), + DurationSeconds: 60.0, + SampleCount: 10, + CPUTotal: StatSummary{Peak: 80.0, Avg: 50.0, P95: 75.0}, + } + + err := client.Push(context.Background(), summary) + if err != nil { + t.Fatalf("Push() error = %v", err) + } + + if received.Execution.Organization != "test-org" { + t.Errorf("Organization = %q, want %q", received.Execution.Organization, "test-org") + } + if received.Execution.RunID != "12345" { + t.Errorf("RunID = %q, want %q", received.Execution.RunID, "12345") + } + if received.Summary.SampleCount != 10 { + t.Errorf("SampleCount = %d, want %d", received.Summary.SampleCount, 10) + } +} + +func TestPushClient_Push_NilSummary(t *testing.T) { + client := NewPushClient("http://localhost:9999") + err := client.Push(context.Background(), nil) + if err != nil { + t.Errorf("Push(nil) error = %v, want nil", err) + } +} + +func TestPushClient_Push_ServerError(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusInternalServerError) + })) + defer server.Close() + + client := NewPushClient(server.URL) + client.ctx = ExecutionContext{RunID: "test"} + + err := client.Push(context.Background(), &RunSummary{}) + if err == nil { + t.Error("Push() expected error for 500 response, got nil") + } +} + +func TestPushClient_Push_ConnectionError(t *testing.T) { + client := NewPushClient("http://localhost:1") // Invalid port + client.ctx = ExecutionContext{RunID: "test"} + + err := client.Push(context.Background(), &RunSummary{}) + if err == nil { + t.Error("Push() expected error for connection failure, got nil") + } +} + +func TestExecutionContextFromEnv(t *testing.T) { + // Save and restore env + origVars := map[string]string{ + "GITHUB_REPOSITORY_OWNER": "", + "GITHUB_REPOSITORY": "", + "GITHUB_WORKFLOW": "", + "GITHUB_JOB": "", + "GITHUB_RUN_ID": "", + } + for k := range origVars { + origVars[k] = getEnvWithFallback(k) + } + defer func() { + for k, v := range origVars { + if v == "" { + t.Setenv(k, "") + } + } + }() + + t.Setenv("GITHUB_REPOSITORY_OWNER", "my-org") + t.Setenv("GITHUB_REPOSITORY", "my-org/my-repo") + t.Setenv("GITHUB_WORKFLOW", "CI") + t.Setenv("GITHUB_JOB", "test") + t.Setenv("GITHUB_RUN_ID", "999") + + ctx := ExecutionContextFromEnv() + + if ctx.Organization != "my-org" { + t.Errorf("Organization = %q, want %q", ctx.Organization, "my-org") + } + if ctx.Repository != "my-org/my-repo" { + t.Errorf("Repository = %q, want %q", ctx.Repository, "my-org/my-repo") + } + if ctx.Workflow != "CI" { + t.Errorf("Workflow = %q, want %q", ctx.Workflow, "CI") + } + if ctx.Job != "test" { + t.Errorf("Job = %q, want %q", ctx.Job, "test") + } + if ctx.RunID != "999" { + t.Errorf("RunID = %q, want %q", ctx.RunID, "999") + } +} + +func TestExecutionContextFromEnv_GiteaFallback(t *testing.T) { + t.Setenv("GITHUB_RUN_ID", "") + t.Setenv("GITEA_RUN_ID", "gitea-123") + + ctx := ExecutionContextFromEnv() + + if ctx.RunID != "gitea-123" { + t.Errorf("RunID = %q, want %q (Gitea fallback)", ctx.RunID, "gitea-123") + } +} + +func TestPushClient_ExecutionContext(t *testing.T) { + client := NewPushClient("http://example.com") + client.ctx = ExecutionContext{ + Organization: "org", + Repository: "repo", + RunID: "run", + } + + ctx := client.ExecutionContext() + if ctx.Organization != "org" { + t.Errorf("Organization = %q, want %q", ctx.Organization, "org") + } +}