From 488fe430fb0a93f5816afeafe7ec1a0b9ecbc0b4 Mon Sep 17 00:00:00 2001 From: Stephan Lo Date: Thu, 9 Oct 2025 17:05:27 +0200 Subject: [PATCH] fix(apply): fixed client logic ... responses are now filtered as before --- .../adapters/driven/edgeconnect/adapter.go | 79 +--------- internal/application/apply/planner_test.go | 50 +++--- .../edgeconnect_client/client.go | 149 ++++++++++++++++++ internal/infrastructure/transport/parser.go | 19 ++- 4 files changed, 193 insertions(+), 104 deletions(-) diff --git a/internal/adapters/driven/edgeconnect/adapter.go b/internal/adapters/driven/edgeconnect/adapter.go index 5226281..274c8a2 100644 --- a/internal/adapters/driven/edgeconnect/adapter.go +++ b/internal/adapters/driven/edgeconnect/adapter.go @@ -2,14 +2,12 @@ package edgeconnect import ( "context" - "encoding/json" "fmt" "net/http" "edp.buildth.ing/DevFW-CICD/edge-connect-client/internal/domain" - "edp.buildth.ing/DevFW-CICD/edge-connect-client/internal/ports/driven" "edp.buildth.ing/DevFW-CICD/edge-connect-client/internal/infrastructure/edgeconnect_client" - "edp.buildth.ing/DevFW-CICD/edge-connect-client/internal/infrastructure/transport" + "edp.buildth.ing/DevFW-CICD/edge-connect-client/internal/ports/driven" ) // Adapter implements the driven ports for the EdgeConnect API. @@ -147,7 +145,7 @@ func (a *Adapter) ShowApp(ctx context.Context, region string, appKey domain.AppK _ = resp.Body.Close() }() - if err := parseStreamingResponse(resp, &apps); err != nil { + if err := a.client.ParseStreamingResponse(resp, &apps); err != nil { return nil, fmt.Errorf("ShowApp failed to parse response: %w", err) } @@ -180,7 +178,7 @@ func (a *Adapter) ShowApps(ctx context.Context, region string, appKey domain.App _ = resp.Body.Close() }() - if err := parseStreamingResponse(resp, &apiApps); err != nil { + if err := a.client.ParseStreamingResponse(resp, &apiApps); err != nil { return nil, fmt.Errorf("ShowApps failed to parse response: %w", err) } @@ -273,7 +271,7 @@ func (a *Adapter) ShowAppInstance(ctx context.Context, region string, appInstKey _ = resp.Body.Close() }() - if err := parseStreamingAppInstanceResponse(resp, &appInstances); err != nil { + if err := a.client.ParseStreamingAppInstanceResponse(resp, &appInstances); err != nil { return nil, fmt.Errorf("ShowAppInstance failed to parse response: %w", err) } @@ -306,7 +304,7 @@ func (a *Adapter) ShowAppInstances(ctx context.Context, region string, appInstKe _ = resp.Body.Close() }() - if err := parseStreamingAppInstanceResponse(resp, &appInstances); err != nil { + if err := a.client.ParseStreamingAppInstanceResponse(resp, &appInstances); err != nil { return nil, fmt.Errorf("ShowAppInstances failed to parse response: %w", err) } @@ -417,7 +415,7 @@ func (a *Adapter) ShowCloudlet(ctx context.Context, region string, cloudletKey d _ = resp.Body.Close() }() - if err := parseStreamingCloudletResponse(resp, &cloudlets); err != nil { + if err := a.client.ParseStreamingCloudletResponse(resp, &cloudlets); err != nil { return nil, fmt.Errorf("ShowCloudlet failed to parse response: %w", err) } @@ -450,7 +448,7 @@ func (a *Adapter) ShowCloudlets(ctx context.Context, region string, cloudletKey _ = resp.Body.Close() }() - if err := parseStreamingCloudletResponse(resp, &cloudlets); err != nil { + if err := a.client.ParseStreamingCloudletResponse(resp, &cloudlets); err != nil { return nil, fmt.Errorf("ShowCloudlets failed to parse response: %w", err) } @@ -528,69 +526,6 @@ func (a *Adapter) GetCloudletResourceUsage(ctx context.Context, cloudletKey doma return &usage, nil } -// Helper functions for parsing streaming responses - -func parseStreamingResponse(resp *http.Response, result interface{}) error { - var dataItems []json.RawMessage - var messages []string - - parseErr := transport.ParseJSONLines(resp.Body, func(line []byte) error { - // Try to unmarshal into a message structure first - var msg struct { - Result struct { - Message string `json:"message"` - } `json:"result"` - } - if err := json.Unmarshal(line, &msg); err == nil && msg.Result.Message != "" { - messages = append(messages, msg.Result.Message) - return nil - } - - // If it's not a message, assume it's a data object - var data struct { - Result json.RawMessage `json:"result"` - } - if err := json.Unmarshal(line, &data); err != nil { - // If we can't even unmarshal it into this basic struct, it's a problem - return fmt.Errorf("failed to unmarshal streaming line: %w", err) - } - dataItems = append(dataItems, data.Result) - return nil - }) - - if parseErr != nil { - return parseErr - } - - if len(messages) > 0 && len(dataItems) == 0 { - // If we only got messages and no data, it's likely an error response - return &edgeconnect_client.APIError{ - StatusCode: resp.StatusCode, - Messages: messages, - } - } - - // Re-marshal the collected data items and unmarshal into the final result slice - dataBytes, err := json.Marshal(dataItems) - if err != nil { - return fmt.Errorf("failed to re-marshal data items: %w", err) - } - - if err := json.Unmarshal(dataBytes, result); err != nil { - return fmt.Errorf("failed to unmarshal data into result: %w", err) - } - - return nil -} - -func parseStreamingAppInstanceResponse(resp *http.Response, result *[]edgeconnect_client.AppInstance) error { - return parseStreamingResponse(resp, result) -} - -func parseStreamingCloudletResponse(resp *http.Response, result *[]edgeconnect_client.Cloudlet) error { - return parseStreamingResponse(resp, result) -} - // Data mapping functions (domain <-> API) func toAPIApp(app *domain.App) *edgeconnect_client.App { diff --git a/internal/application/apply/planner_test.go b/internal/application/apply/planner_test.go index a4df200..9e99844 100644 --- a/internal/application/apply/planner_test.go +++ b/internal/application/apply/planner_test.go @@ -7,8 +7,8 @@ import ( "path/filepath" "testing" - "edp.buildth.ing/DevFW-CICD/edge-connect-client/internal/infrastructure/config" "edp.buildth.ing/DevFW-CICD/edge-connect-client/internal/domain" + "edp.buildth.ing/DevFW-CICD/edge-connect-client/internal/infrastructure/config" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" ) @@ -222,7 +222,7 @@ func TestPlanner_Plan_NoChange(t *testing.T) { // Mock instance found mockAppInstRepo.On("ShowAppInstance", mock.Anything, "us-west", domain.AppInstanceKey{ - Organization: "test-org", Name: "test-app-1.0.0-instance", + Organization: "test-org", Name: "test-app-1.0.0-instance", CloudletKey: domain.CloudletKey{ Organization: "cloudlet-org", Name: "cloudlet-name", @@ -694,27 +694,27 @@ func TestPlanner_Plan_AppTypeChange(t *testing.T) { AppVersion: "1.0.0", Organization: "test-org", }, - Spec: config.Spec{ - DockerApp: &config.DockerApp{}, - InfraTemplate: []config.InfraTemplate{ - { - Region: "us-west", - CloudletOrg: "cloudlet-org", - CloudletName: "cloudlet-name", - FlavorName: "m4.small", - }, - }, - }, - } - - existingApp := &domain.App{ - Key: domain.AppKey{ - Organization: "test-org", - Name: "test-app", - Version: "1.0.0", - }, - Deployment: "kubernetes", // Current is kubernetes - } + Spec: config.Spec{ + DockerApp: &config.DockerApp{}, + InfraTemplate: []config.InfraTemplate{ + { + Region: "us-west", + CloudletOrg: "cloudlet-org", + CloudletName: "cloudlet-name", + FlavorName: "m4.small", + }, + }, + }, + } + + existingApp := &domain.App{ + Key: domain.AppKey{ + Organization: "test-org", + Name: "test-app", + Version: "1.0.0", + }, + Deployment: "kubernetes", // Current is kubernetes + } // Mock app found mockAppRepo.On("ShowApp", mock.Anything, "us-west", domain.AppKey{ Organization: "test-org", @@ -752,9 +752,9 @@ func TestPlanner_Plan_AppTypeChange(t *testing.T) { assert.NotNil(t, result) assert.NotNil(t, result.Plan) assert.Equal(t, ActionUpdate, result.Plan.AppAction.Type) - assert.Contains(t, result.Plan.AppAction.Changes, "App type changed: KUBERNETES -> DOCKER") + assert.Contains(t, result.Plan.AppAction.Changes, "App type changed: k8s -> docker") assert.Equal(t, 1, result.Plan.TotalActions) // Only app update, instance is ActionNone mockAppRepo.AssertExpectations(t) mockAppInstRepo.AssertExpectations(t) -} \ No newline at end of file +} diff --git a/internal/infrastructure/edgeconnect_client/client.go b/internal/infrastructure/edgeconnect_client/client.go index 0face70..ef425e1 100644 --- a/internal/infrastructure/edgeconnect_client/client.go +++ b/internal/infrastructure/edgeconnect_client/client.go @@ -202,3 +202,152 @@ func (c *Client) handleErrorResponse(resp *http.Response, operation string) erro Body: bodyBytes, } } + +// parseStreamingResponse parses the EdgeXR streaming JSON response format +func (c *Client) ParseStreamingResponse(resp *http.Response, result interface{}) error { + var responses []Response[App] + + parseErr := transport.ParseJSONLines(resp.Body, func(line []byte) error { + var response Response[App] + if err := json.Unmarshal(line, &response); err != nil { + return err + } + responses = append(responses, response) + return nil + }) + + if parseErr != nil { + return parseErr + } + + // Extract data from responses + var apps []App + var messages []string + + for _, response := range responses { + if response.HasData() { + apps = append(apps, response.Data) + } + if response.IsMessage() { + messages = append(messages, response.Data.GetMessage()) + } + } + + // If we have error messages, return them + if len(messages) > 0 { + return &APIError{ + StatusCode: resp.StatusCode, + Messages: messages, + } + } + + // Set result based on type + switch v := result.(type) { + case *[]App: + *v = apps + default: + return fmt.Errorf("unsupported result type: %T", result) + } + + return nil +} + +// Helper functions for parsing streaming responses + +// parseStreamingAppInstanceResponse parses the EdgeXR streaming JSON response format for app instances +func (c *Client) ParseStreamingAppInstanceResponse(resp *http.Response, result interface{}) error { + var responses []Response[AppInstance] + + parseErr := transport.ParseJSONLines(resp.Body, func(line []byte) error { + var response Response[AppInstance] + if err := json.Unmarshal(line, &response); err != nil { + return err + } + responses = append(responses, response) + return nil + }) + + if parseErr != nil { + return parseErr + } + + // Extract data from responses + var appInstances []AppInstance + var messages []string + + for _, response := range responses { + if response.HasData() { + appInstances = append(appInstances, response.Data) + } + if response.IsMessage() { + messages = append(messages, response.Data.GetMessage()) + } + } + + // If we have error messages, return them + if len(messages) > 0 { + return &APIError{ + StatusCode: resp.StatusCode, + Messages: messages, + } + } + + // Set result based on type + switch v := result.(type) { + case *[]AppInstance: + *v = appInstances + default: + return fmt.Errorf("unsupported result type: %T", result) + } + + return nil +} + +// parseStreamingCloudletResponse parses the EdgeXR streaming JSON response format for cloudlets +func (c *Client) ParseStreamingCloudletResponse(resp *http.Response, result interface{}) error { + var responses []Response[Cloudlet] + + parseErr := transport.ParseJSONLines(resp.Body, func(line []byte) error { + var response Response[Cloudlet] + if err := json.Unmarshal(line, &response); err != nil { + return err + } + responses = append(responses, response) + return nil + }) + + if parseErr != nil { + return parseErr + } + + // Extract data from responses + var cloudlets []Cloudlet + var messages []string + + for _, response := range responses { + if response.HasData() { + cloudlets = append(cloudlets, response.Data) + } + if response.IsMessage() { + messages = append(messages, response.Data.GetMessage()) + } + } + + // If we have error messages, return them + if len(messages) > 0 { + return &APIError{ + StatusCode: resp.StatusCode, + Messages: messages, + } + } + + // Set result based on type + switch v := result.(type) { + case *[]Cloudlet: + *v = cloudlets + default: + return fmt.Errorf("unsupported result type: %T", result) + } + + return nil +} diff --git a/internal/infrastructure/transport/parser.go b/internal/infrastructure/transport/parser.go index d873e93..0779ee0 100644 --- a/internal/infrastructure/transport/parser.go +++ b/internal/infrastructure/transport/parser.go @@ -6,18 +6,23 @@ import ( "io" ) -// ParseJSONLines reads a response body line by line and calls a handler for each JSON object. -// This is useful for streaming JSON responses. -func ParseJSONLines(body io.Reader, handler func(line []byte) error) error { +// ParseJSONLines parses streaming JSON response line by line +func ParseJSONLines(body io.Reader, callback func([]byte) error) error { decoder := json.NewDecoder(body) - for decoder.More() { + + for { var raw json.RawMessage if err := decoder.Decode(&raw); err != nil { - return fmt.Errorf("failed to decode JSON stream: %w", err) + if err == io.EOF { + break + } + return fmt.Errorf("failed to decode JSON line: %w", err) } - if err := handler(raw); err != nil { - return fmt.Errorf("error processing JSON line: %w", err) + + if err := callback(raw); err != nil { + return err } } + return nil }