chenyuzhi459 commented on code in PR #855:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/855#discussion_r1688138227


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java:
##########
@@ -289,6 +289,8 @@ public boolean 
reconcileOtherChanges(FlinkResourceContext<CR> ctx) throws Except
             LOG.info("Stopping failed Flink job...");
             cleanupAfterFailedJob(ctx);
             status.setError(null);
+            ReconciliationUtils.updateStatusForDeployedSpec(
+                    ctx.getResource(), 
ctx.getDeployConfig(ctx.getResource().getSpec()), clock);

Review Comment:
   Assume a flink deployment is submitted to the flink-kubernetes-operator for 
the first time with the following settings
   
   ```
   spec.job.upgradeMode=savepoint
   spec.job.initialSavepointPath=null
   spec.flinkConfiguration.execution.checkpointing.interval=60s
   ```
   
   Then I will share the startup and failover process of 
flink-kubernetes-operator based on my understanding:
   
   1. At the first reconcile,  in method 
[AbstractFlinkResourceReconciler.updateStatusBeforeFirstDeployment](https://github.com/apache/flink-kubernetes-operator/blob/29076c80eaac5547e3d12f703c43780cd4a52dad/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java#L224)
 , the `spec.job.upgradeMode` of cloned deployment will be set to STATELESS 
(this will not be updated synchronously to origin deployment in k8s, which 
means that the origin deployment's spec.job.upgradeMode is still savepoint)  
because `spec.job.initialSavepointPath` is empty,  and will be serialized into 
`status.reconciliationStatus.lastReconciledSpec` (this step will be 
synchronously updated to the origin deployment in k8s, I haven't studied why 
yet will happen)
   
   
   2. After running for a period of time, the deployment  may encounters a 
problem and exit with failed status. The operator will save the latest 
checkpoint in the `status.jobStatus.savepointInfo.lastSavepoint` of the origin 
deployment in  the method `SnapshotObserver.observeSavepointStatus`.
   
   3. Then in the method 
[AbstractJobReconciler.resubmitJob](https://github.com/apache/flink-kubernetes-operator/blob/29076c80eaac5547e3d12f703c43780cd4a52dad/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java#L315),
 the lastReconciledSpec of the origin deployment will be read as specToRecover 
variable and passed to the method   
[AbstractJobReconciler.restore](https://github.com/apache/flink-kubernetes-operator/blob/29076c80eaac5547e3d12f703c43780cd4a52dad/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java#L260).
 In the method `AbstractJobReconciler.restore`, it will determine whether to 
recover from lastSavepoint based on whether the `spec.job.upgradeMode` of  
specToRecover variable is STATELESS . Before fixed, the updateMode here is 
obviously STATELESS.
   
   
   
   Therefore, in the faiover scenes, I think just serializing the origin 
deployment's `spec.job.upgradeMode=SAVEPOINT` to ` 
status.reconciliationStatus.lastReconciledSpec` before resubmitJob can solve 
this problem.
   
   I don’t know if there is something wrong with my understanding. If so, I 
hope you can correct me. Thank you.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to