yunfengzhou-hub commented on code in PR #20454:
URL: https://github.com/apache/flink/pull/20454#discussion_r940930386


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/RegularOperatorChain.java:
##########
@@ -216,24 +248,64 @@ private OperatorSnapshotFutures 
buildOperatorSnapshotFutures(
         return snapshotInProgress;
     }
 
-    private static OperatorSnapshotFutures checkpointStreamOperator(
+    private OperatorSnapshotFutures checkpointStreamOperator(
             StreamOperator<?> op,
             CheckpointMetaData checkpointMetaData,
             CheckpointOptions checkpointOptions,
             CheckpointStreamFactory storageLocation,
             Supplier<Boolean> isRunning)
             throws Exception {
         try {
-            return op.snapshotState(
-                    checkpointMetaData.getCheckpointId(),
-                    checkpointMetaData.getTimestamp(),
-                    checkpointOptions,
-                    storageLocation);
+            if 
(operatorEventDispatcher.containsOperatorEventGateway(op.getOperatorID())) {
+                operatorEventDispatcher.snapshotOperatorEventGatewayState(
+                        op.getOperatorID(), getOperatorStateBackend(op));
+            }
+
+            OperatorSnapshotFutures futures =
+                    op.snapshotState(
+                            checkpointMetaData.getCheckpointId(),
+                            checkpointMetaData.getTimestamp(),
+                            checkpointOptions,
+                            storageLocation);
+
+            if 
(operatorEventDispatcher.containsOperatorEventGateway(op.getOperatorID())) {
+                operatorEventDispatcher.notifyOperatorSnapshotStateCompleted(

Review Comment:
   Although `OperatorSnapshotFutures` have not been completed when the method 
above returns, it is guaranteed that the states to be saved into the snapshot 
would not be changed by any following operations on the operator, thus it is 
safe to send ACK events from the operator to its OC and release the buffered 
events.



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorEventDispatcherImpl.java:
##########
@@ -66,6 +78,14 @@ void dispatchEventToHandlers(
             throw new FlinkException("Could not deserialize operator event", 
e);
         }
 
+        if (evt instanceof CloseGatewayEvent) {
+            OperatorEventGatewayImpl gateway = 
getOperatorEventGateway(operatorID);
+            gateway.sendEventToCoordinator(
+                    new AcknowledgeCloseGatewayEvent((CloseGatewayEvent) evt));

Review Comment:
   According to our offline discussion, I'll update the closing gateway 
process, to close gateways before sending `AcknowledgeCloseGatewayEvent`, so as 
to make sure there will be no event reaching the OC after the 
`AcknowledgeCloseGatewayEvent`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to