forgejo-runner-optimiser/internal/collector/collector.go
Martin McCaffery d713c25fa5
All checks were successful
ci / build (push) Successful in 26s
ci / goreleaser (push) Successful in 23s
Rename repo from forgejo-runner-resource-collector
2026-02-12 09:55:04 +01:00

128 lines
3.3 KiB
Go

package collector
import (
"context"
"fmt"
"log/slog"
"time"
"edp.buildth.ing/DevFW-CICD/forgejo-runner-optimiser/internal/metrics"
"edp.buildth.ing/DevFW-CICD/forgejo-runner-optimiser/internal/output"
"edp.buildth.ing/DevFW-CICD/forgejo-runner-optimiser/internal/summary"
)
// Config holds the collector configuration
type Config struct {
ProcPath string
Interval time.Duration
TopN int
}
// Collector orchestrates metric collection
type Collector struct {
config Config
aggregator *metrics.Aggregator
writer output.Writer
logger *slog.Logger
accumulator *summary.Accumulator
summaryWriter *summary.SummaryWriter
pushClient *summary.PushClient
}
// New creates a new collector
func New(cfg Config, writer output.Writer, logger *slog.Logger) *Collector {
return &Collector{
config: cfg,
aggregator: metrics.NewAggregator(cfg.ProcPath, cfg.TopN),
writer: writer,
logger: logger,
accumulator: summary.NewAccumulator(cfg.TopN),
}
}
// SetSummaryWriter attaches a summary writer for emitting run summaries on shutdown
func (c *Collector) SetSummaryWriter(w *summary.SummaryWriter) {
c.summaryWriter = w
}
// SetPushClient attaches a push client for sending summaries to the receiver
func (c *Collector) SetPushClient(p *summary.PushClient) {
c.pushClient = p
}
// Run starts the collector loop and blocks until context is cancelled
func (c *Collector) Run(ctx context.Context) error {
c.logger.Info("collector starting",
slog.String("proc_path", c.config.ProcPath),
slog.Duration("interval", c.config.Interval),
slog.Int("top_n", c.config.TopN),
)
// Collect immediately on start
if err := c.collect(); err != nil {
c.logger.Warn("initial collection failed", slog.String("error", err.Error()))
}
ticker := time.NewTicker(c.config.Interval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
c.logger.Info("collector stopping")
c.emitSummary(context.Background()) // Use fresh context for shutdown tasks
return ctx.Err()
case <-ticker.C:
if err := c.collect(); err != nil {
c.logger.Warn("collection failed", slog.String("error", err.Error()))
}
}
}
}
// collect performs a single collection cycle
func (c *Collector) collect() error {
m, err := c.aggregator.Collect()
if err != nil {
return fmt.Errorf("aggregating metrics: %w", err)
}
if err := c.writer.Write(m); err != nil {
return fmt.Errorf("writing metrics: %w", err)
}
c.accumulator.Add(m)
return nil
}
// emitSummary computes and writes the run summary if a writer is configured
func (c *Collector) emitSummary(ctx context.Context) {
s := c.accumulator.Summarize()
if s == nil {
c.logger.Info("no samples collected, skipping run summary")
return
}
c.logger.Info("emitting run summary",
slog.Int("sample_count", s.SampleCount),
slog.Float64("duration_seconds", s.DurationSeconds),
)
if c.summaryWriter != nil {
c.summaryWriter.Write(s)
}
if c.pushClient != nil {
if err := c.pushClient.Push(ctx, s); err != nil {
c.logger.Error("failed to push metrics", slog.String("error", err.Error()))
} else {
c.logger.Info("metrics pushed successfully")
}
}
}
// CollectOnce performs a single collection and returns the metrics
func (c *Collector) CollectOnce() (*metrics.SystemMetrics, error) {
return c.aggregator.Collect()
}