feat(collector): add HTTP push for metrics to receiver
All checks were successful
ci / build (push) Successful in 46s
All checks were successful
ci / build (push) Successful in 46s
Add push client that sends run summary to a configurable HTTP endpoint on shutdown. Execution context is read from GitHub Actions style environment variables (with Gitea fallbacks). New flag: -push-endpoint <url> Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
parent
c309bd810d
commit
cfe583fbc4
4 changed files with 304 additions and 7 deletions
|
|
@ -30,6 +30,7 @@ func main() {
|
|||
logLevel := flag.String("log-level", defaultLogLevel, "Log level: debug, info, warn, error")
|
||||
logFormat := flag.String("log-format", defaultLogFormat, "Output format: json, text")
|
||||
topN := flag.Int("top", defaultTopN, "Number of top processes to include")
|
||||
pushEndpoint := flag.String("push-endpoint", "", "HTTP endpoint to push metrics to (e.g., http://localhost:8080/api/v1/metrics)")
|
||||
flag.Parse()
|
||||
|
||||
// Setup structured logging for application logs
|
||||
|
|
@ -58,6 +59,21 @@ func main() {
|
|||
summaryWriter := summary.NewSummaryWriter(os.Stdout, *logFormat)
|
||||
c.SetSummaryWriter(summaryWriter)
|
||||
|
||||
// Setup push client if endpoint is configured
|
||||
if *pushEndpoint != "" {
|
||||
pushClient := summary.NewPushClient(*pushEndpoint)
|
||||
c.SetPushClient(pushClient)
|
||||
execCtx := pushClient.ExecutionContext()
|
||||
appLogger.Info("push client configured",
|
||||
slog.String("endpoint", *pushEndpoint),
|
||||
slog.String("organization", execCtx.Organization),
|
||||
slog.String("repository", execCtx.Repository),
|
||||
slog.String("workflow", execCtx.Workflow),
|
||||
slog.String("job", execCtx.Job),
|
||||
slog.String("run_id", execCtx.RunID),
|
||||
)
|
||||
}
|
||||
|
||||
// Setup signal handling for graceful shutdown
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
|
|
|||
|
|
@ -26,6 +26,7 @@ type Collector struct {
|
|||
logger *slog.Logger
|
||||
accumulator *summary.Accumulator
|
||||
summaryWriter *summary.SummaryWriter
|
||||
pushClient *summary.PushClient
|
||||
}
|
||||
|
||||
// New creates a new collector
|
||||
|
|
@ -44,6 +45,11 @@ func (c *Collector) SetSummaryWriter(w *summary.SummaryWriter) {
|
|||
c.summaryWriter = w
|
||||
}
|
||||
|
||||
// SetPushClient attaches a push client for sending summaries to the receiver
|
||||
func (c *Collector) SetPushClient(p *summary.PushClient) {
|
||||
c.pushClient = p
|
||||
}
|
||||
|
||||
// Run starts the collector loop and blocks until context is cancelled
|
||||
func (c *Collector) Run(ctx context.Context) error {
|
||||
c.logger.Info("collector starting",
|
||||
|
|
@ -64,7 +70,7 @@ func (c *Collector) Run(ctx context.Context) error {
|
|||
select {
|
||||
case <-ctx.Done():
|
||||
c.logger.Info("collector stopping")
|
||||
c.emitSummary()
|
||||
c.emitSummary(context.Background()) // Use fresh context for shutdown tasks
|
||||
return ctx.Err()
|
||||
case <-ticker.C:
|
||||
if err := c.collect(); err != nil {
|
||||
|
|
@ -91,11 +97,7 @@ func (c *Collector) collect() error {
|
|||
}
|
||||
|
||||
// emitSummary computes and writes the run summary if a writer is configured
|
||||
func (c *Collector) emitSummary() {
|
||||
if c.summaryWriter == nil {
|
||||
return
|
||||
}
|
||||
|
||||
func (c *Collector) emitSummary(ctx context.Context) {
|
||||
s := c.accumulator.Summarize()
|
||||
if s == nil {
|
||||
c.logger.Info("no samples collected, skipping run summary")
|
||||
|
|
@ -106,7 +108,18 @@ func (c *Collector) emitSummary() {
|
|||
slog.Int("sample_count", s.SampleCount),
|
||||
slog.Float64("duration_seconds", s.DurationSeconds),
|
||||
)
|
||||
c.summaryWriter.Write(s)
|
||||
|
||||
if c.summaryWriter != nil {
|
||||
c.summaryWriter.Write(s)
|
||||
}
|
||||
|
||||
if c.pushClient != nil {
|
||||
if err := c.pushClient.Push(ctx, s); err != nil {
|
||||
c.logger.Error("failed to push metrics", slog.String("error", err.Error()))
|
||||
} else {
|
||||
c.logger.Info("metrics pushed successfully")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// CollectOnce performs a single collection and returns the metrics
|
||||
|
|
|
|||
106
internal/summary/push.go
Normal file
106
internal/summary/push.go
Normal file
|
|
@ -0,0 +1,106 @@
|
|||
// ABOUTME: HTTP client for pushing run summaries to the metrics receiver.
|
||||
// ABOUTME: Reads execution context from GitHub Actions style environment variables.
|
||||
package summary
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"os"
|
||||
"time"
|
||||
)
|
||||
|
||||
// ExecutionContext holds GitHub Actions style identifiers for a workflow run
|
||||
type ExecutionContext struct {
|
||||
Organization string `json:"organization"`
|
||||
Repository string `json:"repository"`
|
||||
Workflow string `json:"workflow"`
|
||||
Job string `json:"job"`
|
||||
RunID string `json:"run_id"`
|
||||
}
|
||||
|
||||
// MetricsPayload is the complete payload sent to the receiver
|
||||
type MetricsPayload struct {
|
||||
Execution ExecutionContext `json:"execution"`
|
||||
Summary RunSummary `json:"run_summary"`
|
||||
}
|
||||
|
||||
// PushClient sends metrics to the receiver service
|
||||
type PushClient struct {
|
||||
endpoint string
|
||||
client *http.Client
|
||||
ctx ExecutionContext
|
||||
}
|
||||
|
||||
// NewPushClient creates a new push client configured from environment variables
|
||||
func NewPushClient(endpoint string) *PushClient {
|
||||
return &PushClient{
|
||||
endpoint: endpoint,
|
||||
client: &http.Client{
|
||||
Timeout: 30 * time.Second,
|
||||
},
|
||||
ctx: ExecutionContextFromEnv(),
|
||||
}
|
||||
}
|
||||
|
||||
// ExecutionContextFromEnv reads execution context from GitHub Actions environment variables
|
||||
func ExecutionContextFromEnv() ExecutionContext {
|
||||
return ExecutionContext{
|
||||
Organization: getEnvWithFallback("GITHUB_REPOSITORY_OWNER", "GITEA_REPO_OWNER"),
|
||||
Repository: getEnvWithFallback("GITHUB_REPOSITORY", "GITEA_REPO"),
|
||||
Workflow: getEnvWithFallback("GITHUB_WORKFLOW", "GITEA_WORKFLOW"),
|
||||
Job: getEnvWithFallback("GITHUB_JOB", "GITEA_JOB"),
|
||||
RunID: getEnvWithFallback("GITHUB_RUN_ID", "GITEA_RUN_ID"),
|
||||
}
|
||||
}
|
||||
|
||||
func getEnvWithFallback(keys ...string) string {
|
||||
for _, key := range keys {
|
||||
if val := os.Getenv(key); val != "" {
|
||||
return val
|
||||
}
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
// Push sends the run summary to the receiver
|
||||
func (p *PushClient) Push(ctx context.Context, summary *RunSummary) error {
|
||||
if summary == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
payload := MetricsPayload{
|
||||
Execution: p.ctx,
|
||||
Summary: *summary,
|
||||
}
|
||||
|
||||
body, err := json.Marshal(payload)
|
||||
if err != nil {
|
||||
return fmt.Errorf("marshaling payload: %w", err)
|
||||
}
|
||||
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodPost, p.endpoint, bytes.NewReader(body))
|
||||
if err != nil {
|
||||
return fmt.Errorf("creating request: %w", err)
|
||||
}
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
|
||||
resp, err := p.client.Do(req)
|
||||
if err != nil {
|
||||
return fmt.Errorf("sending request: %w", err)
|
||||
}
|
||||
defer func() { _ = resp.Body.Close() }()
|
||||
|
||||
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
|
||||
return fmt.Errorf("unexpected status code: %d", resp.StatusCode)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// ExecutionContext returns the current execution context
|
||||
func (p *PushClient) ExecutionContext() ExecutionContext {
|
||||
return p.ctx
|
||||
}
|
||||
162
internal/summary/push_test.go
Normal file
162
internal/summary/push_test.go
Normal file
|
|
@ -0,0 +1,162 @@
|
|||
package summary
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestPushClient_Push(t *testing.T) {
|
||||
var received MetricsPayload
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
if r.Method != http.MethodPost {
|
||||
t.Errorf("expected POST, got %s", r.Method)
|
||||
}
|
||||
if ct := r.Header.Get("Content-Type"); ct != "application/json" {
|
||||
t.Errorf("expected Content-Type application/json, got %s", ct)
|
||||
}
|
||||
if err := json.NewDecoder(r.Body).Decode(&received); err != nil {
|
||||
t.Errorf("failed to decode body: %v", err)
|
||||
}
|
||||
w.WriteHeader(http.StatusCreated)
|
||||
}))
|
||||
defer server.Close()
|
||||
|
||||
client := NewPushClient(server.URL)
|
||||
client.ctx = ExecutionContext{
|
||||
Organization: "test-org",
|
||||
Repository: "test-repo",
|
||||
Workflow: "ci.yml",
|
||||
Job: "build",
|
||||
RunID: "12345",
|
||||
}
|
||||
|
||||
summary := &RunSummary{
|
||||
StartTime: time.Now().Add(-time.Minute),
|
||||
EndTime: time.Now(),
|
||||
DurationSeconds: 60.0,
|
||||
SampleCount: 10,
|
||||
CPUTotal: StatSummary{Peak: 80.0, Avg: 50.0, P95: 75.0},
|
||||
}
|
||||
|
||||
err := client.Push(context.Background(), summary)
|
||||
if err != nil {
|
||||
t.Fatalf("Push() error = %v", err)
|
||||
}
|
||||
|
||||
if received.Execution.Organization != "test-org" {
|
||||
t.Errorf("Organization = %q, want %q", received.Execution.Organization, "test-org")
|
||||
}
|
||||
if received.Execution.RunID != "12345" {
|
||||
t.Errorf("RunID = %q, want %q", received.Execution.RunID, "12345")
|
||||
}
|
||||
if received.Summary.SampleCount != 10 {
|
||||
t.Errorf("SampleCount = %d, want %d", received.Summary.SampleCount, 10)
|
||||
}
|
||||
}
|
||||
|
||||
func TestPushClient_Push_NilSummary(t *testing.T) {
|
||||
client := NewPushClient("http://localhost:9999")
|
||||
err := client.Push(context.Background(), nil)
|
||||
if err != nil {
|
||||
t.Errorf("Push(nil) error = %v, want nil", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestPushClient_Push_ServerError(t *testing.T) {
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
}))
|
||||
defer server.Close()
|
||||
|
||||
client := NewPushClient(server.URL)
|
||||
client.ctx = ExecutionContext{RunID: "test"}
|
||||
|
||||
err := client.Push(context.Background(), &RunSummary{})
|
||||
if err == nil {
|
||||
t.Error("Push() expected error for 500 response, got nil")
|
||||
}
|
||||
}
|
||||
|
||||
func TestPushClient_Push_ConnectionError(t *testing.T) {
|
||||
client := NewPushClient("http://localhost:1") // Invalid port
|
||||
client.ctx = ExecutionContext{RunID: "test"}
|
||||
|
||||
err := client.Push(context.Background(), &RunSummary{})
|
||||
if err == nil {
|
||||
t.Error("Push() expected error for connection failure, got nil")
|
||||
}
|
||||
}
|
||||
|
||||
func TestExecutionContextFromEnv(t *testing.T) {
|
||||
// Save and restore env
|
||||
origVars := map[string]string{
|
||||
"GITHUB_REPOSITORY_OWNER": "",
|
||||
"GITHUB_REPOSITORY": "",
|
||||
"GITHUB_WORKFLOW": "",
|
||||
"GITHUB_JOB": "",
|
||||
"GITHUB_RUN_ID": "",
|
||||
}
|
||||
for k := range origVars {
|
||||
origVars[k] = getEnvWithFallback(k)
|
||||
}
|
||||
defer func() {
|
||||
for k, v := range origVars {
|
||||
if v == "" {
|
||||
t.Setenv(k, "")
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
t.Setenv("GITHUB_REPOSITORY_OWNER", "my-org")
|
||||
t.Setenv("GITHUB_REPOSITORY", "my-org/my-repo")
|
||||
t.Setenv("GITHUB_WORKFLOW", "CI")
|
||||
t.Setenv("GITHUB_JOB", "test")
|
||||
t.Setenv("GITHUB_RUN_ID", "999")
|
||||
|
||||
ctx := ExecutionContextFromEnv()
|
||||
|
||||
if ctx.Organization != "my-org" {
|
||||
t.Errorf("Organization = %q, want %q", ctx.Organization, "my-org")
|
||||
}
|
||||
if ctx.Repository != "my-org/my-repo" {
|
||||
t.Errorf("Repository = %q, want %q", ctx.Repository, "my-org/my-repo")
|
||||
}
|
||||
if ctx.Workflow != "CI" {
|
||||
t.Errorf("Workflow = %q, want %q", ctx.Workflow, "CI")
|
||||
}
|
||||
if ctx.Job != "test" {
|
||||
t.Errorf("Job = %q, want %q", ctx.Job, "test")
|
||||
}
|
||||
if ctx.RunID != "999" {
|
||||
t.Errorf("RunID = %q, want %q", ctx.RunID, "999")
|
||||
}
|
||||
}
|
||||
|
||||
func TestExecutionContextFromEnv_GiteaFallback(t *testing.T) {
|
||||
t.Setenv("GITHUB_RUN_ID", "")
|
||||
t.Setenv("GITEA_RUN_ID", "gitea-123")
|
||||
|
||||
ctx := ExecutionContextFromEnv()
|
||||
|
||||
if ctx.RunID != "gitea-123" {
|
||||
t.Errorf("RunID = %q, want %q (Gitea fallback)", ctx.RunID, "gitea-123")
|
||||
}
|
||||
}
|
||||
|
||||
func TestPushClient_ExecutionContext(t *testing.T) {
|
||||
client := NewPushClient("http://example.com")
|
||||
client.ctx = ExecutionContext{
|
||||
Organization: "org",
|
||||
Repository: "repo",
|
||||
RunID: "run",
|
||||
}
|
||||
|
||||
ctx := client.ExecutionContext()
|
||||
if ctx.Organization != "org" {
|
||||
t.Errorf("Organization = %q, want %q", ctx.Organization, "org")
|
||||
}
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue