Github user bjlovegithub commented on the issue: https://github.com/apache/flink/pull/2629 Hi @tillrohrmann , Thanks for your review ;D I will check through each of your comments and update the PR later. Coming to the first part of review, the first one is about `UNORDERED` mode against `Watermark`. This combination is meaningless, of course. Maybe an error can be printed out and the graph generator stops compiling the graph if `UNORDERED` mode and `Watermark` are enabled at the same time? Both of these two modes are guaranteed by `AsyncWaitOperator`. While doing checkpoint for the chained operator and making the snapshot for the `AsyncWaitOperator`, it will first try to get all elements in the `AsyncCollectorBuffer` by calling `getStreamElementsInBuffer()`, which will try to get the lock first to block `Emitter` thread and set a flag named `isCheckpointing` to idle `Emitter` thread. So any finished `AsyncCollector` will not be transferred to the next operator. Calling the `snapshotState()` method is from the head operator to the tail operator, making sure that all states can be taken correctly since `Emitter` threads in parent operators have stopped working. I used to consider about using checkpoint lock in `Emitter` thread, but after testing with the case chaining multiple `AsyncWaitOperator` together, all `Emitter` thread can not fully utilize the the parallelism since they have to get the same lock while collecting outputs. One way to optimize this is to put a conditional statement at `performCheckpoint()`, if there is an `AsyncWaitOpeartor` in the chained operator, then it should broadcast barriers later after `checkpointState()`, otherwise, we can use original design. At last, I will add more test cases based on the `OneInputStreamTaskTestHarness`.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---