feat: added pre-shared-key for read endpoints
All checks were successful
ci / build (push) Successful in 30s
All checks were successful
ci / build (push) Successful in 30s
This commit is contained in:
parent
90c89583a0
commit
042ce77ddc
5 changed files with 152 additions and 38 deletions
77
README.md
77
README.md
|
|
@ -34,23 +34,25 @@ Runs as a sidecar alongside CI workloads. On a configurable interval, it reads `
|
|||
|
||||
**Environment variables:**
|
||||
|
||||
| Variable | Description | Example |
|
||||
|----------|-------------|---------|
|
||||
| `GITHUB_REPOSITORY_OWNER` | Organization name | `my-org` |
|
||||
| `GITHUB_REPOSITORY` | Full repository path | `my-org/my-repo` |
|
||||
| `GITHUB_WORKFLOW` | Workflow filename | `ci.yml` |
|
||||
| `GITHUB_JOB` | Job name | `build` |
|
||||
| `GITHUB_RUN_ID` | Unique run identifier | `run-123` |
|
||||
| `CGROUP_PROCESS_MAP` | JSON: process name → container name | `{"node":"runner"}` |
|
||||
| `CGROUP_LIMITS` | JSON: per-container CPU/memory limits | See below |
|
||||
| Variable | Description | Example |
|
||||
| ------------------------- | ------------------------------------- | ------------------- |
|
||||
| `GITHUB_REPOSITORY_OWNER` | Organization name | `my-org` |
|
||||
| `GITHUB_REPOSITORY` | Full repository path | `my-org/my-repo` |
|
||||
| `GITHUB_WORKFLOW` | Workflow filename | `ci.yml` |
|
||||
| `GITHUB_JOB` | Job name | `build` |
|
||||
| `GITHUB_RUN_ID` | Unique run identifier | `run-123` |
|
||||
| `CGROUP_PROCESS_MAP` | JSON: process name → container name | `{"node":"runner"}` |
|
||||
| `CGROUP_LIMITS` | JSON: per-container CPU/memory limits | See below |
|
||||
|
||||
**CGROUP_LIMITS example:**
|
||||
|
||||
```json
|
||||
{
|
||||
"runner": {"cpu": "2", "memory": "1Gi"},
|
||||
"sidecar": {"cpu": "500m", "memory": "256Mi"}
|
||||
"runner": { "cpu": "2", "memory": "1Gi" },
|
||||
"sidecar": { "cpu": "500m", "memory": "256Mi" }
|
||||
}
|
||||
```
|
||||
|
||||
CPU supports Kubernetes notation (`"2"` = 2 cores, `"500m"` = 0.5 cores). Memory supports `Ki`, `Mi`, `Gi`, `Ti` (binary) or `K`, `M`, `G`, `T` (decimal).
|
||||
|
||||
### Receiver
|
||||
|
|
@ -58,18 +60,32 @@ CPU supports Kubernetes notation (`"2"` = 2 cores, `"500m"` = 0.5 cores). Memory
|
|||
HTTP service that stores metric summaries in SQLite (via GORM) and exposes a query API.
|
||||
|
||||
```bash
|
||||
./receiver --addr=:8080 --db=metrics.db
|
||||
./receiver --addr=:8080 --db=metrics.db --read-token=my-secret-token
|
||||
```
|
||||
|
||||
| Variable | Description | Default |
|
||||
|----------|-------------|---------|
|
||||
| `DB_PATH` | SQLite database path | `metrics.db` |
|
||||
| `LISTEN_ADDR` | HTTP listen address | `:8080` |
|
||||
**Flags:**
|
||||
|
||||
| Flag | Environment Variable | Description | Default |
|
||||
| -------------- | --------------------- | ---------------------------------------------- | ------------ |
|
||||
| `--addr` | — | HTTP listen address | `:8080` |
|
||||
| `--db` | — | SQLite database path | `metrics.db` |
|
||||
| `--read-token` | `RECEIVER_READ_TOKEN` | Pre-shared token for read endpoints (optional) | — |
|
||||
|
||||
**Endpoints:**
|
||||
|
||||
- `POST /api/v1/metrics` — receive and store a metric summary
|
||||
- `GET /api/v1/metrics/repo/{org}/{repo}/{workflow}/{job}` — query stored metrics
|
||||
- `GET /api/v1/metrics/repo/{org}/{repo}/{workflow}/{job}` — query stored metrics (protected if `--read-token` is set)
|
||||
|
||||
**Authentication:**
|
||||
|
||||
When `--read-token` is configured, the GET endpoint requires a Bearer token:
|
||||
|
||||
```bash
|
||||
curl -H "Authorization: Bearer my-secret-token" \ #gitleaks:allow
|
||||
http://localhost:8080/api/v1/metrics/repo/org/repo/workflow/job
|
||||
```
|
||||
|
||||
If no token is configured, the endpoint remains open.
|
||||
|
||||
## How Metrics Are Collected
|
||||
|
||||
|
|
@ -81,11 +97,11 @@ Container CPU is reported in **cores** (not percentage) for direct comparison wi
|
|||
|
||||
Over the course of a run, the `summary.Accumulator` tracks every sample and on shutdown computes:
|
||||
|
||||
| Stat | Description |
|
||||
|------|-------------|
|
||||
| `peak` | Maximum observed value |
|
||||
| Stat | Description |
|
||||
| -------------------------- | ------------------------------ |
|
||||
| `peak` | Maximum observed value |
|
||||
| `p99`, `p95`, `p75`, `p50` | Percentiles across all samples |
|
||||
| `avg` | Arithmetic mean |
|
||||
| `avg` | Arithmetic mean |
|
||||
|
||||
These stats are computed for CPU, memory, and per-container metrics.
|
||||
|
||||
|
|
@ -127,6 +143,7 @@ GET /api/v1/metrics/repo/my-org/my-repo/ci.yml/build
|
|||
```
|
||||
|
||||
**CPU metric distinction:**
|
||||
|
||||
- `cpu_total_percent` — system-wide, 0-100%
|
||||
- `cpu_cores` (containers) — cores used (e.g. `2.0` = two full cores)
|
||||
- `peak_cpu_percent` (processes) — per-process, where 100% = 1 core
|
||||
|
|
@ -157,15 +174,15 @@ go build -o receiver ./cmd/receiver
|
|||
|
||||
## Internal Packages
|
||||
|
||||
| Package | Purpose |
|
||||
|---------|---------|
|
||||
| `internal/proc` | Low-level `/proc` parsing (stat, status, cgroup) |
|
||||
| `internal/metrics` | Aggregates process metrics from `/proc` into system/container views |
|
||||
| `internal/cgroup` | Parses `CGROUP_PROCESS_MAP` and `CGROUP_LIMITS` env vars |
|
||||
| `internal/collector` | Orchestrates the collection loop and shutdown |
|
||||
| `internal/summary` | Accumulates samples, computes stats, pushes to receiver |
|
||||
| `internal/receiver` | HTTP handlers and SQLite store |
|
||||
| `internal/output` | Metrics output formatting (JSON/text) |
|
||||
| Package | Purpose |
|
||||
| -------------------- | ------------------------------------------------------------------- |
|
||||
| `internal/proc` | Low-level `/proc` parsing (stat, status, cgroup) |
|
||||
| `internal/metrics` | Aggregates process metrics from `/proc` into system/container views |
|
||||
| `internal/cgroup` | Parses `CGROUP_PROCESS_MAP` and `CGROUP_LIMITS` env vars |
|
||||
| `internal/collector` | Orchestrates the collection loop and shutdown |
|
||||
| `internal/summary` | Accumulates samples, computes stats, pushes to receiver |
|
||||
| `internal/receiver` | HTTP handlers and SQLite store |
|
||||
| `internal/output` | Metrics output formatting (JSON/text) |
|
||||
|
||||
## Background
|
||||
|
||||
|
|
|
|||
|
|
@ -22,6 +22,7 @@ const (
|
|||
func main() {
|
||||
addr := flag.String("addr", defaultAddr, "HTTP listen address")
|
||||
dbPath := flag.String("db", defaultDBPath, "SQLite database path")
|
||||
readToken := flag.String("read-token", os.Getenv("RECEIVER_READ_TOKEN"), "Pre-shared token for read endpoints (or set RECEIVER_READ_TOKEN)")
|
||||
flag.Parse()
|
||||
|
||||
logger := slog.New(slog.NewJSONHandler(os.Stderr, &slog.HandlerOptions{
|
||||
|
|
@ -35,7 +36,7 @@ func main() {
|
|||
}
|
||||
defer func() { _ = store.Close() }()
|
||||
|
||||
handler := receiver.NewHandler(store, logger)
|
||||
handler := receiver.NewHandler(store, logger, *readToken)
|
||||
mux := http.NewServeMux()
|
||||
handler.RegisterRoutes(mux)
|
||||
|
||||
|
|
|
|||
|
|
@ -27,7 +27,7 @@ func setupTestReceiver(t *testing.T) (*receiver.Store, *httptest.Server, func())
|
|||
t.Fatalf("NewStore() error = %v", err)
|
||||
}
|
||||
|
||||
handler := receiver.NewHandler(store, slog.New(slog.NewTextHandler(io.Discard, nil)))
|
||||
handler := receiver.NewHandler(store, slog.New(slog.NewTextHandler(io.Discard, nil)), "")
|
||||
mux := http.NewServeMux()
|
||||
handler.RegisterRoutes(mux)
|
||||
|
||||
|
|
|
|||
|
|
@ -3,20 +3,25 @@
|
|||
package receiver
|
||||
|
||||
import (
|
||||
"crypto/subtle"
|
||||
"encoding/json"
|
||||
"log/slog"
|
||||
"net/http"
|
||||
"strings"
|
||||
)
|
||||
|
||||
// Handler handles HTTP requests for the metrics receiver
|
||||
type Handler struct {
|
||||
store *Store
|
||||
logger *slog.Logger
|
||||
store *Store
|
||||
logger *slog.Logger
|
||||
readToken string // Pre-shared token for read endpoints (empty = no auth required)
|
||||
}
|
||||
|
||||
// NewHandler creates a new HTTP handler with the given store
|
||||
func NewHandler(store *Store, logger *slog.Logger) *Handler {
|
||||
return &Handler{store: store, logger: logger}
|
||||
// NewHandler creates a new HTTP handler with the given store.
|
||||
// If readToken is non-empty, the GET metrics endpoint will require
|
||||
// Bearer token authentication.
|
||||
func NewHandler(store *Store, logger *slog.Logger, readToken string) *Handler {
|
||||
return &Handler{store: store, logger: logger, readToken: readToken}
|
||||
}
|
||||
|
||||
// RegisterRoutes registers all HTTP routes on the given mux
|
||||
|
|
@ -26,6 +31,37 @@ func (h *Handler) RegisterRoutes(mux *http.ServeMux) {
|
|||
mux.HandleFunc("GET /health", h.handleHealth)
|
||||
}
|
||||
|
||||
// validateReadToken checks the Authorization header for a valid Bearer token.
|
||||
// Returns true if authentication is disabled (empty readToken) or if the token matches.
|
||||
func (h *Handler) validateReadToken(w http.ResponseWriter, r *http.Request) bool {
|
||||
if h.readToken == "" {
|
||||
return true
|
||||
}
|
||||
|
||||
authHeader := r.Header.Get("Authorization")
|
||||
if authHeader == "" {
|
||||
h.logger.Warn("missing authorization header", slog.String("path", r.URL.Path))
|
||||
http.Error(w, "authorization required", http.StatusUnauthorized)
|
||||
return false
|
||||
}
|
||||
|
||||
const bearerPrefix = "Bearer "
|
||||
if !strings.HasPrefix(authHeader, bearerPrefix) {
|
||||
h.logger.Warn("invalid authorization format", slog.String("path", r.URL.Path))
|
||||
http.Error(w, "invalid authorization format", http.StatusUnauthorized)
|
||||
return false
|
||||
}
|
||||
|
||||
token := strings.TrimPrefix(authHeader, bearerPrefix)
|
||||
if subtle.ConstantTimeCompare([]byte(token), []byte(h.readToken)) != 1 {
|
||||
h.logger.Warn("invalid token", slog.String("path", r.URL.Path))
|
||||
http.Error(w, "invalid token", http.StatusUnauthorized)
|
||||
return false
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
func (h *Handler) handleReceiveMetrics(w http.ResponseWriter, r *http.Request) {
|
||||
var payload MetricsPayload
|
||||
if err := json.NewDecoder(r.Body).Decode(&payload); err != nil {
|
||||
|
|
@ -58,6 +94,10 @@ func (h *Handler) handleReceiveMetrics(w http.ResponseWriter, r *http.Request) {
|
|||
}
|
||||
|
||||
func (h *Handler) handleGetByWorkflowJob(w http.ResponseWriter, r *http.Request) {
|
||||
if !h.validateReadToken(w, r) {
|
||||
return
|
||||
}
|
||||
|
||||
org := r.PathValue("org")
|
||||
repo := r.PathValue("repo")
|
||||
workflow := r.PathValue("workflow")
|
||||
|
|
|
|||
|
|
@ -158,6 +158,48 @@ func TestHandler_GetByWorkflowJob_NotFound(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestHandler_GetByWorkflowJob_WithToken(t *testing.T) {
|
||||
h, cleanup := newTestHandlerWithToken(t, "secret-token")
|
||||
defer cleanup()
|
||||
|
||||
// Save a metric
|
||||
payload := &MetricsPayload{
|
||||
Execution: ExecutionContext{Organization: "org", Repository: "repo", Workflow: "ci.yml", Job: "build", RunID: "r1"},
|
||||
}
|
||||
if _, err := h.store.SaveMetric(payload); err != nil {
|
||||
t.Fatalf("SaveMetric() error = %v", err)
|
||||
}
|
||||
|
||||
mux := http.NewServeMux()
|
||||
h.RegisterRoutes(mux)
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
authHeader string
|
||||
wantCode int
|
||||
}{
|
||||
{"no auth header", "", http.StatusUnauthorized},
|
||||
{"wrong format", "Basic dXNlcjpwYXNz", http.StatusUnauthorized},
|
||||
{"wrong token", "Bearer wrong-token", http.StatusUnauthorized},
|
||||
{"valid token", "Bearer secret-token", http.StatusOK},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
req := httptest.NewRequest(http.MethodGet, "/api/v1/metrics/repo/org/repo/ci.yml/build", nil)
|
||||
if tt.authHeader != "" {
|
||||
req.Header.Set("Authorization", tt.authHeader)
|
||||
}
|
||||
rec := httptest.NewRecorder()
|
||||
mux.ServeHTTP(rec, req)
|
||||
|
||||
if rec.Code != tt.wantCode {
|
||||
t.Errorf("status = %d, want %d", rec.Code, tt.wantCode)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestHandler_Health(t *testing.T) {
|
||||
h, cleanup := newTestHandler(t)
|
||||
defer cleanup()
|
||||
|
|
@ -191,7 +233,21 @@ func newTestHandler(t *testing.T) (*Handler, func()) {
|
|||
}
|
||||
|
||||
logger := slog.New(slog.NewTextHandler(io.Discard, nil))
|
||||
handler := NewHandler(store, logger)
|
||||
handler := NewHandler(store, logger, "") // no auth token for basic tests
|
||||
|
||||
return handler, func() { _ = store.Close() }
|
||||
}
|
||||
|
||||
func newTestHandlerWithToken(t *testing.T, token string) (*Handler, func()) {
|
||||
t.Helper()
|
||||
dbPath := filepath.Join(t.TempDir(), "test.db")
|
||||
store, err := NewStore(dbPath)
|
||||
if err != nil {
|
||||
t.Fatalf("NewStore() error = %v", err)
|
||||
}
|
||||
|
||||
logger := slog.New(slog.NewTextHandler(io.Discard, nil))
|
||||
handler := NewHandler(store, logger, token)
|
||||
|
||||
return handler, func() { _ = store.Close() }
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue