[ https://issues.apache.org/jira/browse/FLINK-20660?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
sujun updated FLINK-20660: -------------------------- Description: {{Time window operator with computed column triggers an exception in batch mode.}} {{my test code:}} {code:java} public class WindowAggWithBigintTest { public static void main(String[] args) throws Exception { EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(); TableEnvironment tEnv = TableEnvironment.create(settings); tEnv.registerFunction("longToTimestamp",new LongToTimestamp()); String ddl = "CREATE TABLE source(occur_time bigint,rowtime AS longToTimestamp(occur_time)) WITH ('connector' = 'filesystem', 'format' = 'orc', 'path' = '/path/to/orc')"; tEnv.executeSql(ddl); Table table = tEnv.sqlQuery("select TUMBLE_START(rowtime, INTERVAL '1' HOUR) as ts,count(1) as ct from source group by TUMBLE(rowtime, INTERVAL '1' HOUR)"); DiscardingOutputFormat<String> outputFormat = new DiscardingOutputFormat(); TableResultSink tableResultSink = new TableResultSink(table.getSchema(), outputFormat); tEnv.registerTableSink("sink",tableResultSink); table.insertInto("sink"); tEnv.execute("test"); } private static class TableResultSink implements StreamTableSink<String> { private final TableSchema schema; private final DataType rowType; private final OutputFormat<String> outputFormat; TableResultSink(TableSchema schema, OutputFormat<String> outputFormat) { this.schema = schema; this.rowType = schema.toRowDataType(); this.outputFormat = outputFormat; } @Override public DataType getConsumedDataType() { return rowType; } @Override public TableSchema getTableSchema() { return schema; } @Override public TableSink<String> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) { throw new UnsupportedOperationException( "This sink is configured by passing a static schema when initiating"); } @Override public DataStreamSink<?> consumeDataStream(DataStream<String> dataStream) { return dataStream.writeUsingOutputFormat(outputFormat).setParallelism(1).name("tableResult"); } } } {code} Exception: {code:java} Exception in thread "main" java.lang.RuntimeException: Error while applying rule BatchExecWindowAggregateRule, args [rel#264:FlinkLogicalWindowAggregate.LOGICAL.any.[](input=RelSubset#263,group={},ct=COUNT(),window=TumblingGroupWindow('w$, $f0, 3600000),properties=w$start, w$end, w$rowtime), rel#250:FlinkLogicalLegacyTableSourceScan.LOGICAL.any.[](table=[default_catalog, default_database, source, source: [FileSystemTableSource(occur_time, rowtime)]],fields=occur_time, rowtime)] at org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:244) at org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:636) at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:327) at org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57) at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.optimizeTree(BatchCommonSubGraphBasedOptimizer.scala:86) at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.org$apache$flink$table$planner$plan$optimize$BatchCommonSubGraphBasedOptimizer$$optimizeBlock(BatchCommonSubGraphBasedOptimizer.scala:57) at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer$$anonfun$doOptimize$1.apply(BatchCommonSubGraphBasedOptimizer.scala:45) at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer$$anonfun$doOptimize$1.apply(BatchCommonSubGraphBasedOptimizer.scala:45) at scala.collection.immutable.List.foreach(List.scala:381) at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.doOptimize(BatchCommonSubGraphBasedOptimizer.scala:45) at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77) at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:279) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:164) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1248) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translateAndClearBuffer(TableEnvironmentImpl.java:1240) at org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1197) at example.WindowAggWithBigintTest.main(WindowAggWithBigintTest.java:34) Caused by: java.lang.IllegalArgumentException: field [$f0] not found; input fields are: [occur_time] at org.apache.calcite.tools.RelBuilder.field(RelBuilder.java:402) at org.apache.calcite.tools.RelBuilder.field(RelBuilder.java:385) at org.apache.flink.table.planner.plan.utils.AggregateUtil$.timeFieldIndex(AggregateUtil.scala:720) at org.apache.flink.table.planner.plan.rules.physical.batch.BatchExecWindowAggregateRule.transformTimeSlidingWindow(BatchExecWindowAggregateRule.scala:161) at org.apache.flink.table.planner.plan.rules.physical.batch.BatchExecWindowAggregateRule.onMatch(BatchExecWindowAggregateRule.scala:111) at org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:217) ... 27 more {code} was: {{Time window operator with computed column triggers an exception in batch mode.}} {{my test code:}} {code:java} public class WindowAggWithBigintTest { public static void main(String[] args) throws Exception { EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(); TableEnvironment tEnv = TableEnvironment.create(settings); tEnv.registerFunction("longToTimestamp",new LongToTimestamp()); String ddl = "CREATE TABLE source(occur_time bigint,rowtime AS longToTimestamp(occur_time)) WITH ('connector' = 'filesystem', 'format' = 'orc', 'path' = '/path/to/orc')"; tEnv.executeSql(ddl); Table table = tEnv.sqlQuery("select TUMBLE_START(rowtime, INTERVAL '1' HOUR) as ts,count(1) as ct from source group by TUMBLE(rowtime, INTERVAL '1' HOUR)"); DiscardingOutputFormat<String> outputFormat = new DiscardingOutputFormat(); TableResultSink tableResultSink = new TableResultSink(table.getSchema(), outputFormat); tEnv.registerTableSink("sink",tableResultSink); table.insertInto("sink"); tEnv.execute("test"); } private static class TableResultSink implements StreamTableSink<String> { private final TableSchema schema; private final DataType rowType; private final OutputFormat<String> outputFormat; TableResultSink(TableSchema schema, OutputFormat<String> outputFormat) { this.schema = schema; this.rowType = schema.toRowDataType(); this.outputFormat = outputFormat; } @Override public DataType getConsumedDataType() { return rowType; } @Override public TableSchema getTableSchema() { return schema; } @Override public TableSink<String> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) { throw new UnsupportedOperationException( "This sink is configured by passing a static schema when initiating"); } @Override public DataStreamSink<?> consumeDataStream(DataStream<String> dataStream) { return dataStream.writeUsingOutputFormat(outputFormat).setParallelism(1).name("tableResult"); } } } {code} Exception: {code:java} Exception in thread "main" java.lang.RuntimeException: Error while applying rule BatchExecWindowAggregateRule, args [rel#264:FlinkLogicalWindowAggregate.LOGICAL.any.[](input=RelSubset#263,group={},ct=COUNT(),window=TumblingGroupWindow('w$, $f0, 3600000),properties=w$start, w$end, w$rowtime), rel#250:FlinkLogicalLegacyTableSourceScan.LOGICAL.any.[](table=[default_catalog, default_database, source, source: [FileSystemTableSource(occur_time, rowtime)]],fields=occur_time, rowtime)] at org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:244) at org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:636) at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:327) at org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57) at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.optimizeTree(BatchCommonSubGraphBasedOptimizer.scala:86) at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.org$apache$flink$table$planner$plan$optimize$BatchCommonSubGraphBasedOptimizer$$optimizeBlock(BatchCommonSubGraphBasedOptimizer.scala:57) at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer$$anonfun$doOptimize$1.apply(BatchCommonSubGraphBasedOptimizer.scala:45) at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer$$anonfun$doOptimize$1.apply(BatchCommonSubGraphBasedOptimizer.scala:45) at scala.collection.immutable.List.foreach(List.scala:381) at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.doOptimize(BatchCommonSubGraphBasedOptimizer.scala:45) at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77) at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:279) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:164) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1248) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translateAndClearBuffer(TableEnvironmentImpl.java:1240) at org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1197) at example.WindowAggWithBigintTest.main(WindowAggWithBigintTest.java:34) Caused by: java.lang.IllegalArgumentException: field [$f0] not found; input fields are: [occur_time] at org.apache.calcite.tools.RelBuilder.field(RelBuilder.java:402) at org.apache.calcite.tools.RelBuilder.field(RelBuilder.java:385) at org.apache.flink.table.planner.plan.utils.AggregateUtil$.timeFieldIndex(AggregateUtil.scala:720) at org.apache.flink.table.planner.plan.rules.physical.batch.BatchExecWindowAggregateRule.transformTimeSlidingWindow(BatchExecWindowAggregateRule.scala:161) at org.apache.flink.table.planner.plan.rules.physical.batch.BatchExecWindowAggregateRule.onMatch(BatchExecWindowAggregateRule.scala:111) at org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:217) ... 27 more {code} > Time window operator with computed column triggers an exception in batch mode > ----------------------------------------------------------------------------- > > Key: FLINK-20660 > URL: https://issues.apache.org/jira/browse/FLINK-20660 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner > Affects Versions: 1.11.0 > Reporter: sujun > Priority: Minor > > {{Time window operator with computed column triggers an exception in batch > mode.}} > {{my test code:}} > {code:java} > public class WindowAggWithBigintTest { > public static void main(String[] args) throws Exception { > EnvironmentSettings settings = > EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(); > TableEnvironment tEnv = TableEnvironment.create(settings); > > tEnv.registerFunction("longToTimestamp",new LongToTimestamp()); > > String ddl = "CREATE TABLE source(occur_time bigint,rowtime AS > longToTimestamp(occur_time)) WITH ('connector' = 'filesystem', 'format' = > 'orc', 'path' = '/path/to/orc')"; > > tEnv.executeSql(ddl); > > Table table = tEnv.sqlQuery("select TUMBLE_START(rowtime, > INTERVAL '1' HOUR) as ts,count(1) as ct from source group by TUMBLE(rowtime, > INTERVAL '1' HOUR)"); > > DiscardingOutputFormat<String> outputFormat = new > DiscardingOutputFormat(); > TableResultSink tableResultSink = new > TableResultSink(table.getSchema(), outputFormat); > tEnv.registerTableSink("sink",tableResultSink); > table.insertInto("sink"); > tEnv.execute("test"); > } > > private static class TableResultSink implements StreamTableSink<String> > { > private final TableSchema schema; > private final DataType rowType; > private final OutputFormat<String> outputFormat; > > TableResultSink(TableSchema schema, OutputFormat<String> > outputFormat) { > this.schema = schema; > this.rowType = schema.toRowDataType(); > this.outputFormat = outputFormat; > } > @Override > public DataType getConsumedDataType() { > return rowType; > } > @Override > public TableSchema getTableSchema() { > return schema; > } > @Override > public TableSink<String> configure(String[] fieldNames, > TypeInformation<?>[] fieldTypes) { > throw new UnsupportedOperationException( > "This sink is configured by passing a static > schema when initiating"); > } > @Override > public DataStreamSink<?> consumeDataStream(DataStream<String> > dataStream) { > return > dataStream.writeUsingOutputFormat(outputFormat).setParallelism(1).name("tableResult"); > } > } > } > {code} > > Exception: > > {code:java} > Exception in thread "main" java.lang.RuntimeException: Error while applying > rule BatchExecWindowAggregateRule, args > [rel#264:FlinkLogicalWindowAggregate.LOGICAL.any.[](input=RelSubset#263,group={},ct=COUNT(),window=TumblingGroupWindow('w$, > $f0, 3600000),properties=w$start, w$end, w$rowtime), > rel#250:FlinkLogicalLegacyTableSourceScan.LOGICAL.any.[](table=[default_catalog, > default_database, source, source: [FileSystemTableSource(occur_time, > rowtime)]],fields=occur_time, rowtime)] > at > org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:244) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:636) > at > org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:327) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58) > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) > at scala.collection.Iterator$class.foreach(Iterator.scala:893) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at > scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) > at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57) > at > org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.optimizeTree(BatchCommonSubGraphBasedOptimizer.scala:86) > at > org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.org$apache$flink$table$planner$plan$optimize$BatchCommonSubGraphBasedOptimizer$$optimizeBlock(BatchCommonSubGraphBasedOptimizer.scala:57) > at > org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer$$anonfun$doOptimize$1.apply(BatchCommonSubGraphBasedOptimizer.scala:45) > at > org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer$$anonfun$doOptimize$1.apply(BatchCommonSubGraphBasedOptimizer.scala:45) > at scala.collection.immutable.List.foreach(List.scala:381) > at > org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.doOptimize(BatchCommonSubGraphBasedOptimizer.scala:45) > at > org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77) > at > org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:279) > at > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:164) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1248) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.translateAndClearBuffer(TableEnvironmentImpl.java:1240) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1197) > at example.WindowAggWithBigintTest.main(WindowAggWithBigintTest.java:34) > Caused by: java.lang.IllegalArgumentException: field [$f0] not found; input > fields are: [occur_time] > at org.apache.calcite.tools.RelBuilder.field(RelBuilder.java:402) > at org.apache.calcite.tools.RelBuilder.field(RelBuilder.java:385) > at > org.apache.flink.table.planner.plan.utils.AggregateUtil$.timeFieldIndex(AggregateUtil.scala:720) > at > org.apache.flink.table.planner.plan.rules.physical.batch.BatchExecWindowAggregateRule.transformTimeSlidingWindow(BatchExecWindowAggregateRule.scala:161) > at > org.apache.flink.table.planner.plan.rules.physical.batch.BatchExecWindowAggregateRule.onMatch(BatchExecWindowAggregateRule.scala:111) > at > org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:217) > ... 27 more > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)