lindong28 commented on code in PR #20454: URL: https://github.com/apache/flink/pull/20454#discussion_r941444916
########## flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/RegularOperatorChain.java: ########## @@ -217,24 +248,64 @@ private OperatorSnapshotFutures buildOperatorSnapshotFutures( return snapshotInProgress; } - private static OperatorSnapshotFutures checkpointStreamOperator( + private OperatorSnapshotFutures checkpointStreamOperator( StreamOperator<?> op, CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointStreamFactory storageLocation, Supplier<Boolean> isRunning) throws Exception { try { - return op.snapshotState( - checkpointMetaData.getCheckpointId(), - checkpointMetaData.getTimestamp(), - checkpointOptions, - storageLocation); + if (operatorEventDispatcher.containsOperatorEventGateway(op.getOperatorID())) { + operatorEventDispatcher.snapshotOperatorEventGatewayState( + op.getOperatorID(), getOperatorStateBackend(op)); + } + + OperatorSnapshotFutures futures = + op.snapshotState( + checkpointMetaData.getCheckpointId(), + checkpointMetaData.getTimestamp(), + checkpointOptions, + storageLocation); + + if (operatorEventDispatcher.containsOperatorEventGateway(op.getOperatorID())) { + operatorEventDispatcher.notifyOperatorSnapshotStateCompleted( + op.getOperatorID(), + checkpointMetaData.getCheckpointId(), + getSubtaskIndex(op)); + } + + return futures; } catch (Exception ex) { if (isRunning.get()) { LOG.info(ex.getMessage(), ex); } throw ex; } } + + private int getSubtaskIndex(StreamOperator<?> operator) { + int index = -1; + if (operator instanceof AbstractStreamOperator) { + index = + ((AbstractStreamOperator<?>) operator) + .getRuntimeContext() + .getIndexOfThisSubtask(); + } else if (operator instanceof AbstractStreamOperatorV2) { + index = + ((AbstractStreamOperatorV2<?>) operator) + .getRuntimeContext() + .getIndexOfThisSubtask(); + } + + if (index < 0) { Review Comment: It seems simpler to use `else {...}` here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org