Hi Yassine,

I thought I had fixed that bug a few weeks a ago, but apparently the fix
did not catch all cases.
Can you please reopen FLINK-2662 and post the program to reproduce the bug
there?

Thanks,
Fabian

[1] https://issues.apache.org/jira/browse/FLINK-2662

2016-10-25 12:33 GMT+02:00 Yassine MARZOUGUI <y.marzou...@mindlytix.com>:

> Hi all,
>
> My job fails with the folowing exception : CompilerException: Bug: Plan
> generation for Unions picked a ship strategy between binary plan operators.
> The exception happens when adding partitionByRange(1).sortPartition(1,
> Order.DESCENDING) to the union of datasets.
>
> I made a smaller version that reproduces the bug :
>
> import org.apache.flink.api.common.functions.GroupReduceFunction;
> import org.apache.flink.api.common.operators.Order;
> import org.apache.flink.api.java.DataSet;
> import org.apache.flink.api.java.ExecutionEnvironment;
> import org.apache.flink.api.java.tuple.Tuple2;
> import org.apache.flink.util.Collector;
> import java.util.Iterator;
>
> public class BugReproduce {
>     public static void main(String[] args) throws Exception {
>         final ExecutionEnvironment env = ExecutionEnvironment.
> getExecutionEnvironment();
>         DataSet<WC> wc1 = env.fromElements(new WC("first",1), new
> WC("second",2),new WC("first",1),new WC("first",1),new WC("second",2));
>         DataSet<WC> wc2 = env.fromElements(new WC("third",1), new
> WC("forth",2),new WC("forth",1),new WC("forth",1),new WC("third",2));
>         DataSet<WC> wc3 = env.fromElements(new WC("fifth",1), new
> WC("fifth",2),new WC("fifth",1),new WC("fifth",1),new WC("fifth",2));
>
>         DataSet<Tuple2<String,Integer>> aggregatedwc1 = aggregateWC(wc1);
>         DataSet<Tuple2<String,Integer>> aggregatedwc2 = aggregateWC(wc2);
>         DataSet<Tuple2<String,Integer>> aggregatedwc3 = aggregateWC(wc3);
>         DataSet<Tuple2<String,Integer>> all = aggregatedwc1.union(
> aggregatedwc2).union(aggregatedwc3);
>         all.partitionByRange(1).sortPartition(1,
> Order.DESCENDING).print();
>
>     }
>
>     public static DataSet<Tuple2<String,Integer>> aggregateWC(DataSet<WC>
> input){
>         return input.groupBy("word").reduceGroup(new
> GroupReduceFunction<WC, Tuple2<String, Integer>>() {
>             @Override
>             public void reduce(Iterable<WC> iterable,
> Collector<Tuple2<String, Integer>> collector) throws Exception {
>                 Integer count = 0;
>                 Iterator<WC> iterator = iterable.iterator();
>                 if (iterator.hasNext()) {
>                     String word= iterator.next().word;
>                     while (iterator.hasNext()) {
>                         iterator.next();
>                         count += 1;
>                     }
>                     collector.collect(Tuple2.of(word,count));
>                 }
>             }
>         });
>     }
>
>     public static class WC {
>         public String word;
>         public int count;
>
>         public WC() {
>         }
>
>         public WC(String word, int count) {
>             this.word = word;
>             this.count = count;
>         }
>
>         public String getWord() {
>             return word;
>         }
>
>         public void setWord(String word) {
>             this.word = word;
>         }
>
>         public int getCount() {
>             return count;
>         }
>
>         public void setCount(int count) {
>             this.count = count;
>         }
>     }
> }
>
> Here is the exception stacktrace:
>
> Exception in thread "main" org.apache.flink.optimizer.CompilerException:
> Bug: Plan generation for Unions picked a ship strategy between binary plan
> operators.
> at org.apache.flink.optimizer.traversals.BinaryUnionReplacer.collect(
> BinaryUnionReplacer.java:113)
> at org.apache.flink.optimizer.traversals.BinaryUnionReplacer.postVisit(
> BinaryUnionReplacer.java:72)
> at org.apache.flink.optimizer.traversals.BinaryUnionReplacer.postVisit(
> BinaryUnionReplacer.java:41)
> at org.apache.flink.optimizer.plan.DualInputPlanNode.accept(
> DualInputPlanNode.java:170)
> at org.apache.flink.optimizer.plan.SingleInputPlanNode.
> accept(SingleInputPlanNode.java:199)
> at org.apache.flink.optimizer.plan.SingleInputPlanNode.
> accept(SingleInputPlanNode.java:199)
> at org.apache.flink.optimizer.plan.SingleInputPlanNode.
> accept(SingleInputPlanNode.java:199)
> at org.apache.flink.optimizer.plan.OptimizedPlan.accept(
> OptimizedPlan.java:128)
> at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:516)
> at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:398)
> at org.apache.flink.client.LocalExecutor.executePlan(
> LocalExecutor.java:185)
> at org.apache.flink.api.java.LocalEnvironment.execute(
> LocalEnvironment.java:91)
> at org.apache.flink.api.java.ExecutionEnvironment.execute(
> ExecutionEnvironment.java:896)
> at org.apache.flink.api.java.DataSet.collect(DataSet.java:410)
> at org.apache.flink.api.java.DataSet.print(DataSet.java:1605)
> at org.myorg.prod.BugReproduce.main(BugReproduce.java:28)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at sun.reflect.NativeMethodAccessorImpl.invoke(
> NativeMethodAccessorImpl.java:62)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
>
> I'm using Flink v1.1.3. Any help is appreciated. Thank you.
>
> Best,
> Yassine
>

Reply via email to