feat(collector): Summaries metrics at the end of the process
All checks were successful
ci / build (push) Successful in 1m39s
All checks were successful
ci / build (push) Successful in 1m39s
This commit is contained in:
parent
54269e8a0e
commit
7201a527d8
13 changed files with 844 additions and 8 deletions
|
|
@ -12,6 +12,7 @@ import (
|
|||
|
||||
"edp.buildth.ing/DevFW-CICD/forgejo-runner-resource-collector/internal/collector"
|
||||
"edp.buildth.ing/DevFW-CICD/forgejo-runner-resource-collector/internal/output"
|
||||
"edp.buildth.ing/DevFW-CICD/forgejo-runner-resource-collector/internal/summary"
|
||||
)
|
||||
|
||||
const (
|
||||
|
|
@ -53,6 +54,10 @@ func main() {
|
|||
TopN: *topN,
|
||||
}, metricsWriter, appLogger)
|
||||
|
||||
// Attach summary writer to emit run summary on shutdown
|
||||
summaryWriter := summary.NewSummaryWriter(os.Stdout, *logFormat)
|
||||
c.SetSummaryWriter(summaryWriter)
|
||||
|
||||
// Setup signal handling for graceful shutdown
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
|
|
|||
|
|
@ -8,6 +8,7 @@ import (
|
|||
|
||||
"edp.buildth.ing/DevFW-CICD/forgejo-runner-resource-collector/internal/metrics"
|
||||
"edp.buildth.ing/DevFW-CICD/forgejo-runner-resource-collector/internal/output"
|
||||
"edp.buildth.ing/DevFW-CICD/forgejo-runner-resource-collector/internal/summary"
|
||||
)
|
||||
|
||||
// Config holds the collector configuration
|
||||
|
|
@ -19,22 +20,30 @@ type Config struct {
|
|||
|
||||
// Collector orchestrates metric collection
|
||||
type Collector struct {
|
||||
config Config
|
||||
aggregator *metrics.Aggregator
|
||||
writer output.Writer
|
||||
logger *slog.Logger
|
||||
config Config
|
||||
aggregator *metrics.Aggregator
|
||||
writer output.Writer
|
||||
logger *slog.Logger
|
||||
accumulator *summary.Accumulator
|
||||
summaryWriter *summary.SummaryWriter
|
||||
}
|
||||
|
||||
// New creates a new collector
|
||||
func New(cfg Config, writer output.Writer, logger *slog.Logger) *Collector {
|
||||
return &Collector{
|
||||
config: cfg,
|
||||
aggregator: metrics.NewAggregator(cfg.ProcPath, cfg.TopN),
|
||||
writer: writer,
|
||||
logger: logger,
|
||||
config: cfg,
|
||||
aggregator: metrics.NewAggregator(cfg.ProcPath, cfg.TopN),
|
||||
writer: writer,
|
||||
logger: logger,
|
||||
accumulator: summary.NewAccumulator(cfg.TopN),
|
||||
}
|
||||
}
|
||||
|
||||
// SetSummaryWriter attaches a summary writer for emitting run summaries on shutdown
|
||||
func (c *Collector) SetSummaryWriter(w *summary.SummaryWriter) {
|
||||
c.summaryWriter = w
|
||||
}
|
||||
|
||||
// Run starts the collector loop and blocks until context is cancelled
|
||||
func (c *Collector) Run(ctx context.Context) error {
|
||||
c.logger.Info("collector starting",
|
||||
|
|
@ -55,6 +64,7 @@ func (c *Collector) Run(ctx context.Context) error {
|
|||
select {
|
||||
case <-ctx.Done():
|
||||
c.logger.Info("collector stopping")
|
||||
c.emitSummary()
|
||||
return ctx.Err()
|
||||
case <-ticker.C:
|
||||
if err := c.collect(); err != nil {
|
||||
|
|
@ -75,9 +85,30 @@ func (c *Collector) collect() error {
|
|||
return fmt.Errorf("writing metrics: %w", err)
|
||||
}
|
||||
|
||||
c.accumulator.Add(m)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// emitSummary computes and writes the run summary if a writer is configured
|
||||
func (c *Collector) emitSummary() {
|
||||
if c.summaryWriter == nil {
|
||||
return
|
||||
}
|
||||
|
||||
s := c.accumulator.Summarize()
|
||||
if s == nil {
|
||||
c.logger.Info("no samples collected, skipping run summary")
|
||||
return
|
||||
}
|
||||
|
||||
c.logger.Info("emitting run summary",
|
||||
slog.Int("sample_count", s.SampleCount),
|
||||
slog.Float64("duration_seconds", s.DurationSeconds),
|
||||
)
|
||||
c.summaryWriter.Write(s)
|
||||
}
|
||||
|
||||
// CollectOnce performs a single collection and returns the metrics
|
||||
func (c *Collector) CollectOnce() (*metrics.SystemMetrics, error) {
|
||||
return c.aggregator.Collect()
|
||||
|
|
|
|||
98
internal/collector/collector_test.go
Normal file
98
internal/collector/collector_test.go
Normal file
|
|
@ -0,0 +1,98 @@
|
|||
// ABOUTME: Tests for the collector's summary integration.
|
||||
// ABOUTME: Validates that run summaries are emitted on shutdown and handles missing writer gracefully.
|
||||
package collector
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"log/slog"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"edp.buildth.ing/DevFW-CICD/forgejo-runner-resource-collector/internal/output"
|
||||
"edp.buildth.ing/DevFW-CICD/forgejo-runner-resource-collector/internal/summary"
|
||||
)
|
||||
|
||||
func TestCollector_EmitsSummaryOnShutdown(t *testing.T) {
|
||||
// Use testdata/proc as the proc filesystem
|
||||
procPath := "testdata/proc"
|
||||
|
||||
// Metrics output (regular collection output)
|
||||
var metricsOut bytes.Buffer
|
||||
metricsWriter := output.NewLoggerWriter(output.LoggerConfig{
|
||||
Output: &metricsOut,
|
||||
Format: output.LogFormatJSON,
|
||||
Level: slog.LevelInfo,
|
||||
})
|
||||
|
||||
// Summary output
|
||||
var summaryOut bytes.Buffer
|
||||
sw := summary.NewSummaryWriter(&summaryOut, "json")
|
||||
|
||||
// Silence app logs
|
||||
appLogger := slog.New(slog.NewTextHandler(&bytes.Buffer{}, nil))
|
||||
|
||||
c := New(Config{
|
||||
ProcPath: procPath,
|
||||
Interval: 50 * time.Millisecond,
|
||||
TopN: 5,
|
||||
}, metricsWriter, appLogger)
|
||||
|
||||
c.SetSummaryWriter(sw)
|
||||
|
||||
// Run collector briefly then cancel
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
go func() {
|
||||
// Let at least 2 collection cycles run
|
||||
time.Sleep(150 * time.Millisecond)
|
||||
cancel()
|
||||
}()
|
||||
|
||||
_ = c.Run(ctx)
|
||||
|
||||
// Verify summary was emitted
|
||||
summaryOutput := summaryOut.String()
|
||||
if !strings.Contains(summaryOutput, "run_summary") {
|
||||
t.Errorf("expected 'run_summary' in output, got: %s", summaryOutput)
|
||||
}
|
||||
if !strings.Contains(summaryOutput, "duration_seconds") {
|
||||
t.Errorf("expected 'duration_seconds' in output, got: %s", summaryOutput)
|
||||
}
|
||||
if !strings.Contains(summaryOutput, "sample_count") {
|
||||
t.Errorf("expected 'sample_count' in output, got: %s", summaryOutput)
|
||||
}
|
||||
}
|
||||
|
||||
func TestCollector_NoSummaryWithoutWriter(t *testing.T) {
|
||||
procPath := "testdata/proc"
|
||||
|
||||
var metricsOut bytes.Buffer
|
||||
metricsWriter := output.NewLoggerWriter(output.LoggerConfig{
|
||||
Output: &metricsOut,
|
||||
Format: output.LogFormatJSON,
|
||||
Level: slog.LevelInfo,
|
||||
})
|
||||
|
||||
appLogger := slog.New(slog.NewTextHandler(&bytes.Buffer{}, nil))
|
||||
|
||||
c := New(Config{
|
||||
ProcPath: procPath,
|
||||
Interval: 50 * time.Millisecond,
|
||||
TopN: 5,
|
||||
}, metricsWriter, appLogger)
|
||||
|
||||
// Deliberately do NOT set a summary writer
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
go func() {
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
cancel()
|
||||
}()
|
||||
|
||||
// Should not panic
|
||||
err := c.Run(ctx)
|
||||
if err != context.Canceled {
|
||||
t.Errorf("expected context.Canceled, got: %v", err)
|
||||
}
|
||||
}
|
||||
1
internal/collector/testdata/proc/1/stat
vendored
Normal file
1
internal/collector/testdata/proc/1/stat
vendored
Normal file
|
|
@ -0,0 +1 @@
|
|||
1 (init) S 0 1 1 0 -1 4194560 1000 0 0 0 100 50 0 0 20 0 1 0 1 10000000 500 18446744073709551615 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
|
||||
14
internal/collector/testdata/proc/1/status
vendored
Normal file
14
internal/collector/testdata/proc/1/status
vendored
Normal file
|
|
@ -0,0 +1,14 @@
|
|||
Name: init
|
||||
Uid: 0 0 0 0
|
||||
Gid: 0 0 0 0
|
||||
VmPeak: 10000 kB
|
||||
VmSize: 10000 kB
|
||||
VmRSS: 5000 kB
|
||||
VmData: 3000 kB
|
||||
VmStk: 200 kB
|
||||
VmExe: 100 kB
|
||||
VmLib: 1000 kB
|
||||
RssAnon: 3000 kB
|
||||
RssFile: 1500 kB
|
||||
RssShmem: 500 kB
|
||||
Threads: 1
|
||||
1
internal/collector/testdata/proc/cpuinfo
vendored
Normal file
1
internal/collector/testdata/proc/cpuinfo
vendored
Normal file
|
|
@ -0,0 +1 @@
|
|||
processor : 0
|
||||
5
internal/collector/testdata/proc/meminfo
vendored
Normal file
5
internal/collector/testdata/proc/meminfo
vendored
Normal file
|
|
@ -0,0 +1,5 @@
|
|||
MemTotal: 16348500 kB
|
||||
MemFree: 8000000 kB
|
||||
MemAvailable: 12000000 kB
|
||||
Buffers: 500000 kB
|
||||
Cached: 3000000 kB
|
||||
1
internal/collector/testdata/proc/stat
vendored
Normal file
1
internal/collector/testdata/proc/stat
vendored
Normal file
|
|
@ -0,0 +1 @@
|
|||
cpu 10000 500 3000 80000 200 50 30 0 0 0
|
||||
138
internal/summary/accumulator.go
Normal file
138
internal/summary/accumulator.go
Normal file
|
|
@ -0,0 +1,138 @@
|
|||
// ABOUTME: Accumulates system metrics samples across a collection run.
|
||||
// ABOUTME: Computes peak, average, and P95 statistics for CPU and memory on demand.
|
||||
package summary
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sort"
|
||||
"time"
|
||||
|
||||
"edp.buildth.ing/DevFW-CICD/forgejo-runner-resource-collector/internal/metrics"
|
||||
)
|
||||
|
||||
// Accumulator collects metric samples and computes run-level statistics
|
||||
type Accumulator struct {
|
||||
topN int
|
||||
cpuValues []float64
|
||||
memBytesValues []float64
|
||||
memPctValues []float64
|
||||
processPeaks map[string]*ProcessPeak
|
||||
startTime time.Time
|
||||
endTime time.Time
|
||||
sampleCount int
|
||||
}
|
||||
|
||||
// NewAccumulator creates an accumulator that tracks the top N processes
|
||||
func NewAccumulator(topN int) *Accumulator {
|
||||
return &Accumulator{
|
||||
topN: topN,
|
||||
processPeaks: make(map[string]*ProcessPeak),
|
||||
}
|
||||
}
|
||||
|
||||
// Add records a single metrics sample
|
||||
func (a *Accumulator) Add(m *metrics.SystemMetrics) {
|
||||
a.sampleCount++
|
||||
if a.sampleCount == 1 {
|
||||
a.startTime = m.Timestamp
|
||||
}
|
||||
a.endTime = m.Timestamp
|
||||
|
||||
a.cpuValues = append(a.cpuValues, m.CPU.TotalPercent)
|
||||
a.memBytesValues = append(a.memBytesValues, float64(m.Memory.UsedBytes))
|
||||
a.memPctValues = append(a.memPctValues, m.Memory.UsedPercent)
|
||||
|
||||
for _, p := range m.TopCPU {
|
||||
a.updateProcessPeak(p)
|
||||
}
|
||||
for _, p := range m.TopMemory {
|
||||
a.updateProcessPeak(p)
|
||||
}
|
||||
}
|
||||
|
||||
// Summarize computes and returns the run summary, or nil if no samples were added
|
||||
func (a *Accumulator) Summarize() *RunSummary {
|
||||
if a.sampleCount == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
return &RunSummary{
|
||||
StartTime: a.startTime,
|
||||
EndTime: a.endTime,
|
||||
DurationSeconds: a.endTime.Sub(a.startTime).Seconds(),
|
||||
SampleCount: a.sampleCount,
|
||||
CPUTotal: computeStats(a.cpuValues),
|
||||
MemUsedBytes: computeStats(a.memBytesValues),
|
||||
MemUsedPercent: computeStats(a.memPctValues),
|
||||
TopCPUProcesses: a.topProcesses(func(p *ProcessPeak) float64 { return p.PeakCPU }),
|
||||
TopMemProcesses: a.topProcesses(func(p *ProcessPeak) float64 { return float64(p.PeakMem) }),
|
||||
}
|
||||
}
|
||||
|
||||
// SampleCount returns the number of samples added
|
||||
func (a *Accumulator) SampleCount() int {
|
||||
return a.sampleCount
|
||||
}
|
||||
|
||||
// computeStats calculates peak, average, and P95 from a sorted copy of the values
|
||||
func computeStats(values []float64) StatSummary {
|
||||
n := len(values)
|
||||
if n == 0 {
|
||||
return StatSummary{}
|
||||
}
|
||||
|
||||
sorted := make([]float64, n)
|
||||
copy(sorted, values)
|
||||
sort.Float64s(sorted)
|
||||
|
||||
var sum float64
|
||||
for _, v := range sorted {
|
||||
sum += v
|
||||
}
|
||||
|
||||
p95Index := int(float64(n-1) * 0.95)
|
||||
|
||||
return StatSummary{
|
||||
Peak: sorted[n-1],
|
||||
Avg: sum / float64(n),
|
||||
P95: sorted[p95Index],
|
||||
}
|
||||
}
|
||||
|
||||
// updateProcessPeak merges a process observation into the peak tracking map
|
||||
func (a *Accumulator) updateProcessPeak(p metrics.ProcessMetrics) {
|
||||
key := fmt.Sprintf("%d:%s", p.PID, p.Name)
|
||||
existing, ok := a.processPeaks[key]
|
||||
if !ok {
|
||||
a.processPeaks[key] = &ProcessPeak{
|
||||
PID: p.PID,
|
||||
Name: p.Name,
|
||||
PeakCPU: p.CPUPercent,
|
||||
PeakMem: p.MemRSS,
|
||||
}
|
||||
return
|
||||
}
|
||||
if p.CPUPercent > existing.PeakCPU {
|
||||
existing.PeakCPU = p.CPUPercent
|
||||
}
|
||||
if p.MemRSS > existing.PeakMem {
|
||||
existing.PeakMem = p.MemRSS
|
||||
}
|
||||
}
|
||||
|
||||
// topProcesses returns the top N processes sorted by the given key function (descending)
|
||||
func (a *Accumulator) topProcesses(keyFn func(*ProcessPeak) float64) []ProcessPeak {
|
||||
all := make([]ProcessPeak, 0, len(a.processPeaks))
|
||||
for _, p := range a.processPeaks {
|
||||
all = append(all, *p)
|
||||
}
|
||||
|
||||
sort.Slice(all, func(i, j int) bool {
|
||||
return keyFn(&all[i]) > keyFn(&all[j])
|
||||
})
|
||||
|
||||
if len(all) > a.topN {
|
||||
all = all[:a.topN]
|
||||
}
|
||||
return all
|
||||
}
|
||||
335
internal/summary/accumulator_test.go
Normal file
335
internal/summary/accumulator_test.go
Normal file
|
|
@ -0,0 +1,335 @@
|
|||
// ABOUTME: Tests for the summary accumulator that tracks metrics across a run.
|
||||
// ABOUTME: Validates stats computation (peak/avg/P95), process peak tracking, and edge cases.
|
||||
package summary
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"edp.buildth.ing/DevFW-CICD/forgejo-runner-resource-collector/internal/metrics"
|
||||
)
|
||||
|
||||
func TestAccumulator_NoSamples(t *testing.T) {
|
||||
acc := NewAccumulator(5)
|
||||
result := acc.Summarize()
|
||||
if result != nil {
|
||||
t.Errorf("expected nil summary for no samples, got %+v", result)
|
||||
}
|
||||
}
|
||||
|
||||
func TestAccumulator_SingleSample(t *testing.T) {
|
||||
acc := NewAccumulator(5)
|
||||
acc.Add(&metrics.SystemMetrics{
|
||||
Timestamp: time.Date(2025, 1, 1, 0, 0, 0, 0, time.UTC),
|
||||
CPU: metrics.CPUMetrics{TotalPercent: 42.5},
|
||||
Memory: metrics.MemoryMetrics{
|
||||
UsedBytes: 1000,
|
||||
UsedPercent: 50.0,
|
||||
},
|
||||
})
|
||||
|
||||
s := acc.Summarize()
|
||||
if s == nil {
|
||||
t.Fatal("expected non-nil summary")
|
||||
}
|
||||
|
||||
// With a single sample, peak=avg=p95
|
||||
if s.CPUTotal.Peak != 42.5 {
|
||||
t.Errorf("CPU peak: got %f, want 42.5", s.CPUTotal.Peak)
|
||||
}
|
||||
if s.CPUTotal.Avg != 42.5 {
|
||||
t.Errorf("CPU avg: got %f, want 42.5", s.CPUTotal.Avg)
|
||||
}
|
||||
if s.CPUTotal.P95 != 42.5 {
|
||||
t.Errorf("CPU p95: got %f, want 42.5", s.CPUTotal.P95)
|
||||
}
|
||||
if s.MemUsedBytes.Peak != 1000 {
|
||||
t.Errorf("MemUsedBytes peak: got %f, want 1000", s.MemUsedBytes.Peak)
|
||||
}
|
||||
if s.MemUsedPercent.Peak != 50.0 {
|
||||
t.Errorf("MemUsedPercent peak: got %f, want 50.0", s.MemUsedPercent.Peak)
|
||||
}
|
||||
}
|
||||
|
||||
func TestAccumulator_Stats(t *testing.T) {
|
||||
acc := NewAccumulator(5)
|
||||
cpuValues := []float64{10, 20, 30, 40, 50}
|
||||
for i, v := range cpuValues {
|
||||
acc.Add(&metrics.SystemMetrics{
|
||||
Timestamp: time.Date(2025, 1, 1, 0, 0, i, 0, time.UTC),
|
||||
CPU: metrics.CPUMetrics{TotalPercent: v},
|
||||
Memory: metrics.MemoryMetrics{
|
||||
UsedBytes: uint64(v * 100),
|
||||
UsedPercent: v,
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
s := acc.Summarize()
|
||||
if s == nil {
|
||||
t.Fatal("expected non-nil summary")
|
||||
}
|
||||
|
||||
// Peak = max = 50
|
||||
if s.CPUTotal.Peak != 50 {
|
||||
t.Errorf("CPU peak: got %f, want 50", s.CPUTotal.Peak)
|
||||
}
|
||||
// Avg = (10+20+30+40+50)/5 = 30
|
||||
if s.CPUTotal.Avg != 30 {
|
||||
t.Errorf("CPU avg: got %f, want 30", s.CPUTotal.Avg)
|
||||
}
|
||||
// P95: sorted=[10,20,30,40,50], index=int(4*0.95)=int(3.8)=3, value=40
|
||||
if s.CPUTotal.P95 != 40 {
|
||||
t.Errorf("CPU p95: got %f, want 40", s.CPUTotal.P95)
|
||||
}
|
||||
}
|
||||
|
||||
func TestAccumulator_P95_LargerDataset(t *testing.T) {
|
||||
acc := NewAccumulator(5)
|
||||
// 20 values: 1, 2, 3, ..., 20
|
||||
for i := 1; i <= 20; i++ {
|
||||
acc.Add(&metrics.SystemMetrics{
|
||||
Timestamp: time.Date(2025, 1, 1, 0, 0, i, 0, time.UTC),
|
||||
CPU: metrics.CPUMetrics{TotalPercent: float64(i)},
|
||||
Memory: metrics.MemoryMetrics{},
|
||||
})
|
||||
}
|
||||
|
||||
s := acc.Summarize()
|
||||
if s == nil {
|
||||
t.Fatal("expected non-nil summary")
|
||||
}
|
||||
|
||||
// P95: sorted=[1..20], index=int(19*0.95)=int(18.05)=18, value=19
|
||||
if s.CPUTotal.P95 != 19 {
|
||||
t.Errorf("CPU p95: got %f, want 19", s.CPUTotal.P95)
|
||||
}
|
||||
// Avg = (1+2+...+20)/20 = 210/20 = 10.5
|
||||
if s.CPUTotal.Avg != 10.5 {
|
||||
t.Errorf("CPU avg: got %f, want 10.5", s.CPUTotal.Avg)
|
||||
}
|
||||
}
|
||||
|
||||
func TestAccumulator_MemoryStats(t *testing.T) {
|
||||
acc := NewAccumulator(5)
|
||||
memBytes := []uint64{1000, 2000, 3000, 4000, 5000}
|
||||
memPercent := []float64{10, 20, 30, 40, 50}
|
||||
|
||||
for i := range memBytes {
|
||||
acc.Add(&metrics.SystemMetrics{
|
||||
Timestamp: time.Date(2025, 1, 1, 0, 0, i, 0, time.UTC),
|
||||
CPU: metrics.CPUMetrics{TotalPercent: 0},
|
||||
Memory: metrics.MemoryMetrics{
|
||||
UsedBytes: memBytes[i],
|
||||
UsedPercent: memPercent[i],
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
s := acc.Summarize()
|
||||
if s == nil {
|
||||
t.Fatal("expected non-nil summary")
|
||||
}
|
||||
|
||||
// MemUsedBytes: peak=5000, avg=3000, p95=4000
|
||||
if s.MemUsedBytes.Peak != 5000 {
|
||||
t.Errorf("MemUsedBytes peak: got %f, want 5000", s.MemUsedBytes.Peak)
|
||||
}
|
||||
if s.MemUsedBytes.Avg != 3000 {
|
||||
t.Errorf("MemUsedBytes avg: got %f, want 3000", s.MemUsedBytes.Avg)
|
||||
}
|
||||
if s.MemUsedBytes.P95 != 4000 {
|
||||
t.Errorf("MemUsedBytes p95: got %f, want 4000", s.MemUsedBytes.P95)
|
||||
}
|
||||
|
||||
// MemUsedPercent: peak=50, avg=30, p95=40
|
||||
if s.MemUsedPercent.Peak != 50 {
|
||||
t.Errorf("MemUsedPercent peak: got %f, want 50", s.MemUsedPercent.Peak)
|
||||
}
|
||||
if s.MemUsedPercent.Avg != 30 {
|
||||
t.Errorf("MemUsedPercent avg: got %f, want 30", s.MemUsedPercent.Avg)
|
||||
}
|
||||
if s.MemUsedPercent.P95 != 40 {
|
||||
t.Errorf("MemUsedPercent p95: got %f, want 40", s.MemUsedPercent.P95)
|
||||
}
|
||||
}
|
||||
|
||||
func TestAccumulator_ProcessPeaks(t *testing.T) {
|
||||
acc := NewAccumulator(5)
|
||||
|
||||
// Same PID across two samples; peaks should be retained
|
||||
acc.Add(&metrics.SystemMetrics{
|
||||
Timestamp: time.Date(2025, 1, 1, 0, 0, 0, 0, time.UTC),
|
||||
CPU: metrics.CPUMetrics{},
|
||||
Memory: metrics.MemoryMetrics{},
|
||||
TopCPU: []metrics.ProcessMetrics{
|
||||
{PID: 1, Name: "a", CPUPercent: 10, MemRSS: 100},
|
||||
},
|
||||
TopMemory: []metrics.ProcessMetrics{
|
||||
{PID: 1, Name: "a", CPUPercent: 10, MemRSS: 100},
|
||||
},
|
||||
})
|
||||
acc.Add(&metrics.SystemMetrics{
|
||||
Timestamp: time.Date(2025, 1, 1, 0, 0, 1, 0, time.UTC),
|
||||
CPU: metrics.CPUMetrics{},
|
||||
Memory: metrics.MemoryMetrics{},
|
||||
TopCPU: []metrics.ProcessMetrics{
|
||||
{PID: 1, Name: "a", CPUPercent: 20, MemRSS: 50},
|
||||
},
|
||||
TopMemory: []metrics.ProcessMetrics{
|
||||
{PID: 1, Name: "a", CPUPercent: 5, MemRSS: 200},
|
||||
},
|
||||
})
|
||||
|
||||
s := acc.Summarize()
|
||||
if s == nil {
|
||||
t.Fatal("expected non-nil summary")
|
||||
}
|
||||
|
||||
// Should find PID 1 with peak CPU=20, peak mem=200
|
||||
found := false
|
||||
for _, p := range s.TopCPUProcesses {
|
||||
if p.PID == 1 {
|
||||
found = true
|
||||
if p.PeakCPU != 20 {
|
||||
t.Errorf("PeakCPU: got %f, want 20", p.PeakCPU)
|
||||
}
|
||||
if p.PeakMem != 200 {
|
||||
t.Errorf("PeakMem: got %d, want 200", p.PeakMem)
|
||||
}
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
t.Error("PID 1 not found in TopCPUProcesses")
|
||||
}
|
||||
}
|
||||
|
||||
func TestAccumulator_ProcessPeaks_TopN(t *testing.T) {
|
||||
acc := NewAccumulator(2) // Only top 2
|
||||
|
||||
acc.Add(&metrics.SystemMetrics{
|
||||
Timestamp: time.Date(2025, 1, 1, 0, 0, 0, 0, time.UTC),
|
||||
CPU: metrics.CPUMetrics{},
|
||||
Memory: metrics.MemoryMetrics{},
|
||||
TopCPU: []metrics.ProcessMetrics{
|
||||
{PID: 1, Name: "low", CPUPercent: 10, MemRSS: 100},
|
||||
{PID: 2, Name: "mid", CPUPercent: 50, MemRSS: 500},
|
||||
{PID: 3, Name: "high", CPUPercent: 90, MemRSS: 900},
|
||||
},
|
||||
TopMemory: []metrics.ProcessMetrics{
|
||||
{PID: 1, Name: "low", CPUPercent: 10, MemRSS: 100},
|
||||
{PID: 2, Name: "mid", CPUPercent: 50, MemRSS: 500},
|
||||
{PID: 3, Name: "high", CPUPercent: 90, MemRSS: 900},
|
||||
},
|
||||
})
|
||||
|
||||
s := acc.Summarize()
|
||||
if s == nil {
|
||||
t.Fatal("expected non-nil summary")
|
||||
}
|
||||
|
||||
// TopCPUProcesses should have at most 2 entries, sorted by PeakCPU descending
|
||||
if len(s.TopCPUProcesses) != 2 {
|
||||
t.Fatalf("TopCPUProcesses length: got %d, want 2", len(s.TopCPUProcesses))
|
||||
}
|
||||
if s.TopCPUProcesses[0].PeakCPU != 90 {
|
||||
t.Errorf("TopCPU[0] PeakCPU: got %f, want 90", s.TopCPUProcesses[0].PeakCPU)
|
||||
}
|
||||
if s.TopCPUProcesses[1].PeakCPU != 50 {
|
||||
t.Errorf("TopCPU[1] PeakCPU: got %f, want 50", s.TopCPUProcesses[1].PeakCPU)
|
||||
}
|
||||
|
||||
// TopMemProcesses should have at most 2 entries, sorted by PeakMem descending
|
||||
if len(s.TopMemProcesses) != 2 {
|
||||
t.Fatalf("TopMemProcesses length: got %d, want 2", len(s.TopMemProcesses))
|
||||
}
|
||||
if s.TopMemProcesses[0].PeakMem != 900 {
|
||||
t.Errorf("TopMem[0] PeakMem: got %d, want 900", s.TopMemProcesses[0].PeakMem)
|
||||
}
|
||||
if s.TopMemProcesses[1].PeakMem != 500 {
|
||||
t.Errorf("TopMem[1] PeakMem: got %d, want 500", s.TopMemProcesses[1].PeakMem)
|
||||
}
|
||||
}
|
||||
|
||||
func TestAccumulator_ProcessPeaks_Dedup(t *testing.T) {
|
||||
acc := NewAccumulator(5)
|
||||
|
||||
// A process appears in both TopCPU and TopMemory
|
||||
acc.Add(&metrics.SystemMetrics{
|
||||
Timestamp: time.Date(2025, 1, 1, 0, 0, 0, 0, time.UTC),
|
||||
CPU: metrics.CPUMetrics{},
|
||||
Memory: metrics.MemoryMetrics{},
|
||||
TopCPU: []metrics.ProcessMetrics{
|
||||
{PID: 1, Name: "proc", CPUPercent: 80, MemRSS: 100},
|
||||
},
|
||||
TopMemory: []metrics.ProcessMetrics{
|
||||
{PID: 1, Name: "proc", CPUPercent: 30, MemRSS: 500},
|
||||
},
|
||||
})
|
||||
|
||||
s := acc.Summarize()
|
||||
if s == nil {
|
||||
t.Fatal("expected non-nil summary")
|
||||
}
|
||||
|
||||
// The internal process map should have merged the peaks
|
||||
// PeakCPU should be 80 (from TopCPU), PeakMem should be 500 (from TopMemory)
|
||||
for _, p := range s.TopCPUProcesses {
|
||||
if p.PID == 1 {
|
||||
if p.PeakCPU != 80 {
|
||||
t.Errorf("PeakCPU: got %f, want 80", p.PeakCPU)
|
||||
}
|
||||
if p.PeakMem != 500 {
|
||||
t.Errorf("PeakMem: got %d, want 500", p.PeakMem)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestAccumulator_SampleCount(t *testing.T) {
|
||||
acc := NewAccumulator(5)
|
||||
if acc.SampleCount() != 0 {
|
||||
t.Errorf("initial SampleCount: got %d, want 0", acc.SampleCount())
|
||||
}
|
||||
|
||||
for i := 0; i < 3; i++ {
|
||||
acc.Add(&metrics.SystemMetrics{
|
||||
Timestamp: time.Date(2025, 1, 1, 0, 0, i, 0, time.UTC),
|
||||
CPU: metrics.CPUMetrics{},
|
||||
Memory: metrics.MemoryMetrics{},
|
||||
})
|
||||
}
|
||||
|
||||
if acc.SampleCount() != 3 {
|
||||
t.Errorf("SampleCount after 3 adds: got %d, want 3", acc.SampleCount())
|
||||
}
|
||||
}
|
||||
|
||||
func TestAccumulator_Duration(t *testing.T) {
|
||||
acc := NewAccumulator(5)
|
||||
start := time.Date(2025, 1, 1, 0, 0, 0, 0, time.UTC)
|
||||
end := time.Date(2025, 1, 1, 0, 1, 0, 0, time.UTC) // 60 seconds later
|
||||
|
||||
acc.Add(&metrics.SystemMetrics{
|
||||
Timestamp: start,
|
||||
CPU: metrics.CPUMetrics{},
|
||||
Memory: metrics.MemoryMetrics{},
|
||||
})
|
||||
acc.Add(&metrics.SystemMetrics{
|
||||
Timestamp: end,
|
||||
CPU: metrics.CPUMetrics{},
|
||||
Memory: metrics.MemoryMetrics{},
|
||||
})
|
||||
|
||||
s := acc.Summarize()
|
||||
if s == nil {
|
||||
t.Fatal("expected non-nil summary")
|
||||
}
|
||||
|
||||
if !s.StartTime.Equal(start) {
|
||||
t.Errorf("StartTime: got %v, want %v", s.StartTime, start)
|
||||
}
|
||||
if s.DurationSeconds != 60 {
|
||||
t.Errorf("DurationSeconds: got %f, want 60", s.DurationSeconds)
|
||||
}
|
||||
}
|
||||
33
internal/summary/types.go
Normal file
33
internal/summary/types.go
Normal file
|
|
@ -0,0 +1,33 @@
|
|||
// ABOUTME: Data types for run-level summary statistics.
|
||||
// ABOUTME: Defines StatSummary, ProcessPeak, and RunSummary used to report metrics on shutdown.
|
||||
package summary
|
||||
|
||||
import "time"
|
||||
|
||||
// StatSummary holds peak, average, and P95 for a metric across the run
|
||||
type StatSummary struct {
|
||||
Peak float64 `json:"peak"`
|
||||
Avg float64 `json:"avg"`
|
||||
P95 float64 `json:"p95"`
|
||||
}
|
||||
|
||||
// ProcessPeak holds the peak CPU and memory observed for a single process
|
||||
type ProcessPeak struct {
|
||||
PID int `json:"pid"`
|
||||
Name string `json:"name"`
|
||||
PeakCPU float64 `json:"peak_cpu_percent"`
|
||||
PeakMem uint64 `json:"peak_mem_rss_bytes"`
|
||||
}
|
||||
|
||||
// RunSummary holds the complete summary of a collection run
|
||||
type RunSummary struct {
|
||||
StartTime time.Time `json:"start_time"`
|
||||
EndTime time.Time `json:"end_time"`
|
||||
DurationSeconds float64 `json:"duration_seconds"`
|
||||
SampleCount int `json:"sample_count"`
|
||||
CPUTotal StatSummary `json:"cpu_total_percent"`
|
||||
MemUsedBytes StatSummary `json:"mem_used_bytes"`
|
||||
MemUsedPercent StatSummary `json:"mem_used_percent"`
|
||||
TopCPUProcesses []ProcessPeak `json:"top_cpu_processes"`
|
||||
TopMemProcesses []ProcessPeak `json:"top_mem_processes"`
|
||||
}
|
||||
81
internal/summary/writer.go
Normal file
81
internal/summary/writer.go
Normal file
|
|
@ -0,0 +1,81 @@
|
|||
// ABOUTME: Emits a RunSummary as a structured log entry via slog.
|
||||
// ABOUTME: Follows the same slog pattern as internal/output/logger.go for consistency.
|
||||
package summary
|
||||
|
||||
import (
|
||||
"io"
|
||||
"log/slog"
|
||||
)
|
||||
|
||||
// SummaryWriter outputs a RunSummary using structured logging
|
||||
type SummaryWriter struct {
|
||||
logger *slog.Logger
|
||||
}
|
||||
|
||||
// NewSummaryWriter creates a writer that emits summaries to the given output in the given format
|
||||
func NewSummaryWriter(output io.Writer, format string) *SummaryWriter {
|
||||
opts := &slog.HandlerOptions{Level: slog.LevelInfo}
|
||||
|
||||
var handler slog.Handler
|
||||
switch format {
|
||||
case "text":
|
||||
handler = slog.NewTextHandler(output, opts)
|
||||
default:
|
||||
handler = slog.NewJSONHandler(output, opts)
|
||||
}
|
||||
|
||||
return &SummaryWriter{
|
||||
logger: slog.New(handler),
|
||||
}
|
||||
}
|
||||
|
||||
// Write emits the run summary as a single structured log entry
|
||||
func (w *SummaryWriter) Write(s *RunSummary) {
|
||||
if s == nil {
|
||||
return
|
||||
}
|
||||
|
||||
topCPUAttrs := make([]any, 0, len(s.TopCPUProcesses))
|
||||
for _, p := range s.TopCPUProcesses {
|
||||
topCPUAttrs = append(topCPUAttrs, slog.Group("",
|
||||
slog.Int("pid", p.PID),
|
||||
slog.String("name", p.Name),
|
||||
slog.Float64("peak_cpu_percent", p.PeakCPU),
|
||||
slog.Uint64("peak_mem_rss_bytes", p.PeakMem),
|
||||
))
|
||||
}
|
||||
|
||||
topMemAttrs := make([]any, 0, len(s.TopMemProcesses))
|
||||
for _, p := range s.TopMemProcesses {
|
||||
topMemAttrs = append(topMemAttrs, slog.Group("",
|
||||
slog.Int("pid", p.PID),
|
||||
slog.String("name", p.Name),
|
||||
slog.Float64("peak_cpu_percent", p.PeakCPU),
|
||||
slog.Uint64("peak_mem_rss_bytes", p.PeakMem),
|
||||
))
|
||||
}
|
||||
|
||||
w.logger.Info("run_summary",
|
||||
slog.Time("start_time", s.StartTime),
|
||||
slog.Time("end_time", s.EndTime),
|
||||
slog.Float64("duration_seconds", s.DurationSeconds),
|
||||
slog.Int("sample_count", s.SampleCount),
|
||||
slog.Group("cpu_total_percent",
|
||||
slog.Float64("peak", s.CPUTotal.Peak),
|
||||
slog.Float64("avg", s.CPUTotal.Avg),
|
||||
slog.Float64("p95", s.CPUTotal.P95),
|
||||
),
|
||||
slog.Group("mem_used_bytes",
|
||||
slog.Float64("peak", s.MemUsedBytes.Peak),
|
||||
slog.Float64("avg", s.MemUsedBytes.Avg),
|
||||
slog.Float64("p95", s.MemUsedBytes.P95),
|
||||
),
|
||||
slog.Group("mem_used_percent",
|
||||
slog.Float64("peak", s.MemUsedPercent.Peak),
|
||||
slog.Float64("avg", s.MemUsedPercent.Avg),
|
||||
slog.Float64("p95", s.MemUsedPercent.P95),
|
||||
),
|
||||
slog.Any("top_cpu_processes", topCPUAttrs),
|
||||
slog.Any("top_mem_processes", topMemAttrs),
|
||||
)
|
||||
}
|
||||
93
internal/summary/writer_test.go
Normal file
93
internal/summary/writer_test.go
Normal file
|
|
@ -0,0 +1,93 @@
|
|||
// ABOUTME: Tests for the summary writer that emits run summaries via slog.
|
||||
// ABOUTME: Validates JSON output, text output, and nil summary handling.
|
||||
package summary
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestSummaryWriter_JSON(t *testing.T) {
|
||||
var buf bytes.Buffer
|
||||
w := NewSummaryWriter(&buf, "json")
|
||||
|
||||
s := &RunSummary{
|
||||
StartTime: time.Date(2025, 1, 1, 0, 0, 0, 0, time.UTC),
|
||||
EndTime: time.Date(2025, 1, 1, 0, 1, 0, 0, time.UTC),
|
||||
DurationSeconds: 60,
|
||||
SampleCount: 12,
|
||||
CPUTotal: StatSummary{Peak: 95.5, Avg: 42.0, P95: 88.0},
|
||||
MemUsedBytes: StatSummary{Peak: 8000000, Avg: 4000000, P95: 7500000},
|
||||
MemUsedPercent: StatSummary{Peak: 80.0, Avg: 40.0, P95: 75.0},
|
||||
TopCPUProcesses: []ProcessPeak{
|
||||
{PID: 1, Name: "busy", PeakCPU: 95.5, PeakMem: 1000},
|
||||
},
|
||||
TopMemProcesses: []ProcessPeak{
|
||||
{PID: 2, Name: "hungry", PeakCPU: 10.0, PeakMem: 8000000},
|
||||
},
|
||||
}
|
||||
|
||||
w.Write(s)
|
||||
|
||||
output := buf.String()
|
||||
if !strings.Contains(output, "run_summary") {
|
||||
t.Errorf("output should contain 'run_summary', got: %s", output)
|
||||
}
|
||||
if !strings.Contains(output, "duration_seconds") {
|
||||
t.Errorf("output should contain 'duration_seconds', got: %s", output)
|
||||
}
|
||||
if !strings.Contains(output, "sample_count") {
|
||||
t.Errorf("output should contain 'sample_count', got: %s", output)
|
||||
}
|
||||
if !strings.Contains(output, "cpu_total_percent") {
|
||||
t.Errorf("output should contain 'cpu_total_percent', got: %s", output)
|
||||
}
|
||||
if !strings.Contains(output, "mem_used_bytes") {
|
||||
t.Errorf("output should contain 'mem_used_bytes', got: %s", output)
|
||||
}
|
||||
if !strings.Contains(output, "top_cpu_processes") {
|
||||
t.Errorf("output should contain 'top_cpu_processes', got: %s", output)
|
||||
}
|
||||
if !strings.Contains(output, "top_mem_processes") {
|
||||
t.Errorf("output should contain 'top_mem_processes', got: %s", output)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSummaryWriter_Text(t *testing.T) {
|
||||
var buf bytes.Buffer
|
||||
w := NewSummaryWriter(&buf, "text")
|
||||
|
||||
s := &RunSummary{
|
||||
StartTime: time.Date(2025, 1, 1, 0, 0, 0, 0, time.UTC),
|
||||
EndTime: time.Date(2025, 1, 1, 0, 1, 0, 0, time.UTC),
|
||||
DurationSeconds: 60,
|
||||
SampleCount: 12,
|
||||
CPUTotal: StatSummary{Peak: 95.5, Avg: 42.0, P95: 88.0},
|
||||
MemUsedBytes: StatSummary{Peak: 8000000, Avg: 4000000, P95: 7500000},
|
||||
MemUsedPercent: StatSummary{Peak: 80.0, Avg: 40.0, P95: 75.0},
|
||||
}
|
||||
|
||||
w.Write(s)
|
||||
|
||||
output := buf.String()
|
||||
if !strings.Contains(output, "run_summary") {
|
||||
t.Errorf("output should contain 'run_summary', got: %s", output)
|
||||
}
|
||||
if !strings.Contains(output, "duration_seconds") {
|
||||
t.Errorf("output should contain 'duration_seconds', got: %s", output)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSummaryWriter_NilSummary(t *testing.T) {
|
||||
var buf bytes.Buffer
|
||||
w := NewSummaryWriter(&buf, "json")
|
||||
|
||||
// Should not panic and should not write anything
|
||||
w.Write(nil)
|
||||
|
||||
if buf.Len() != 0 {
|
||||
t.Errorf("expected no output for nil summary, got: %s", buf.String())
|
||||
}
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue