// ABOUTME: Resource management for EdgeConnect apply command with deployment execution and rollback // ABOUTME: Handles actual deployment operations, manifest processing, and error recovery with parallel execution package apply import ( "context" "fmt" "io" "os" "sync" "time" "edp.buildth.ing/DevFW-CICD/edge-connect-client/sdk/config" "edp.buildth.ing/DevFW-CICD/edge-connect-client/sdk/edgeconnect" ) // ResourceManagerInterface defines the interface for resource management type ResourceManagerInterface interface { // ApplyDeployment executes a deployment plan ApplyDeployment(ctx context.Context, plan *DeploymentPlan, config *config.EdgeConnectConfig) (*ExecutionResult, error) // RollbackDeployment attempts to rollback a failed deployment RollbackDeployment(ctx context.Context, result *ExecutionResult) error // ValidatePrerequisites checks if deployment prerequisites are met ValidatePrerequisites(ctx context.Context, plan *DeploymentPlan) error } // EdgeConnectResourceManager implements resource management for EdgeConnect type EdgeConnectResourceManager struct { client EdgeConnectClientInterface parallelLimit int rollbackOnFail bool logger Logger } // Logger interface for deployment logging type Logger interface { Printf(format string, v ...interface{}) } // ResourceManagerOptions configures the resource manager behavior type ResourceManagerOptions struct { // ParallelLimit controls how many operations run concurrently ParallelLimit int // RollbackOnFail automatically rolls back on deployment failure RollbackOnFail bool // Logger for deployment operations Logger Logger // Timeout for individual operations OperationTimeout time.Duration } // DefaultResourceManagerOptions returns sensible defaults func DefaultResourceManagerOptions() ResourceManagerOptions { return ResourceManagerOptions{ ParallelLimit: 5, // Conservative parallel limit RollbackOnFail: true, OperationTimeout: 2 * time.Minute, } } // NewResourceManager creates a new EdgeConnect resource manager func NewResourceManager(client EdgeConnectClientInterface, opts ...func(*ResourceManagerOptions)) ResourceManagerInterface { options := DefaultResourceManagerOptions() for _, opt := range opts { opt(&options) } return &EdgeConnectResourceManager{ client: client, parallelLimit: options.ParallelLimit, rollbackOnFail: options.RollbackOnFail, logger: options.Logger, } } // WithParallelLimit sets the parallel execution limit func WithParallelLimit(limit int) func(*ResourceManagerOptions) { return func(opts *ResourceManagerOptions) { opts.ParallelLimit = limit } } // WithRollbackOnFail enables/disables automatic rollback func WithRollbackOnFail(rollback bool) func(*ResourceManagerOptions) { return func(opts *ResourceManagerOptions) { opts.RollbackOnFail = rollback } } // WithLogger sets a logger for deployment operations func WithLogger(logger Logger) func(*ResourceManagerOptions) { return func(opts *ResourceManagerOptions) { opts.Logger = logger } } // ApplyDeployment executes a deployment plan with rollback support func (rm *EdgeConnectResourceManager) ApplyDeployment(ctx context.Context, plan *DeploymentPlan, config *config.EdgeConnectConfig) (*ExecutionResult, error) { startTime := time.Now() rm.logf("Starting deployment: %s", plan.ConfigName) result := &ExecutionResult{ Plan: plan, CompletedActions: []ActionResult{}, FailedActions: []ActionResult{}, } // Step 1: Validate prerequisites if err := rm.ValidatePrerequisites(ctx, plan); err != nil { result.Error = fmt.Errorf("prerequisites validation failed: %w", err) result.Duration = time.Since(startTime) return result, err } // Step 2: Execute app action first (apps must exist before instances) if plan.AppAction.Type != ActionNone { appResult := rm.executeAppAction(ctx, plan.AppAction, config) if appResult.Success { result.CompletedActions = append(result.CompletedActions, appResult) rm.logf("App action completed: %s", appResult.Type) } else { result.FailedActions = append(result.FailedActions, appResult) rm.logf("App action failed: %s - %v", appResult.Type, appResult.Error) if rm.rollbackOnFail { rm.logf("Attempting rollback...") if rollbackErr := rm.RollbackDeployment(ctx, result); rollbackErr != nil { rm.logf("Rollback failed: %v", rollbackErr) } else { result.RollbackPerformed = true result.RollbackSuccess = true } } result.Error = appResult.Error result.Duration = time.Since(startTime) return result, appResult.Error } } // Step 3: Execute instance actions in parallel instanceResults := rm.executeInstanceActions(ctx, plan.InstanceActions, config) for _, instanceResult := range instanceResults { if instanceResult.Success { result.CompletedActions = append(result.CompletedActions, instanceResult) } else { result.FailedActions = append(result.FailedActions, instanceResult) } } // Check if deployment succeeded result.Success = len(result.FailedActions) == 0 result.Duration = time.Since(startTime) if !result.Success { result.Error = fmt.Errorf("%d instance actions failed", len(result.FailedActions)) if rm.rollbackOnFail { rm.logf("Deployment failed, attempting rollback...") if rollbackErr := rm.RollbackDeployment(ctx, result); rollbackErr != nil { rm.logf("Rollback failed: %v", rollbackErr) } else { result.RollbackPerformed = true result.RollbackSuccess = true } } } else { rm.logf("Deployment completed successfully in %v", result.Duration) } return result, result.Error } // executeAppAction handles application creation/update operations func (rm *EdgeConnectResourceManager) executeAppAction(ctx context.Context, action AppAction, config *config.EdgeConnectConfig) ActionResult { startTime := time.Now() result := ActionResult{ Type: action.Type, Target: action.Desired.Name, } switch action.Type { case ActionCreate: result.Success, result.Error = rm.createApplication(ctx, action, config) result.Details = fmt.Sprintf("Created application %s version %s", action.Desired.Name, action.Desired.Version) case ActionUpdate: result.Success, result.Error = rm.updateApplication(ctx, action, config) result.Details = fmt.Sprintf("Updated application %s version %s", action.Desired.Name, action.Desired.Version) default: result.Success = true result.Details = "No action required" } result.Duration = time.Since(startTime) return result } // executeInstanceActions handles instance deployment across multiple cloudlets in parallel func (rm *EdgeConnectResourceManager) executeInstanceActions(ctx context.Context, actions []InstanceAction, config *config.EdgeConnectConfig) []ActionResult { if len(actions) == 0 { return []ActionResult{} } // Create semaphore to limit parallel operations semaphore := make(chan struct{}, rm.parallelLimit) results := make([]ActionResult, len(actions)) var wg sync.WaitGroup for i, action := range actions { if action.Type == ActionNone { results[i] = ActionResult{ Type: action.Type, Target: action.InstanceName, Success: true, Details: "No action required", } continue } wg.Add(1) go func(index int, instanceAction InstanceAction) { defer wg.Done() // Acquire semaphore semaphore <- struct{}{} defer func() { <-semaphore }() results[index] = rm.executeInstanceAction(ctx, instanceAction, config) }(i, action) } wg.Wait() return results } // executeInstanceAction handles single instance operations func (rm *EdgeConnectResourceManager) executeInstanceAction(ctx context.Context, action InstanceAction, config *config.EdgeConnectConfig) ActionResult { startTime := time.Now() result := ActionResult{ Type: action.Type, Target: action.InstanceName, } switch action.Type { case ActionCreate: result.Success, result.Error = rm.createInstance(ctx, action, config) result.Details = fmt.Sprintf("Created instance %s on %s:%s", action.InstanceName, action.Target.CloudletOrg, action.Target.CloudletName) case ActionUpdate: result.Success, result.Error = rm.updateInstance(ctx, action, config) result.Details = fmt.Sprintf("Updated instance %s", action.InstanceName) default: result.Success = true result.Details = "No action required" } result.Duration = time.Since(startTime) return result } // createApplication creates a new application with manifest file processing func (rm *EdgeConnectResourceManager) createApplication(ctx context.Context, action AppAction, config *config.EdgeConnectConfig) (bool, error) { // Read and process manifest file manifestContent, err := rm.readManifestFile(config.Spec.GetManifestFile()) if err != nil { return false, fmt.Errorf("failed to read manifest file: %w", err) } // Build the app input appInput := &edgeconnect.NewAppInput{ Region: action.Desired.Region, App: edgeconnect.App{ Key: edgeconnect.AppKey{ Organization: action.Desired.Organization, Name: action.Desired.Name, Version: action.Desired.Version, }, Deployment: rm.getDeploymentType(config), ImageType: "ImageTypeDocker", // Default for EdgeConnect ImagePath: rm.getImagePath(config), AllowServerless: true, // Required for Kubernetes DefaultFlavor: edgeconnect.Flavor{Name: config.Spec.InfraTemplate[0].FlavorName}, ServerlessConfig: struct{}{}, // Required empty struct DeploymentManifest: manifestContent, DeploymentGenerator: "kubernetes-basic", }, } // Add network configuration if specified if config.Spec.Network != nil { appInput.App.RequiredOutboundConnections = rm.convertNetworkRules(config.Spec.Network) } // Create the application if client, ok := rm.client.(interface { CreateApp(ctx context.Context, input *edgeconnect.NewAppInput) error }); ok { if err := client.CreateApp(ctx, appInput); err != nil { return false, fmt.Errorf("failed to create application: %w", err) } } else { return false, fmt.Errorf("client does not support CreateApp operation") } rm.logf("Successfully created application: %s/%s version %s", action.Desired.Organization, action.Desired.Name, action.Desired.Version) return true, nil } // updateApplication updates an existing application func (rm *EdgeConnectResourceManager) updateApplication(ctx context.Context, action AppAction, config *config.EdgeConnectConfig) (bool, error) { // For now, EdgeConnect doesn't support app updates directly // This would be implemented when the API supports app updates rm.logf("Application update not yet supported by EdgeConnect API") return true, nil } // createInstance creates a new application instance func (rm *EdgeConnectResourceManager) createInstance(ctx context.Context, action InstanceAction, config *config.EdgeConnectConfig) (bool, error) { instanceInput := &edgeconnect.NewAppInstanceInput{ Region: action.Target.Region, AppInst: edgeconnect.AppInstance{ Key: edgeconnect.AppInstanceKey{ Organization: action.Target.Organization, Name: action.InstanceName, CloudletKey: edgeconnect.CloudletKey{ Organization: action.Target.CloudletOrg, Name: action.Target.CloudletName, }, }, AppKey: edgeconnect.AppKey{ Organization: action.Target.Organization, Name: config.Spec.GetAppName(), Version: config.Spec.GetAppVersion(), }, Flavor: edgeconnect.Flavor{ Name: action.Target.FlavorName, }, }, } // Create the instance if client, ok := rm.client.(interface { CreateAppInstance(ctx context.Context, input *edgeconnect.NewAppInstanceInput) error }); ok { if err := client.CreateAppInstance(ctx, instanceInput); err != nil { return false, fmt.Errorf("failed to create instance: %w", err) } } else { return false, fmt.Errorf("client does not support CreateAppInstance operation") } rm.logf("Successfully created instance: %s on %s:%s", action.InstanceName, action.Target.CloudletOrg, action.Target.CloudletName) return true, nil } // updateInstance updates an existing application instance func (rm *EdgeConnectResourceManager) updateInstance(ctx context.Context, action InstanceAction, config *config.EdgeConnectConfig) (bool, error) { // For now, instance updates would require delete/recreate // This would be optimized when the API supports direct instance updates rm.logf("Instance update requires recreate - not yet optimized") return true, nil } // readManifestFile reads and returns the contents of a manifest file func (rm *EdgeConnectResourceManager) readManifestFile(manifestPath string) (string, error) { if manifestPath == "" { return "", nil } file, err := os.Open(manifestPath) if err != nil { return "", fmt.Errorf("failed to open manifest file %s: %w", manifestPath, err) } defer file.Close() content, err := io.ReadAll(file) if err != nil { return "", fmt.Errorf("failed to read manifest file %s: %w", manifestPath, err) } return string(content), nil } // getDeploymentType determines the deployment type from config func (rm *EdgeConnectResourceManager) getDeploymentType(config *config.EdgeConnectConfig) string { if config.Spec.IsK8sApp() { return "kubernetes" } return "docker" } // getImagePath gets the image path for the application func (rm *EdgeConnectResourceManager) getImagePath(config *config.EdgeConnectConfig) string { if config.Spec.IsDockerApp() && config.Spec.DockerApp.Image != "" { return config.Spec.DockerApp.Image } // Default for kubernetes apps return "https://registry-1.docker.io/library/nginx:latest" } // convertNetworkRules converts config network rules to EdgeConnect SecurityRules func (rm *EdgeConnectResourceManager) convertNetworkRules(network *config.NetworkConfig) []edgeconnect.SecurityRule { rules := make([]edgeconnect.SecurityRule, len(network.OutboundConnections)) for i, conn := range network.OutboundConnections { rules[i] = edgeconnect.SecurityRule{ Protocol: conn.Protocol, PortRangeMin: conn.PortRangeMin, PortRangeMax: conn.PortRangeMax, RemoteCIDR: conn.RemoteCIDR, } } return rules } // ValidatePrerequisites checks if deployment prerequisites are met func (rm *EdgeConnectResourceManager) ValidatePrerequisites(ctx context.Context, plan *DeploymentPlan) error { rm.logf("Validating deployment prerequisites for: %s", plan.ConfigName) // Check if we have any actions to perform if plan.IsEmpty() { return fmt.Errorf("deployment plan is empty - no actions to perform") } // Validate that we have required client capabilities if rm.client == nil { return fmt.Errorf("EdgeConnect client is not configured") } rm.logf("Prerequisites validation passed") return nil } // RollbackDeployment attempts to rollback a failed deployment func (rm *EdgeConnectResourceManager) RollbackDeployment(ctx context.Context, result *ExecutionResult) error { rm.logf("Starting rollback for deployment: %s", result.Plan.ConfigName) rollbackErrors := []error{} // Rollback completed instances (in reverse order) for i := len(result.CompletedActions) - 1; i >= 0; i-- { action := result.CompletedActions[i] switch action.Type { case ActionCreate: if err := rm.rollbackCreateAction(ctx, action, result.Plan); err != nil { rollbackErrors = append(rollbackErrors, fmt.Errorf("failed to rollback %s: %w", action.Target, err)) } else { rm.logf("Successfully rolled back: %s", action.Target) } } } if len(rollbackErrors) > 0 { return fmt.Errorf("rollback encountered %d errors: %v", len(rollbackErrors), rollbackErrors) } rm.logf("Rollback completed successfully") return nil } // rollbackCreateAction rolls back a CREATE action by deleting the resource func (rm *EdgeConnectResourceManager) rollbackCreateAction(ctx context.Context, action ActionResult, plan *DeploymentPlan) error { if action.Type != ActionCreate { return nil } // Determine if this is an app or instance rollback based on the target name isInstance := false for _, instanceAction := range plan.InstanceActions { if instanceAction.InstanceName == action.Target { isInstance = true break } } if isInstance { return rm.rollbackInstance(ctx, action, plan) } else { return rm.rollbackApp(ctx, action, plan) } } // rollbackApp deletes an application that was created func (rm *EdgeConnectResourceManager) rollbackApp(ctx context.Context, action ActionResult, plan *DeploymentPlan) error { if client, ok := rm.client.(interface { DeleteApp(ctx context.Context, appKey edgeconnect.AppKey, region string) error }); ok { appKey := edgeconnect.AppKey{ Organization: plan.AppAction.Desired.Organization, Name: plan.AppAction.Desired.Name, Version: plan.AppAction.Desired.Version, } return client.DeleteApp(ctx, appKey, plan.AppAction.Desired.Region) } return fmt.Errorf("client does not support DeleteApp operation") } // rollbackInstance deletes an instance that was created func (rm *EdgeConnectResourceManager) rollbackInstance(ctx context.Context, action ActionResult, plan *DeploymentPlan) error { if client, ok := rm.client.(interface { DeleteAppInstance(ctx context.Context, instanceKey edgeconnect.AppInstanceKey, region string) error }); ok { // Find the instance action to get the details for _, instanceAction := range plan.InstanceActions { if instanceAction.InstanceName == action.Target { instanceKey := edgeconnect.AppInstanceKey{ Organization: instanceAction.Target.Organization, Name: instanceAction.InstanceName, CloudletKey: edgeconnect.CloudletKey{ Organization: instanceAction.Target.CloudletOrg, Name: instanceAction.Target.CloudletName, }, } return client.DeleteAppInstance(ctx, instanceKey, instanceAction.Target.Region) } } return fmt.Errorf("instance action not found for rollback: %s", action.Target) } return fmt.Errorf("client does not support DeleteAppInstance operation") } // logf logs a message if a logger is configured func (rm *EdgeConnectResourceManager) logf(format string, v ...interface{}) { if rm.logger != nil { rm.logger.Printf("[ResourceManager] "+format, v...) } }