gyfora commented on code in PR #969:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/969#discussion_r2265308904


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/bluegreen/BlueGreenStateHandlerRegistry.java:
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.controller.bluegreen;
+
+import 
org.apache.flink.kubernetes.operator.api.status.FlinkBlueGreenDeploymentState;
+import 
org.apache.flink.kubernetes.operator.controller.bluegreen.handlers.ActiveBlueStateHandler;
+import 
org.apache.flink.kubernetes.operator.controller.bluegreen.handlers.ActiveGreenStateHandler;
+import 
org.apache.flink.kubernetes.operator.controller.bluegreen.handlers.BlueGreenStateHandler;
+import 
org.apache.flink.kubernetes.operator.controller.bluegreen.handlers.InitializingBlueStateHandler;
+import 
org.apache.flink.kubernetes.operator.controller.bluegreen.handlers.TransitioningToBlueStateHandler;
+import 
org.apache.flink.kubernetes.operator.controller.bluegreen.handlers.TransitioningToGreenStateHandler;
+import 
org.apache.flink.kubernetes.operator.controller.bluegreen.BlueGreenDeploymentService;
+
+import java.util.Map;
+
+import static 
org.apache.flink.kubernetes.operator.api.status.FlinkBlueGreenDeploymentState.ACTIVE_BLUE;
+import static 
org.apache.flink.kubernetes.operator.api.status.FlinkBlueGreenDeploymentState.ACTIVE_GREEN;
+import static 
org.apache.flink.kubernetes.operator.api.status.FlinkBlueGreenDeploymentState.INITIALIZING_BLUE;
+import static 
org.apache.flink.kubernetes.operator.api.status.FlinkBlueGreenDeploymentState.TRANSITIONING_TO_BLUE;
+import static 
org.apache.flink.kubernetes.operator.api.status.FlinkBlueGreenDeploymentState.TRANSITIONING_TO_GREEN;
+
+/** Registry for Blue/Green deployment state handlers. */
+public class BlueGreenStateHandlerRegistry {
+
+    private final Map<FlinkBlueGreenDeploymentState, BlueGreenStateHandler> 
handlers;
+
+    public BlueGreenStateHandlerRegistry() {
+        // Create consolidated service
+        BlueGreenDeploymentService deploymentService = new 
BlueGreenDeploymentService();
+
+        // Create handlers
+        this.handlers =
+                Map.of(
+                        INITIALIZING_BLUE, new 
InitializingBlueStateHandler(deploymentService),
+                        ACTIVE_BLUE, new 
ActiveBlueStateHandler(deploymentService),
+                        ACTIVE_GREEN, new 
ActiveGreenStateHandler(deploymentService),
+                        TRANSITIONING_TO_BLUE,
+                                new 
TransitioningToBlueStateHandler(deploymentService),
+                        TRANSITIONING_TO_GREEN,
+                                new 
TransitioningToGreenStateHandler(deploymentService));
+    }
+
+    /**
+     * Gets the appropriate handler for the given state.
+     *
+     * @param state the Blue/Green deployment state
+     * @return the corresponding state handler
+     * @throws IllegalStateException if no handler is found for the state
+     */
+    public BlueGreenStateHandler getHandler(FlinkBlueGreenDeploymentState 
state) {
+        BlueGreenStateHandler handler = handlers.get(state);
+        if (handler == null) {
+            throw new IllegalStateException("No handler found for state: " + 
state);
+        }

Review Comment:
   This feels like unnecessary logic as a handler for every state is required 
otherwise we would hit errors left and right in unit tests.



##########
flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/bluegreen/BlueGreenDiffType.java:
##########
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.api.bluegreen;
+
+/**
+ * Enum representing different types of differences found in Blue/Green 
deployment specifications.
+ */
+public enum BlueGreenDiffType {
+    /** No differences found between specifications. */
+    IGNORE,
+
+    /** Only top-level properties (metadata, configuration) have differences. 
*/
+    PATCH_TOP_LEVEL,
+
+    /** Only the nested FlinkDeploymentSpec has differences. */
+    PATCH_CHILD,
+
+    /** Both top-level and nested specifications have differences. */
+    PATCH_BOTH,

Review Comment:
   What's the functional difference between IGNORE the different PATCH diffs 
and TRANSITION? Feels like we should have only 3:
   
   NO_CHANGE
   OPERATOR_CONFIG_ONLY -> Non-upgrade/scale deployment changes
   UPGRADE
   
   



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeploymentController.java:
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.controller;
+
+import org.apache.flink.kubernetes.operator.api.FlinkBlueGreenDeployment;
+import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
+import 
org.apache.flink.kubernetes.operator.api.status.FlinkBlueGreenDeploymentState;
+import 
org.apache.flink.kubernetes.operator.api.status.FlinkBlueGreenDeploymentStatus;
+import 
org.apache.flink.kubernetes.operator.controller.bluegreen.BlueGreenStateHandlerRegistry;
+import 
org.apache.flink.kubernetes.operator.controller.bluegreen.BlueGreenContext;
+import 
org.apache.flink.kubernetes.operator.controller.bluegreen.handlers.BlueGreenStateHandler;
+import 
org.apache.flink.kubernetes.operator.controller.bluegreen.BlueGreenDeploymentService;
+import 
org.apache.flink.kubernetes.operator.service.FlinkResourceContextFactory;
+
+import 
io.javaoperatorsdk.operator.api.config.informer.InformerEventSourceConfiguration;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration;
+import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext;
+import io.javaoperatorsdk.operator.api.reconciler.Reconciler;
+import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
+import io.javaoperatorsdk.operator.processing.event.source.EventSource;
+import 
io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource;
+import io.javaoperatorsdk.operator.processing.event.source.informer.Mappers;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static 
org.apache.flink.kubernetes.operator.api.spec.FlinkBlueGreenDeploymentConfigOptions.ABORT_GRACE_PERIOD;
+import static 
org.apache.flink.kubernetes.operator.api.status.FlinkBlueGreenDeploymentState.INITIALIZING_BLUE;
+
+/** Controller that runs the main reconcile loop for Flink Blue/Green 
deployments. */
+@ControllerConfiguration
+public class FlinkBlueGreenDeploymentController implements 
Reconciler<FlinkBlueGreenDeployment> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(FlinkDeploymentController.class);
+
+    private final FlinkResourceContextFactory ctxFactory;
+    private final BlueGreenStateHandlerRegistry handlerRegistry;
+
+    public static long minimumAbortGracePeriodMs = 
ABORT_GRACE_PERIOD.defaultValue().toMillis();
+
+    public FlinkBlueGreenDeploymentController(FlinkResourceContextFactory 
ctxFactory) {
+        this.ctxFactory = ctxFactory;
+        this.handlerRegistry = new BlueGreenStateHandlerRegistry();
+    }
+
+    @Override
+    public List<EventSource<?, FlinkBlueGreenDeployment>> prepareEventSources(
+            EventSourceContext<FlinkBlueGreenDeployment> context) {
+        List<EventSource<?, FlinkBlueGreenDeployment>> eventSources = new 
ArrayList<>();
+
+        InformerEventSourceConfiguration<FlinkDeployment> config =
+                InformerEventSourceConfiguration.from(
+                                FlinkDeployment.class, 
FlinkBlueGreenDeployment.class)
+                        .withSecondaryToPrimaryMapper(
+                                
Mappers.fromOwnerReferences(context.getPrimaryResourceClass()))
+                        .withNamespacesInheritedFromController()
+                        .withFollowControllerNamespacesChanges(true)
+                        .build();
+
+        eventSources.add(new InformerEventSource<>(config, context));
+
+        return eventSources;
+    }
+
+    @Override
+    public UpdateControl<FlinkBlueGreenDeployment> reconcile(
+            FlinkBlueGreenDeployment bgDeployment, 
Context<FlinkBlueGreenDeployment> josdkContext)
+            throws Exception {
+
+        FlinkBlueGreenDeploymentStatus deploymentStatus = 
bgDeployment.getStatus();
+
+        if (deploymentStatus == null) {
+            var context =
+                    new BlueGreenContext(
+                            bgDeployment,
+                            new FlinkBlueGreenDeploymentStatus(),
+                            josdkContext,
+                            null,
+                            ctxFactory);
+            return BlueGreenDeploymentService
+                    .patchStatusUpdateControl(context, INITIALIZING_BLUE, null)
+                    .rescheduleAfter(100);
+        } else {
+            FlinkBlueGreenDeploymentState currentState = 
deploymentStatus.getBlueGreenState();
+            var context =
+                    new BlueGreenContext(
+                            bgDeployment,
+                            deploymentStatus,
+                            josdkContext,
+                            currentState == INITIALIZING_BLUE
+                                    ? null
+                                    : 
FlinkBlueGreenDeployments.fromSecondaryResources(
+                                            josdkContext),
+                            ctxFactory);
+
+            LOG.debug(
+                    "Processing state: {} for deployment: {}",
+                    currentState,
+                    context.getDeploymentName());
+
+            BlueGreenStateHandler handler = 
handlerRegistry.getHandler(currentState);
+            return handler.handle(context);
+//            return stateMachine.processState(context);

Review Comment:
   remove commented code



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/bluegreen/BlueGreenDeploymentService.java:
##########
@@ -0,0 +1,401 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.controller.bluegreen;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.kubernetes.operator.api.FlinkBlueGreenDeployment;
+import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.api.bluegreen.DeploymentType;
+import 
org.apache.flink.kubernetes.operator.api.status.FlinkBlueGreenDeploymentState;
+import org.apache.flink.kubernetes.operator.api.status.Savepoint;
+import 
org.apache.flink.kubernetes.operator.controller.FlinkBlueGreenDeployments;
+import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
+import org.apache.flink.kubernetes.operator.utils.bluegreen.BlueGreenSpecUtils;
+import org.apache.flink.kubernetes.operator.utils.bluegreen.BlueGreenUtils;
+import org.apache.flink.util.Preconditions;
+
+import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Instant;
+
+import static 
org.apache.flink.kubernetes.operator.controller.bluegreen.BlueGreenKubernetesService.isFlinkDeploymentReady;
+import static 
org.apache.flink.kubernetes.operator.utils.bluegreen.BlueGreenSpecUtils.configureSavepoint;
+import static 
org.apache.flink.kubernetes.operator.utils.bluegreen.BlueGreenUtils.instantStrToMillis;
+import static 
org.apache.flink.kubernetes.operator.utils.bluegreen.BlueGreenUtils.millisToInstantStr;
+
+/** Consolidated service for all Blue/Green deployment operations. */
+public class BlueGreenDeploymentService {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(BlueGreenDeploymentService.class);
+    private static final long RETRY_DELAY_MS = 500;
+
+    // ==================== Deployment Initiation Methods ====================
+
+    /**
+     * Initiates a new Blue/Green deployment.
+     *
+     * @param context the transition context
+     * @param nextDeploymentType the type of deployment to create
+     * @param nextState the next state to transition to
+     * @param lastCheckpoint the checkpoint to restore from (can be null)
+     * @param isFirstDeployment whether this is the first deployment
+     * @return UpdateControl for the deployment
+     */
+    public UpdateControl<FlinkBlueGreenDeployment> initiateDeployment(
+            BlueGreenContext context,
+            DeploymentType nextDeploymentType,
+            FlinkBlueGreenDeploymentState nextState,
+            Savepoint lastCheckpoint,
+            boolean isFirstDeployment) {
+
+        BlueGreenKubernetesService.deployCluster(
+                context, nextDeploymentType, lastCheckpoint, 
isFirstDeployment);
+
+        BlueGreenUtils.setAbortTimestamp(context);
+
+        return patchStatusUpdateControl(context, nextState, 
JobStatus.RECONCILING)
+                
.rescheduleAfter(BlueGreenUtils.getReconciliationReschedInterval(context));
+    }
+
+    /**
+     * Checks if a deployment can be initiated and initiates it if conditions 
are met.
+     *
+     * @param context the transition context
+     * @param currentDeploymentType the current deployment type
+     * @return UpdateControl for the deployment
+     */
+    public UpdateControl<FlinkBlueGreenDeployment> checkAndInitiateDeployment(
+            BlueGreenContext context, DeploymentType currentDeploymentType) {
+
+        if (BlueGreenSpecUtils.hasSpecChanged(context)) {
+            BlueGreenSpecUtils.setLastReconciledSpec(context);
+
+            FlinkDeployment currentFlinkDeployment =
+                    context.getDeploymentByType(currentDeploymentType);
+
+            if 
(BlueGreenKubernetesService.isFlinkDeploymentReady(currentFlinkDeployment)) {
+                DeploymentTransition transition = 
calculateTransition(currentDeploymentType);
+
+                FlinkResourceContext<FlinkDeployment> resourceContext =
+                        context.getCtxFactory()
+                                .getResourceContext(
+                                        currentFlinkDeployment, 
context.getJosdkContext());
+
+                Savepoint lastCheckpoint = configureSavepoint(resourceContext);
+
+                return initiateDeployment(
+                        context,
+                        transition.nextDeploymentType,
+                        transition.nextState,
+                        lastCheckpoint,
+                        false);
+            } else {
+                if (context.getDeploymentStatus().getJobStatus().getState()
+                        != JobStatus.FAILING) {
+                    return patchStatusUpdateControl(context, null, 
JobStatus.FAILING);
+                }
+            }
+        }
+
+        return UpdateControl.noUpdate();
+    }
+
+    private DeploymentTransition calculateTransition(DeploymentType 
currentType) {
+        if (DeploymentType.BLUE == currentType) {
+            return new DeploymentTransition(
+                    DeploymentType.GREEN, 
FlinkBlueGreenDeploymentState.TRANSITIONING_TO_GREEN);
+        } else {
+            return new DeploymentTransition(
+                    DeploymentType.BLUE, 
FlinkBlueGreenDeploymentState.TRANSITIONING_TO_BLUE);
+        }
+    }
+
+    // ==================== Transition Monitoring Methods ====================
+
+    /**
+     * Monitors an ongoing Blue/Green deployment transition.
+     *
+     * @param context the transition context
+     * @param currentDeploymentType the current deployment type being 
transitioned from
+     * @return UpdateControl for the transition
+     */
+    public UpdateControl<FlinkBlueGreenDeployment> monitorTransition(
+            BlueGreenContext context, DeploymentType currentDeploymentType) {
+
+        handleSpecChangesDuringTransition(context);
+
+        TransitionState transitionState = determineTransitionState(context, 
currentDeploymentType);
+
+        if (isFlinkDeploymentReady(transitionState.nextDeployment)) {
+            return shouldWeDelete(
+                    context,
+                    transitionState.currentDeployment,
+                    transitionState.nextDeployment,
+                    transitionState.nextState);
+        } else {
+            return shouldWeAbort(
+                    context, transitionState.nextDeployment, 
transitionState.nextState);
+        }
+    }
+
+    private void handleSpecChangesDuringTransition(BlueGreenContext context) {
+        if (BlueGreenSpecUtils.hasSpecChanged(context)) {
+            BlueGreenSpecUtils.revertToLastSpec(context);
+            LOG.warn(
+                    "Blue/Green Spec change detected during transition, 
ignored and reverted to the last reconciled spec");
+        }
+    }
+
+    private TransitionState determineTransitionState(
+            BlueGreenContext context, DeploymentType currentDeploymentType) {
+        TransitionState transitionState;
+
+        if (DeploymentType.BLUE == currentDeploymentType) {
+            transitionState = new TransitionState(
+                    context.getBlueDeployment(),                    // 
currentDeployment
+                    context.getGreenDeployment(),                   // 
nextDeployment
+                    FlinkBlueGreenDeploymentState.ACTIVE_GREEN);    // next 
State
+        } else {
+            transitionState = new TransitionState(
+                    context.getGreenDeployment(),               // 
currentDeployment
+                    context.getBlueDeployment(),                // 
nextDeployment
+                    FlinkBlueGreenDeploymentState.ACTIVE_BLUE); // next State
+        }
+
+        Preconditions.checkNotNull(
+                transitionState.nextDeployment,
+                "Target Dependent Deployment resource not found. Blue/Green 
deployment name: "
+                        + context.getDeploymentName()
+                        + ", current deployment type: "
+                        + currentDeploymentType);
+
+        return transitionState;
+    }
+
+    // ==================== Deployment Deletion Methods ====================
+
+    private UpdateControl<FlinkBlueGreenDeployment> shouldWeDelete(
+            BlueGreenContext context,
+            FlinkDeployment currentDeployment,
+            FlinkDeployment nextDeployment,
+            FlinkBlueGreenDeploymentState nextState) {
+
+        var deploymentStatus = context.getDeploymentStatus();
+
+        if (currentDeployment == null) {
+            
deploymentStatus.setDeploymentReadyTimestamp(Instant.now().toString());
+            return finalizeBlueGreenDeployment(context, nextState);
+        }
+
+        long deploymentDeletionDelayMs = 
BlueGreenUtils.getDeploymentDeletionDelay(context);
+        long deploymentReadyTimestamp =
+                
instantStrToMillis(deploymentStatus.getDeploymentReadyTimestamp());
+
+        if (deploymentReadyTimestamp == 0) {
+            LOG.info(
+                    "FlinkDeployment '{}' marked ready, rescheduling 
reconciliation in {} seconds.",
+                    nextDeployment.getMetadata().getName(),
+                    deploymentDeletionDelayMs / 1000);
+
+            
deploymentStatus.setDeploymentReadyTimestamp(Instant.now().toString());
+            return patchStatusUpdateControl(context, null, null)
+                    .rescheduleAfter(deploymentDeletionDelayMs);
+        }
+
+        long deletionTimestamp = deploymentReadyTimestamp + 
deploymentDeletionDelayMs;
+
+        if (deletionTimestamp < System.currentTimeMillis()) {
+            return deleteDeployment(currentDeployment, context);
+        } else {
+            return waitBeforeDeleting(currentDeployment, deletionTimestamp);
+        }
+    }
+
+    private UpdateControl<FlinkBlueGreenDeployment> waitBeforeDeleting(
+            FlinkDeployment currentDeployment, long deletionTimestamp) {
+
+        long delay = deletionTimestamp - System.currentTimeMillis();
+        LOG.info(
+                "Awaiting deletion delay for FlinkDeployment '{}', 
rescheduling reconciliation in {} seconds.",
+                currentDeployment.getMetadata().getName(),
+                delay / 1000);
+
+        return 
UpdateControl.<FlinkBlueGreenDeployment>noUpdate().rescheduleAfter(delay);
+    }
+
+    private UpdateControl<FlinkBlueGreenDeployment> deleteDeployment(
+            FlinkDeployment currentDeployment, BlueGreenContext context) {
+
+        boolean deleted =
+                
BlueGreenKubernetesService.deleteFlinkDeployment(currentDeployment, context);
+
+        if (!deleted) {
+            LOG.info("FlinkDeployment '{}' not deleted, will retry", 
currentDeployment);
+        } else {
+            LOG.info("FlinkDeployment '{}' deleted!", currentDeployment);
+        }
+
+        return 
UpdateControl.<FlinkBlueGreenDeployment>noUpdate().rescheduleAfter(RETRY_DELAY_MS);
+    }
+
+    // ==================== Abort and Retry Methods ====================
+
+    private UpdateControl<FlinkBlueGreenDeployment> shouldWeAbort(
+            BlueGreenContext context,
+            FlinkDeployment nextDeployment,
+            FlinkBlueGreenDeploymentState nextState) {
+
+        String deploymentName = nextDeployment.getMetadata().getName();
+        long abortTimestamp =
+                
instantStrToMillis(context.getDeploymentStatus().getAbortTimestamp());
+
+        if (abortTimestamp == 0) {
+            throw new IllegalStateException("Unexpected abortTimestamp == 0");
+        }
+
+        if (abortTimestamp < System.currentTimeMillis()) {
+            return abortDeployment(context, nextDeployment, nextState, 
deploymentName);
+        } else {
+            return retryDeployment(context, abortTimestamp, deploymentName);
+        }
+    }
+
+    private UpdateControl<FlinkBlueGreenDeployment> retryDeployment(
+            BlueGreenContext context, long abortTimestamp, String 
deploymentName) {
+
+        long delay = abortTimestamp - System.currentTimeMillis();
+        LOG.info(
+                "FlinkDeployment '{}' not ready yet, retrying in {} seconds.",
+                deploymentName,
+                delay / 1000);
+
+        return patchStatusUpdateControl(context, null, 
null).rescheduleAfter(delay);
+    }
+
+    private UpdateControl<FlinkBlueGreenDeployment> abortDeployment(
+            BlueGreenContext context,
+            FlinkDeployment nextDeployment,
+            FlinkBlueGreenDeploymentState nextState,
+            String deploymentName) {
+
+        BlueGreenKubernetesService.suspendFlinkDeployment(context, 
nextDeployment);
+
+        FlinkBlueGreenDeploymentState previousState =
+                getPreviousState(nextState, context.getDeployments());
+        context.getDeploymentStatus().setBlueGreenState(previousState);
+
+        LOG.warn(
+                "Aborting deployment '{}', rolling B/G deployment back to {}",
+                deploymentName,
+                previousState);
+
+        return patchStatusUpdateControl(context, null, JobStatus.FAILING);
+    }
+
+    private static FlinkBlueGreenDeploymentState getPreviousState(
+            FlinkBlueGreenDeploymentState nextState, FlinkBlueGreenDeployments 
deployments) {
+        FlinkBlueGreenDeploymentState previousState;
+        if (deployments.getNumberOfDeployments() == 1) {
+            previousState = FlinkBlueGreenDeploymentState.INITIALIZING_BLUE;
+        } else if (deployments.getNumberOfDeployments() == 2) {
+            previousState =
+                    nextState == FlinkBlueGreenDeploymentState.ACTIVE_BLUE
+                            ? FlinkBlueGreenDeploymentState.ACTIVE_GREEN
+                            : FlinkBlueGreenDeploymentState.ACTIVE_BLUE;
+        } else {
+            throw new IllegalStateException("No blue/green FlinkDeployments 
found!");
+        }
+        return previousState;
+    }
+
+    // ==================== Finalization Methods ====================
+
+    /**
+     * Finalizes a Blue/Green deployment transition.
+     *
+     * @param context the transition context
+     * @param nextState the next state to transition to
+     * @return UpdateControl for finalization
+     */
+    public UpdateControl<FlinkBlueGreenDeployment> finalizeBlueGreenDeployment(
+            BlueGreenContext context, FlinkBlueGreenDeploymentState nextState) 
{
+
+        LOG.info(
+                "Finalizing deployment '{}' to {} state",
+                context.getDeploymentName(),
+                nextState);
+
+        
context.getDeploymentStatus().setDeploymentReadyTimestamp(millisToInstantStr(0));
+        context.getDeploymentStatus().setAbortTimestamp(millisToInstantStr(0));
+
+        return patchStatusUpdateControl(context, nextState, JobStatus.RUNNING);
+    }
+
+    // ==================== Common Utility Methods ====================
+
+    public static UpdateControl<FlinkBlueGreenDeployment> 
patchStatusUpdateControl(
+            BlueGreenContext context,
+            FlinkBlueGreenDeploymentState deploymentState,
+            JobStatus jobState) {
+
+        var deploymentStatus = context.getDeploymentStatus();
+        var flinkBlueGreenDeployment = context.getBgDeployment();
+
+        if (deploymentState != null) {
+            deploymentStatus.setBlueGreenState(deploymentState);
+        }
+
+        if (jobState != null) {
+            deploymentStatus.getJobStatus().setState(jobState);
+        }
+
+        
deploymentStatus.setLastReconciledTimestamp(java.time.Instant.now().toString());
+        flinkBlueGreenDeployment.setStatus(deploymentStatus);
+        return UpdateControl.patchStatus(flinkBlueGreenDeployment);
+    }
+
+    // ==================== Inner Classes ====================
+
+    private static class DeploymentTransition {
+        final DeploymentType nextDeploymentType;
+        final FlinkBlueGreenDeploymentState nextState;
+
+        DeploymentTransition(
+                DeploymentType nextDeploymentType, 
FlinkBlueGreenDeploymentState nextState) {
+            this.nextDeploymentType = nextDeploymentType;
+            this.nextState = nextState;
+        }
+    }
+
+    private static class TransitionState {

Review Comment:
   Could use Lombok `@Value` annotation to remove boilerplate



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/bluegreen/handlers/AbstractBlueGreenStateHandler.java:
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.controller.bluegreen.handlers;
+
+import 
org.apache.flink.kubernetes.operator.api.status.FlinkBlueGreenDeploymentState;
+
+import 
org.apache.flink.kubernetes.operator.controller.bluegreen.BlueGreenDeploymentService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Abstract base class providing common functionality for Blue/Green state 
handlers. */
+public abstract class AbstractBlueGreenStateHandler implements 
BlueGreenStateHandler {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(AbstractBlueGreenStateHandler.class);
+
+    private final FlinkBlueGreenDeploymentState supportedState;
+
+    protected final BlueGreenDeploymentService deploymentService;
+
+    protected AbstractBlueGreenStateHandler(
+            FlinkBlueGreenDeploymentState supportedState,
+            BlueGreenDeploymentService deploymentService) {
+        this.supportedState = supportedState;
+        this.deploymentService = deploymentService;
+    }
+
+    @Override
+    public FlinkBlueGreenDeploymentState getSupportedState() {
+        return supportedState;
+    }
+
+    protected Logger getLogger() {
+        return LOG;
+    }

Review Comment:
   if we need access just make the logger field protected



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to