diff --git a/cmd/collector/main.go b/cmd/collector/main.go index 8009586..b34561b 100644 --- a/cmd/collector/main.go +++ b/cmd/collector/main.go @@ -12,6 +12,7 @@ import ( "edp.buildth.ing/DevFW-CICD/forgejo-runner-resource-collector/internal/collector" "edp.buildth.ing/DevFW-CICD/forgejo-runner-resource-collector/internal/output" + "edp.buildth.ing/DevFW-CICD/forgejo-runner-resource-collector/internal/summary" ) const ( @@ -53,6 +54,10 @@ func main() { TopN: *topN, }, metricsWriter, appLogger) + // Attach summary writer to emit run summary on shutdown + summaryWriter := summary.NewSummaryWriter(os.Stdout, *logFormat) + c.SetSummaryWriter(summaryWriter) + // Setup signal handling for graceful shutdown ctx, cancel := context.WithCancel(context.Background()) defer cancel() diff --git a/internal/collector/collector.go b/internal/collector/collector.go index cbc5683..134ab3a 100644 --- a/internal/collector/collector.go +++ b/internal/collector/collector.go @@ -8,6 +8,7 @@ import ( "edp.buildth.ing/DevFW-CICD/forgejo-runner-resource-collector/internal/metrics" "edp.buildth.ing/DevFW-CICD/forgejo-runner-resource-collector/internal/output" + "edp.buildth.ing/DevFW-CICD/forgejo-runner-resource-collector/internal/summary" ) // Config holds the collector configuration @@ -19,22 +20,30 @@ type Config struct { // Collector orchestrates metric collection type Collector struct { - config Config - aggregator *metrics.Aggregator - writer output.Writer - logger *slog.Logger + config Config + aggregator *metrics.Aggregator + writer output.Writer + logger *slog.Logger + accumulator *summary.Accumulator + summaryWriter *summary.SummaryWriter } // New creates a new collector func New(cfg Config, writer output.Writer, logger *slog.Logger) *Collector { return &Collector{ - config: cfg, - aggregator: metrics.NewAggregator(cfg.ProcPath, cfg.TopN), - writer: writer, - logger: logger, + config: cfg, + aggregator: metrics.NewAggregator(cfg.ProcPath, cfg.TopN), + writer: writer, + logger: logger, + accumulator: summary.NewAccumulator(cfg.TopN), } } +// SetSummaryWriter attaches a summary writer for emitting run summaries on shutdown +func (c *Collector) SetSummaryWriter(w *summary.SummaryWriter) { + c.summaryWriter = w +} + // Run starts the collector loop and blocks until context is cancelled func (c *Collector) Run(ctx context.Context) error { c.logger.Info("collector starting", @@ -55,6 +64,7 @@ func (c *Collector) Run(ctx context.Context) error { select { case <-ctx.Done(): c.logger.Info("collector stopping") + c.emitSummary() return ctx.Err() case <-ticker.C: if err := c.collect(); err != nil { @@ -75,9 +85,30 @@ func (c *Collector) collect() error { return fmt.Errorf("writing metrics: %w", err) } + c.accumulator.Add(m) + return nil } +// emitSummary computes and writes the run summary if a writer is configured +func (c *Collector) emitSummary() { + if c.summaryWriter == nil { + return + } + + s := c.accumulator.Summarize() + if s == nil { + c.logger.Info("no samples collected, skipping run summary") + return + } + + c.logger.Info("emitting run summary", + slog.Int("sample_count", s.SampleCount), + slog.Float64("duration_seconds", s.DurationSeconds), + ) + c.summaryWriter.Write(s) +} + // CollectOnce performs a single collection and returns the metrics func (c *Collector) CollectOnce() (*metrics.SystemMetrics, error) { return c.aggregator.Collect() diff --git a/internal/collector/collector_test.go b/internal/collector/collector_test.go new file mode 100644 index 0000000..9251a51 --- /dev/null +++ b/internal/collector/collector_test.go @@ -0,0 +1,98 @@ +// ABOUTME: Tests for the collector's summary integration. +// ABOUTME: Validates that run summaries are emitted on shutdown and handles missing writer gracefully. +package collector + +import ( + "bytes" + "context" + "log/slog" + "strings" + "testing" + "time" + + "edp.buildth.ing/DevFW-CICD/forgejo-runner-resource-collector/internal/output" + "edp.buildth.ing/DevFW-CICD/forgejo-runner-resource-collector/internal/summary" +) + +func TestCollector_EmitsSummaryOnShutdown(t *testing.T) { + // Use testdata/proc as the proc filesystem + procPath := "testdata/proc" + + // Metrics output (regular collection output) + var metricsOut bytes.Buffer + metricsWriter := output.NewLoggerWriter(output.LoggerConfig{ + Output: &metricsOut, + Format: output.LogFormatJSON, + Level: slog.LevelInfo, + }) + + // Summary output + var summaryOut bytes.Buffer + sw := summary.NewSummaryWriter(&summaryOut, "json") + + // Silence app logs + appLogger := slog.New(slog.NewTextHandler(&bytes.Buffer{}, nil)) + + c := New(Config{ + ProcPath: procPath, + Interval: 50 * time.Millisecond, + TopN: 5, + }, metricsWriter, appLogger) + + c.SetSummaryWriter(sw) + + // Run collector briefly then cancel + ctx, cancel := context.WithCancel(context.Background()) + go func() { + // Let at least 2 collection cycles run + time.Sleep(150 * time.Millisecond) + cancel() + }() + + _ = c.Run(ctx) + + // Verify summary was emitted + summaryOutput := summaryOut.String() + if !strings.Contains(summaryOutput, "run_summary") { + t.Errorf("expected 'run_summary' in output, got: %s", summaryOutput) + } + if !strings.Contains(summaryOutput, "duration_seconds") { + t.Errorf("expected 'duration_seconds' in output, got: %s", summaryOutput) + } + if !strings.Contains(summaryOutput, "sample_count") { + t.Errorf("expected 'sample_count' in output, got: %s", summaryOutput) + } +} + +func TestCollector_NoSummaryWithoutWriter(t *testing.T) { + procPath := "testdata/proc" + + var metricsOut bytes.Buffer + metricsWriter := output.NewLoggerWriter(output.LoggerConfig{ + Output: &metricsOut, + Format: output.LogFormatJSON, + Level: slog.LevelInfo, + }) + + appLogger := slog.New(slog.NewTextHandler(&bytes.Buffer{}, nil)) + + c := New(Config{ + ProcPath: procPath, + Interval: 50 * time.Millisecond, + TopN: 5, + }, metricsWriter, appLogger) + + // Deliberately do NOT set a summary writer + + ctx, cancel := context.WithCancel(context.Background()) + go func() { + time.Sleep(100 * time.Millisecond) + cancel() + }() + + // Should not panic + err := c.Run(ctx) + if err != context.Canceled { + t.Errorf("expected context.Canceled, got: %v", err) + } +} diff --git a/internal/collector/testdata/proc/1/stat b/internal/collector/testdata/proc/1/stat new file mode 100644 index 0000000..01d6595 --- /dev/null +++ b/internal/collector/testdata/proc/1/stat @@ -0,0 +1 @@ +1 (init) S 0 1 1 0 -1 4194560 1000 0 0 0 100 50 0 0 20 0 1 0 1 10000000 500 18446744073709551615 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 diff --git a/internal/collector/testdata/proc/1/status b/internal/collector/testdata/proc/1/status new file mode 100644 index 0000000..0b4d9e3 --- /dev/null +++ b/internal/collector/testdata/proc/1/status @@ -0,0 +1,14 @@ +Name: init +Uid: 0 0 0 0 +Gid: 0 0 0 0 +VmPeak: 10000 kB +VmSize: 10000 kB +VmRSS: 5000 kB +VmData: 3000 kB +VmStk: 200 kB +VmExe: 100 kB +VmLib: 1000 kB +RssAnon: 3000 kB +RssFile: 1500 kB +RssShmem: 500 kB +Threads: 1 diff --git a/internal/collector/testdata/proc/cpuinfo b/internal/collector/testdata/proc/cpuinfo new file mode 100644 index 0000000..9703a4e --- /dev/null +++ b/internal/collector/testdata/proc/cpuinfo @@ -0,0 +1 @@ +processor : 0 diff --git a/internal/collector/testdata/proc/meminfo b/internal/collector/testdata/proc/meminfo new file mode 100644 index 0000000..2993ff4 --- /dev/null +++ b/internal/collector/testdata/proc/meminfo @@ -0,0 +1,5 @@ +MemTotal: 16348500 kB +MemFree: 8000000 kB +MemAvailable: 12000000 kB +Buffers: 500000 kB +Cached: 3000000 kB diff --git a/internal/collector/testdata/proc/stat b/internal/collector/testdata/proc/stat new file mode 100644 index 0000000..513d56a --- /dev/null +++ b/internal/collector/testdata/proc/stat @@ -0,0 +1 @@ +cpu 10000 500 3000 80000 200 50 30 0 0 0 diff --git a/internal/summary/accumulator.go b/internal/summary/accumulator.go new file mode 100644 index 0000000..bc3b1de --- /dev/null +++ b/internal/summary/accumulator.go @@ -0,0 +1,138 @@ +// ABOUTME: Accumulates system metrics samples across a collection run. +// ABOUTME: Computes peak, average, and P95 statistics for CPU and memory on demand. +package summary + +import ( + "fmt" + "sort" + "time" + + "edp.buildth.ing/DevFW-CICD/forgejo-runner-resource-collector/internal/metrics" +) + +// Accumulator collects metric samples and computes run-level statistics +type Accumulator struct { + topN int + cpuValues []float64 + memBytesValues []float64 + memPctValues []float64 + processPeaks map[string]*ProcessPeak + startTime time.Time + endTime time.Time + sampleCount int +} + +// NewAccumulator creates an accumulator that tracks the top N processes +func NewAccumulator(topN int) *Accumulator { + return &Accumulator{ + topN: topN, + processPeaks: make(map[string]*ProcessPeak), + } +} + +// Add records a single metrics sample +func (a *Accumulator) Add(m *metrics.SystemMetrics) { + a.sampleCount++ + if a.sampleCount == 1 { + a.startTime = m.Timestamp + } + a.endTime = m.Timestamp + + a.cpuValues = append(a.cpuValues, m.CPU.TotalPercent) + a.memBytesValues = append(a.memBytesValues, float64(m.Memory.UsedBytes)) + a.memPctValues = append(a.memPctValues, m.Memory.UsedPercent) + + for _, p := range m.TopCPU { + a.updateProcessPeak(p) + } + for _, p := range m.TopMemory { + a.updateProcessPeak(p) + } +} + +// Summarize computes and returns the run summary, or nil if no samples were added +func (a *Accumulator) Summarize() *RunSummary { + if a.sampleCount == 0 { + return nil + } + + return &RunSummary{ + StartTime: a.startTime, + EndTime: a.endTime, + DurationSeconds: a.endTime.Sub(a.startTime).Seconds(), + SampleCount: a.sampleCount, + CPUTotal: computeStats(a.cpuValues), + MemUsedBytes: computeStats(a.memBytesValues), + MemUsedPercent: computeStats(a.memPctValues), + TopCPUProcesses: a.topProcesses(func(p *ProcessPeak) float64 { return p.PeakCPU }), + TopMemProcesses: a.topProcesses(func(p *ProcessPeak) float64 { return float64(p.PeakMem) }), + } +} + +// SampleCount returns the number of samples added +func (a *Accumulator) SampleCount() int { + return a.sampleCount +} + +// computeStats calculates peak, average, and P95 from a sorted copy of the values +func computeStats(values []float64) StatSummary { + n := len(values) + if n == 0 { + return StatSummary{} + } + + sorted := make([]float64, n) + copy(sorted, values) + sort.Float64s(sorted) + + var sum float64 + for _, v := range sorted { + sum += v + } + + p95Index := int(float64(n-1) * 0.95) + + return StatSummary{ + Peak: sorted[n-1], + Avg: sum / float64(n), + P95: sorted[p95Index], + } +} + +// updateProcessPeak merges a process observation into the peak tracking map +func (a *Accumulator) updateProcessPeak(p metrics.ProcessMetrics) { + key := fmt.Sprintf("%d:%s", p.PID, p.Name) + existing, ok := a.processPeaks[key] + if !ok { + a.processPeaks[key] = &ProcessPeak{ + PID: p.PID, + Name: p.Name, + PeakCPU: p.CPUPercent, + PeakMem: p.MemRSS, + } + return + } + if p.CPUPercent > existing.PeakCPU { + existing.PeakCPU = p.CPUPercent + } + if p.MemRSS > existing.PeakMem { + existing.PeakMem = p.MemRSS + } +} + +// topProcesses returns the top N processes sorted by the given key function (descending) +func (a *Accumulator) topProcesses(keyFn func(*ProcessPeak) float64) []ProcessPeak { + all := make([]ProcessPeak, 0, len(a.processPeaks)) + for _, p := range a.processPeaks { + all = append(all, *p) + } + + sort.Slice(all, func(i, j int) bool { + return keyFn(&all[i]) > keyFn(&all[j]) + }) + + if len(all) > a.topN { + all = all[:a.topN] + } + return all +} diff --git a/internal/summary/accumulator_test.go b/internal/summary/accumulator_test.go new file mode 100644 index 0000000..76f6d47 --- /dev/null +++ b/internal/summary/accumulator_test.go @@ -0,0 +1,335 @@ +// ABOUTME: Tests for the summary accumulator that tracks metrics across a run. +// ABOUTME: Validates stats computation (peak/avg/P95), process peak tracking, and edge cases. +package summary + +import ( + "testing" + "time" + + "edp.buildth.ing/DevFW-CICD/forgejo-runner-resource-collector/internal/metrics" +) + +func TestAccumulator_NoSamples(t *testing.T) { + acc := NewAccumulator(5) + result := acc.Summarize() + if result != nil { + t.Errorf("expected nil summary for no samples, got %+v", result) + } +} + +func TestAccumulator_SingleSample(t *testing.T) { + acc := NewAccumulator(5) + acc.Add(&metrics.SystemMetrics{ + Timestamp: time.Date(2025, 1, 1, 0, 0, 0, 0, time.UTC), + CPU: metrics.CPUMetrics{TotalPercent: 42.5}, + Memory: metrics.MemoryMetrics{ + UsedBytes: 1000, + UsedPercent: 50.0, + }, + }) + + s := acc.Summarize() + if s == nil { + t.Fatal("expected non-nil summary") + } + + // With a single sample, peak=avg=p95 + if s.CPUTotal.Peak != 42.5 { + t.Errorf("CPU peak: got %f, want 42.5", s.CPUTotal.Peak) + } + if s.CPUTotal.Avg != 42.5 { + t.Errorf("CPU avg: got %f, want 42.5", s.CPUTotal.Avg) + } + if s.CPUTotal.P95 != 42.5 { + t.Errorf("CPU p95: got %f, want 42.5", s.CPUTotal.P95) + } + if s.MemUsedBytes.Peak != 1000 { + t.Errorf("MemUsedBytes peak: got %f, want 1000", s.MemUsedBytes.Peak) + } + if s.MemUsedPercent.Peak != 50.0 { + t.Errorf("MemUsedPercent peak: got %f, want 50.0", s.MemUsedPercent.Peak) + } +} + +func TestAccumulator_Stats(t *testing.T) { + acc := NewAccumulator(5) + cpuValues := []float64{10, 20, 30, 40, 50} + for i, v := range cpuValues { + acc.Add(&metrics.SystemMetrics{ + Timestamp: time.Date(2025, 1, 1, 0, 0, i, 0, time.UTC), + CPU: metrics.CPUMetrics{TotalPercent: v}, + Memory: metrics.MemoryMetrics{ + UsedBytes: uint64(v * 100), + UsedPercent: v, + }, + }) + } + + s := acc.Summarize() + if s == nil { + t.Fatal("expected non-nil summary") + } + + // Peak = max = 50 + if s.CPUTotal.Peak != 50 { + t.Errorf("CPU peak: got %f, want 50", s.CPUTotal.Peak) + } + // Avg = (10+20+30+40+50)/5 = 30 + if s.CPUTotal.Avg != 30 { + t.Errorf("CPU avg: got %f, want 30", s.CPUTotal.Avg) + } + // P95: sorted=[10,20,30,40,50], index=int(4*0.95)=int(3.8)=3, value=40 + if s.CPUTotal.P95 != 40 { + t.Errorf("CPU p95: got %f, want 40", s.CPUTotal.P95) + } +} + +func TestAccumulator_P95_LargerDataset(t *testing.T) { + acc := NewAccumulator(5) + // 20 values: 1, 2, 3, ..., 20 + for i := 1; i <= 20; i++ { + acc.Add(&metrics.SystemMetrics{ + Timestamp: time.Date(2025, 1, 1, 0, 0, i, 0, time.UTC), + CPU: metrics.CPUMetrics{TotalPercent: float64(i)}, + Memory: metrics.MemoryMetrics{}, + }) + } + + s := acc.Summarize() + if s == nil { + t.Fatal("expected non-nil summary") + } + + // P95: sorted=[1..20], index=int(19*0.95)=int(18.05)=18, value=19 + if s.CPUTotal.P95 != 19 { + t.Errorf("CPU p95: got %f, want 19", s.CPUTotal.P95) + } + // Avg = (1+2+...+20)/20 = 210/20 = 10.5 + if s.CPUTotal.Avg != 10.5 { + t.Errorf("CPU avg: got %f, want 10.5", s.CPUTotal.Avg) + } +} + +func TestAccumulator_MemoryStats(t *testing.T) { + acc := NewAccumulator(5) + memBytes := []uint64{1000, 2000, 3000, 4000, 5000} + memPercent := []float64{10, 20, 30, 40, 50} + + for i := range memBytes { + acc.Add(&metrics.SystemMetrics{ + Timestamp: time.Date(2025, 1, 1, 0, 0, i, 0, time.UTC), + CPU: metrics.CPUMetrics{TotalPercent: 0}, + Memory: metrics.MemoryMetrics{ + UsedBytes: memBytes[i], + UsedPercent: memPercent[i], + }, + }) + } + + s := acc.Summarize() + if s == nil { + t.Fatal("expected non-nil summary") + } + + // MemUsedBytes: peak=5000, avg=3000, p95=4000 + if s.MemUsedBytes.Peak != 5000 { + t.Errorf("MemUsedBytes peak: got %f, want 5000", s.MemUsedBytes.Peak) + } + if s.MemUsedBytes.Avg != 3000 { + t.Errorf("MemUsedBytes avg: got %f, want 3000", s.MemUsedBytes.Avg) + } + if s.MemUsedBytes.P95 != 4000 { + t.Errorf("MemUsedBytes p95: got %f, want 4000", s.MemUsedBytes.P95) + } + + // MemUsedPercent: peak=50, avg=30, p95=40 + if s.MemUsedPercent.Peak != 50 { + t.Errorf("MemUsedPercent peak: got %f, want 50", s.MemUsedPercent.Peak) + } + if s.MemUsedPercent.Avg != 30 { + t.Errorf("MemUsedPercent avg: got %f, want 30", s.MemUsedPercent.Avg) + } + if s.MemUsedPercent.P95 != 40 { + t.Errorf("MemUsedPercent p95: got %f, want 40", s.MemUsedPercent.P95) + } +} + +func TestAccumulator_ProcessPeaks(t *testing.T) { + acc := NewAccumulator(5) + + // Same PID across two samples; peaks should be retained + acc.Add(&metrics.SystemMetrics{ + Timestamp: time.Date(2025, 1, 1, 0, 0, 0, 0, time.UTC), + CPU: metrics.CPUMetrics{}, + Memory: metrics.MemoryMetrics{}, + TopCPU: []metrics.ProcessMetrics{ + {PID: 1, Name: "a", CPUPercent: 10, MemRSS: 100}, + }, + TopMemory: []metrics.ProcessMetrics{ + {PID: 1, Name: "a", CPUPercent: 10, MemRSS: 100}, + }, + }) + acc.Add(&metrics.SystemMetrics{ + Timestamp: time.Date(2025, 1, 1, 0, 0, 1, 0, time.UTC), + CPU: metrics.CPUMetrics{}, + Memory: metrics.MemoryMetrics{}, + TopCPU: []metrics.ProcessMetrics{ + {PID: 1, Name: "a", CPUPercent: 20, MemRSS: 50}, + }, + TopMemory: []metrics.ProcessMetrics{ + {PID: 1, Name: "a", CPUPercent: 5, MemRSS: 200}, + }, + }) + + s := acc.Summarize() + if s == nil { + t.Fatal("expected non-nil summary") + } + + // Should find PID 1 with peak CPU=20, peak mem=200 + found := false + for _, p := range s.TopCPUProcesses { + if p.PID == 1 { + found = true + if p.PeakCPU != 20 { + t.Errorf("PeakCPU: got %f, want 20", p.PeakCPU) + } + if p.PeakMem != 200 { + t.Errorf("PeakMem: got %d, want 200", p.PeakMem) + } + } + } + if !found { + t.Error("PID 1 not found in TopCPUProcesses") + } +} + +func TestAccumulator_ProcessPeaks_TopN(t *testing.T) { + acc := NewAccumulator(2) // Only top 2 + + acc.Add(&metrics.SystemMetrics{ + Timestamp: time.Date(2025, 1, 1, 0, 0, 0, 0, time.UTC), + CPU: metrics.CPUMetrics{}, + Memory: metrics.MemoryMetrics{}, + TopCPU: []metrics.ProcessMetrics{ + {PID: 1, Name: "low", CPUPercent: 10, MemRSS: 100}, + {PID: 2, Name: "mid", CPUPercent: 50, MemRSS: 500}, + {PID: 3, Name: "high", CPUPercent: 90, MemRSS: 900}, + }, + TopMemory: []metrics.ProcessMetrics{ + {PID: 1, Name: "low", CPUPercent: 10, MemRSS: 100}, + {PID: 2, Name: "mid", CPUPercent: 50, MemRSS: 500}, + {PID: 3, Name: "high", CPUPercent: 90, MemRSS: 900}, + }, + }) + + s := acc.Summarize() + if s == nil { + t.Fatal("expected non-nil summary") + } + + // TopCPUProcesses should have at most 2 entries, sorted by PeakCPU descending + if len(s.TopCPUProcesses) != 2 { + t.Fatalf("TopCPUProcesses length: got %d, want 2", len(s.TopCPUProcesses)) + } + if s.TopCPUProcesses[0].PeakCPU != 90 { + t.Errorf("TopCPU[0] PeakCPU: got %f, want 90", s.TopCPUProcesses[0].PeakCPU) + } + if s.TopCPUProcesses[1].PeakCPU != 50 { + t.Errorf("TopCPU[1] PeakCPU: got %f, want 50", s.TopCPUProcesses[1].PeakCPU) + } + + // TopMemProcesses should have at most 2 entries, sorted by PeakMem descending + if len(s.TopMemProcesses) != 2 { + t.Fatalf("TopMemProcesses length: got %d, want 2", len(s.TopMemProcesses)) + } + if s.TopMemProcesses[0].PeakMem != 900 { + t.Errorf("TopMem[0] PeakMem: got %d, want 900", s.TopMemProcesses[0].PeakMem) + } + if s.TopMemProcesses[1].PeakMem != 500 { + t.Errorf("TopMem[1] PeakMem: got %d, want 500", s.TopMemProcesses[1].PeakMem) + } +} + +func TestAccumulator_ProcessPeaks_Dedup(t *testing.T) { + acc := NewAccumulator(5) + + // A process appears in both TopCPU and TopMemory + acc.Add(&metrics.SystemMetrics{ + Timestamp: time.Date(2025, 1, 1, 0, 0, 0, 0, time.UTC), + CPU: metrics.CPUMetrics{}, + Memory: metrics.MemoryMetrics{}, + TopCPU: []metrics.ProcessMetrics{ + {PID: 1, Name: "proc", CPUPercent: 80, MemRSS: 100}, + }, + TopMemory: []metrics.ProcessMetrics{ + {PID: 1, Name: "proc", CPUPercent: 30, MemRSS: 500}, + }, + }) + + s := acc.Summarize() + if s == nil { + t.Fatal("expected non-nil summary") + } + + // The internal process map should have merged the peaks + // PeakCPU should be 80 (from TopCPU), PeakMem should be 500 (from TopMemory) + for _, p := range s.TopCPUProcesses { + if p.PID == 1 { + if p.PeakCPU != 80 { + t.Errorf("PeakCPU: got %f, want 80", p.PeakCPU) + } + if p.PeakMem != 500 { + t.Errorf("PeakMem: got %d, want 500", p.PeakMem) + } + } + } +} + +func TestAccumulator_SampleCount(t *testing.T) { + acc := NewAccumulator(5) + if acc.SampleCount() != 0 { + t.Errorf("initial SampleCount: got %d, want 0", acc.SampleCount()) + } + + for i := 0; i < 3; i++ { + acc.Add(&metrics.SystemMetrics{ + Timestamp: time.Date(2025, 1, 1, 0, 0, i, 0, time.UTC), + CPU: metrics.CPUMetrics{}, + Memory: metrics.MemoryMetrics{}, + }) + } + + if acc.SampleCount() != 3 { + t.Errorf("SampleCount after 3 adds: got %d, want 3", acc.SampleCount()) + } +} + +func TestAccumulator_Duration(t *testing.T) { + acc := NewAccumulator(5) + start := time.Date(2025, 1, 1, 0, 0, 0, 0, time.UTC) + end := time.Date(2025, 1, 1, 0, 1, 0, 0, time.UTC) // 60 seconds later + + acc.Add(&metrics.SystemMetrics{ + Timestamp: start, + CPU: metrics.CPUMetrics{}, + Memory: metrics.MemoryMetrics{}, + }) + acc.Add(&metrics.SystemMetrics{ + Timestamp: end, + CPU: metrics.CPUMetrics{}, + Memory: metrics.MemoryMetrics{}, + }) + + s := acc.Summarize() + if s == nil { + t.Fatal("expected non-nil summary") + } + + if !s.StartTime.Equal(start) { + t.Errorf("StartTime: got %v, want %v", s.StartTime, start) + } + if s.DurationSeconds != 60 { + t.Errorf("DurationSeconds: got %f, want 60", s.DurationSeconds) + } +} diff --git a/internal/summary/types.go b/internal/summary/types.go new file mode 100644 index 0000000..ab5ecea --- /dev/null +++ b/internal/summary/types.go @@ -0,0 +1,33 @@ +// ABOUTME: Data types for run-level summary statistics. +// ABOUTME: Defines StatSummary, ProcessPeak, and RunSummary used to report metrics on shutdown. +package summary + +import "time" + +// StatSummary holds peak, average, and P95 for a metric across the run +type StatSummary struct { + Peak float64 `json:"peak"` + Avg float64 `json:"avg"` + P95 float64 `json:"p95"` +} + +// ProcessPeak holds the peak CPU and memory observed for a single process +type ProcessPeak struct { + PID int `json:"pid"` + Name string `json:"name"` + PeakCPU float64 `json:"peak_cpu_percent"` + PeakMem uint64 `json:"peak_mem_rss_bytes"` +} + +// RunSummary holds the complete summary of a collection run +type RunSummary struct { + StartTime time.Time `json:"start_time"` + EndTime time.Time `json:"end_time"` + DurationSeconds float64 `json:"duration_seconds"` + SampleCount int `json:"sample_count"` + CPUTotal StatSummary `json:"cpu_total_percent"` + MemUsedBytes StatSummary `json:"mem_used_bytes"` + MemUsedPercent StatSummary `json:"mem_used_percent"` + TopCPUProcesses []ProcessPeak `json:"top_cpu_processes"` + TopMemProcesses []ProcessPeak `json:"top_mem_processes"` +} diff --git a/internal/summary/writer.go b/internal/summary/writer.go new file mode 100644 index 0000000..f6781e1 --- /dev/null +++ b/internal/summary/writer.go @@ -0,0 +1,81 @@ +// ABOUTME: Emits a RunSummary as a structured log entry via slog. +// ABOUTME: Follows the same slog pattern as internal/output/logger.go for consistency. +package summary + +import ( + "io" + "log/slog" +) + +// SummaryWriter outputs a RunSummary using structured logging +type SummaryWriter struct { + logger *slog.Logger +} + +// NewSummaryWriter creates a writer that emits summaries to the given output in the given format +func NewSummaryWriter(output io.Writer, format string) *SummaryWriter { + opts := &slog.HandlerOptions{Level: slog.LevelInfo} + + var handler slog.Handler + switch format { + case "text": + handler = slog.NewTextHandler(output, opts) + default: + handler = slog.NewJSONHandler(output, opts) + } + + return &SummaryWriter{ + logger: slog.New(handler), + } +} + +// Write emits the run summary as a single structured log entry +func (w *SummaryWriter) Write(s *RunSummary) { + if s == nil { + return + } + + topCPUAttrs := make([]any, 0, len(s.TopCPUProcesses)) + for _, p := range s.TopCPUProcesses { + topCPUAttrs = append(topCPUAttrs, slog.Group("", + slog.Int("pid", p.PID), + slog.String("name", p.Name), + slog.Float64("peak_cpu_percent", p.PeakCPU), + slog.Uint64("peak_mem_rss_bytes", p.PeakMem), + )) + } + + topMemAttrs := make([]any, 0, len(s.TopMemProcesses)) + for _, p := range s.TopMemProcesses { + topMemAttrs = append(topMemAttrs, slog.Group("", + slog.Int("pid", p.PID), + slog.String("name", p.Name), + slog.Float64("peak_cpu_percent", p.PeakCPU), + slog.Uint64("peak_mem_rss_bytes", p.PeakMem), + )) + } + + w.logger.Info("run_summary", + slog.Time("start_time", s.StartTime), + slog.Time("end_time", s.EndTime), + slog.Float64("duration_seconds", s.DurationSeconds), + slog.Int("sample_count", s.SampleCount), + slog.Group("cpu_total_percent", + slog.Float64("peak", s.CPUTotal.Peak), + slog.Float64("avg", s.CPUTotal.Avg), + slog.Float64("p95", s.CPUTotal.P95), + ), + slog.Group("mem_used_bytes", + slog.Float64("peak", s.MemUsedBytes.Peak), + slog.Float64("avg", s.MemUsedBytes.Avg), + slog.Float64("p95", s.MemUsedBytes.P95), + ), + slog.Group("mem_used_percent", + slog.Float64("peak", s.MemUsedPercent.Peak), + slog.Float64("avg", s.MemUsedPercent.Avg), + slog.Float64("p95", s.MemUsedPercent.P95), + ), + slog.Any("top_cpu_processes", topCPUAttrs), + slog.Any("top_mem_processes", topMemAttrs), + ) +} diff --git a/internal/summary/writer_test.go b/internal/summary/writer_test.go new file mode 100644 index 0000000..d787ec6 --- /dev/null +++ b/internal/summary/writer_test.go @@ -0,0 +1,93 @@ +// ABOUTME: Tests for the summary writer that emits run summaries via slog. +// ABOUTME: Validates JSON output, text output, and nil summary handling. +package summary + +import ( + "bytes" + "strings" + "testing" + "time" +) + +func TestSummaryWriter_JSON(t *testing.T) { + var buf bytes.Buffer + w := NewSummaryWriter(&buf, "json") + + s := &RunSummary{ + StartTime: time.Date(2025, 1, 1, 0, 0, 0, 0, time.UTC), + EndTime: time.Date(2025, 1, 1, 0, 1, 0, 0, time.UTC), + DurationSeconds: 60, + SampleCount: 12, + CPUTotal: StatSummary{Peak: 95.5, Avg: 42.0, P95: 88.0}, + MemUsedBytes: StatSummary{Peak: 8000000, Avg: 4000000, P95: 7500000}, + MemUsedPercent: StatSummary{Peak: 80.0, Avg: 40.0, P95: 75.0}, + TopCPUProcesses: []ProcessPeak{ + {PID: 1, Name: "busy", PeakCPU: 95.5, PeakMem: 1000}, + }, + TopMemProcesses: []ProcessPeak{ + {PID: 2, Name: "hungry", PeakCPU: 10.0, PeakMem: 8000000}, + }, + } + + w.Write(s) + + output := buf.String() + if !strings.Contains(output, "run_summary") { + t.Errorf("output should contain 'run_summary', got: %s", output) + } + if !strings.Contains(output, "duration_seconds") { + t.Errorf("output should contain 'duration_seconds', got: %s", output) + } + if !strings.Contains(output, "sample_count") { + t.Errorf("output should contain 'sample_count', got: %s", output) + } + if !strings.Contains(output, "cpu_total_percent") { + t.Errorf("output should contain 'cpu_total_percent', got: %s", output) + } + if !strings.Contains(output, "mem_used_bytes") { + t.Errorf("output should contain 'mem_used_bytes', got: %s", output) + } + if !strings.Contains(output, "top_cpu_processes") { + t.Errorf("output should contain 'top_cpu_processes', got: %s", output) + } + if !strings.Contains(output, "top_mem_processes") { + t.Errorf("output should contain 'top_mem_processes', got: %s", output) + } +} + +func TestSummaryWriter_Text(t *testing.T) { + var buf bytes.Buffer + w := NewSummaryWriter(&buf, "text") + + s := &RunSummary{ + StartTime: time.Date(2025, 1, 1, 0, 0, 0, 0, time.UTC), + EndTime: time.Date(2025, 1, 1, 0, 1, 0, 0, time.UTC), + DurationSeconds: 60, + SampleCount: 12, + CPUTotal: StatSummary{Peak: 95.5, Avg: 42.0, P95: 88.0}, + MemUsedBytes: StatSummary{Peak: 8000000, Avg: 4000000, P95: 7500000}, + MemUsedPercent: StatSummary{Peak: 80.0, Avg: 40.0, P95: 75.0}, + } + + w.Write(s) + + output := buf.String() + if !strings.Contains(output, "run_summary") { + t.Errorf("output should contain 'run_summary', got: %s", output) + } + if !strings.Contains(output, "duration_seconds") { + t.Errorf("output should contain 'duration_seconds', got: %s", output) + } +} + +func TestSummaryWriter_NilSummary(t *testing.T) { + var buf bytes.Buffer + w := NewSummaryWriter(&buf, "json") + + // Should not panic and should not write anything + w.Write(nil) + + if buf.Len() != 0 { + t.Errorf("expected no output for nil summary, got: %s", buf.String()) + } +}