feat(parser): add result parser of createappinstance and added a configurable timeout for that function
This commit is contained in:
parent
8f6fd94442
commit
0b31409b26
10 changed files with 218 additions and 47 deletions
|
|
@ -53,6 +53,7 @@ func newSDKClient() *edgeconnect.Client {
|
|||
baseURL := viper.GetString("base_url")
|
||||
username := viper.GetString("username")
|
||||
password := viper.GetString("password")
|
||||
createAppInstanceTimeout := viper.GetInt("create_app_instance_timeout")
|
||||
|
||||
err := validateBaseURL(baseURL)
|
||||
if err != nil {
|
||||
|
|
@ -60,15 +61,20 @@ func newSDKClient() *edgeconnect.Client {
|
|||
os.Exit(1)
|
||||
}
|
||||
|
||||
// Convert timeout from minutes to duration
|
||||
timeout := time.Duration(createAppInstanceTimeout) * time.Minute
|
||||
|
||||
if username != "" && password != "" {
|
||||
return edgeconnect.NewClientWithCredentials(baseURL, username, password,
|
||||
edgeconnect.WithHTTPClient(&http.Client{Timeout: 30 * time.Second}),
|
||||
edgeconnect.WithCreateAppInstanceTimeout(timeout),
|
||||
)
|
||||
}
|
||||
|
||||
// Fallback to no auth for now - in production should require auth
|
||||
return edgeconnect.NewClient(baseURL,
|
||||
edgeconnect.WithHTTPClient(&http.Client{Timeout: 30 * time.Second}),
|
||||
edgeconnect.WithCreateAppInstanceTimeout(timeout),
|
||||
)
|
||||
}
|
||||
|
||||
|
|
|
|||
12
cmd/root.go
12
cmd/root.go
|
|
@ -9,10 +9,11 @@ import (
|
|||
)
|
||||
|
||||
var (
|
||||
cfgFile string
|
||||
baseURL string
|
||||
username string
|
||||
password string
|
||||
cfgFile string
|
||||
baseURL string
|
||||
username string
|
||||
password string
|
||||
createAppInstanceTimeout int // timeout in minutes
|
||||
)
|
||||
|
||||
// rootCmd represents the base command when called without any subcommands
|
||||
|
|
@ -39,10 +40,12 @@ func init() {
|
|||
rootCmd.PersistentFlags().StringVar(&baseURL, "base-url", "", "base URL for the Edge Connect API")
|
||||
rootCmd.PersistentFlags().StringVar(&username, "username", "", "username for authentication")
|
||||
rootCmd.PersistentFlags().StringVar(&password, "password", "", "password for authentication")
|
||||
rootCmd.PersistentFlags().IntVar(&createAppInstanceTimeout, "create-app-instance-timeout", 10, "timeout in minutes for CreateAppInstance operations")
|
||||
|
||||
viper.BindPFlag("base_url", rootCmd.PersistentFlags().Lookup("base-url"))
|
||||
viper.BindPFlag("username", rootCmd.PersistentFlags().Lookup("username"))
|
||||
viper.BindPFlag("password", rootCmd.PersistentFlags().Lookup("password"))
|
||||
viper.BindPFlag("create_app_instance_timeout", rootCmd.PersistentFlags().Lookup("create-app-instance-timeout"))
|
||||
}
|
||||
|
||||
func initConfig() {
|
||||
|
|
@ -51,6 +54,7 @@ func initConfig() {
|
|||
viper.BindEnv("base_url", "EDGE_CONNECT_BASE_URL")
|
||||
viper.BindEnv("username", "EDGE_CONNECT_USERNAME")
|
||||
viper.BindEnv("password", "EDGE_CONNECT_PASSWORD")
|
||||
viper.BindEnv("create_app_instance_timeout", "EDGE_CONNECT_CREATE_APP_INSTANCE_TIMEOUT")
|
||||
|
||||
if cfgFile != "" {
|
||||
viper.SetConfigFile(cfgFile)
|
||||
|
|
|
|||
|
|
@ -4,6 +4,7 @@ package apply
|
|||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strings"
|
||||
"sync"
|
||||
|
|
@ -355,6 +356,15 @@ func (r *RecreateStrategy) executeInstanceActionWithRetry(ctx context.Context, a
|
|||
}
|
||||
|
||||
lastErr = err
|
||||
|
||||
// Check if error is retryable (don't retry 4xx client errors)
|
||||
if !isRetryableError(err) {
|
||||
r.logf("Failed to %s instance %s: %v (non-retryable error, giving up)", operation, action.InstanceName, err)
|
||||
result.Error = fmt.Errorf("non-retryable error: %w", err)
|
||||
result.Duration = time.Since(startTime)
|
||||
return result
|
||||
}
|
||||
|
||||
if attempt < r.config.MaxRetries {
|
||||
r.logf("Failed to %s instance %s: %v (will retry)", operation, action.InstanceName, err)
|
||||
}
|
||||
|
|
@ -395,6 +405,15 @@ func (r *RecreateStrategy) executeAppActionWithRetry(ctx context.Context, action
|
|||
}
|
||||
|
||||
lastErr = err
|
||||
|
||||
// Check if error is retryable (don't retry 4xx client errors)
|
||||
if !isRetryableError(err) {
|
||||
r.logf("Failed to update app: %v (non-retryable error, giving up)", err)
|
||||
result.Error = fmt.Errorf("non-retryable error: %w", err)
|
||||
result.Duration = time.Since(startTime)
|
||||
return result
|
||||
}
|
||||
|
||||
if attempt < r.config.MaxRetries {
|
||||
r.logf("Failed to update app: %v (will retry)", err)
|
||||
}
|
||||
|
|
@ -503,3 +522,27 @@ func (r *RecreateStrategy) logf(format string, v ...interface{}) {
|
|||
r.logger.Printf("[RecreateStrategy] "+format, v...)
|
||||
}
|
||||
}
|
||||
|
||||
// isRetryableError determines if an error should be retried
|
||||
// Returns false for client errors (4xx), true for server errors (5xx) and other transient errors
|
||||
func isRetryableError(err error) bool {
|
||||
if err == nil {
|
||||
return false
|
||||
}
|
||||
|
||||
// Check if it's an APIError with a status code
|
||||
var apiErr *edgeconnect.APIError
|
||||
if errors.As(err, &apiErr) {
|
||||
// Don't retry client errors (4xx)
|
||||
if apiErr.StatusCode >= 400 && apiErr.StatusCode < 500 {
|
||||
return false
|
||||
}
|
||||
// Retry server errors (5xx)
|
||||
if apiErr.StatusCode >= 500 {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
// Retry all other errors (network issues, timeouts, etc.)
|
||||
return true
|
||||
}
|
||||
|
|
|
|||
|
|
@ -15,10 +15,14 @@ import (
|
|||
// CreateAppInstance creates a new application instance in the specified region
|
||||
// Maps to POST /auth/ctrl/CreateAppInst
|
||||
func (c *Client) CreateAppInstance(ctx context.Context, input *NewAppInstanceInput) error {
|
||||
// Apply CreateAppInstance-specific timeout
|
||||
timeoutCtx, cancel := context.WithTimeout(ctx, c.CreateAppInstanceTimeout)
|
||||
defer cancel()
|
||||
|
||||
transport := c.getTransport()
|
||||
url := c.BaseURL + "/api/v1/auth/ctrl/CreateAppInst"
|
||||
|
||||
resp, err := transport.Call(ctx, "POST", url, input)
|
||||
resp, err := transport.Call(timeoutCtx, "POST", url, input)
|
||||
if err != nil {
|
||||
return fmt.Errorf("CreateAppInstance failed: %w", err)
|
||||
}
|
||||
|
|
@ -28,6 +32,12 @@ func (c *Client) CreateAppInstance(ctx context.Context, input *NewAppInstanceInp
|
|||
return c.handleErrorResponse(resp, "CreateAppInstance")
|
||||
}
|
||||
|
||||
// Parse streaming JSON response
|
||||
var appInstances []AppInstance
|
||||
if err := c.parseStreamingAppInstanceResponse(resp, &appInstances); err != nil {
|
||||
return fmt.Errorf("ShowAppInstance failed to parse response: %w", err)
|
||||
}
|
||||
|
||||
c.logf("CreateAppInstance: %s/%s created successfully",
|
||||
input.AppInst.Key.Organization, input.AppInst.Key.Name)
|
||||
|
||||
|
|
@ -187,14 +197,41 @@ func (c *Client) DeleteAppInstance(ctx context.Context, appInstKey AppInstanceKe
|
|||
|
||||
// parseStreamingAppInstanceResponse parses the EdgeXR streaming JSON response format for app instances
|
||||
func (c *Client) parseStreamingAppInstanceResponse(resp *http.Response, result interface{}) error {
|
||||
var responses []Response[AppInstance]
|
||||
var appInstances []AppInstance
|
||||
var messages []string
|
||||
var hasError bool
|
||||
var errorCode int
|
||||
var errorMessage string
|
||||
|
||||
parseErr := sdkhttp.ParseJSONLines(resp.Body, func(line []byte) error {
|
||||
// Try parsing as ResultResponse first (error format)
|
||||
var resultResp ResultResponse
|
||||
if err := json.Unmarshal(line, &resultResp); err == nil && resultResp.Result.Message != "" {
|
||||
if resultResp.IsError() {
|
||||
hasError = true
|
||||
errorCode = resultResp.GetCode()
|
||||
errorMessage = resultResp.GetMessage()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Try parsing as Response[AppInstance]
|
||||
var response Response[AppInstance]
|
||||
if err := json.Unmarshal(line, &response); err != nil {
|
||||
return err
|
||||
}
|
||||
responses = append(responses, response)
|
||||
|
||||
if response.HasData() {
|
||||
appInstances = append(appInstances, response.Data)
|
||||
}
|
||||
if response.IsMessage() {
|
||||
msg := response.Data.GetMessage()
|
||||
messages = append(messages, msg)
|
||||
// Check for error indicators in messages
|
||||
if msg == "CreateError" || msg == "UpdateError" || msg == "DeleteError" {
|
||||
hasError = true
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
|
|
@ -202,25 +239,20 @@ func (c *Client) parseStreamingAppInstanceResponse(resp *http.Response, result i
|
|||
return parseErr
|
||||
}
|
||||
|
||||
// Extract data from responses
|
||||
var appInstances []AppInstance
|
||||
var messages []string
|
||||
|
||||
for _, response := range responses {
|
||||
if response.HasData() {
|
||||
appInstances = append(appInstances, response.Data)
|
||||
}
|
||||
if response.IsMessage() {
|
||||
messages = append(messages, response.Data.GetMessage())
|
||||
}
|
||||
}
|
||||
|
||||
// If we have error messages, return them
|
||||
if len(messages) > 0 {
|
||||
return &APIError{
|
||||
// If we detected an error, return it
|
||||
if hasError {
|
||||
apiErr := &APIError{
|
||||
StatusCode: resp.StatusCode,
|
||||
Messages: messages,
|
||||
}
|
||||
if errorCode > 0 {
|
||||
apiErr.StatusCode = errorCode
|
||||
apiErr.Code = fmt.Sprintf("%d", errorCode)
|
||||
}
|
||||
if errorMessage != "" {
|
||||
apiErr.Messages = append([]string{errorMessage}, apiErr.Messages...)
|
||||
}
|
||||
return apiErr
|
||||
}
|
||||
|
||||
// Set result based on type
|
||||
|
|
|
|||
|
|
@ -22,6 +22,7 @@ func TestCreateAppInstance(t *testing.T) {
|
|||
mockStatusCode int
|
||||
mockResponse string
|
||||
expectError bool
|
||||
errorContains string
|
||||
}{
|
||||
{
|
||||
name: "successful creation",
|
||||
|
|
@ -63,6 +64,57 @@ func TestCreateAppInstance(t *testing.T) {
|
|||
mockResponse: `{"message": "organization is required"}`,
|
||||
expectError: true,
|
||||
},
|
||||
{
|
||||
name: "HTTP 200 with CreateError message",
|
||||
input: &NewAppInstanceInput{
|
||||
Region: "us-west",
|
||||
AppInst: AppInstance{
|
||||
Key: AppInstanceKey{
|
||||
Organization: "testorg",
|
||||
Name: "testinst",
|
||||
CloudletKey: CloudletKey{
|
||||
Organization: "cloudletorg",
|
||||
Name: "testcloudlet",
|
||||
},
|
||||
},
|
||||
Flavor: Flavor{Name: "m4.small"},
|
||||
},
|
||||
},
|
||||
mockStatusCode: 200,
|
||||
mockResponse: `{"data":{"message":"Creating"}}
|
||||
{"data":{"message":"a service has been configured"}}
|
||||
{"data":{"message":"CreateError"}}
|
||||
{"data":{"message":"Deleting AppInst due to failure"}}
|
||||
{"data":{"message":"Deleted AppInst successfully"}}`,
|
||||
expectError: true,
|
||||
errorContains: "CreateError",
|
||||
},
|
||||
{
|
||||
name: "HTTP 200 with result error code",
|
||||
input: &NewAppInstanceInput{
|
||||
Region: "us-west",
|
||||
AppInst: AppInstance{
|
||||
Key: AppInstanceKey{
|
||||
Organization: "testorg",
|
||||
Name: "testinst",
|
||||
CloudletKey: CloudletKey{
|
||||
Organization: "cloudletorg",
|
||||
Name: "testcloudlet",
|
||||
},
|
||||
},
|
||||
Flavor: Flavor{Name: "m4.small"},
|
||||
},
|
||||
},
|
||||
mockStatusCode: 200,
|
||||
mockResponse: `{"data":{"message":"Creating"}}
|
||||
{"data":{"message":"a service has been configured"}}
|
||||
{"data":{"message":"CreateError"}}
|
||||
{"data":{"message":"Deleting AppInst due to failure"}}
|
||||
{"data":{"message":"Deleted AppInst successfully"}}
|
||||
{"result":{"message":"Encountered failures: Create App Inst failed: deployments.apps is forbidden: User \"system:serviceaccount:edgexr:crm-telekomop-munich\" cannot create resource \"deployments\" in API group \"apps\" in the namespace \"gitea\"","code":400}}`,
|
||||
expectError: true,
|
||||
errorContains: "deployments.apps is forbidden",
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
|
|
@ -91,6 +143,9 @@ func TestCreateAppInstance(t *testing.T) {
|
|||
// Verify results
|
||||
if tt.expectError {
|
||||
assert.Error(t, err)
|
||||
if tt.errorContains != "" {
|
||||
assert.Contains(t, err.Error(), tt.errorContains)
|
||||
}
|
||||
} else {
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -11,11 +11,12 @@ import (
|
|||
|
||||
// Client represents the EdgeXR Master Controller SDK client
|
||||
type Client struct {
|
||||
BaseURL string
|
||||
HTTPClient *http.Client
|
||||
AuthProvider AuthProvider
|
||||
RetryOpts RetryOptions
|
||||
Logger Logger
|
||||
BaseURL string
|
||||
HTTPClient *http.Client
|
||||
AuthProvider AuthProvider
|
||||
RetryOpts RetryOptions
|
||||
Logger Logger
|
||||
CreateAppInstanceTimeout time.Duration
|
||||
}
|
||||
|
||||
// RetryOptions configures retry behavior for API calls
|
||||
|
|
@ -81,13 +82,21 @@ func WithLogger(logger Logger) Option {
|
|||
}
|
||||
}
|
||||
|
||||
// WithCreateAppInstanceTimeout sets the timeout for CreateAppInstance operations
|
||||
func WithCreateAppInstanceTimeout(timeout time.Duration) Option {
|
||||
return func(c *Client) {
|
||||
c.CreateAppInstanceTimeout = timeout
|
||||
}
|
||||
}
|
||||
|
||||
// NewClient creates a new EdgeXR SDK client
|
||||
func NewClient(baseURL string, options ...Option) *Client {
|
||||
client := &Client{
|
||||
BaseURL: strings.TrimRight(baseURL, "/"),
|
||||
HTTPClient: &http.Client{Timeout: 30 * time.Second},
|
||||
AuthProvider: NewNoAuthProvider(),
|
||||
RetryOpts: DefaultRetryOptions(),
|
||||
BaseURL: strings.TrimRight(baseURL, "/"),
|
||||
HTTPClient: &http.Client{Timeout: 30 * time.Second},
|
||||
AuthProvider: NewNoAuthProvider(),
|
||||
RetryOpts: DefaultRetryOptions(),
|
||||
CreateAppInstanceTimeout: 10 * time.Minute,
|
||||
}
|
||||
|
||||
for _, opt := range options {
|
||||
|
|
@ -101,10 +110,11 @@ func NewClient(baseURL string, options ...Option) *Client {
|
|||
// This matches the existing client pattern from client/client.go
|
||||
func NewClientWithCredentials(baseURL, username, password string, options ...Option) *Client {
|
||||
client := &Client{
|
||||
BaseURL: strings.TrimRight(baseURL, "/"),
|
||||
HTTPClient: &http.Client{Timeout: 30 * time.Second},
|
||||
AuthProvider: NewUsernamePasswordProvider(baseURL, username, password, nil),
|
||||
RetryOpts: DefaultRetryOptions(),
|
||||
BaseURL: strings.TrimRight(baseURL, "/"),
|
||||
HTTPClient: &http.Client{Timeout: 30 * time.Second},
|
||||
AuthProvider: NewUsernamePasswordProvider(baseURL, username, password, nil),
|
||||
RetryOpts: DefaultRetryOptions(),
|
||||
CreateAppInstanceTimeout: 10 * time.Minute,
|
||||
}
|
||||
|
||||
for _, opt := range options {
|
||||
|
|
|
|||
|
|
@ -271,6 +271,26 @@ func (res *Response[T]) IsMessage() bool {
|
|||
return res.Data.GetMessage() != ""
|
||||
}
|
||||
|
||||
// ResultResponse represents an API result with error code
|
||||
type ResultResponse struct {
|
||||
Result struct {
|
||||
Message string `json:"message"`
|
||||
Code int `json:"code"`
|
||||
} `json:"result"`
|
||||
}
|
||||
|
||||
func (r *ResultResponse) IsError() bool {
|
||||
return r.Result.Code >= 400
|
||||
}
|
||||
|
||||
func (r *ResultResponse) GetMessage() string {
|
||||
return r.Result.Message
|
||||
}
|
||||
|
||||
func (r *ResultResponse) GetCode() int {
|
||||
return r.Result.Code
|
||||
}
|
||||
|
||||
// Responses wraps multiple API responses with metadata
|
||||
type Responses[T Message] struct {
|
||||
Responses []Response[T] `json:"responses,omitempty"`
|
||||
|
|
|
|||
|
|
@ -2,7 +2,7 @@
|
|||
# How does it differ from the EdgeXR API?
|
||||
kind: edgeconnect-deployment
|
||||
metadata:
|
||||
name: "edge-app-demo" # name could be used for appName
|
||||
name: "edge-app-test" # name could be used for appName
|
||||
appVersion: "1.0.0"
|
||||
organization: "edp2"
|
||||
spec:
|
||||
|
|
@ -15,7 +15,7 @@ spec:
|
|||
infraTemplate:
|
||||
- region: "EU"
|
||||
cloudletOrg: "TelekomOP"
|
||||
cloudletName: "Munich"
|
||||
cloudletName: "Hamburg"
|
||||
flavorName: "EU.small"
|
||||
network:
|
||||
outboundConnections:
|
||||
|
|
|
|||
|
|
@ -2,7 +2,7 @@
|
|||
# How does it differ from the EdgeXR API?
|
||||
kind: edgeconnect-deployment
|
||||
metadata:
|
||||
name: "forgejo-runner-test" # name could be used for appName
|
||||
name: "forgejo-runner-edge" # name could be used for appName
|
||||
appVersion: "1.0.0"
|
||||
organization: "edp2"
|
||||
spec:
|
||||
|
|
@ -15,7 +15,7 @@ spec:
|
|||
infraTemplate:
|
||||
- region: "EU"
|
||||
cloudletOrg: "TelekomOP"
|
||||
cloudletName: "Munich"
|
||||
cloudletName: "Hamburg"
|
||||
flavorName: "EU.small"
|
||||
network:
|
||||
outboundConnections:
|
||||
|
|
|
|||
|
|
@ -1,9 +1,9 @@
|
|||
apiVersion: v1
|
||||
kind: Service
|
||||
metadata:
|
||||
name: edgeconnect-coder-tcp
|
||||
name: forgejo-runner-test-tcp
|
||||
labels:
|
||||
app: forgejo-runner
|
||||
app: forgejo-runner-test
|
||||
spec:
|
||||
type: LoadBalancer
|
||||
ports:
|
||||
|
|
@ -12,26 +12,27 @@ spec:
|
|||
port: 80
|
||||
targetPort: 80
|
||||
selector:
|
||||
app: forgejo-runner
|
||||
app: forgejo-runner-test
|
||||
---
|
||||
apiVersion: apps/v1
|
||||
kind: Deployment
|
||||
metadata:
|
||||
labels:
|
||||
app: forgejo-runner
|
||||
name: forgejo-runner
|
||||
app: forgejo-runner-test
|
||||
name: forgejo-runner-test
|
||||
#namespace: gitea
|
||||
spec:
|
||||
# Two replicas means that if one is busy, the other can pick up jobs.
|
||||
replicas: 3
|
||||
selector:
|
||||
matchLabels:
|
||||
app: forgejo-runner
|
||||
app: forgejo-runner-test
|
||||
strategy: {}
|
||||
template:
|
||||
metadata:
|
||||
creationTimestamp: null
|
||||
labels:
|
||||
app: forgejo-runner
|
||||
app: forgejo-runner-test
|
||||
spec:
|
||||
restartPolicy: Always
|
||||
volumes:
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue