chenyuzhi459 commented on code in PR #855: URL: https://github.com/apache/flink-kubernetes-operator/pull/855#discussion_r1688138227
########## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java: ########## @@ -289,6 +289,8 @@ public boolean reconcileOtherChanges(FlinkResourceContext<CR> ctx) throws Except LOG.info("Stopping failed Flink job..."); cleanupAfterFailedJob(ctx); status.setError(null); + ReconciliationUtils.updateStatusForDeployedSpec( + ctx.getResource(), ctx.getDeployConfig(ctx.getResource().getSpec()), clock); Review Comment: Assume a flink deployment is submitted to the flink-kubernetes-operator for the first time with the following settings ``` spec.job.upgradeMode=savepoint spec.job.initialSavepointPath=null spec.flinkConfiguration.execution.checkpointing.interval=60s ``` Then I will share the startup and failover process of flink-kubernetes-operator based on my understanding: 1. At the first reconcile, in method [AbstractFlinkResourceReconciler.updateStatusBeforeFirstDeployment](https://github.com/apache/flink-kubernetes-operator/blob/29076c80eaac5547e3d12f703c43780cd4a52dad/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java#L224) , the `spec.job.upgradeMode` of cloned deployment will be set to STATELESS (this will not be updated synchronously to origin deployment in k8s, which means that the origin deployment's spec.job.upgradeMode is still savepoint) because `spec.job.initialSavepointPath` is empty, and will be serialized into `status.reconciliationStatus.lastReconciledSpec` (this step will be synchronously updated to the origin deployment in k8s, I haven't studied why yet will happen) 2. After running for a period of time, the deployment may encounters a problem and exit with failed status. The operator will save the latest checkpoint in the `status.jobStatus.savepointInfo.lastSavepoint` of the origin deployment in the method `SnapshotObserver.observeSavepointStatus`. 3. Then in the method [AbstractJobReconciler.resubmitJob](https://github.com/apache/flink-kubernetes-operator/blob/29076c80eaac5547e3d12f703c43780cd4a52dad/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java#L315), the lastReconciledSpec of the origin deployment will be read as specToRecover variable and passed to the method [AbstractJobReconciler.restore](https://github.com/apache/flink-kubernetes-operator/blob/29076c80eaac5547e3d12f703c43780cd4a52dad/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java#L260). In the method `AbstractJobReconciler.restore`, it will determine whether to recover from lastSavepoint based on whether the `spec.job.upgradeMode` of specToRecover variable is STATELESS . Before fixed, the updateMode here is obviously STATELESS. Therefore, in the faiover scenes, I think just serializing the origin deployment's `spec.job.upgradeMode=SAVEPOINT` to ` status.reconciliationStatus.lastReconciledSpec` before resubmitJob can solve this problem. I don’t know if there is something wrong with my understanding. If so, I hope you can correct me. Thank you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org