yunfengzhou-hub commented on code in PR #20454: URL: https://github.com/apache/flink/pull/20454#discussion_r940930386
########## flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/RegularOperatorChain.java: ########## @@ -216,24 +248,64 @@ private OperatorSnapshotFutures buildOperatorSnapshotFutures( return snapshotInProgress; } - private static OperatorSnapshotFutures checkpointStreamOperator( + private OperatorSnapshotFutures checkpointStreamOperator( StreamOperator<?> op, CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointStreamFactory storageLocation, Supplier<Boolean> isRunning) throws Exception { try { - return op.snapshotState( - checkpointMetaData.getCheckpointId(), - checkpointMetaData.getTimestamp(), - checkpointOptions, - storageLocation); + if (operatorEventDispatcher.containsOperatorEventGateway(op.getOperatorID())) { + operatorEventDispatcher.snapshotOperatorEventGatewayState( + op.getOperatorID(), getOperatorStateBackend(op)); + } + + OperatorSnapshotFutures futures = + op.snapshotState( + checkpointMetaData.getCheckpointId(), + checkpointMetaData.getTimestamp(), + checkpointOptions, + storageLocation); + + if (operatorEventDispatcher.containsOperatorEventGateway(op.getOperatorID())) { + operatorEventDispatcher.notifyOperatorSnapshotStateCompleted( Review Comment: Although `OperatorSnapshotFutures` have not been completed when the method above returns, it is guaranteed that the states to be saved into the snapshot would not be changed by any following operations on the operator, thus it is safe to send ACK events from the operator to its OC and release the buffered events. ########## flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorEventDispatcherImpl.java: ########## @@ -66,6 +78,14 @@ void dispatchEventToHandlers( throw new FlinkException("Could not deserialize operator event", e); } + if (evt instanceof CloseGatewayEvent) { + OperatorEventGatewayImpl gateway = getOperatorEventGateway(operatorID); + gateway.sendEventToCoordinator( + new AcknowledgeCloseGatewayEvent((CloseGatewayEvent) evt)); Review Comment: According to our offline discussion, I'll update the closing gateway process, to close gateways before sending `AcknowledgeCloseGatewayEvent`, so as to make sure there will be no event reaching the OC after the `AcknowledgeCloseGatewayEvent`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org