forgejo-runner-optimiser/internal/metrics/aggregator.go
Martin McCaffery db78b02bb5
All checks were successful
ci / build (push) Successful in 1m48s
Rename repo from forgejo-runner-resource-collector
2026-02-12 09:36:23 +01:00

401 lines
11 KiB
Go

package metrics
import (
"sort"
"time"
"edp.buildth.ing/DevFW-CICD/forgejo-runner-optimiser/internal/cgroup"
"edp.buildth.ing/DevFW-CICD/forgejo-runner-optimiser/internal/proc"
)
// Aggregator collects and aggregates metrics from processes
type Aggregator struct {
procPath string
topN int
prevCPU *CPUSnapshot
prevProcCPU map[int]*ProcessCPUSnapshot
cgroupLimits cgroup.CgroupLimits // container name -> limits
processMapping cgroup.ProcessMapping // process name -> container name
cgroupPathMapping cgroup.CgroupPathMapping // cgroup path -> container name (built at runtime)
prevCgroupCPU map[string]uint64 // container name -> previous total ticks
prevCgroupTime time.Time // previous collection time for cgroup CPU calc
}
// NewAggregator creates a new metrics aggregator
func NewAggregator(procPath string, topN int) *Aggregator {
// Parse cgroup configuration from environment
limits, _ := cgroup.ParseCgroupLimitsEnv()
processMap, _ := cgroup.ParseProcessMappingEnv()
return &Aggregator{
procPath: procPath,
topN: topN,
prevProcCPU: make(map[int]*ProcessCPUSnapshot),
cgroupLimits: limits,
processMapping: processMap,
cgroupPathMapping: make(cgroup.CgroupPathMapping),
prevCgroupCPU: make(map[string]uint64),
}
}
// Collect gathers all system metrics
func (a *Aggregator) Collect() (*SystemMetrics, error) {
now := time.Now()
// Read system info
sysInfo, err := proc.ReadSystemInfo(a.procPath)
if err != nil {
return nil, err
}
// Read system CPU
user, nice, system, idle, iowait, irq, softirq, err := proc.ReadSystemCPU(a.procPath)
if err != nil {
return nil, err
}
currentCPU := &CPUSnapshot{
Timestamp: now,
User: user,
Nice: nice,
System: system,
Idle: idle,
IOWait: iowait,
IRQ: irq,
SoftIRQ: softirq,
}
// Calculate CPU percentages
cpuMetrics := a.calculateCPUMetrics(currentCPU)
a.prevCPU = currentCPU
// Read all processes
processes, err := proc.ReadAllProcesses(a.procPath)
if err != nil {
return nil, err
}
// Calculate per-process metrics
processMetrics := a.calculateProcessMetrics(processes, now)
// Calculate memory metrics
memMetrics := a.calculateMemoryMetrics(sysInfo, processMetrics)
// Get top CPU consumers
topCPU := a.getTopByMetric(processMetrics, func(p ProcessMetrics) float64 {
return p.CPUPercent
})
// Get top memory consumers
topMemory := a.getTopByMetric(processMetrics, func(p ProcessMetrics) float64 {
return float64(p.MemRSS)
})
// Discover cgroup path mappings from known processes
a.discoverCgroupMappings(processes)
// Calculate per-cgroup metrics
cgroupMetrics := a.calculateCgroupMetrics(processes, processMetrics, now)
return &SystemMetrics{
Timestamp: now,
TotalProcesses: len(processes),
CPU: cpuMetrics,
Memory: memMetrics,
TopCPU: topCPU,
TopMemory: topMemory,
Cgroups: cgroupMetrics,
}, nil
}
// calculateCPUMetrics calculates CPU percentages between snapshots
func (a *Aggregator) calculateCPUMetrics(current *CPUSnapshot) CPUMetrics {
if a.prevCPU == nil {
return CPUMetrics{}
}
totalDelta := float64(current.Total() - a.prevCPU.Total())
if totalDelta <= 0 {
return CPUMetrics{}
}
userDelta := float64(current.User+current.Nice) - float64(a.prevCPU.User+a.prevCPU.Nice)
systemDelta := float64(current.System+current.IRQ+current.SoftIRQ) - float64(a.prevCPU.System+a.prevCPU.IRQ+a.prevCPU.SoftIRQ)
idleDelta := float64(current.Idle) - float64(a.prevCPU.Idle)
iowaitDelta := float64(current.IOWait) - float64(a.prevCPU.IOWait)
return CPUMetrics{
TotalPercent: (totalDelta - idleDelta - iowaitDelta) / totalDelta * 100,
UserPercent: userDelta / totalDelta * 100,
SystemPercent: systemDelta / totalDelta * 100,
IdlePercent: idleDelta / totalDelta * 100,
IOWaitPercent: iowaitDelta / totalDelta * 100,
}
}
// calculateProcessMetrics calculates metrics for each process
func (a *Aggregator) calculateProcessMetrics(processes []*proc.ProcessInfo, now time.Time) []ProcessMetrics {
newProcCPU := make(map[int]*ProcessCPUSnapshot)
metrics := make([]ProcessMetrics, 0, len(processes))
for _, p := range processes {
pid := p.Stat.PID
// Calculate CPU percentage for this process
cpuPercent := 0.0
if prev, ok := a.prevProcCPU[pid]; ok && a.prevCPU != nil {
totalDelta := float64(a.prevCPU.Total())
if a.prevCPU != nil {
// Use system CPU total delta for process CPU calculation
currentTotal := a.prevCPU.Total()
if currentTotal > 0 {
procDelta := float64(p.Stat.TotalTime()) - float64(prev.Total())
if procDelta > 0 {
// Calculate based on elapsed time and clock ticks
elapsed := now.Sub(prev.Timestamp).Seconds()
if elapsed > 0 {
// CPU percent = (process ticks / clock_ticks_per_sec) / elapsed_time * 100
cpuPercent = (procDelta / float64(proc.DefaultClockTicks)) / elapsed * 100
if cpuPercent > 100 {
cpuPercent = 100 // Cap at 100% per CPU
}
}
}
}
}
_ = totalDelta // Avoid unused variable warning
}
// Store current snapshot for next iteration
newProcCPU[pid] = &ProcessCPUSnapshot{
PID: pid,
Timestamp: now,
UTime: p.Stat.UTime,
STime: p.Stat.STime,
}
state := string(p.Stat.State)
if state == "" {
state = "?"
}
cgroupPath := ""
if p.Cgroup != nil {
cgroupPath = p.Cgroup.Path
}
metrics = append(metrics, ProcessMetrics{
PID: pid,
Name: p.Status.Name,
CPUPercent: cpuPercent,
MemRSS: p.Status.VmRSS,
MemVirtual: p.Status.VmSize,
Threads: p.Status.Threads,
State: state,
CgroupPath: cgroupPath,
})
}
// Update process CPU snapshots for next iteration
a.prevProcCPU = newProcCPU
return metrics
}
// calculateMemoryMetrics calculates aggregated memory metrics
func (a *Aggregator) calculateMemoryMetrics(sysInfo *proc.SystemInfo, processes []ProcessMetrics) MemoryMetrics {
var totalRSS uint64
for _, p := range processes {
totalRSS += p.MemRSS
}
usedBytes := sysInfo.MemTotal - sysInfo.MemAvailable
usedPercent := 0.0
rssPercent := 0.0
if sysInfo.MemTotal > 0 {
usedPercent = float64(usedBytes) / float64(sysInfo.MemTotal) * 100
rssPercent = float64(totalRSS) / float64(sysInfo.MemTotal) * 100
}
return MemoryMetrics{
TotalBytes: sysInfo.MemTotal,
UsedBytes: usedBytes,
FreeBytes: sysInfo.MemFree,
AvailableBytes: sysInfo.MemAvailable,
UsedPercent: usedPercent,
TotalRSSBytes: totalRSS,
RSSPercent: rssPercent,
}
}
// getTopByMetric returns the top N processes by a given metric
func (a *Aggregator) getTopByMetric(metrics []ProcessMetrics, getValue func(ProcessMetrics) float64) []ProcessMetrics {
if len(metrics) == 0 {
return nil
}
// Sort by the metric value (descending)
sorted := make([]ProcessMetrics, len(metrics))
copy(sorted, metrics)
sort.Slice(sorted, func(i, j int) bool {
return getValue(sorted[i]) > getValue(sorted[j])
})
// Return top N
n := a.topN
if n > len(sorted) {
n = len(sorted)
}
return sorted[:n]
}
// discoverCgroupMappings discovers cgroup path to container name mappings
// by looking for processes that match the configured process mapping.
func (a *Aggregator) discoverCgroupMappings(processes []*proc.ProcessInfo) {
if len(a.processMapping) == 0 {
return
}
for _, p := range processes {
if p.Cgroup == nil || p.Cgroup.Path == "" {
continue
}
// Check if this process name is in our mapping
if containerName, ok := a.processMapping[p.Status.Name]; ok {
// Map this cgroup path to the container name
a.cgroupPathMapping[p.Cgroup.Path] = containerName
}
}
}
// resolveContainerName returns the container name for a cgroup path,
// or the raw path if no mapping exists.
func (a *Aggregator) resolveContainerName(cgroupPath string) string {
if name, ok := a.cgroupPathMapping[cgroupPath]; ok {
return name
}
// Use raw path as fallback
if cgroupPath == "" {
return "<unknown>"
}
return cgroupPath
}
// calculateCgroupMetrics computes metrics grouped by container/cgroup.
func (a *Aggregator) calculateCgroupMetrics(
processes []*proc.ProcessInfo,
processMetrics []ProcessMetrics,
now time.Time,
) map[string]*CgroupMetrics {
// Build lookup from PID to ProcessMetrics
pmByPID := make(map[int]ProcessMetrics)
for _, pm := range processMetrics {
pmByPID[pm.PID] = pm
}
// Group processes by container name
type cgroupData struct {
cgroupPath string
procs []*proc.ProcessInfo
metrics []ProcessMetrics
}
containerGroups := make(map[string]*cgroupData)
for _, p := range processes {
cgroupPath := ""
if p.Cgroup != nil {
cgroupPath = p.Cgroup.Path
}
containerName := a.resolveContainerName(cgroupPath)
if _, ok := containerGroups[containerName]; !ok {
containerGroups[containerName] = &cgroupData{
cgroupPath: cgroupPath,
}
}
containerGroups[containerName].procs = append(containerGroups[containerName].procs, p)
if pm, ok := pmByPID[p.Stat.PID]; ok {
containerGroups[containerName].metrics = append(containerGroups[containerName].metrics, pm)
}
}
// Calculate elapsed time since last collection
elapsed := 0.0
if !a.prevCgroupTime.IsZero() {
elapsed = now.Sub(a.prevCgroupTime).Seconds()
}
a.prevCgroupTime = now
// Calculate metrics for each container
result := make(map[string]*CgroupMetrics)
for containerName, data := range containerGroups {
// Sum CPU ticks (utime + stime only, not cutime/cstime)
var totalTicks uint64
var totalRSS uint64
for _, p := range data.procs {
totalTicks += p.Stat.TotalTime()
totalRSS += p.Status.VmRSS
}
// Calculate CPU cores used from delta
usedCores := 0.0
hasDelta := false
if prev, ok := a.prevCgroupCPU[containerName]; ok && elapsed > 0 {
// Guard against underflow: if processes exited and new ones started,
// totalTicks could be less than prev. In that case, skip this sample.
if totalTicks >= prev {
deltaTicks := totalTicks - prev
// Convert ticks to cores: deltaTicks / (elapsed_seconds * CLK_TCK)
usedCores = float64(deltaTicks) / (elapsed * float64(proc.DefaultClockTicks))
hasDelta = true
}
}
a.prevCgroupCPU[containerName] = totalTicks
// Calculate percentages against limits if available
cpuPercent := 0.0
memPercent := 0.0
var limitCores float64
var limitBytes uint64
if limit, ok := a.cgroupLimits[containerName]; ok {
limitCores = limit.CPUCores
limitBytes = limit.MemoryBytes
if limit.CPUCores > 0 {
cpuPercent = (usedCores / limit.CPUCores) * 100
}
if limit.MemoryBytes > 0 {
memPercent = (float64(totalRSS) / float64(limit.MemoryBytes)) * 100
}
}
result[containerName] = &CgroupMetrics{
Name: containerName,
CgroupPath: data.cgroupPath,
CPU: CgroupCPUMetrics{
TotalTicks: totalTicks,
UsedCores: usedCores,
UsedPercent: cpuPercent,
LimitCores: limitCores,
HasDelta: hasDelta,
},
Memory: CgroupMemoryMetrics{
TotalRSSBytes: totalRSS,
UsedPercent: memPercent,
LimitBytes: limitBytes,
},
Processes: data.metrics,
NumProcs: len(data.procs),
}
}
return result
}