Hi Fabian, I commented on the issue and attached the program reproducing the bug, But I couldn't find how to re-open it (I think maybe I don't have enough permissions?).
Best, Yassine 2016-10-25 12:49 GMT+02:00 Fabian Hueske <fhue...@gmail.com>: > Hi Yassine, > > I thought I had fixed that bug a few weeks a ago, but apparently the fix > did not catch all cases. > Can you please reopen FLINK-2662 and post the program to reproduce the bug > there? > > Thanks, > Fabian > > [1] https://issues.apache.org/jira/browse/FLINK-2662 > > 2016-10-25 12:33 GMT+02:00 Yassine MARZOUGUI <y.marzou...@mindlytix.com>: > >> Hi all, >> >> My job fails with the folowing exception : CompilerException: Bug: Plan >> generation for Unions picked a ship strategy between binary plan operators. >> The exception happens when adding partitionByRange(1).sortPartition(1, >> Order.DESCENDING) to the union of datasets. >> >> I made a smaller version that reproduces the bug : >> >> import org.apache.flink.api.common.functions.GroupReduceFunction; >> import org.apache.flink.api.common.operators.Order; >> import org.apache.flink.api.java.DataSet; >> import org.apache.flink.api.java.ExecutionEnvironment; >> import org.apache.flink.api.java.tuple.Tuple2; >> import org.apache.flink.util.Collector; >> import java.util.Iterator; >> >> public class BugReproduce { >> public static void main(String[] args) throws Exception { >> final ExecutionEnvironment env = ExecutionEnvironment.getExecut >> ionEnvironment(); >> DataSet<WC> wc1 = env.fromElements(new WC("first",1), new >> WC("second",2),new WC("first",1),new WC("first",1),new WC("second",2)); >> DataSet<WC> wc2 = env.fromElements(new WC("third",1), new >> WC("forth",2),new WC("forth",1),new WC("forth",1),new WC("third",2)); >> DataSet<WC> wc3 = env.fromElements(new WC("fifth",1), new >> WC("fifth",2),new WC("fifth",1),new WC("fifth",1),new WC("fifth",2)); >> >> DataSet<Tuple2<String,Integer>> aggregatedwc1 = aggregateWC(wc1); >> DataSet<Tuple2<String,Integer>> aggregatedwc2 = aggregateWC(wc2); >> DataSet<Tuple2<String,Integer>> aggregatedwc3 = aggregateWC(wc3); >> DataSet<Tuple2<String,Integer>> all = >> aggregatedwc1.union(aggregatedwc2).union(aggregatedwc3); >> all.partitionByRange(1).sortPartition(1, >> Order.DESCENDING).print(); >> >> } >> >> public static DataSet<Tuple2<String,Integer>> >> aggregateWC(DataSet<WC> input){ >> return input.groupBy("word").reduceGroup(new >> GroupReduceFunction<WC, Tuple2<String, Integer>>() { >> @Override >> public void reduce(Iterable<WC> iterable, >> Collector<Tuple2<String, Integer>> collector) throws Exception { >> Integer count = 0; >> Iterator<WC> iterator = iterable.iterator(); >> if (iterator.hasNext()) { >> String word= iterator.next().word; >> while (iterator.hasNext()) { >> iterator.next(); >> count += 1; >> } >> collector.collect(Tuple2.of(word,count)); >> } >> } >> }); >> } >> >> public static class WC { >> public String word; >> public int count; >> >> public WC() { >> } >> >> public WC(String word, int count) { >> this.word = word; >> this.count = count; >> } >> >> public String getWord() { >> return word; >> } >> >> public void setWord(String word) { >> this.word = word; >> } >> >> public int getCount() { >> return count; >> } >> >> public void setCount(int count) { >> this.count = count; >> } >> } >> } >> >> Here is the exception stacktrace: >> >> Exception in thread "main" org.apache.flink.optimizer.CompilerException: >> Bug: Plan generation for Unions picked a ship strategy between binary plan >> operators. >> at org.apache.flink.optimizer.traversals.BinaryUnionReplacer. >> collect(BinaryUnionReplacer.java:113) >> at org.apache.flink.optimizer.traversals.BinaryUnionReplacer. >> postVisit(BinaryUnionReplacer.java:72) >> at org.apache.flink.optimizer.traversals.BinaryUnionReplacer. >> postVisit(BinaryUnionReplacer.java:41) >> at org.apache.flink.optimizer.plan.DualInputPlanNode.accept(Dua >> lInputPlanNode.java:170) >> at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept( >> SingleInputPlanNode.java:199) >> at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept( >> SingleInputPlanNode.java:199) >> at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept( >> SingleInputPlanNode.java:199) >> at org.apache.flink.optimizer.plan.OptimizedPlan.accept(Optimiz >> edPlan.java:128) >> at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:516) >> at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:398) >> at org.apache.flink.client.LocalExecutor.executePlan(LocalExecu >> tor.java:185) >> at org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvi >> ronment.java:91) >> at org.apache.flink.api.java.ExecutionEnvironment.execute(Execu >> tionEnvironment.java:896) >> at org.apache.flink.api.java.DataSet.collect(DataSet.java:410) >> at org.apache.flink.api.java.DataSet.print(DataSet.java:1605) >> at org.myorg.prod.BugReproduce.main(BugReproduce.java:28) >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce >> ssorImpl.java:62) >> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe >> thodAccessorImpl.java:43) >> at java.lang.reflect.Method.invoke(Method.java:498) >> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147) >> >> I'm using Flink v1.1.3. Any help is appreciated. Thank you. >> >> Best, >> Yassine >> > >