rkhachatryan commented on a change in pull request #17774: URL: https://github.com/apache/flink/pull/17774#discussion_r767792864
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java ########## @@ -1084,6 +1083,15 @@ public boolean receiveAcknowledgeMessage( final PendingCheckpoint checkpoint = pendingCheckpoints.get(checkpointId); + if (message.getSubtaskState() != null) { + // Register shared state regardless of checkpoint state and task ACK state. + // This way, shared state is + // 1. kept if the message is late or state will be used by the task otherwise + // 2. removed eventually upon checkpoint subsumption (or job cancellation) + message.getSubtaskState() + .registerSharedStates(completedCheckpointStore.getRegistry(), checkpointId); + } + Review comment: No, it isn't. For consistency, most of the changes would be bundled together (except for refactorings and RocksDB) because they all change some part of a bigger mechanism (at least I don't see a way to split them). However, I hope that separating these changes eases the review and will also simplify tracking the changes later (unit tests still pass for every commit though). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org