forgejo-runner-optimiser/internal/metrics/aggregator.go
Manuel Ganter 0af8c28bc2
All checks were successful
ci / build (push) Successful in 28s
fix(aggregator): prevent CPU cores overflow when processes restart
Guard against unsigned integer underflow in cgroup CPU calculation.
When processes exit and new ones start, totalTicks can be less than
the previous value, causing the subtraction to wrap around to a huge
positive number.

Now checks totalTicks >= prev before calculating delta, treating
process churn as 0 CPU usage for that sample.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-06 15:15:30 +01:00

399 lines
11 KiB
Go

package metrics
import (
"sort"
"time"
"edp.buildth.ing/DevFW-CICD/forgejo-runner-resource-collector/internal/cgroup"
"edp.buildth.ing/DevFW-CICD/forgejo-runner-resource-collector/internal/proc"
)
// Aggregator collects and aggregates metrics from processes
type Aggregator struct {
procPath string
topN int
prevCPU *CPUSnapshot
prevProcCPU map[int]*ProcessCPUSnapshot
cgroupLimits cgroup.CgroupLimits // container name -> limits
processMapping cgroup.ProcessMapping // process name -> container name
cgroupPathMapping cgroup.CgroupPathMapping // cgroup path -> container name (built at runtime)
prevCgroupCPU map[string]uint64 // container name -> previous total ticks
prevCgroupTime time.Time // previous collection time for cgroup CPU calc
}
// NewAggregator creates a new metrics aggregator
func NewAggregator(procPath string, topN int) *Aggregator {
// Parse cgroup configuration from environment
limits, _ := cgroup.ParseCgroupLimitsEnv()
processMap, _ := cgroup.ParseProcessMappingEnv()
return &Aggregator{
procPath: procPath,
topN: topN,
prevProcCPU: make(map[int]*ProcessCPUSnapshot),
cgroupLimits: limits,
processMapping: processMap,
cgroupPathMapping: make(cgroup.CgroupPathMapping),
prevCgroupCPU: make(map[string]uint64),
}
}
// Collect gathers all system metrics
func (a *Aggregator) Collect() (*SystemMetrics, error) {
now := time.Now()
// Read system info
sysInfo, err := proc.ReadSystemInfo(a.procPath)
if err != nil {
return nil, err
}
// Read system CPU
user, nice, system, idle, iowait, irq, softirq, err := proc.ReadSystemCPU(a.procPath)
if err != nil {
return nil, err
}
currentCPU := &CPUSnapshot{
Timestamp: now,
User: user,
Nice: nice,
System: system,
Idle: idle,
IOWait: iowait,
IRQ: irq,
SoftIRQ: softirq,
}
// Calculate CPU percentages
cpuMetrics := a.calculateCPUMetrics(currentCPU)
a.prevCPU = currentCPU
// Read all processes
processes, err := proc.ReadAllProcesses(a.procPath)
if err != nil {
return nil, err
}
// Calculate per-process metrics
processMetrics := a.calculateProcessMetrics(processes, now)
// Calculate memory metrics
memMetrics := a.calculateMemoryMetrics(sysInfo, processMetrics)
// Get top CPU consumers
topCPU := a.getTopByMetric(processMetrics, func(p ProcessMetrics) float64 {
return p.CPUPercent
})
// Get top memory consumers
topMemory := a.getTopByMetric(processMetrics, func(p ProcessMetrics) float64 {
return float64(p.MemRSS)
})
// Discover cgroup path mappings from known processes
a.discoverCgroupMappings(processes)
// Calculate per-cgroup metrics
cgroupMetrics := a.calculateCgroupMetrics(processes, processMetrics, now)
return &SystemMetrics{
Timestamp: now,
TotalProcesses: len(processes),
CPU: cpuMetrics,
Memory: memMetrics,
TopCPU: topCPU,
TopMemory: topMemory,
Cgroups: cgroupMetrics,
}, nil
}
// calculateCPUMetrics calculates CPU percentages between snapshots
func (a *Aggregator) calculateCPUMetrics(current *CPUSnapshot) CPUMetrics {
if a.prevCPU == nil {
return CPUMetrics{}
}
totalDelta := float64(current.Total() - a.prevCPU.Total())
if totalDelta <= 0 {
return CPUMetrics{}
}
userDelta := float64(current.User+current.Nice) - float64(a.prevCPU.User+a.prevCPU.Nice)
systemDelta := float64(current.System+current.IRQ+current.SoftIRQ) - float64(a.prevCPU.System+a.prevCPU.IRQ+a.prevCPU.SoftIRQ)
idleDelta := float64(current.Idle) - float64(a.prevCPU.Idle)
iowaitDelta := float64(current.IOWait) - float64(a.prevCPU.IOWait)
return CPUMetrics{
TotalPercent: (totalDelta - idleDelta - iowaitDelta) / totalDelta * 100,
UserPercent: userDelta / totalDelta * 100,
SystemPercent: systemDelta / totalDelta * 100,
IdlePercent: idleDelta / totalDelta * 100,
IOWaitPercent: iowaitDelta / totalDelta * 100,
}
}
// calculateProcessMetrics calculates metrics for each process
func (a *Aggregator) calculateProcessMetrics(processes []*proc.ProcessInfo, now time.Time) []ProcessMetrics {
newProcCPU := make(map[int]*ProcessCPUSnapshot)
metrics := make([]ProcessMetrics, 0, len(processes))
for _, p := range processes {
pid := p.Stat.PID
// Calculate CPU percentage for this process
cpuPercent := 0.0
if prev, ok := a.prevProcCPU[pid]; ok && a.prevCPU != nil {
totalDelta := float64(a.prevCPU.Total())
if a.prevCPU != nil {
// Use system CPU total delta for process CPU calculation
currentTotal := a.prevCPU.Total()
if currentTotal > 0 {
procDelta := float64(p.Stat.TotalTime()) - float64(prev.Total())
if procDelta > 0 {
// Calculate based on elapsed time and clock ticks
elapsed := now.Sub(prev.Timestamp).Seconds()
if elapsed > 0 {
// CPU percent = (process ticks / clock_ticks_per_sec) / elapsed_time * 100
cpuPercent = (procDelta / float64(proc.DefaultClockTicks)) / elapsed * 100
if cpuPercent > 100 {
cpuPercent = 100 // Cap at 100% per CPU
}
}
}
}
}
_ = totalDelta // Avoid unused variable warning
}
// Store current snapshot for next iteration
newProcCPU[pid] = &ProcessCPUSnapshot{
PID: pid,
Timestamp: now,
UTime: p.Stat.UTime,
STime: p.Stat.STime,
}
state := string(p.Stat.State)
if state == "" {
state = "?"
}
cgroupPath := ""
if p.Cgroup != nil {
cgroupPath = p.Cgroup.Path
}
metrics = append(metrics, ProcessMetrics{
PID: pid,
Name: p.Status.Name,
CPUPercent: cpuPercent,
MemRSS: p.Status.VmRSS,
MemVirtual: p.Status.VmSize,
Threads: p.Status.Threads,
State: state,
CgroupPath: cgroupPath,
})
}
// Update process CPU snapshots for next iteration
a.prevProcCPU = newProcCPU
return metrics
}
// calculateMemoryMetrics calculates aggregated memory metrics
func (a *Aggregator) calculateMemoryMetrics(sysInfo *proc.SystemInfo, processes []ProcessMetrics) MemoryMetrics {
var totalRSS uint64
for _, p := range processes {
totalRSS += p.MemRSS
}
usedBytes := sysInfo.MemTotal - sysInfo.MemAvailable
usedPercent := 0.0
rssPercent := 0.0
if sysInfo.MemTotal > 0 {
usedPercent = float64(usedBytes) / float64(sysInfo.MemTotal) * 100
rssPercent = float64(totalRSS) / float64(sysInfo.MemTotal) * 100
}
return MemoryMetrics{
TotalBytes: sysInfo.MemTotal,
UsedBytes: usedBytes,
FreeBytes: sysInfo.MemFree,
AvailableBytes: sysInfo.MemAvailable,
UsedPercent: usedPercent,
TotalRSSBytes: totalRSS,
RSSPercent: rssPercent,
}
}
// getTopByMetric returns the top N processes by a given metric
func (a *Aggregator) getTopByMetric(metrics []ProcessMetrics, getValue func(ProcessMetrics) float64) []ProcessMetrics {
if len(metrics) == 0 {
return nil
}
// Sort by the metric value (descending)
sorted := make([]ProcessMetrics, len(metrics))
copy(sorted, metrics)
sort.Slice(sorted, func(i, j int) bool {
return getValue(sorted[i]) > getValue(sorted[j])
})
// Return top N
n := a.topN
if n > len(sorted) {
n = len(sorted)
}
return sorted[:n]
}
// discoverCgroupMappings discovers cgroup path to container name mappings
// by looking for processes that match the configured process mapping.
func (a *Aggregator) discoverCgroupMappings(processes []*proc.ProcessInfo) {
if len(a.processMapping) == 0 {
return
}
for _, p := range processes {
if p.Cgroup == nil || p.Cgroup.Path == "" {
continue
}
// Check if this process name is in our mapping
if containerName, ok := a.processMapping[p.Status.Name]; ok {
// Map this cgroup path to the container name
a.cgroupPathMapping[p.Cgroup.Path] = containerName
}
}
}
// resolveContainerName returns the container name for a cgroup path,
// or the raw path if no mapping exists.
func (a *Aggregator) resolveContainerName(cgroupPath string) string {
if name, ok := a.cgroupPathMapping[cgroupPath]; ok {
return name
}
// Use raw path as fallback
if cgroupPath == "" {
return "<unknown>"
}
return cgroupPath
}
// calculateCgroupMetrics computes metrics grouped by container/cgroup.
func (a *Aggregator) calculateCgroupMetrics(
processes []*proc.ProcessInfo,
processMetrics []ProcessMetrics,
now time.Time,
) map[string]*CgroupMetrics {
// Build lookup from PID to ProcessMetrics
pmByPID := make(map[int]ProcessMetrics)
for _, pm := range processMetrics {
pmByPID[pm.PID] = pm
}
// Group processes by container name
type cgroupData struct {
cgroupPath string
procs []*proc.ProcessInfo
metrics []ProcessMetrics
}
containerGroups := make(map[string]*cgroupData)
for _, p := range processes {
cgroupPath := ""
if p.Cgroup != nil {
cgroupPath = p.Cgroup.Path
}
containerName := a.resolveContainerName(cgroupPath)
if _, ok := containerGroups[containerName]; !ok {
containerGroups[containerName] = &cgroupData{
cgroupPath: cgroupPath,
}
}
containerGroups[containerName].procs = append(containerGroups[containerName].procs, p)
if pm, ok := pmByPID[p.Stat.PID]; ok {
containerGroups[containerName].metrics = append(containerGroups[containerName].metrics, pm)
}
}
// Calculate elapsed time since last collection
elapsed := 0.0
if !a.prevCgroupTime.IsZero() {
elapsed = now.Sub(a.prevCgroupTime).Seconds()
}
a.prevCgroupTime = now
// Calculate metrics for each container
result := make(map[string]*CgroupMetrics)
for containerName, data := range containerGroups {
// Sum CPU ticks (utime + stime only, not cutime/cstime)
var totalTicks uint64
var totalRSS uint64
for _, p := range data.procs {
totalTicks += p.Stat.TotalTime()
totalRSS += p.Status.VmRSS
}
// Calculate CPU cores used from delta
usedCores := 0.0
if prev, ok := a.prevCgroupCPU[containerName]; ok && elapsed > 0 {
// Guard against underflow: if processes exited and new ones started,
// totalTicks could be less than prev. In that case, skip this sample.
if totalTicks >= prev {
deltaTicks := totalTicks - prev
// Convert ticks to cores: deltaTicks / (elapsed_seconds * CLK_TCK)
usedCores = float64(deltaTicks) / (elapsed * float64(proc.DefaultClockTicks))
}
// If totalTicks < prev, usedCores stays 0 for this sample
}
a.prevCgroupCPU[containerName] = totalTicks
// Calculate percentages against limits if available
cpuPercent := 0.0
memPercent := 0.0
var limitCores float64
var limitBytes uint64
if limit, ok := a.cgroupLimits[containerName]; ok {
limitCores = limit.CPUCores
limitBytes = limit.MemoryBytes
if limit.CPUCores > 0 {
cpuPercent = (usedCores / limit.CPUCores) * 100
}
if limit.MemoryBytes > 0 {
memPercent = (float64(totalRSS) / float64(limit.MemoryBytes)) * 100
}
}
result[containerName] = &CgroupMetrics{
Name: containerName,
CgroupPath: data.cgroupPath,
CPU: CgroupCPUMetrics{
TotalTicks: totalTicks,
UsedCores: usedCores,
UsedPercent: cpuPercent,
LimitCores: limitCores,
},
Memory: CgroupMemoryMetrics{
TotalRSSBytes: totalRSS,
UsedPercent: memPercent,
LimitBytes: limitBytes,
},
Processes: data.metrics,
NumProcs: len(data.procs),
}
}
return result
}