parseStreamingResponse is now unified for all objects under both versions
All checks were successful
test / test (push) Successful in 45s
All checks were successful
test / test (push) Successful in 45s
This commit is contained in:
parent
2909e0d1b4
commit
e38d7e84d5
12 changed files with 250 additions and 306 deletions
2
.gitignore
vendored
2
.gitignore
vendored
|
|
@ -5,3 +5,5 @@ dist/
|
|||
### direnv ###
|
||||
.direnv
|
||||
.envrc
|
||||
|
||||
edge-connect-client
|
||||
|
|
|
|||
2
Makefile
2
Makefile
|
|
@ -28,7 +28,7 @@ clean:
|
|||
|
||||
# Lint the code
|
||||
lint:
|
||||
golangci-lint run
|
||||
go run github.com/golangci/golangci-lint/v2/cmd/golangci-lint@v2.6.2 run
|
||||
|
||||
# Run all checks (generate, test, lint)
|
||||
check: test lint
|
||||
|
|
|
|||
|
|
@ -343,8 +343,7 @@ func (p *EdgeConnectPlanner) getCurrentInstanceState(ctx context.Context, desire
|
|||
},
|
||||
}
|
||||
|
||||
appKey := v2.AppKey{ Name: desired.AppName}
|
||||
|
||||
appKey := v2.AppKey{Name: desired.AppName}
|
||||
|
||||
instance, err := p.client.ShowAppInstance(timeoutCtx, instanceKey, appKey, desired.Region)
|
||||
if err != nil {
|
||||
|
|
|
|||
|
|
@ -586,7 +586,7 @@ func (r *RecreateStrategy) backupInstance(ctx context.Context, action InstanceAc
|
|||
},
|
||||
}
|
||||
|
||||
appKey := v2.AppKey{ Name: action.Desired.AppName }
|
||||
appKey := v2.AppKey{Name: action.Desired.AppName}
|
||||
|
||||
instance, err := r.client.ShowAppInstance(ctx, instanceKey, appKey, action.Target.Region)
|
||||
if err != nil {
|
||||
|
|
|
|||
|
|
@ -10,8 +10,7 @@ import (
|
|||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
|
||||
sdkhttp "edp.buildth.ing/DevFW-CICD/edge-connect-client/v2/sdk/internal/http"
|
||||
"strings"
|
||||
)
|
||||
|
||||
// CreateAppInstance creates a new application instance in the specified region
|
||||
|
|
@ -34,8 +33,7 @@ func (c *Client) CreateAppInstance(ctx context.Context, input *NewAppInstanceInp
|
|||
}
|
||||
|
||||
// Parse streaming JSON response
|
||||
var appInstances []AppInstance
|
||||
if err := c.parseStreamingAppInstanceResponse(resp, &appInstances); err != nil {
|
||||
if _, err = parseStreamingResponse[AppInstance](resp); err != nil {
|
||||
return fmt.Errorf("ShowAppInstance failed to parse response: %w", err)
|
||||
}
|
||||
|
||||
|
|
@ -75,7 +73,7 @@ func (c *Client) ShowAppInstance(ctx context.Context, appInstKey AppInstanceKey,
|
|||
|
||||
// Parse streaming JSON response
|
||||
var appInstances []AppInstance
|
||||
if err := c.parseStreamingAppInstanceResponse(resp, &appInstances); err != nil {
|
||||
if appInstances, err = parseStreamingResponse[AppInstance](resp); err != nil {
|
||||
return AppInstance{}, fmt.Errorf("ShowAppInstance failed to parse response: %w", err)
|
||||
}
|
||||
|
||||
|
|
@ -110,12 +108,12 @@ func (c *Client) ShowAppInstances(ctx context.Context, appInstKey AppInstanceKey
|
|||
return nil, c.handleErrorResponse(resp, "ShowAppInstances")
|
||||
}
|
||||
|
||||
var appInstances []AppInstance
|
||||
if resp.StatusCode == http.StatusNotFound {
|
||||
return appInstances, nil // Return empty slice for not found
|
||||
return []AppInstance{}, nil // Return empty slice for not found
|
||||
}
|
||||
|
||||
if err := c.parseStreamingAppInstanceResponse(resp, &appInstances); err != nil {
|
||||
var appInstances []AppInstance
|
||||
if appInstances, err = parseStreamingResponse[AppInstance](resp); err != nil {
|
||||
return nil, fmt.Errorf("ShowAppInstances failed to parse response: %w", err)
|
||||
}
|
||||
|
||||
|
|
@ -207,88 +205,90 @@ func (c *Client) DeleteAppInstance(ctx context.Context, appInstKey AppInstanceKe
|
|||
}
|
||||
|
||||
// parseStreamingAppInstanceResponse parses the EdgeXR streaming JSON response format for app instances
|
||||
func (c *Client) parseStreamingAppInstanceResponse(resp *http.Response, result interface{}) error {
|
||||
func parseStreamingResponse[T Message](resp *http.Response) ([]T, error) {
|
||||
bodyBytes, err := io.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to read response body: %w", err)
|
||||
return []T{}, fmt.Errorf("failed to read response body: %w", err)
|
||||
}
|
||||
|
||||
// Try parsing as a direct JSON array first (v2 API format)
|
||||
switch v := result.(type) {
|
||||
case *[]AppInstance:
|
||||
var appInstances []AppInstance
|
||||
if err := json.Unmarshal(bodyBytes, &appInstances); err == nil {
|
||||
*v = appInstances
|
||||
return nil
|
||||
}
|
||||
// todo finish check the responses, test them, and make a unify result, probably need
|
||||
// to update the response parameter to the message type e.g. App or AppInst
|
||||
isV2, err := isV2Response(bodyBytes)
|
||||
if err != nil {
|
||||
return []T{}, fmt.Errorf("failed to parse streaming response: %w", err)
|
||||
}
|
||||
|
||||
// Fall back to streaming format (v1 API format)
|
||||
var appInstances []AppInstance
|
||||
var messages []string
|
||||
var hasError bool
|
||||
var errorCode int
|
||||
var errorMessage string
|
||||
|
||||
parseErr := sdkhttp.ParseJSONLines(io.NopCloser(bytes.NewReader(bodyBytes)), func(line []byte) error {
|
||||
// Try parsing as ResultResponse first (error format)
|
||||
var resultResp ResultResponse
|
||||
if err := json.Unmarshal(line, &resultResp); err == nil && resultResp.Result.Message != "" {
|
||||
if resultResp.IsError() {
|
||||
hasError = true
|
||||
errorCode = resultResp.GetCode()
|
||||
errorMessage = resultResp.GetMessage()
|
||||
if isV2 {
|
||||
resultV2, err := parseStreamingResponseV2[T](resp.StatusCode, bodyBytes)
|
||||
if err != nil {
|
||||
return []T{}, err
|
||||
}
|
||||
return nil
|
||||
return resultV2, nil
|
||||
}
|
||||
|
||||
// Try parsing as Response[AppInstance]
|
||||
var response Response[AppInstance]
|
||||
if err := json.Unmarshal(line, &response); err != nil {
|
||||
return err
|
||||
resultV1, err := parseStreamingResponseV1[T](resp.StatusCode, bodyBytes)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if response.HasData() {
|
||||
appInstances = append(appInstances, response.Data)
|
||||
}
|
||||
if response.IsMessage() {
|
||||
msg := response.Data.GetMessage()
|
||||
messages = append(messages, msg)
|
||||
// Check for error indicators in messages
|
||||
if msg == "CreateError" || msg == "UpdateError" || msg == "DeleteError" {
|
||||
hasError = true
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
if parseErr != nil {
|
||||
return parseErr
|
||||
if !resultV1.IsSuccessful() {
|
||||
return []T{}, resultV1.Error()
|
||||
}
|
||||
|
||||
// If we detected an error, return it
|
||||
if hasError {
|
||||
apiErr := &APIError{
|
||||
StatusCode: resp.StatusCode,
|
||||
Messages: messages,
|
||||
}
|
||||
if errorCode > 0 {
|
||||
apiErr.StatusCode = errorCode
|
||||
apiErr.Code = fmt.Sprintf("%d", errorCode)
|
||||
}
|
||||
if errorMessage != "" {
|
||||
apiErr.Messages = append([]string{errorMessage}, apiErr.Messages...)
|
||||
}
|
||||
return apiErr
|
||||
}
|
||||
|
||||
// Set result based on type
|
||||
switch v := result.(type) {
|
||||
case *[]AppInstance:
|
||||
*v = appInstances
|
||||
default:
|
||||
return fmt.Errorf("unsupported result type: %T", result)
|
||||
}
|
||||
|
||||
return nil
|
||||
return resultV1.GetData(), nil
|
||||
}
|
||||
|
||||
func parseStreamingResponseV1[T Message](statusCode int, bodyBytes []byte) (Responses[T], error) {
|
||||
// Fall back to streaming format (v1 API format)
|
||||
var responses Responses[T]
|
||||
responses.StatusCode = statusCode
|
||||
|
||||
decoder := json.NewDecoder(bytes.NewReader(bodyBytes))
|
||||
for {
|
||||
var d Response[T]
|
||||
if err := decoder.Decode(&d); err != nil {
|
||||
if err.Error() == "EOF" {
|
||||
break
|
||||
}
|
||||
return Responses[T]{}, fmt.Errorf("error in parsing json object into Message: %w", err)
|
||||
}
|
||||
|
||||
if d.Result.Message != "" && d.Result.Code != 0 {
|
||||
responses.StatusCode = d.Result.Code
|
||||
}
|
||||
|
||||
if strings.Contains(d.Data.GetMessage(), "CreateError") {
|
||||
responses.Errors = append(responses.Errors, fmt.Errorf("server responded with: %s", "CreateError"))
|
||||
}
|
||||
|
||||
if strings.Contains(d.Data.GetMessage(), "UpdateError") {
|
||||
responses.Errors = append(responses.Errors, fmt.Errorf("server responded with: %s", "UpdateError"))
|
||||
}
|
||||
|
||||
if strings.Contains(d.Data.GetMessage(), "DeleteError") {
|
||||
responses.Errors = append(responses.Errors, fmt.Errorf("server responded with: %s", "DeleteError"))
|
||||
}
|
||||
|
||||
responses.Responses = append(responses.Responses, d)
|
||||
}
|
||||
|
||||
return responses, nil
|
||||
}
|
||||
|
||||
func isV2Response(bodyBytes []byte) (bool, error) {
|
||||
if len(bodyBytes) == 0 {
|
||||
return false, fmt.Errorf("malformatted response body")
|
||||
}
|
||||
|
||||
return bodyBytes[0] == '[', nil
|
||||
}
|
||||
|
||||
func parseStreamingResponseV2[T Message](statusCode int, bodyBytes []byte) ([]T, error) {
|
||||
var result []T
|
||||
// Try parsing as a direct JSON array first (v2 API format)
|
||||
if err := json.Unmarshal(bodyBytes, &result); err == nil {
|
||||
return result, fmt.Errorf("failed to read response body: %w", err)
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -174,7 +174,7 @@ func TestShowAppInstance(t *testing.T) {
|
|||
Name: "testcloudlet",
|
||||
},
|
||||
},
|
||||
appKey: AppKey{ Name: "testapp" },
|
||||
appKey: AppKey{Name: "testapp"},
|
||||
region: "us-west",
|
||||
mockStatusCode: 200,
|
||||
mockResponse: `{"data": {"key": {"organization": "testorg", "name": "testinst", "cloudlet_key": {"organization": "cloudletorg", "name": "testcloudlet"}}, "state": "Ready"}}
|
||||
|
|
@ -192,7 +192,7 @@ func TestShowAppInstance(t *testing.T) {
|
|||
Name: "testcloudlet",
|
||||
},
|
||||
},
|
||||
appKey: AppKey{ Name: "testapp" },
|
||||
appKey: AppKey{Name: "testapp"},
|
||||
region: "us-west",
|
||||
mockStatusCode: 404,
|
||||
mockResponse: "",
|
||||
|
|
|
|||
|
|
@ -4,9 +4,7 @@
|
|||
package v2
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
|
|
@ -73,7 +71,7 @@ func (c *Client) ShowApp(ctx context.Context, appKey AppKey, region string) (App
|
|||
|
||||
// Parse streaming JSON response
|
||||
var apps []App
|
||||
if err := c.parseStreamingResponse(resp, &apps); err != nil {
|
||||
if apps, err = parseStreamingResponse[App](resp); err != nil {
|
||||
return App{}, fmt.Errorf("ShowApp failed to parse response: %w", err)
|
||||
}
|
||||
|
||||
|
|
@ -108,12 +106,12 @@ func (c *Client) ShowApps(ctx context.Context, appKey AppKey, region string) ([]
|
|||
return nil, c.handleErrorResponse(resp, "ShowApps")
|
||||
}
|
||||
|
||||
var apps []App
|
||||
if resp.StatusCode == http.StatusNotFound {
|
||||
return apps, nil // Return empty slice for not found
|
||||
return []App{}, nil // Return empty slice for not found
|
||||
}
|
||||
|
||||
if err := c.parseStreamingResponse(resp, &apps); err != nil {
|
||||
var apps []App
|
||||
if apps, err = parseStreamingResponse[App](resp); err != nil {
|
||||
return nil, fmt.Errorf("ShowApps failed to parse response: %w", err)
|
||||
}
|
||||
|
||||
|
|
@ -175,70 +173,6 @@ func (c *Client) DeleteApp(ctx context.Context, appKey AppKey, region string) er
|
|||
return nil
|
||||
}
|
||||
|
||||
// parseStreamingResponse parses the EdgeXR streaming JSON response format
|
||||
func (c *Client) parseStreamingResponse(resp *http.Response, result interface{}) error {
|
||||
bodyBytes, err := io.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to read response body: %w", err)
|
||||
}
|
||||
|
||||
// Try parsing as a direct JSON array first (v2 API format)
|
||||
switch v := result.(type) {
|
||||
case *[]App:
|
||||
var apps []App
|
||||
if err := json.Unmarshal(bodyBytes, &apps); err == nil {
|
||||
*v = apps
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// Fall back to streaming format (v1 API format)
|
||||
var responses []Response[App]
|
||||
var apps []App
|
||||
var messages []string
|
||||
|
||||
parseErr := sdkhttp.ParseJSONLines(io.NopCloser(bytes.NewReader(bodyBytes)), func(line []byte) error {
|
||||
var response Response[App]
|
||||
if err := json.Unmarshal(line, &response); err != nil {
|
||||
return err
|
||||
}
|
||||
responses = append(responses, response)
|
||||
return nil
|
||||
})
|
||||
|
||||
if parseErr != nil {
|
||||
return parseErr
|
||||
}
|
||||
|
||||
// Extract data from responses
|
||||
for _, response := range responses {
|
||||
if response.HasData() {
|
||||
apps = append(apps, response.Data)
|
||||
}
|
||||
if response.IsMessage() {
|
||||
messages = append(messages, response.Data.GetMessage())
|
||||
}
|
||||
}
|
||||
|
||||
// If we have error messages, return them
|
||||
if len(messages) > 0 {
|
||||
return &APIError{
|
||||
StatusCode: resp.StatusCode,
|
||||
Messages: messages,
|
||||
}
|
||||
}
|
||||
|
||||
// Set result based on type
|
||||
switch v := result.(type) {
|
||||
case *[]App:
|
||||
*v = apps
|
||||
default:
|
||||
return fmt.Errorf("unsupported result type: %T", result)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// getTransport creates an HTTP transport with current client settings
|
||||
func (c *Client) getTransport() *sdkhttp.Transport {
|
||||
return sdkhttp.NewTransport(
|
||||
|
|
|
|||
|
|
@ -291,6 +291,7 @@ type DeleteAppInstanceInput struct {
|
|||
|
||||
// Response wraps a single API response
|
||||
type Response[T Message] struct {
|
||||
ResultResponse `json:",inline"`
|
||||
Data T `json:"data"`
|
||||
}
|
||||
|
||||
|
|
@ -326,6 +327,7 @@ func (r *ResultResponse) GetCode() int {
|
|||
type Responses[T Message] struct {
|
||||
Responses []Response[T] `json:"responses,omitempty"`
|
||||
StatusCode int `json:"-"`
|
||||
Errors []error `json:"-"`
|
||||
}
|
||||
|
||||
func (r *Responses[T]) GetData() []T {
|
||||
|
|
@ -344,12 +346,15 @@ func (r *Responses[T]) GetMessages() []string {
|
|||
if v.IsMessage() {
|
||||
messages = append(messages, v.Data.GetMessage())
|
||||
}
|
||||
if v.Result.Message != "" {
|
||||
messages = append(messages, v.Result.Message)
|
||||
}
|
||||
}
|
||||
return messages
|
||||
}
|
||||
|
||||
func (r *Responses[T]) IsSuccessful() bool {
|
||||
return r.StatusCode >= 200 && r.StatusCode < 400
|
||||
return len(r.Errors) == 0 && (r.StatusCode >= 200 && r.StatusCode < 400)
|
||||
}
|
||||
|
||||
func (r *Responses[T]) Error() error {
|
||||
|
|
@ -410,3 +415,7 @@ type CloudletResourceUsage struct {
|
|||
Region string `json:"region"`
|
||||
Usage map[string]interface{} `json:"usage"`
|
||||
}
|
||||
|
||||
type ErrorMessage struct {
|
||||
Message string
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue