lindong28 commented on code in PR #20454:
URL: https://github.com/apache/flink/pull/20454#discussion_r941444916


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/RegularOperatorChain.java:
##########
@@ -217,24 +248,64 @@ private OperatorSnapshotFutures 
buildOperatorSnapshotFutures(
         return snapshotInProgress;
     }
 
-    private static OperatorSnapshotFutures checkpointStreamOperator(
+    private OperatorSnapshotFutures checkpointStreamOperator(
             StreamOperator<?> op,
             CheckpointMetaData checkpointMetaData,
             CheckpointOptions checkpointOptions,
             CheckpointStreamFactory storageLocation,
             Supplier<Boolean> isRunning)
             throws Exception {
         try {
-            return op.snapshotState(
-                    checkpointMetaData.getCheckpointId(),
-                    checkpointMetaData.getTimestamp(),
-                    checkpointOptions,
-                    storageLocation);
+            if 
(operatorEventDispatcher.containsOperatorEventGateway(op.getOperatorID())) {
+                operatorEventDispatcher.snapshotOperatorEventGatewayState(
+                        op.getOperatorID(), getOperatorStateBackend(op));
+            }
+
+            OperatorSnapshotFutures futures =
+                    op.snapshotState(
+                            checkpointMetaData.getCheckpointId(),
+                            checkpointMetaData.getTimestamp(),
+                            checkpointOptions,
+                            storageLocation);
+
+            if 
(operatorEventDispatcher.containsOperatorEventGateway(op.getOperatorID())) {
+                operatorEventDispatcher.notifyOperatorSnapshotStateCompleted(
+                        op.getOperatorID(),
+                        checkpointMetaData.getCheckpointId(),
+                        getSubtaskIndex(op));
+            }
+
+            return futures;
         } catch (Exception ex) {
             if (isRunning.get()) {
                 LOG.info(ex.getMessage(), ex);
             }
             throw ex;
         }
     }
+
+    private int getSubtaskIndex(StreamOperator<?> operator) {
+        int index = -1;
+        if (operator instanceof AbstractStreamOperator) {
+            index =
+                    ((AbstractStreamOperator<?>) operator)
+                            .getRuntimeContext()
+                            .getIndexOfThisSubtask();
+        } else if (operator instanceof AbstractStreamOperatorV2) {
+            index =
+                    ((AbstractStreamOperatorV2<?>) operator)
+                            .getRuntimeContext()
+                            .getIndexOfThisSubtask();
+        }
+
+        if (index < 0) {

Review Comment:
   It seems simpler to use `else {...}` here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to