Niels Basjes created FLINK-5025: ----------------------------------- Summary: Job fails because of Optimizer bug Key: FLINK-5025 URL: https://issues.apache.org/jira/browse/FLINK-5025 Project: Flink Issue Type: Bug Affects Versions: 1.1.3 Reporter: Niels Basjes
I have a batch job that when I run it I get the error message: {code} org.apache.flink.optimizer.CompilerException: Bug: Plan generation for Unions picked a ship strategy between binary plan operators. at org.apache.flink.optimizer.traversals.BinaryUnionReplacer.collect(BinaryUnionReplacer.java:113) at org.apache.flink.optimizer.traversals.BinaryUnionReplacer.postVisit(BinaryUnionReplacer.java:72) at org.apache.flink.optimizer.traversals.BinaryUnionReplacer.postVisit(BinaryUnionReplacer.java:41) at org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:170) at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199) at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199) at org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:128) at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:516) at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:398) at org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:185) at org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:91) at com.bol.reproduce.flink.Main.run(Main.java:42) at com.bol.reproduce.flink.Main.main(Main.java:21) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147) {code} The smallest code snippet I have been able to create that reproduces this problem is below here. Note that when using a single union this error does not happen. {code} public class Main implements Serializable { public static void main(String[] args) throws Exception { System.exit(new Main().run()); } private int run() throws IOException { final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); final DataSet<String> lines = env.createInput(new TextInputFormat(new Path("/tmp/doesNotExist"))) .union(env.createInput(new TextInputFormat(new Path("/tmp/doesNotExist")))) .union(env.createInput(new TextInputFormat(new Path("/tmp/doesNotExist")))); List<String> allLines = new ArrayList<>(); lines .rebalance() .output(new LocalCollectionOutputFormat<>(allLines)); // execute program try { env.execute("Running"); } catch (Exception e) { e.printStackTrace(); } return 0; } } {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)