[ https://issues.apache.org/jira/browse/FLINK-4391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15683300#comment-15683300 ]
ASF GitHub Bot commented on FLINK-4391: --------------------------------------- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2629#discussion_r88720498 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java --- @@ -116,7 +116,12 @@ public OperatorChain(StreamTask<OUT, OP> containingTask) { // add head operator to end of chain allOps.add(headOperator); - + + // reverse the order of all operators so that head operator is at the first place. + // for chained operator with async wait operator, operators after wait operator have to + // wait for while until all data in the buffer in wait operator has done snapshot. + Collections.reverse(allOps); --- End diff -- This won't work with emitting elements in the open method of `AsyncWaitOperator`, because then the downstream operators are potentially not yet opened when the first stream element arrives there. > Provide support for asynchronous operations over streams > -------------------------------------------------------- > > Key: FLINK-4391 > URL: https://issues.apache.org/jira/browse/FLINK-4391 > Project: Flink > Issue Type: New Feature > Components: DataStream API > Reporter: Jamie Grier > Assignee: david.wang > > Many Flink users need to do asynchronous processing driven by data from a > DataStream. The classic example would be joining against an external > database in order to enrich a stream with extra information. > It would be nice to add general support for this type of operation in the > Flink API. Ideally this could simply take the form of a new operator that > manages async operations, keeps so many of them in flight, and then emits > results to downstream operators as the async operations complete. -- This message was sent by Atlassian JIRA (v6.3.4#6332)