edge-connect-client/sdk/internal/http/transport.go
2025-10-20 13:12:06 +02:00

230 lines
5.8 KiB
Go

// ABOUTME: HTTP transport layer with retry logic and request/response handling
// ABOUTME: Provides resilient HTTP communication with context support and error wrapping
package http
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"math"
"math/rand"
"net/http"
"time"
"github.com/hashicorp/go-retryablehttp"
)
// Transport wraps HTTP operations with retry logic and error handling
type Transport struct {
client *retryablehttp.Client
authProvider AuthProvider
logger Logger
}
// AuthProvider interface for attaching authentication
type AuthProvider interface {
Attach(ctx context.Context, req *http.Request) error
}
// Logger interface for request/response logging
type Logger interface {
Printf(format string, v ...interface{})
}
// RetryOptions configures retry behavior
type RetryOptions struct {
MaxRetries int
InitialDelay time.Duration
MaxDelay time.Duration
Multiplier float64
RetryableHTTPStatusCodes []int
}
// NewTransport creates a new HTTP transport with retry capabilities
func NewTransport(opts RetryOptions, auth AuthProvider, logger Logger) *Transport {
client := retryablehttp.NewClient()
// Configure retry policy
client.RetryMax = opts.MaxRetries
client.RetryWaitMin = opts.InitialDelay
client.RetryWaitMax = opts.MaxDelay
// Custom retry policy that considers both network errors and HTTP status codes
client.CheckRetry = func(ctx context.Context, resp *http.Response, err error) (bool, error) {
// Default retry for network errors
if err != nil {
return true, nil
}
// Check if status code is retryable
if resp != nil {
for _, code := range opts.RetryableHTTPStatusCodes {
if resp.StatusCode == code {
return true, nil
}
}
}
return false, nil
}
// Custom backoff with jitter
client.Backoff = func(min, max time.Duration, attemptNum int, resp *http.Response) time.Duration {
mult := math.Pow(opts.Multiplier, float64(attemptNum))
sleep := time.Duration(mult) * min
if sleep > max {
sleep = max
}
// Add jitter
jitter := time.Duration(rand.Float64() * float64(sleep) * 0.1)
return sleep + jitter
}
// Disable default logging if no logger provided
if logger == nil {
client.Logger = nil
}
return &Transport{
client: client,
authProvider: auth,
logger: logger,
}
}
// Call executes an HTTP request with retry logic and returns typed response
func (t *Transport) Call(ctx context.Context, method, url string, body interface{}) (*http.Response, error) {
var reqBody io.Reader
var jsonData []byte
// Marshal request body if provided
if body != nil {
var err error
jsonData, err = json.Marshal(body)
if err != nil {
return nil, fmt.Errorf("failed to marshal request body: %w", err)
}
reqBody = bytes.NewReader(jsonData)
}
// Create retryable request
req, err := retryablehttp.NewRequestWithContext(ctx, method, url, reqBody)
if err != nil {
return nil, fmt.Errorf("failed to create request: %w", err)
}
// Set headers
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Accept", "application/json")
// Add authentication
if t.authProvider != nil {
if err := t.authProvider.Attach(ctx, req.Request); err != nil {
return nil, fmt.Errorf("failed to attach auth: %w", err)
}
}
// Log request
if t.logger != nil {
t.logger.Printf("=== HTTP REQUEST ===")
t.logger.Printf("%s %s", method, url)
if len(jsonData) > 0 {
var prettyJSON bytes.Buffer
if err := json.Indent(&prettyJSON, jsonData, "", " "); err == nil {
t.logger.Printf("Request Body:\n%s", prettyJSON.String())
} else {
t.logger.Printf("Request Body: %s", string(jsonData))
}
}
}
// Execute request
resp, err := t.client.Do(req)
if err != nil {
return nil, fmt.Errorf("HTTP request failed: %w", err)
}
// Log response
if t.logger != nil {
t.logger.Printf("=== HTTP RESPONSE ===")
t.logger.Printf("%s %s -> %d", method, url, resp.StatusCode)
}
return resp, nil
}
// CallJSON executes a request and unmarshals the response into a typed result
func (t *Transport) CallJSON(ctx context.Context, method, url string, body interface{}, result interface{}) (*http.Response, error) {
resp, err := t.Call(ctx, method, url, body)
if err != nil {
return resp, err
}
defer resp.Body.Close()
// Read response body
respBody, err := io.ReadAll(resp.Body)
if err != nil {
return resp, fmt.Errorf("failed to read response body: %w", err)
}
// For error responses, don't try to unmarshal into result type
if resp.StatusCode >= 400 {
return resp, &HTTPError{
StatusCode: resp.StatusCode,
Status: resp.Status,
Body: respBody,
}
}
// Unmarshal successful response
if result != nil && len(respBody) > 0 {
if err := json.Unmarshal(respBody, result); err != nil {
return resp, fmt.Errorf("failed to unmarshal response: %w", err)
}
}
return resp, nil
}
// HTTPError represents an HTTP error response
type HTTPError struct {
StatusCode int `json:"status_code"`
Status string `json:"status"`
Body []byte `json:"-"`
}
func (e *HTTPError) Error() string {
if len(e.Body) > 0 {
return fmt.Sprintf("HTTP %d %s: %s", e.StatusCode, e.Status, string(e.Body))
}
return fmt.Sprintf("HTTP %d %s", e.StatusCode, e.Status)
}
// IsRetryable returns true if the error indicates a retryable condition
func (e *HTTPError) IsRetryable() bool {
return e.StatusCode >= 500 || e.StatusCode == 429 || e.StatusCode == 408
}
// ParseJSONLines parses streaming JSON response line by line
func ParseJSONLines(body io.Reader, callback func([]byte) error) error {
decoder := json.NewDecoder(body)
for {
var raw json.RawMessage
if err := decoder.Decode(&raw); err != nil {
if err == io.EOF {
break
}
return fmt.Errorf("failed to decode JSON line: %w", err)
}
if err := callback(raw); err != nil {
return err
}
}
return nil
}