tweise commented on code in PR #356: URL: https://github.com/apache/flink-kubernetes-operator/pull/356#discussion_r959557420
########## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java: ########## @@ -98,12 +99,39 @@ protected Optional<UpgradeMode> getAvailableUpgradeMode( .OPERATOR_JOB_UPGRADE_LAST_STATE_FALLBACK_ENABLED) && FlinkUtils.isKubernetesHAActivated(deployConfig) && FlinkUtils.isKubernetesHAActivated(observeConfig) - && flinkService.isHaMetadataAvailable(deployConfig) && !flinkVersionChanged( ReconciliationUtils.getDeployedSpec(deployment), deployment.getSpec())) { - LOG.info( - "Job is not running but HA metadata is available for last state restore, ready for upgrade"); - return Optional.of(UpgradeMode.LAST_STATE); + + if (!flinkService.isHaMetadataAvailable(deployConfig)) { + if (deployment.getStatus().getReconciliationStatus().getLastStableSpec() == null) { + // initial deployment failure, reset to allow for spec change to proceed + flinkService.deleteClusterDeployment( + deployment.getMetadata(), deployment.getStatus(), false); + flinkService.waitForClusterShutdown(deployConfig); + // in case the deployment succeeded between check and delete, fall through to + // the upgrade path + if (!flinkService.isHaMetadataAvailable(deployConfig)) { Review Comment: Indeed, and that also made it easier to move the logic to a separate method. ########## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java: ########## @@ -98,12 +99,39 @@ protected Optional<UpgradeMode> getAvailableUpgradeMode( .OPERATOR_JOB_UPGRADE_LAST_STATE_FALLBACK_ENABLED) && FlinkUtils.isKubernetesHAActivated(deployConfig) && FlinkUtils.isKubernetesHAActivated(observeConfig) - && flinkService.isHaMetadataAvailable(deployConfig) && !flinkVersionChanged( ReconciliationUtils.getDeployedSpec(deployment), deployment.getSpec())) { - LOG.info( - "Job is not running but HA metadata is available for last state restore, ready for upgrade"); - return Optional.of(UpgradeMode.LAST_STATE); + + if (!flinkService.isHaMetadataAvailable(deployConfig)) { + if (deployment.getStatus().getReconciliationStatus().getLastStableSpec() == null) { Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org