[ https://issues.apache.org/jira/browse/FLINK-6435?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15994782#comment-15994782 ]
ASF GitHub Bot commented on FLINK-6435: --------------------------------------- GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/3814 [FLINK-6435] [async] React to exceptionally completed StreamElementQueueEntry The AsyncWaitOperator should not only react to orderly completed StreamElementQueueEntries but also to those completed with a user exception or those which timed out. This PR fixes the problem by calling the onComplete function passed to StreamElementQueueEntry#onComplete also in the exceptional case. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink fixAsyncWaitOperator Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3814.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3814 ---- commit 18ede824016e8714dd1f82f7ac4574eeeee85f70 Author: Till Rohrmann <trohrm...@apache.org> Date: 2017-05-03T12:40:46Z [FLINK-6435] [async] React to exceptionally completed StreamElementQueueEntry The AsyncWaitOperator should not only react to orderly completed StreamElementQueueEntries but also to those completed with a user exception or those which timed out. This PR fixes the problem by calling the onComplete function passed to StreamElementQueueEntry#onComplete also in the exceptional case. ---- > AsyncWaitOperator does not handle exceptions properly > ----------------------------------------------------- > > Key: FLINK-6435 > URL: https://issues.apache.org/jira/browse/FLINK-6435 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination > Affects Versions: 1.3.0 > Reporter: Till Rohrmann > Assignee: Till Rohrmann > > A user reported that the {{AsyncWaitOperator}} does not handle exceptions > properly. The following code snipped does not make the job fail. > {code} > public void test() throws Exception { > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > DataStream<Integer> withTimestamps = > env.fromCollection(Arrays.asList(1,2,3,4,5)); > AsyncDataStream.unorderedWait(withTimestamps, > (AsyncFunction<Integer, String>) (input, collector) -> { > if (input == 3){ > collector.collect(new RuntimeException("Test")); > return; > } > collector.collect(Collections.singleton("Ok")); > }, 10, TimeUnit.MILLISECONDS) > .returns(String.class) > .print(); > env.execute("unit-test"); > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)