[ https://issues.apache.org/jira/browse/FLINK-6435?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15998216#comment-15998216 ]
ASF GitHub Bot commented on FLINK-6435: --------------------------------------- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3814 > AsyncWaitOperator does not handle exceptions properly > ----------------------------------------------------- > > Key: FLINK-6435 > URL: https://issues.apache.org/jira/browse/FLINK-6435 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination > Affects Versions: 1.3.0 > Reporter: Till Rohrmann > Assignee: Till Rohrmann > Fix For: 1.3.0 > > > A user reported that the {{AsyncWaitOperator}} does not handle exceptions > properly. The following code snipped does not make the job fail. > {code} > public void test() throws Exception { > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > DataStream<Integer> withTimestamps = > env.fromCollection(Arrays.asList(1,2,3,4,5)); > AsyncDataStream.unorderedWait(withTimestamps, > (AsyncFunction<Integer, String>) (input, collector) -> { > if (input == 3){ > collector.collect(new RuntimeException("Test")); > return; > } > collector.collect(Collections.singleton("Ok")); > }, 10, TimeUnit.MILLISECONDS) > .returns(String.class) > .print(); > env.execute("unit-test"); > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)