[ 
https://issues.apache.org/jira/browse/FLINK-5025?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15643752#comment-15643752
 ] 

Niels Basjes commented on FLINK-5025:
-------------------------------------

See here for a minimal project that does this: 
https://github.com/nielsbasjes/Reproduce-FLINK-5025
NOTE: This code is Apache 2.0 licensed so feel free to copy this into the 
project as a unit test or documentation.

> Job fails because of Optimizer bug
> ----------------------------------
>
>                 Key: FLINK-5025
>                 URL: https://issues.apache.org/jira/browse/FLINK-5025
>             Project: Flink
>          Issue Type: Bug
>    Affects Versions: 1.1.3
>            Reporter: Niels Basjes
>
> I have a batch job that when I run it I get the error message:
> {code}
> org.apache.flink.optimizer.CompilerException: Bug: Plan generation for Unions 
> picked a ship strategy between binary plan operators.
>       at 
> org.apache.flink.optimizer.traversals.BinaryUnionReplacer.collect(BinaryUnionReplacer.java:113)
>       at 
> org.apache.flink.optimizer.traversals.BinaryUnionReplacer.postVisit(BinaryUnionReplacer.java:72)
>       at 
> org.apache.flink.optimizer.traversals.BinaryUnionReplacer.postVisit(BinaryUnionReplacer.java:41)
>       at 
> org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:170)
>       at 
> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>       at 
> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>       at 
> org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:128)
>       at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:516)
>       at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:398)
>       at 
> org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:185)
>       at 
> org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:91)
>       at com.bol.reproduce.flink.Main.run(Main.java:42)
>       at com.bol.reproduce.flink.Main.main(Main.java:21)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:498)
>       at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
> {code}
> The smallest code snippet I have been able to create that reproduces this 
> problem is below here.
> Note that when using a single union this error does not happen.
> {code}
> public class Main implements Serializable {
>   public static void main(String[] args) throws Exception {
>     System.exit(new Main().run());
>   }
>   private int run() throws IOException {
>     final ExecutionEnvironment env = 
> ExecutionEnvironment.getExecutionEnvironment();
>     final DataSet<String> lines =
>                env.createInput(new TextInputFormat(new 
> Path("/tmp/doesNotExist")))
>         .union(env.createInput(new TextInputFormat(new 
> Path("/tmp/doesNotExist"))))
>         .union(env.createInput(new TextInputFormat(new 
> Path("/tmp/doesNotExist"))));
>     List<String> allLines = new ArrayList<>();
>     lines
>       .rebalance()
>       .output(new LocalCollectionOutputFormat<>(allLines));
>     // execute program
>     try {
>       env.execute("Running");
>     } catch (Exception e) {
>       e.printStackTrace();
>     }
>     return 0;
>   }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to