gyfora commented on code in PR #968: URL: https://github.com/apache/flink-kubernetes-operator/pull/968#discussion_r2063647322
########## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java: ########## @@ -575,7 +602,7 @@ protected void resubmitJob(FlinkResourceContext<CR> ctx, boolean requireHaMetada && lastSavepointKnown) { specToRecover.getJob().setUpgradeMode(UpgradeMode.SAVEPOINT); } - restoreJob(ctx, specToRecover, ctx.getObserveConfig(), requireHaMetadata); + restoreJob(ctx, specToRecover, ctx.getObserveConfig(), requireHaMetadata, false); Review Comment: We could probably leverage the new flag here as well and not always set it to false during resubmit. Maybe when a lost deployment/unhealthy job is restarted it should be true as well. But we can follow up on a separate ticket ########## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java: ########## @@ -202,13 +203,38 @@ protected boolean reconcileSpecChange( currentDeploySpec, deployConfig, // We decide to enforce HA based on how job was previously suspended - lastReconciledSpec.getJob().getUpgradeMode() == UpgradeMode.LAST_STATE); + lastReconciledSpec.getJob().getUpgradeMode() == UpgradeMode.LAST_STATE, + lastReconciledSpec.getJob().getUpgradeMode() == UpgradeMode.LAST_STATE + && wasUpgradeForScalingOnly(ctx)); ReconciliationUtils.updateStatusForDeployedSpec(resource, deployConfig, clock); } return true; } + /** + * Determines whether the last upgrade was performed for scaling. + * + * @param ctx Reconciliation context. + * @return {@code true} if the upgrade was for scaling, {@code false} otherwise. + */ + protected boolean wasUpgradeForScalingOnly(FlinkResourceContext<CR> ctx) { + var resource = ctx.getResource(); + + STATUS status = resource.getStatus(); + SPEC lastReconciledSpec = status.getReconciliationStatus().deserializeLastReconciledSpec(); + SPEC lastStableSpec = status.getReconciliationStatus().deserializeLastStableSpec(); + if (lastStableSpec == null) { + return false; + } + lastReconciledSpec.getJob().setState(lastStableSpec.getJob().getState()); + var specDiff = + new ReflectiveDiffBuilder<>( + ctx.getDeploymentMode(), lastReconciledSpec, lastStableSpec) + .build(); + return specDiff.getType() == DiffType.SCALE; + } Review Comment: I think the logic here will not work. We cannot use the lastStableSpec to decide what triggered the last reconciliation as reconciliation (scaling/upgrade/etc) is never determined based on the lastStableSpec but always based on the lastReconciledSpec vs current spec. Currently the status may not have enough information to determine if this was a scale only change or not, we might have to record some extra metadata somewhere, we can sync offline. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org