[ 
https://issues.apache.org/jira/browse/FLINK-1986?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14538328#comment-14538328
 ] 

Stephan Ewen commented on FLINK-1986:
-------------------------------------

[~gyfora] and [~senorcarbone] Are you using co-location constraints, to make 
sure that head and tail of an iteration are co-located? Otherwise that is not 
guaranteed, but required by the backchannel broker.


> Group by fails on iterative data streams
> ----------------------------------------
>
>                 Key: FLINK-1986
>                 URL: https://issues.apache.org/jira/browse/FLINK-1986
>             Project: Flink
>          Issue Type: Bug
>          Components: Streaming
>            Reporter: Daniel Bali
>              Labels: iteration, streaming
>
> Hello!
> When I try to run a `groupBy` on an IterativeDataStream I get a 
> NullPointerException. Here is the code that reproduces the issue:
> {code}
> public Test() throws Exception {
>     StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.createLocalEnvironment();
>     DataStream<Tuple2<Long, Long>> edges = env
>             .generateSequence(0, 7)
>             .map(new MapFunction<Long, Tuple2<Long, Long>>() {
>                 @Override
>                 public Tuple2<Long, Long> map(Long v) throws Exception {
>                     return new Tuple2<>(v, (v + 1));
>                 }
>             });
>     IterativeDataStream<Tuple2<Long, Long>> iteration = edges.iterate();
>     SplitDataStream<Tuple2<Long, Long>> step = iteration.groupBy(1)
>             .map(new MapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>() {
>                 @Override
>                 public Tuple2<Long, Long> map(Tuple2<Long, Long> tuple) 
> throws Exception {
>                     return tuple;
>                 }
>             })
>             .split(new OutputSelector<Tuple2<Long, Long>>() {
>                 @Override
>                 public Iterable<String> select(Tuple2<Long, Long> tuple) {
>                     List<String> output = new ArrayList<>();
>                     output.add("iterate");
>                     return output;
>                 }
>             });
>     iteration.closeWith(step.select("iterate"));
>     env.execute("Sandbox");
> }
> {code}
> Moving the groupBy before the iteration solves the issue. e.g. this works:
> {code}
> ... iteration = edges.groupBy(1).iterate();
> iteration.map(...)
> {code}
> Here is the stack trace:
> {code}
> Exception in thread "main" java.lang.NullPointerException
>       at 
> org.apache.flink.streaming.api.graph.StreamGraph.addIterationTail(StreamGraph.java:207)
>       at 
> org.apache.flink.streaming.api.datastream.IterativeDataStream.closeWith(IterativeDataStream.java:72)
>       at org.apache.flink.graph.streaming.example.Test.<init>(Test.java:73)
>       at org.apache.flink.graph.streaming.example.Test.main(Test.java:79)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:601)
>       at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to