[ https://issues.apache.org/jira/browse/FLINK-4391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15635019#comment-15635019 ]
ASF GitHub Bot commented on FLINK-4391: --------------------------------------- Github user bjlovegithub commented on the issue: https://github.com/apache/flink/pull/2629 Hi @tillrohrmann , Thanks for your review ;D I will check through each of your comments and update the PR later. Coming to the first part of review, the first one is about `UNORDERED` mode against `Watermark`. This combination is meaningless, of course. Maybe an error can be printed out and the graph generator stops compiling the graph if `UNORDERED` mode and `Watermark` are enabled at the same time? Both of these two modes are guaranteed by `AsyncWaitOperator`. While doing checkpoint for the chained operator and making the snapshot for the `AsyncWaitOperator`, it will first try to get all elements in the `AsyncCollectorBuffer` by calling `getStreamElementsInBuffer()`, which will try to get the lock first to block `Emitter` thread and set a flag named `isCheckpointing` to idle `Emitter` thread. So any finished `AsyncCollector` will not be transferred to the next operator. Calling the `snapshotState()` method is from the head operator to the tail operator, making sure that all states can be taken correctly since `Emitter` threads in parent operators have stopped working. I used to consider about using checkpoint lock in `Emitter` thread, but after testing with the case chaining multiple `AsyncWaitOperator` together, all `Emitter` thread can not fully utilize the the parallelism since they have to get the same lock while collecting outputs. One way to optimize this is to put a conditional statement at `performCheckpoint()`, if there is an `AsyncWaitOpeartor` in the chained operator, then it should broadcast barriers later after `checkpointState()`, otherwise, we can use original design. At last, I will add more test cases based on the `OneInputStreamTaskTestHarness`. > Provide support for asynchronous operations over streams > -------------------------------------------------------- > > Key: FLINK-4391 > URL: https://issues.apache.org/jira/browse/FLINK-4391 > Project: Flink > Issue Type: New Feature > Components: DataStream API > Reporter: Jamie Grier > Assignee: david.wang > > Many Flink users need to do asynchronous processing driven by data from a > DataStream. The classic example would be joining against an external > database in order to enrich a stream with extra information. > It would be nice to add general support for this type of operation in the > Flink API. Ideally this could simply take the form of a new operator that > manages async operations, keeps so many of them in flight, and then emits > results to downstream operators as the async operations complete. -- This message was sent by Atlassian JIRA (v6.3.4#6332)