gyfora commented on code in PR #968:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/968#discussion_r2063647322


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java:
##########
@@ -575,7 +602,7 @@ protected void resubmitJob(FlinkResourceContext<CR> ctx, 
boolean requireHaMetada
                 && lastSavepointKnown) {
             specToRecover.getJob().setUpgradeMode(UpgradeMode.SAVEPOINT);
         }
-        restoreJob(ctx, specToRecover, ctx.getObserveConfig(), 
requireHaMetadata);
+        restoreJob(ctx, specToRecover, ctx.getObserveConfig(), 
requireHaMetadata, false);

Review Comment:
   We could probably leverage the new flag here as well and not always set it 
to false during resubmit. Maybe when a lost deployment/unhealthy job is 
restarted it should be true as well. But we can follow up on a separate ticket



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java:
##########
@@ -202,13 +203,38 @@ protected boolean reconcileSpecChange(
                     currentDeploySpec,
                     deployConfig,
                     // We decide to enforce HA based on how job was previously 
suspended
-                    lastReconciledSpec.getJob().getUpgradeMode() == 
UpgradeMode.LAST_STATE);
+                    lastReconciledSpec.getJob().getUpgradeMode() == 
UpgradeMode.LAST_STATE,
+                    lastReconciledSpec.getJob().getUpgradeMode() == 
UpgradeMode.LAST_STATE
+                            && wasUpgradeForScalingOnly(ctx));
 
             ReconciliationUtils.updateStatusForDeployedSpec(resource, 
deployConfig, clock);
         }
         return true;
     }
 
+    /**
+     * Determines whether the last upgrade was performed for scaling.
+     *
+     * @param ctx Reconciliation context.
+     * @return {@code true} if the upgrade was for scaling, {@code false} 
otherwise.
+     */
+    protected boolean wasUpgradeForScalingOnly(FlinkResourceContext<CR> ctx) {
+        var resource = ctx.getResource();
+
+        STATUS status = resource.getStatus();
+        SPEC lastReconciledSpec = 
status.getReconciliationStatus().deserializeLastReconciledSpec();
+        SPEC lastStableSpec = 
status.getReconciliationStatus().deserializeLastStableSpec();
+        if (lastStableSpec == null) {
+            return false;
+        }
+        
lastReconciledSpec.getJob().setState(lastStableSpec.getJob().getState());
+        var specDiff =
+                new ReflectiveDiffBuilder<>(
+                                ctx.getDeploymentMode(), lastReconciledSpec, 
lastStableSpec)
+                        .build();
+        return specDiff.getType() == DiffType.SCALE;
+    }

Review Comment:
   I think the logic here will not work. We cannot use the lastStableSpec to 
decide what triggered the last reconciliation as reconciliation 
(scaling/upgrade/etc) is never determined based on the lastStableSpec but 
always based on the lastReconciledSpec vs current spec. 
   
   Currently the status may not have enough information to determine if this 
was a scale only change or not, we might have to record some extra metadata 
somewhere, we can sync offline.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to