401 lines
11 KiB
Go
401 lines
11 KiB
Go
package metrics
|
|
|
|
import (
|
|
"sort"
|
|
"time"
|
|
|
|
"edp.buildth.ing/DevFW-CICD/forgejo-runner-optimiser/internal/cgroup"
|
|
"edp.buildth.ing/DevFW-CICD/forgejo-runner-optimiser/internal/proc"
|
|
)
|
|
|
|
// Aggregator collects and aggregates metrics from processes
|
|
type Aggregator struct {
|
|
procPath string
|
|
topN int
|
|
prevCPU *CPUSnapshot
|
|
prevProcCPU map[int]*ProcessCPUSnapshot
|
|
cgroupLimits cgroup.CgroupLimits // container name -> limits
|
|
processMapping cgroup.ProcessMapping // process name -> container name
|
|
cgroupPathMapping cgroup.CgroupPathMapping // cgroup path -> container name (built at runtime)
|
|
prevCgroupCPU map[string]uint64 // container name -> previous total ticks
|
|
prevCgroupTime time.Time // previous collection time for cgroup CPU calc
|
|
}
|
|
|
|
// NewAggregator creates a new metrics aggregator
|
|
func NewAggregator(procPath string, topN int) *Aggregator {
|
|
// Parse cgroup configuration from environment
|
|
limits, _ := cgroup.ParseCgroupLimitsEnv()
|
|
processMap, _ := cgroup.ParseProcessMappingEnv()
|
|
|
|
return &Aggregator{
|
|
procPath: procPath,
|
|
topN: topN,
|
|
prevProcCPU: make(map[int]*ProcessCPUSnapshot),
|
|
cgroupLimits: limits,
|
|
processMapping: processMap,
|
|
cgroupPathMapping: make(cgroup.CgroupPathMapping),
|
|
prevCgroupCPU: make(map[string]uint64),
|
|
}
|
|
}
|
|
|
|
// Collect gathers all system metrics
|
|
func (a *Aggregator) Collect() (*SystemMetrics, error) {
|
|
now := time.Now()
|
|
|
|
// Read system info
|
|
sysInfo, err := proc.ReadSystemInfo(a.procPath)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Read system CPU
|
|
user, nice, system, idle, iowait, irq, softirq, err := proc.ReadSystemCPU(a.procPath)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
currentCPU := &CPUSnapshot{
|
|
Timestamp: now,
|
|
User: user,
|
|
Nice: nice,
|
|
System: system,
|
|
Idle: idle,
|
|
IOWait: iowait,
|
|
IRQ: irq,
|
|
SoftIRQ: softirq,
|
|
}
|
|
|
|
// Calculate CPU percentages
|
|
cpuMetrics := a.calculateCPUMetrics(currentCPU)
|
|
a.prevCPU = currentCPU
|
|
|
|
// Read all processes
|
|
processes, err := proc.ReadAllProcesses(a.procPath)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Calculate per-process metrics
|
|
processMetrics := a.calculateProcessMetrics(processes, now)
|
|
|
|
// Calculate memory metrics
|
|
memMetrics := a.calculateMemoryMetrics(sysInfo, processMetrics)
|
|
|
|
// Get top CPU consumers
|
|
topCPU := a.getTopByMetric(processMetrics, func(p ProcessMetrics) float64 {
|
|
return p.CPUPercent
|
|
})
|
|
|
|
// Get top memory consumers
|
|
topMemory := a.getTopByMetric(processMetrics, func(p ProcessMetrics) float64 {
|
|
return float64(p.MemRSS)
|
|
})
|
|
|
|
// Discover cgroup path mappings from known processes
|
|
a.discoverCgroupMappings(processes)
|
|
|
|
// Calculate per-cgroup metrics
|
|
cgroupMetrics := a.calculateCgroupMetrics(processes, processMetrics, now)
|
|
|
|
return &SystemMetrics{
|
|
Timestamp: now,
|
|
TotalProcesses: len(processes),
|
|
CPU: cpuMetrics,
|
|
Memory: memMetrics,
|
|
TopCPU: topCPU,
|
|
TopMemory: topMemory,
|
|
Cgroups: cgroupMetrics,
|
|
}, nil
|
|
}
|
|
|
|
// calculateCPUMetrics calculates CPU percentages between snapshots
|
|
func (a *Aggregator) calculateCPUMetrics(current *CPUSnapshot) CPUMetrics {
|
|
if a.prevCPU == nil {
|
|
return CPUMetrics{}
|
|
}
|
|
|
|
totalDelta := float64(current.Total() - a.prevCPU.Total())
|
|
if totalDelta <= 0 {
|
|
return CPUMetrics{}
|
|
}
|
|
|
|
userDelta := float64(current.User+current.Nice) - float64(a.prevCPU.User+a.prevCPU.Nice)
|
|
systemDelta := float64(current.System+current.IRQ+current.SoftIRQ) - float64(a.prevCPU.System+a.prevCPU.IRQ+a.prevCPU.SoftIRQ)
|
|
idleDelta := float64(current.Idle) - float64(a.prevCPU.Idle)
|
|
iowaitDelta := float64(current.IOWait) - float64(a.prevCPU.IOWait)
|
|
|
|
return CPUMetrics{
|
|
TotalPercent: (totalDelta - idleDelta - iowaitDelta) / totalDelta * 100,
|
|
UserPercent: userDelta / totalDelta * 100,
|
|
SystemPercent: systemDelta / totalDelta * 100,
|
|
IdlePercent: idleDelta / totalDelta * 100,
|
|
IOWaitPercent: iowaitDelta / totalDelta * 100,
|
|
}
|
|
}
|
|
|
|
// calculateProcessMetrics calculates metrics for each process
|
|
func (a *Aggregator) calculateProcessMetrics(processes []*proc.ProcessInfo, now time.Time) []ProcessMetrics {
|
|
newProcCPU := make(map[int]*ProcessCPUSnapshot)
|
|
metrics := make([]ProcessMetrics, 0, len(processes))
|
|
|
|
for _, p := range processes {
|
|
pid := p.Stat.PID
|
|
|
|
// Calculate CPU percentage for this process
|
|
cpuPercent := 0.0
|
|
if prev, ok := a.prevProcCPU[pid]; ok && a.prevCPU != nil {
|
|
totalDelta := float64(a.prevCPU.Total())
|
|
if a.prevCPU != nil {
|
|
// Use system CPU total delta for process CPU calculation
|
|
currentTotal := a.prevCPU.Total()
|
|
if currentTotal > 0 {
|
|
procDelta := float64(p.Stat.TotalTime()) - float64(prev.Total())
|
|
if procDelta > 0 {
|
|
// Calculate based on elapsed time and clock ticks
|
|
elapsed := now.Sub(prev.Timestamp).Seconds()
|
|
if elapsed > 0 {
|
|
// CPU percent = (process ticks / clock_ticks_per_sec) / elapsed_time * 100
|
|
cpuPercent = (procDelta / float64(proc.DefaultClockTicks)) / elapsed * 100
|
|
if cpuPercent > 100 {
|
|
cpuPercent = 100 // Cap at 100% per CPU
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
_ = totalDelta // Avoid unused variable warning
|
|
}
|
|
|
|
// Store current snapshot for next iteration
|
|
newProcCPU[pid] = &ProcessCPUSnapshot{
|
|
PID: pid,
|
|
Timestamp: now,
|
|
UTime: p.Stat.UTime,
|
|
STime: p.Stat.STime,
|
|
}
|
|
|
|
state := string(p.Stat.State)
|
|
if state == "" {
|
|
state = "?"
|
|
}
|
|
|
|
cgroupPath := ""
|
|
if p.Cgroup != nil {
|
|
cgroupPath = p.Cgroup.Path
|
|
}
|
|
|
|
metrics = append(metrics, ProcessMetrics{
|
|
PID: pid,
|
|
Name: p.Status.Name,
|
|
CPUPercent: cpuPercent,
|
|
MemRSS: p.Status.VmRSS,
|
|
MemVirtual: p.Status.VmSize,
|
|
Threads: p.Status.Threads,
|
|
State: state,
|
|
CgroupPath: cgroupPath,
|
|
})
|
|
}
|
|
|
|
// Update process CPU snapshots for next iteration
|
|
a.prevProcCPU = newProcCPU
|
|
|
|
return metrics
|
|
}
|
|
|
|
// calculateMemoryMetrics calculates aggregated memory metrics
|
|
func (a *Aggregator) calculateMemoryMetrics(sysInfo *proc.SystemInfo, processes []ProcessMetrics) MemoryMetrics {
|
|
var totalRSS uint64
|
|
for _, p := range processes {
|
|
totalRSS += p.MemRSS
|
|
}
|
|
|
|
usedBytes := sysInfo.MemTotal - sysInfo.MemAvailable
|
|
usedPercent := 0.0
|
|
rssPercent := 0.0
|
|
|
|
if sysInfo.MemTotal > 0 {
|
|
usedPercent = float64(usedBytes) / float64(sysInfo.MemTotal) * 100
|
|
rssPercent = float64(totalRSS) / float64(sysInfo.MemTotal) * 100
|
|
}
|
|
|
|
return MemoryMetrics{
|
|
TotalBytes: sysInfo.MemTotal,
|
|
UsedBytes: usedBytes,
|
|
FreeBytes: sysInfo.MemFree,
|
|
AvailableBytes: sysInfo.MemAvailable,
|
|
UsedPercent: usedPercent,
|
|
TotalRSSBytes: totalRSS,
|
|
RSSPercent: rssPercent,
|
|
}
|
|
}
|
|
|
|
// getTopByMetric returns the top N processes by a given metric
|
|
func (a *Aggregator) getTopByMetric(metrics []ProcessMetrics, getValue func(ProcessMetrics) float64) []ProcessMetrics {
|
|
if len(metrics) == 0 {
|
|
return nil
|
|
}
|
|
|
|
// Sort by the metric value (descending)
|
|
sorted := make([]ProcessMetrics, len(metrics))
|
|
copy(sorted, metrics)
|
|
sort.Slice(sorted, func(i, j int) bool {
|
|
return getValue(sorted[i]) > getValue(sorted[j])
|
|
})
|
|
|
|
// Return top N
|
|
n := a.topN
|
|
if n > len(sorted) {
|
|
n = len(sorted)
|
|
}
|
|
|
|
return sorted[:n]
|
|
}
|
|
|
|
// discoverCgroupMappings discovers cgroup path to container name mappings
|
|
// by looking for processes that match the configured process mapping.
|
|
func (a *Aggregator) discoverCgroupMappings(processes []*proc.ProcessInfo) {
|
|
if len(a.processMapping) == 0 {
|
|
return
|
|
}
|
|
|
|
for _, p := range processes {
|
|
if p.Cgroup == nil || p.Cgroup.Path == "" {
|
|
continue
|
|
}
|
|
|
|
// Check if this process name is in our mapping
|
|
if containerName, ok := a.processMapping[p.Status.Name]; ok {
|
|
// Map this cgroup path to the container name
|
|
a.cgroupPathMapping[p.Cgroup.Path] = containerName
|
|
}
|
|
}
|
|
}
|
|
|
|
// resolveContainerName returns the container name for a cgroup path,
|
|
// or the raw path if no mapping exists.
|
|
func (a *Aggregator) resolveContainerName(cgroupPath string) string {
|
|
if name, ok := a.cgroupPathMapping[cgroupPath]; ok {
|
|
return name
|
|
}
|
|
// Use raw path as fallback
|
|
if cgroupPath == "" {
|
|
return "<unknown>"
|
|
}
|
|
return cgroupPath
|
|
}
|
|
|
|
// calculateCgroupMetrics computes metrics grouped by container/cgroup.
|
|
func (a *Aggregator) calculateCgroupMetrics(
|
|
processes []*proc.ProcessInfo,
|
|
processMetrics []ProcessMetrics,
|
|
now time.Time,
|
|
) map[string]*CgroupMetrics {
|
|
// Build lookup from PID to ProcessMetrics
|
|
pmByPID := make(map[int]ProcessMetrics)
|
|
for _, pm := range processMetrics {
|
|
pmByPID[pm.PID] = pm
|
|
}
|
|
|
|
// Group processes by container name
|
|
type cgroupData struct {
|
|
cgroupPath string
|
|
procs []*proc.ProcessInfo
|
|
metrics []ProcessMetrics
|
|
}
|
|
containerGroups := make(map[string]*cgroupData)
|
|
|
|
for _, p := range processes {
|
|
cgroupPath := ""
|
|
if p.Cgroup != nil {
|
|
cgroupPath = p.Cgroup.Path
|
|
}
|
|
|
|
containerName := a.resolveContainerName(cgroupPath)
|
|
|
|
if _, ok := containerGroups[containerName]; !ok {
|
|
containerGroups[containerName] = &cgroupData{
|
|
cgroupPath: cgroupPath,
|
|
}
|
|
}
|
|
|
|
containerGroups[containerName].procs = append(containerGroups[containerName].procs, p)
|
|
|
|
if pm, ok := pmByPID[p.Stat.PID]; ok {
|
|
containerGroups[containerName].metrics = append(containerGroups[containerName].metrics, pm)
|
|
}
|
|
}
|
|
|
|
// Calculate elapsed time since last collection
|
|
elapsed := 0.0
|
|
if !a.prevCgroupTime.IsZero() {
|
|
elapsed = now.Sub(a.prevCgroupTime).Seconds()
|
|
}
|
|
a.prevCgroupTime = now
|
|
|
|
// Calculate metrics for each container
|
|
result := make(map[string]*CgroupMetrics)
|
|
|
|
for containerName, data := range containerGroups {
|
|
// Sum CPU ticks (utime + stime only, not cutime/cstime)
|
|
var totalTicks uint64
|
|
var totalRSS uint64
|
|
|
|
for _, p := range data.procs {
|
|
totalTicks += p.Stat.TotalTime()
|
|
totalRSS += p.Status.VmRSS
|
|
}
|
|
|
|
// Calculate CPU cores used from delta
|
|
usedCores := 0.0
|
|
hasDelta := false
|
|
if prev, ok := a.prevCgroupCPU[containerName]; ok && elapsed > 0 {
|
|
// Guard against underflow: if processes exited and new ones started,
|
|
// totalTicks could be less than prev. In that case, skip this sample.
|
|
if totalTicks >= prev {
|
|
deltaTicks := totalTicks - prev
|
|
// Convert ticks to cores: deltaTicks / (elapsed_seconds * CLK_TCK)
|
|
usedCores = float64(deltaTicks) / (elapsed * float64(proc.DefaultClockTicks))
|
|
hasDelta = true
|
|
}
|
|
}
|
|
a.prevCgroupCPU[containerName] = totalTicks
|
|
|
|
// Calculate percentages against limits if available
|
|
cpuPercent := 0.0
|
|
memPercent := 0.0
|
|
var limitCores float64
|
|
var limitBytes uint64
|
|
|
|
if limit, ok := a.cgroupLimits[containerName]; ok {
|
|
limitCores = limit.CPUCores
|
|
limitBytes = limit.MemoryBytes
|
|
if limit.CPUCores > 0 {
|
|
cpuPercent = (usedCores / limit.CPUCores) * 100
|
|
}
|
|
if limit.MemoryBytes > 0 {
|
|
memPercent = (float64(totalRSS) / float64(limit.MemoryBytes)) * 100
|
|
}
|
|
}
|
|
|
|
result[containerName] = &CgroupMetrics{
|
|
Name: containerName,
|
|
CgroupPath: data.cgroupPath,
|
|
CPU: CgroupCPUMetrics{
|
|
TotalTicks: totalTicks,
|
|
UsedCores: usedCores,
|
|
UsedPercent: cpuPercent,
|
|
LimitCores: limitCores,
|
|
HasDelta: hasDelta,
|
|
},
|
|
Memory: CgroupMemoryMetrics{
|
|
TotalRSSBytes: totalRSS,
|
|
UsedPercent: memPercent,
|
|
LimitBytes: limitBytes,
|
|
},
|
|
Processes: data.metrics,
|
|
NumProcs: len(data.procs),
|
|
}
|
|
}
|
|
|
|
return result
|
|
}
|