fix(apply): fixed client logic ... responses are now filtered as before

This commit is contained in:
Stephan Lo 2025-10-09 17:05:27 +02:00
parent 00487bec7c
commit 488fe430fb
4 changed files with 193 additions and 104 deletions

View file

@ -2,14 +2,12 @@ package edgeconnect
import (
"context"
"encoding/json"
"fmt"
"net/http"
"edp.buildth.ing/DevFW-CICD/edge-connect-client/internal/domain"
"edp.buildth.ing/DevFW-CICD/edge-connect-client/internal/ports/driven"
"edp.buildth.ing/DevFW-CICD/edge-connect-client/internal/infrastructure/edgeconnect_client"
"edp.buildth.ing/DevFW-CICD/edge-connect-client/internal/infrastructure/transport"
"edp.buildth.ing/DevFW-CICD/edge-connect-client/internal/ports/driven"
)
// Adapter implements the driven ports for the EdgeConnect API.
@ -147,7 +145,7 @@ func (a *Adapter) ShowApp(ctx context.Context, region string, appKey domain.AppK
_ = resp.Body.Close()
}()
if err := parseStreamingResponse(resp, &apps); err != nil {
if err := a.client.ParseStreamingResponse(resp, &apps); err != nil {
return nil, fmt.Errorf("ShowApp failed to parse response: %w", err)
}
@ -180,7 +178,7 @@ func (a *Adapter) ShowApps(ctx context.Context, region string, appKey domain.App
_ = resp.Body.Close()
}()
if err := parseStreamingResponse(resp, &apiApps); err != nil {
if err := a.client.ParseStreamingResponse(resp, &apiApps); err != nil {
return nil, fmt.Errorf("ShowApps failed to parse response: %w", err)
}
@ -273,7 +271,7 @@ func (a *Adapter) ShowAppInstance(ctx context.Context, region string, appInstKey
_ = resp.Body.Close()
}()
if err := parseStreamingAppInstanceResponse(resp, &appInstances); err != nil {
if err := a.client.ParseStreamingAppInstanceResponse(resp, &appInstances); err != nil {
return nil, fmt.Errorf("ShowAppInstance failed to parse response: %w", err)
}
@ -306,7 +304,7 @@ func (a *Adapter) ShowAppInstances(ctx context.Context, region string, appInstKe
_ = resp.Body.Close()
}()
if err := parseStreamingAppInstanceResponse(resp, &appInstances); err != nil {
if err := a.client.ParseStreamingAppInstanceResponse(resp, &appInstances); err != nil {
return nil, fmt.Errorf("ShowAppInstances failed to parse response: %w", err)
}
@ -417,7 +415,7 @@ func (a *Adapter) ShowCloudlet(ctx context.Context, region string, cloudletKey d
_ = resp.Body.Close()
}()
if err := parseStreamingCloudletResponse(resp, &cloudlets); err != nil {
if err := a.client.ParseStreamingCloudletResponse(resp, &cloudlets); err != nil {
return nil, fmt.Errorf("ShowCloudlet failed to parse response: %w", err)
}
@ -450,7 +448,7 @@ func (a *Adapter) ShowCloudlets(ctx context.Context, region string, cloudletKey
_ = resp.Body.Close()
}()
if err := parseStreamingCloudletResponse(resp, &cloudlets); err != nil {
if err := a.client.ParseStreamingCloudletResponse(resp, &cloudlets); err != nil {
return nil, fmt.Errorf("ShowCloudlets failed to parse response: %w", err)
}
@ -528,69 +526,6 @@ func (a *Adapter) GetCloudletResourceUsage(ctx context.Context, cloudletKey doma
return &usage, nil
}
// Helper functions for parsing streaming responses
func parseStreamingResponse(resp *http.Response, result interface{}) error {
var dataItems []json.RawMessage
var messages []string
parseErr := transport.ParseJSONLines(resp.Body, func(line []byte) error {
// Try to unmarshal into a message structure first
var msg struct {
Result struct {
Message string `json:"message"`
} `json:"result"`
}
if err := json.Unmarshal(line, &msg); err == nil && msg.Result.Message != "" {
messages = append(messages, msg.Result.Message)
return nil
}
// If it's not a message, assume it's a data object
var data struct {
Result json.RawMessage `json:"result"`
}
if err := json.Unmarshal(line, &data); err != nil {
// If we can't even unmarshal it into this basic struct, it's a problem
return fmt.Errorf("failed to unmarshal streaming line: %w", err)
}
dataItems = append(dataItems, data.Result)
return nil
})
if parseErr != nil {
return parseErr
}
if len(messages) > 0 && len(dataItems) == 0 {
// If we only got messages and no data, it's likely an error response
return &edgeconnect_client.APIError{
StatusCode: resp.StatusCode,
Messages: messages,
}
}
// Re-marshal the collected data items and unmarshal into the final result slice
dataBytes, err := json.Marshal(dataItems)
if err != nil {
return fmt.Errorf("failed to re-marshal data items: %w", err)
}
if err := json.Unmarshal(dataBytes, result); err != nil {
return fmt.Errorf("failed to unmarshal data into result: %w", err)
}
return nil
}
func parseStreamingAppInstanceResponse(resp *http.Response, result *[]edgeconnect_client.AppInstance) error {
return parseStreamingResponse(resp, result)
}
func parseStreamingCloudletResponse(resp *http.Response, result *[]edgeconnect_client.Cloudlet) error {
return parseStreamingResponse(resp, result)
}
// Data mapping functions (domain <-> API)
func toAPIApp(app *domain.App) *edgeconnect_client.App {

View file

@ -7,8 +7,8 @@ import (
"path/filepath"
"testing"
"edp.buildth.ing/DevFW-CICD/edge-connect-client/internal/infrastructure/config"
"edp.buildth.ing/DevFW-CICD/edge-connect-client/internal/domain"
"edp.buildth.ing/DevFW-CICD/edge-connect-client/internal/infrastructure/config"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
)
@ -222,7 +222,7 @@ func TestPlanner_Plan_NoChange(t *testing.T) {
// Mock instance found
mockAppInstRepo.On("ShowAppInstance", mock.Anything, "us-west", domain.AppInstanceKey{
Organization: "test-org", Name: "test-app-1.0.0-instance",
Organization: "test-org", Name: "test-app-1.0.0-instance",
CloudletKey: domain.CloudletKey{
Organization: "cloudlet-org",
Name: "cloudlet-name",
@ -694,27 +694,27 @@ func TestPlanner_Plan_AppTypeChange(t *testing.T) {
AppVersion: "1.0.0",
Organization: "test-org",
},
Spec: config.Spec{
DockerApp: &config.DockerApp{},
InfraTemplate: []config.InfraTemplate{
{
Region: "us-west",
CloudletOrg: "cloudlet-org",
CloudletName: "cloudlet-name",
FlavorName: "m4.small",
},
},
},
}
Spec: config.Spec{
DockerApp: &config.DockerApp{},
InfraTemplate: []config.InfraTemplate{
{
Region: "us-west",
CloudletOrg: "cloudlet-org",
CloudletName: "cloudlet-name",
FlavorName: "m4.small",
},
},
},
}
existingApp := &domain.App{
Key: domain.AppKey{
Organization: "test-org",
Name: "test-app",
Version: "1.0.0",
},
Deployment: "kubernetes", // Current is kubernetes
}
existingApp := &domain.App{
Key: domain.AppKey{
Organization: "test-org",
Name: "test-app",
Version: "1.0.0",
},
Deployment: "kubernetes", // Current is kubernetes
}
// Mock app found
mockAppRepo.On("ShowApp", mock.Anything, "us-west", domain.AppKey{
Organization: "test-org",
@ -752,7 +752,7 @@ func TestPlanner_Plan_AppTypeChange(t *testing.T) {
assert.NotNil(t, result)
assert.NotNil(t, result.Plan)
assert.Equal(t, ActionUpdate, result.Plan.AppAction.Type)
assert.Contains(t, result.Plan.AppAction.Changes, "App type changed: KUBERNETES -> DOCKER")
assert.Contains(t, result.Plan.AppAction.Changes, "App type changed: k8s -> docker")
assert.Equal(t, 1, result.Plan.TotalActions) // Only app update, instance is ActionNone
mockAppRepo.AssertExpectations(t)

View file

@ -202,3 +202,152 @@ func (c *Client) handleErrorResponse(resp *http.Response, operation string) erro
Body: bodyBytes,
}
}
// parseStreamingResponse parses the EdgeXR streaming JSON response format
func (c *Client) ParseStreamingResponse(resp *http.Response, result interface{}) error {
var responses []Response[App]
parseErr := transport.ParseJSONLines(resp.Body, func(line []byte) error {
var response Response[App]
if err := json.Unmarshal(line, &response); err != nil {
return err
}
responses = append(responses, response)
return nil
})
if parseErr != nil {
return parseErr
}
// Extract data from responses
var apps []App
var messages []string
for _, response := range responses {
if response.HasData() {
apps = append(apps, response.Data)
}
if response.IsMessage() {
messages = append(messages, response.Data.GetMessage())
}
}
// If we have error messages, return them
if len(messages) > 0 {
return &APIError{
StatusCode: resp.StatusCode,
Messages: messages,
}
}
// Set result based on type
switch v := result.(type) {
case *[]App:
*v = apps
default:
return fmt.Errorf("unsupported result type: %T", result)
}
return nil
}
// Helper functions for parsing streaming responses
// parseStreamingAppInstanceResponse parses the EdgeXR streaming JSON response format for app instances
func (c *Client) ParseStreamingAppInstanceResponse(resp *http.Response, result interface{}) error {
var responses []Response[AppInstance]
parseErr := transport.ParseJSONLines(resp.Body, func(line []byte) error {
var response Response[AppInstance]
if err := json.Unmarshal(line, &response); err != nil {
return err
}
responses = append(responses, response)
return nil
})
if parseErr != nil {
return parseErr
}
// Extract data from responses
var appInstances []AppInstance
var messages []string
for _, response := range responses {
if response.HasData() {
appInstances = append(appInstances, response.Data)
}
if response.IsMessage() {
messages = append(messages, response.Data.GetMessage())
}
}
// If we have error messages, return them
if len(messages) > 0 {
return &APIError{
StatusCode: resp.StatusCode,
Messages: messages,
}
}
// Set result based on type
switch v := result.(type) {
case *[]AppInstance:
*v = appInstances
default:
return fmt.Errorf("unsupported result type: %T", result)
}
return nil
}
// parseStreamingCloudletResponse parses the EdgeXR streaming JSON response format for cloudlets
func (c *Client) ParseStreamingCloudletResponse(resp *http.Response, result interface{}) error {
var responses []Response[Cloudlet]
parseErr := transport.ParseJSONLines(resp.Body, func(line []byte) error {
var response Response[Cloudlet]
if err := json.Unmarshal(line, &response); err != nil {
return err
}
responses = append(responses, response)
return nil
})
if parseErr != nil {
return parseErr
}
// Extract data from responses
var cloudlets []Cloudlet
var messages []string
for _, response := range responses {
if response.HasData() {
cloudlets = append(cloudlets, response.Data)
}
if response.IsMessage() {
messages = append(messages, response.Data.GetMessage())
}
}
// If we have error messages, return them
if len(messages) > 0 {
return &APIError{
StatusCode: resp.StatusCode,
Messages: messages,
}
}
// Set result based on type
switch v := result.(type) {
case *[]Cloudlet:
*v = cloudlets
default:
return fmt.Errorf("unsupported result type: %T", result)
}
return nil
}

View file

@ -6,18 +6,23 @@ import (
"io"
)
// ParseJSONLines reads a response body line by line and calls a handler for each JSON object.
// This is useful for streaming JSON responses.
func ParseJSONLines(body io.Reader, handler func(line []byte) error) error {
// ParseJSONLines parses streaming JSON response line by line
func ParseJSONLines(body io.Reader, callback func([]byte) error) error {
decoder := json.NewDecoder(body)
for decoder.More() {
for {
var raw json.RawMessage
if err := decoder.Decode(&raw); err != nil {
return fmt.Errorf("failed to decode JSON stream: %w", err)
if err == io.EOF {
break
}
return fmt.Errorf("failed to decode JSON line: %w", err)
}
if err := handler(raw); err != nil {
return fmt.Errorf("error processing JSON line: %w", err)
if err := callback(raw); err != nil {
return err
}
}
return nil
}