parseStreamingResponse is now unified for all objects under both versions
All checks were successful
test / test (push) Successful in 45s
All checks were successful
test / test (push) Successful in 45s
This commit is contained in:
parent
2909e0d1b4
commit
e38d7e84d5
12 changed files with 250 additions and 306 deletions
2
.gitignore
vendored
2
.gitignore
vendored
|
|
@ -5,3 +5,5 @@ dist/
|
||||||
### direnv ###
|
### direnv ###
|
||||||
.direnv
|
.direnv
|
||||||
.envrc
|
.envrc
|
||||||
|
|
||||||
|
edge-connect-client
|
||||||
|
|
|
||||||
2
Makefile
2
Makefile
|
|
@ -28,7 +28,7 @@ clean:
|
||||||
|
|
||||||
# Lint the code
|
# Lint the code
|
||||||
lint:
|
lint:
|
||||||
golangci-lint run
|
go run github.com/golangci/golangci-lint/v2/cmd/golangci-lint@v2.6.2 run
|
||||||
|
|
||||||
# Run all checks (generate, test, lint)
|
# Run all checks (generate, test, lint)
|
||||||
check: test lint
|
check: test lint
|
||||||
|
|
|
||||||
|
|
@ -343,8 +343,7 @@ func (p *EdgeConnectPlanner) getCurrentInstanceState(ctx context.Context, desire
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
appKey := v2.AppKey{ Name: desired.AppName}
|
appKey := v2.AppKey{Name: desired.AppName}
|
||||||
|
|
||||||
|
|
||||||
instance, err := p.client.ShowAppInstance(timeoutCtx, instanceKey, appKey, desired.Region)
|
instance, err := p.client.ShowAppInstance(timeoutCtx, instanceKey, appKey, desired.Region)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
||||||
|
|
@ -586,7 +586,7 @@ func (r *RecreateStrategy) backupInstance(ctx context.Context, action InstanceAc
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
appKey := v2.AppKey{ Name: action.Desired.AppName }
|
appKey := v2.AppKey{Name: action.Desired.AppName}
|
||||||
|
|
||||||
instance, err := r.client.ShowAppInstance(ctx, instanceKey, appKey, action.Target.Region)
|
instance, err := r.client.ShowAppInstance(ctx, instanceKey, appKey, action.Target.Region)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
||||||
|
|
@ -10,8 +10,7 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"strings"
|
||||||
sdkhttp "edp.buildth.ing/DevFW-CICD/edge-connect-client/v2/sdk/internal/http"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// CreateAppInstance creates a new application instance in the specified region
|
// CreateAppInstance creates a new application instance in the specified region
|
||||||
|
|
@ -34,8 +33,7 @@ func (c *Client) CreateAppInstance(ctx context.Context, input *NewAppInstanceInp
|
||||||
}
|
}
|
||||||
|
|
||||||
// Parse streaming JSON response
|
// Parse streaming JSON response
|
||||||
var appInstances []AppInstance
|
if _, err = parseStreamingResponse[AppInstance](resp); err != nil {
|
||||||
if err := c.parseStreamingAppInstanceResponse(resp, &appInstances); err != nil {
|
|
||||||
return fmt.Errorf("ShowAppInstance failed to parse response: %w", err)
|
return fmt.Errorf("ShowAppInstance failed to parse response: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -75,7 +73,7 @@ func (c *Client) ShowAppInstance(ctx context.Context, appInstKey AppInstanceKey,
|
||||||
|
|
||||||
// Parse streaming JSON response
|
// Parse streaming JSON response
|
||||||
var appInstances []AppInstance
|
var appInstances []AppInstance
|
||||||
if err := c.parseStreamingAppInstanceResponse(resp, &appInstances); err != nil {
|
if appInstances, err = parseStreamingResponse[AppInstance](resp); err != nil {
|
||||||
return AppInstance{}, fmt.Errorf("ShowAppInstance failed to parse response: %w", err)
|
return AppInstance{}, fmt.Errorf("ShowAppInstance failed to parse response: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -110,12 +108,12 @@ func (c *Client) ShowAppInstances(ctx context.Context, appInstKey AppInstanceKey
|
||||||
return nil, c.handleErrorResponse(resp, "ShowAppInstances")
|
return nil, c.handleErrorResponse(resp, "ShowAppInstances")
|
||||||
}
|
}
|
||||||
|
|
||||||
var appInstances []AppInstance
|
|
||||||
if resp.StatusCode == http.StatusNotFound {
|
if resp.StatusCode == http.StatusNotFound {
|
||||||
return appInstances, nil // Return empty slice for not found
|
return []AppInstance{}, nil // Return empty slice for not found
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := c.parseStreamingAppInstanceResponse(resp, &appInstances); err != nil {
|
var appInstances []AppInstance
|
||||||
|
if appInstances, err = parseStreamingResponse[AppInstance](resp); err != nil {
|
||||||
return nil, fmt.Errorf("ShowAppInstances failed to parse response: %w", err)
|
return nil, fmt.Errorf("ShowAppInstances failed to parse response: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -207,88 +205,90 @@ func (c *Client) DeleteAppInstance(ctx context.Context, appInstKey AppInstanceKe
|
||||||
}
|
}
|
||||||
|
|
||||||
// parseStreamingAppInstanceResponse parses the EdgeXR streaming JSON response format for app instances
|
// parseStreamingAppInstanceResponse parses the EdgeXR streaming JSON response format for app instances
|
||||||
func (c *Client) parseStreamingAppInstanceResponse(resp *http.Response, result interface{}) error {
|
func parseStreamingResponse[T Message](resp *http.Response) ([]T, error) {
|
||||||
bodyBytes, err := io.ReadAll(resp.Body)
|
bodyBytes, err := io.ReadAll(resp.Body)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to read response body: %w", err)
|
return []T{}, fmt.Errorf("failed to read response body: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Try parsing as a direct JSON array first (v2 API format)
|
// todo finish check the responses, test them, and make a unify result, probably need
|
||||||
switch v := result.(type) {
|
// to update the response parameter to the message type e.g. App or AppInst
|
||||||
case *[]AppInstance:
|
isV2, err := isV2Response(bodyBytes)
|
||||||
var appInstances []AppInstance
|
if err != nil {
|
||||||
if err := json.Unmarshal(bodyBytes, &appInstances); err == nil {
|
return []T{}, fmt.Errorf("failed to parse streaming response: %w", err)
|
||||||
*v = appInstances
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Fall back to streaming format (v1 API format)
|
if isV2 {
|
||||||
var appInstances []AppInstance
|
resultV2, err := parseStreamingResponseV2[T](resp.StatusCode, bodyBytes)
|
||||||
var messages []string
|
if err != nil {
|
||||||
var hasError bool
|
return []T{}, err
|
||||||
var errorCode int
|
|
||||||
var errorMessage string
|
|
||||||
|
|
||||||
parseErr := sdkhttp.ParseJSONLines(io.NopCloser(bytes.NewReader(bodyBytes)), func(line []byte) error {
|
|
||||||
// Try parsing as ResultResponse first (error format)
|
|
||||||
var resultResp ResultResponse
|
|
||||||
if err := json.Unmarshal(line, &resultResp); err == nil && resultResp.Result.Message != "" {
|
|
||||||
if resultResp.IsError() {
|
|
||||||
hasError = true
|
|
||||||
errorCode = resultResp.GetCode()
|
|
||||||
errorMessage = resultResp.GetMessage()
|
|
||||||
}
|
}
|
||||||
return nil
|
return resultV2, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Try parsing as Response[AppInstance]
|
resultV1, err := parseStreamingResponseV1[T](resp.StatusCode, bodyBytes)
|
||||||
var response Response[AppInstance]
|
if err != nil {
|
||||||
if err := json.Unmarshal(line, &response); err != nil {
|
return nil, err
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if response.HasData() {
|
if !resultV1.IsSuccessful() {
|
||||||
appInstances = append(appInstances, response.Data)
|
return []T{}, resultV1.Error()
|
||||||
}
|
|
||||||
if response.IsMessage() {
|
|
||||||
msg := response.Data.GetMessage()
|
|
||||||
messages = append(messages, msg)
|
|
||||||
// Check for error indicators in messages
|
|
||||||
if msg == "CreateError" || msg == "UpdateError" || msg == "DeleteError" {
|
|
||||||
hasError = true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
|
|
||||||
if parseErr != nil {
|
|
||||||
return parseErr
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// If we detected an error, return it
|
return resultV1.GetData(), nil
|
||||||
if hasError {
|
}
|
||||||
apiErr := &APIError{
|
|
||||||
StatusCode: resp.StatusCode,
|
func parseStreamingResponseV1[T Message](statusCode int, bodyBytes []byte) (Responses[T], error) {
|
||||||
Messages: messages,
|
// Fall back to streaming format (v1 API format)
|
||||||
}
|
var responses Responses[T]
|
||||||
if errorCode > 0 {
|
responses.StatusCode = statusCode
|
||||||
apiErr.StatusCode = errorCode
|
|
||||||
apiErr.Code = fmt.Sprintf("%d", errorCode)
|
decoder := json.NewDecoder(bytes.NewReader(bodyBytes))
|
||||||
}
|
for {
|
||||||
if errorMessage != "" {
|
var d Response[T]
|
||||||
apiErr.Messages = append([]string{errorMessage}, apiErr.Messages...)
|
if err := decoder.Decode(&d); err != nil {
|
||||||
}
|
if err.Error() == "EOF" {
|
||||||
return apiErr
|
break
|
||||||
}
|
}
|
||||||
|
return Responses[T]{}, fmt.Errorf("error in parsing json object into Message: %w", err)
|
||||||
// Set result based on type
|
}
|
||||||
switch v := result.(type) {
|
|
||||||
case *[]AppInstance:
|
if d.Result.Message != "" && d.Result.Code != 0 {
|
||||||
*v = appInstances
|
responses.StatusCode = d.Result.Code
|
||||||
default:
|
}
|
||||||
return fmt.Errorf("unsupported result type: %T", result)
|
|
||||||
}
|
if strings.Contains(d.Data.GetMessage(), "CreateError") {
|
||||||
|
responses.Errors = append(responses.Errors, fmt.Errorf("server responded with: %s", "CreateError"))
|
||||||
return nil
|
}
|
||||||
|
|
||||||
|
if strings.Contains(d.Data.GetMessage(), "UpdateError") {
|
||||||
|
responses.Errors = append(responses.Errors, fmt.Errorf("server responded with: %s", "UpdateError"))
|
||||||
|
}
|
||||||
|
|
||||||
|
if strings.Contains(d.Data.GetMessage(), "DeleteError") {
|
||||||
|
responses.Errors = append(responses.Errors, fmt.Errorf("server responded with: %s", "DeleteError"))
|
||||||
|
}
|
||||||
|
|
||||||
|
responses.Responses = append(responses.Responses, d)
|
||||||
|
}
|
||||||
|
|
||||||
|
return responses, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func isV2Response(bodyBytes []byte) (bool, error) {
|
||||||
|
if len(bodyBytes) == 0 {
|
||||||
|
return false, fmt.Errorf("malformatted response body")
|
||||||
|
}
|
||||||
|
|
||||||
|
return bodyBytes[0] == '[', nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func parseStreamingResponseV2[T Message](statusCode int, bodyBytes []byte) ([]T, error) {
|
||||||
|
var result []T
|
||||||
|
// Try parsing as a direct JSON array first (v2 API format)
|
||||||
|
if err := json.Unmarshal(bodyBytes, &result); err == nil {
|
||||||
|
return result, fmt.Errorf("failed to read response body: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -174,7 +174,7 @@ func TestShowAppInstance(t *testing.T) {
|
||||||
Name: "testcloudlet",
|
Name: "testcloudlet",
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
appKey: AppKey{ Name: "testapp" },
|
appKey: AppKey{Name: "testapp"},
|
||||||
region: "us-west",
|
region: "us-west",
|
||||||
mockStatusCode: 200,
|
mockStatusCode: 200,
|
||||||
mockResponse: `{"data": {"key": {"organization": "testorg", "name": "testinst", "cloudlet_key": {"organization": "cloudletorg", "name": "testcloudlet"}}, "state": "Ready"}}
|
mockResponse: `{"data": {"key": {"organization": "testorg", "name": "testinst", "cloudlet_key": {"organization": "cloudletorg", "name": "testcloudlet"}}, "state": "Ready"}}
|
||||||
|
|
@ -192,7 +192,7 @@ func TestShowAppInstance(t *testing.T) {
|
||||||
Name: "testcloudlet",
|
Name: "testcloudlet",
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
appKey: AppKey{ Name: "testapp" },
|
appKey: AppKey{Name: "testapp"},
|
||||||
region: "us-west",
|
region: "us-west",
|
||||||
mockStatusCode: 404,
|
mockStatusCode: 404,
|
||||||
mockResponse: "",
|
mockResponse: "",
|
||||||
|
|
|
||||||
|
|
@ -4,9 +4,7 @@
|
||||||
package v2
|
package v2
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
|
@ -73,7 +71,7 @@ func (c *Client) ShowApp(ctx context.Context, appKey AppKey, region string) (App
|
||||||
|
|
||||||
// Parse streaming JSON response
|
// Parse streaming JSON response
|
||||||
var apps []App
|
var apps []App
|
||||||
if err := c.parseStreamingResponse(resp, &apps); err != nil {
|
if apps, err = parseStreamingResponse[App](resp); err != nil {
|
||||||
return App{}, fmt.Errorf("ShowApp failed to parse response: %w", err)
|
return App{}, fmt.Errorf("ShowApp failed to parse response: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -108,12 +106,12 @@ func (c *Client) ShowApps(ctx context.Context, appKey AppKey, region string) ([]
|
||||||
return nil, c.handleErrorResponse(resp, "ShowApps")
|
return nil, c.handleErrorResponse(resp, "ShowApps")
|
||||||
}
|
}
|
||||||
|
|
||||||
var apps []App
|
|
||||||
if resp.StatusCode == http.StatusNotFound {
|
if resp.StatusCode == http.StatusNotFound {
|
||||||
return apps, nil // Return empty slice for not found
|
return []App{}, nil // Return empty slice for not found
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := c.parseStreamingResponse(resp, &apps); err != nil {
|
var apps []App
|
||||||
|
if apps, err = parseStreamingResponse[App](resp); err != nil {
|
||||||
return nil, fmt.Errorf("ShowApps failed to parse response: %w", err)
|
return nil, fmt.Errorf("ShowApps failed to parse response: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -175,70 +173,6 @@ func (c *Client) DeleteApp(ctx context.Context, appKey AppKey, region string) er
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// parseStreamingResponse parses the EdgeXR streaming JSON response format
|
|
||||||
func (c *Client) parseStreamingResponse(resp *http.Response, result interface{}) error {
|
|
||||||
bodyBytes, err := io.ReadAll(resp.Body)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("failed to read response body: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Try parsing as a direct JSON array first (v2 API format)
|
|
||||||
switch v := result.(type) {
|
|
||||||
case *[]App:
|
|
||||||
var apps []App
|
|
||||||
if err := json.Unmarshal(bodyBytes, &apps); err == nil {
|
|
||||||
*v = apps
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Fall back to streaming format (v1 API format)
|
|
||||||
var responses []Response[App]
|
|
||||||
var apps []App
|
|
||||||
var messages []string
|
|
||||||
|
|
||||||
parseErr := sdkhttp.ParseJSONLines(io.NopCloser(bytes.NewReader(bodyBytes)), func(line []byte) error {
|
|
||||||
var response Response[App]
|
|
||||||
if err := json.Unmarshal(line, &response); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
responses = append(responses, response)
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
|
|
||||||
if parseErr != nil {
|
|
||||||
return parseErr
|
|
||||||
}
|
|
||||||
|
|
||||||
// Extract data from responses
|
|
||||||
for _, response := range responses {
|
|
||||||
if response.HasData() {
|
|
||||||
apps = append(apps, response.Data)
|
|
||||||
}
|
|
||||||
if response.IsMessage() {
|
|
||||||
messages = append(messages, response.Data.GetMessage())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// If we have error messages, return them
|
|
||||||
if len(messages) > 0 {
|
|
||||||
return &APIError{
|
|
||||||
StatusCode: resp.StatusCode,
|
|
||||||
Messages: messages,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Set result based on type
|
|
||||||
switch v := result.(type) {
|
|
||||||
case *[]App:
|
|
||||||
*v = apps
|
|
||||||
default:
|
|
||||||
return fmt.Errorf("unsupported result type: %T", result)
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// getTransport creates an HTTP transport with current client settings
|
// getTransport creates an HTTP transport with current client settings
|
||||||
func (c *Client) getTransport() *sdkhttp.Transport {
|
func (c *Client) getTransport() *sdkhttp.Transport {
|
||||||
return sdkhttp.NewTransport(
|
return sdkhttp.NewTransport(
|
||||||
|
|
|
||||||
|
|
@ -291,6 +291,7 @@ type DeleteAppInstanceInput struct {
|
||||||
|
|
||||||
// Response wraps a single API response
|
// Response wraps a single API response
|
||||||
type Response[T Message] struct {
|
type Response[T Message] struct {
|
||||||
|
ResultResponse `json:",inline"`
|
||||||
Data T `json:"data"`
|
Data T `json:"data"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -326,6 +327,7 @@ func (r *ResultResponse) GetCode() int {
|
||||||
type Responses[T Message] struct {
|
type Responses[T Message] struct {
|
||||||
Responses []Response[T] `json:"responses,omitempty"`
|
Responses []Response[T] `json:"responses,omitempty"`
|
||||||
StatusCode int `json:"-"`
|
StatusCode int `json:"-"`
|
||||||
|
Errors []error `json:"-"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Responses[T]) GetData() []T {
|
func (r *Responses[T]) GetData() []T {
|
||||||
|
|
@ -344,12 +346,15 @@ func (r *Responses[T]) GetMessages() []string {
|
||||||
if v.IsMessage() {
|
if v.IsMessage() {
|
||||||
messages = append(messages, v.Data.GetMessage())
|
messages = append(messages, v.Data.GetMessage())
|
||||||
}
|
}
|
||||||
|
if v.Result.Message != "" {
|
||||||
|
messages = append(messages, v.Result.Message)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return messages
|
return messages
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Responses[T]) IsSuccessful() bool {
|
func (r *Responses[T]) IsSuccessful() bool {
|
||||||
return r.StatusCode >= 200 && r.StatusCode < 400
|
return len(r.Errors) == 0 && (r.StatusCode >= 200 && r.StatusCode < 400)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Responses[T]) Error() error {
|
func (r *Responses[T]) Error() error {
|
||||||
|
|
@ -410,3 +415,7 @@ type CloudletResourceUsage struct {
|
||||||
Region string `json:"region"`
|
Region string `json:"region"`
|
||||||
Usage map[string]interface{} `json:"usage"`
|
Usage map[string]interface{} `json:"usage"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type ErrorMessage struct {
|
||||||
|
Message string
|
||||||
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue