Hi Yassine, I thought I had fixed that bug a few weeks a ago, but apparently the fix did not catch all cases. Can you please reopen FLINK-2662 and post the program to reproduce the bug there?
Thanks, Fabian [1] https://issues.apache.org/jira/browse/FLINK-2662 2016-10-25 12:33 GMT+02:00 Yassine MARZOUGUI <y.marzou...@mindlytix.com>: > Hi all, > > My job fails with the folowing exception : CompilerException: Bug: Plan > generation for Unions picked a ship strategy between binary plan operators. > The exception happens when adding partitionByRange(1).sortPartition(1, > Order.DESCENDING) to the union of datasets. > > I made a smaller version that reproduces the bug : > > import org.apache.flink.api.common.functions.GroupReduceFunction; > import org.apache.flink.api.common.operators.Order; > import org.apache.flink.api.java.DataSet; > import org.apache.flink.api.java.ExecutionEnvironment; > import org.apache.flink.api.java.tuple.Tuple2; > import org.apache.flink.util.Collector; > import java.util.Iterator; > > public class BugReproduce { > public static void main(String[] args) throws Exception { > final ExecutionEnvironment env = ExecutionEnvironment. > getExecutionEnvironment(); > DataSet<WC> wc1 = env.fromElements(new WC("first",1), new > WC("second",2),new WC("first",1),new WC("first",1),new WC("second",2)); > DataSet<WC> wc2 = env.fromElements(new WC("third",1), new > WC("forth",2),new WC("forth",1),new WC("forth",1),new WC("third",2)); > DataSet<WC> wc3 = env.fromElements(new WC("fifth",1), new > WC("fifth",2),new WC("fifth",1),new WC("fifth",1),new WC("fifth",2)); > > DataSet<Tuple2<String,Integer>> aggregatedwc1 = aggregateWC(wc1); > DataSet<Tuple2<String,Integer>> aggregatedwc2 = aggregateWC(wc2); > DataSet<Tuple2<String,Integer>> aggregatedwc3 = aggregateWC(wc3); > DataSet<Tuple2<String,Integer>> all = aggregatedwc1.union( > aggregatedwc2).union(aggregatedwc3); > all.partitionByRange(1).sortPartition(1, > Order.DESCENDING).print(); > > } > > public static DataSet<Tuple2<String,Integer>> aggregateWC(DataSet<WC> > input){ > return input.groupBy("word").reduceGroup(new > GroupReduceFunction<WC, Tuple2<String, Integer>>() { > @Override > public void reduce(Iterable<WC> iterable, > Collector<Tuple2<String, Integer>> collector) throws Exception { > Integer count = 0; > Iterator<WC> iterator = iterable.iterator(); > if (iterator.hasNext()) { > String word= iterator.next().word; > while (iterator.hasNext()) { > iterator.next(); > count += 1; > } > collector.collect(Tuple2.of(word,count)); > } > } > }); > } > > public static class WC { > public String word; > public int count; > > public WC() { > } > > public WC(String word, int count) { > this.word = word; > this.count = count; > } > > public String getWord() { > return word; > } > > public void setWord(String word) { > this.word = word; > } > > public int getCount() { > return count; > } > > public void setCount(int count) { > this.count = count; > } > } > } > > Here is the exception stacktrace: > > Exception in thread "main" org.apache.flink.optimizer.CompilerException: > Bug: Plan generation for Unions picked a ship strategy between binary plan > operators. > at org.apache.flink.optimizer.traversals.BinaryUnionReplacer.collect( > BinaryUnionReplacer.java:113) > at org.apache.flink.optimizer.traversals.BinaryUnionReplacer.postVisit( > BinaryUnionReplacer.java:72) > at org.apache.flink.optimizer.traversals.BinaryUnionReplacer.postVisit( > BinaryUnionReplacer.java:41) > at org.apache.flink.optimizer.plan.DualInputPlanNode.accept( > DualInputPlanNode.java:170) > at org.apache.flink.optimizer.plan.SingleInputPlanNode. > accept(SingleInputPlanNode.java:199) > at org.apache.flink.optimizer.plan.SingleInputPlanNode. > accept(SingleInputPlanNode.java:199) > at org.apache.flink.optimizer.plan.SingleInputPlanNode. > accept(SingleInputPlanNode.java:199) > at org.apache.flink.optimizer.plan.OptimizedPlan.accept( > OptimizedPlan.java:128) > at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:516) > at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:398) > at org.apache.flink.client.LocalExecutor.executePlan( > LocalExecutor.java:185) > at org.apache.flink.api.java.LocalEnvironment.execute( > LocalEnvironment.java:91) > at org.apache.flink.api.java.ExecutionEnvironment.execute( > ExecutionEnvironment.java:896) > at org.apache.flink.api.java.DataSet.collect(DataSet.java:410) > at org.apache.flink.api.java.DataSet.print(DataSet.java:1605) > at org.myorg.prod.BugReproduce.main(BugReproduce.java:28) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at sun.reflect.NativeMethodAccessorImpl.invoke( > NativeMethodAccessorImpl.java:62) > at sun.reflect.DelegatingMethodAccessorImpl.invoke( > DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147) > > I'm using Flink v1.1.3. Any help is appreciated. Thank you. > > Best, > Yassine >