[ https://issues.apache.org/jira/browse/FLINK-23919?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17435750#comment-17435750 ]
JING ZHANG commented on FLINK-23919: ------------------------------------ [~Yuval.Itzchakov] I'm very sorry for late response. I would follow this JIRA soon and give you response later today. > PullUpWindowTableFunctionIntoWindowAggregateRule generates invalid Calc for > Window TVF > -------------------------------------------------------------------------------------- > > Key: FLINK-23919 > URL: https://issues.apache.org/jira/browse/FLINK-23919 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner > Affects Versions: 1.13.2 > Reporter: Yuval Itzchakov > Priority: Major > Attachments: image-2021-08-23-13-31-24-052.png > > > Given the following Window TVF: > {code:java} > SELECT window_time, > MIN(alert_timestamp) as start_time, > MAX(alert_timestamp) as end_time > FROM TABLE(TUMBLE(TABLE alert_table, DESCRIPTOR(alert_timestamp), INTERVAL > '3' MINUTE)) > WHERE service_source = 'source' > GROUP BY window_start, window_end, window_time > {code} > Where the schema of alert_table is: > {code:java} > alert_timestamp: TIMESTAMP(3) ROWTIME INDICATOR > service_source: VARCHAR{code} > The following generates an invalid RowType: > {code:java} > Error while applying rule PullUpWindowTableFunctionIntoWindowAggregateRule, > args [rel#358:StreamPhysicalWindowAggregate.STREAM_PHYSICAL.any.None: > 0.[NONE].[NONE](input=RelSubset#356,window=TUMBLE(win_start=[window_start], > win_end=[window_end], size=[3 min]),select=MIN(alert_timestamp) AS > start_time, MAX(alert_timestamp) AS end_time, start('w$) AS window_start, > end('w$) AS window_end, rowtime('w$) AS window_time), > rel#367:StreamPhysicalExchange.STREAM_PHYSICAL.single.None: > 0.[NONE].[NONE](input=RelSubset#355,distribution=single), > rel#354:StreamPhysicalCalc.STREAM_PHYSICAL.any.None: > 0.[NONE].[NONE](input=RelSubset#353,select=window_start, window_end, > window_time, CAST(alert_timestamp) AS alert_timestamp,where==(service_source, > _UTF-16LE'my source':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")), > rel#352:StreamPhysicalWindowTableFunction.STREAM_PHYSICAL.any.None: > 0.[NONE].[NONE](input=RelSubset#351,window=TUMBLE(time_col=[alert_timestamp], > size=[3 min]))]Error while applying rule > PullUpWindowTableFunctionIntoWindowAggregateRule, args > [rel#358:StreamPhysicalWindowAggregate.STREAM_PHYSICAL.any.None: > 0.[NONE].[NONE](input=RelSubset#356,window=TUMBLE(win_start=[window_start], > win_end=[window_end], size=[3 min]),select=MIN(alert_timestamp) AS > start_time, MAX(alert_timestamp) AS end_time, start('w$) AS window_start, > end('w$) AS window_end, rowtime('w$) AS window_time), > rel#367:StreamPhysicalExchange.STREAM_PHYSICAL.single.None: > 0.[NONE].[NONE](input=RelSubset#355,distribution=single), > rel#354:StreamPhysicalCalc.STREAM_PHYSICAL.any.None: > 0.[NONE].[NONE](input=RelSubset#353,select=window_start, window_end, > window_time, CAST(alert_timestamp) AS alert_timestamp,where==(service_source, > _UTF-16LE'Microsoft Defender for Identity':VARCHAR(2147483647) CHARACTER SET > "UTF-16LE")), > rel#352:StreamPhysicalWindowTableFunction.STREAM_PHYSICAL.any.None: > 0.[NONE].[NONE](input=RelSubset#351,window=TUMBLE(time_col=[alert_timestamp], > size=[3 min]))] at > org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:256) > at > org.apache.calcite.plan.volcano.IterativeRuleDriver.drive(IterativeRuleDriver.java:58) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:510) > at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:312) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:69) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:62) > at > scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196) at > scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194) at > scala.collection.Iterator.foreach(Iterator.scala:943) at > scala.collection.Iterator.foreach$(Iterator.scala:943) at > scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at > scala.collection.IterableLike.foreach(IterableLike.scala:74) at > scala.collection.IterableLike.foreach$(IterableLike.scala:73) at > scala.collection.AbstractIterable.foreach(Iterable.scala:56) at > scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199) at > scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192) at > scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108) at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:58) > at > org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163) > at > org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:83) > at > org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77) > at > org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:279) > at > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1518) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:740) > at > org.apache.flink.table.api.internal.StatementSetImpl.execute(StatementSetImpl.java:99) > > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) > at > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) > at java.base/java.lang.Thread.run(Thread.java:834)Caused by: > java.lang.RuntimeException: Error occurred while applying rule > PullUpWindowTableFunctionIntoWindowAggregateRule at > org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:161) > at > org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:268) > at > org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:283) > at > org.apache.flink.table.planner.plan.rules.physical.stream.PullUpWindowTableFunctionIntoWindowAggregateRule.onMatch(PullUpWindowTableFunctionIntoWindowAggregateRule.scala:143) > at > org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:229) > ... 31 moreCaused by: org.apache.flink.table.api.ValidationException: Field > names must be unique. Found duplicates: [alert_timestamp] at > org.apache.flink.table.types.logical.RowType.validateFields(RowType.java:272) > at org.apache.flink.table.types.logical.RowType.<init>(RowType.java:157) at > org.apache.flink.table.types.logical.RowType.of(RowType.java:297) at > org.apache.flink.table.types.logical.RowType.of(RowType.java:289) at > org.apache.flink.table.planner.calcite.FlinkTypeFactory$.toLogicalRowType(FlinkTypeFactory.scala:657) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalWindowAggregate.aggInfoList$lzycompute(StreamPhysicalWindowAggregate.scala:60) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalWindowAggregate.aggInfoList(StreamPhysicalWindowAggregate.scala:59) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalWindowAggregate.explainTerms(StreamPhysicalWindowAggregate.scala:86) > at > org.apache.calcite.rel.AbstractRelNode.getDigestItems(AbstractRelNode.java:409) > at > org.apache.calcite.rel.AbstractRelNode.deepHashCode(AbstractRelNode.java:391) > at > org.apache.calcite.rel.AbstractRelNode$InnerRelDigest.hashCode(AbstractRelNode.java:443) > at java.base/java.util.HashMap.hash(HashMap.java:339) at > java.base/java.util.HashMap.get(HashMap.java:552) at > org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1150) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:589) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:604) > at > org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:148) > {code} > Looking at the code, it seems that when > PullUpWindowTableFunctionIntoWindowAggregateRule is building the new Calc in > WindowUtil.buildNewProgramWithoutWindowColumns, it is adding the rowtime > column from the input row to the new calc without checking to see if there > are any name collisions. Also, TBH I'm not entirely sure yet why the rowtime > column of the input table is being added to the projected output row like > that? > !image-2021-08-23-13-31-24-052.png|width=887,height=163! > [~jark] would appreciate your help with this. -- This message was sent by Atlassian Jira (v8.3.4#803005)