[ https://issues.apache.org/jira/browse/FLINK-27519?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17533238#comment-17533238 ]
Yao Zhang commented on FLINK-27519: ----------------------------------- Hi [~hackergin] and [~TsReaper], I created a demo using Calcite HepPlanner and ProjectToWindowRule. {code:java} SchemaPlus schemaPlus = Frameworks.createRootSchema(true); schemaPlus.add("T", new ReflectiveSchema(new TestSchema())); Frameworks.ConfigBuilder configBuilder = Frameworks.newConfigBuilder(); configBuilder.defaultSchema(schemaPlus); FrameworkConfig frameworkConfig = configBuilder.build(); SqlParser.ConfigBuilder parserConfig = SqlParser.configBuilder(frameworkConfig.getParserConfig()); parserConfig.setCaseSensitive(false).setConfig(parserConfig.build()); Planner planner = Frameworks.getPlanner(frameworkConfig); SqlNode sqlNode; RelRoot relRoot = null; try { sqlNode = planner.parse( "SELECT *, count(distinct(\"z\".\"o\")) over (partition by \"z\".\"s\" order by \"z\".\"p\" desc) as uv" + " FROM (" + " SELECT *, count(*) over (partition by \"t\".\"s\", \"t\".\"o\" order by \"t\".\"p\" desc) as pv" + " FROM \"T\".\"rdf\" \"t\" " + " ) as \"z\"" ); planner.validate(sqlNode); relRoot = planner.rel(sqlNode); } catch (Exception e) { e.printStackTrace(); } RelNode relNode = relRoot.project(); System.out.print(RelOptUtil.toString(relNode)); HepProgram hepProgram = HepProgram.builder() .addRuleInstance(CoreRules.PROJECT_TO_LOGICAL_PROJECT_AND_WINDOW) .addMatchOrder(HepMatchOrder.BOTTOM_UP) .build(); HepPlanner hepPlanner = new HepPlanner(hepProgram); hepPlanner.setRoot(relNode); RelNode bestExp = hepPlanner.findBestExp(); System.out.println(RelOptUtil.toString(bestExp)); System.out.println(bestExp.getRowType().getFieldList()); {code} The test schema is: {code:java} class Triple { public String s; public String p; public String o; public Triple(String s, String p, String o) { super(); this.s = s; this.p = p; this.o = o; } } public static class TestSchema { public final Triple[] rdf = {new Triple("s", "p", "o")}; } {code} Then the field list of bestExp it printed was: {code:java} [#0: s JavaType(class java.lang.String), #1: p JavaType(class java.lang.String), #2: o JavaType(class java.lang.String), #3: PV BIGINT, #4: w0$o0 BIGINT] {code} The field names are not duplicated at all. Everything works well. I guess the problem might be some rules that precedes ProjectToWindowRule transform the plan. We might need to check the rule set (especially some nested SQL transposition). I am not very faliliar with those rules. Currenty as a workaround I think we can add the following inside class FlinkLogicalOverAggregate: {code:scala} override def deriveRowType: RelDataType = { val typeFactory = cluster.getRexBuilder.getTypeFactory val typeBuilder = typeFactory.builder() input.getRowType.getFieldList.foreach(field => { if (typeBuilder.nameExists(field.getName)) { val newFieldName = RowTypeUtils.getUniqueName(field.getName, input.getRowType.getFieldNames) typeBuilder.add( new RelDataTypeFieldImpl( newFieldName, field.getIndex, field.getType)) } else { typeBuilder.add(field) } }) typeBuilder.build() } {code} Correct me if I am wrong. > Fix duplicates names when there are multiple levels of over window aggregate > ---------------------------------------------------------------------------- > > Key: FLINK-27519 > URL: https://issues.apache.org/jira/browse/FLINK-27519 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner > Affects Versions: 1.15.0 > Reporter: jinfeng > Priority: Major > > A similar issue like > [FLINK-22121|https://issues.apache.org/jira/browse/FLINK-22121] > And can be reproduced by adding this unit test > org.apache.flink.table.planner.plan.stream.sql.agg.GroupWindowTest#testWindowAggregateWithAnotherWindowAggregate > {code:java} > //代码占位符 > @Test > def testWindowAggregateWithAnotherWindowAggregate(): Unit = { > val sql = > """ > |SELECT CAST(pv AS INT) AS pv, CAST(uv AS INT) AS uv FROM ( > | SELECT *, count(distinct(c)) over (partition by a order by b desc) > AS uv > | FROM ( > | SELECT *, count(*) over (partition by a, c order by b desc) AS pv > | FROM MyTable > | ) > |) > |""".stripMargin > util.verifyExecPlan(sql) > } {code} > The error message : > > > {code:java} > //代码占位符 > org.apache.flink.table.api.ValidationException: Field names must be unique. > Found duplicates: [w0$o0] at > org.apache.flink.table.types.logical.RowType.validateFields(RowType.java:273) > at org.apache.flink.table.types.logical.RowType.<init>(RowType.java:158) > at org.apache.flink.table.types.logical.RowType.of(RowType.java:298) > at org.apache.flink.table.types.logical.RowType.of(RowType.java:290) > at > org.apache.flink.table.planner.calcite.FlinkTypeFactory$.toLogicalRowType(FlinkTypeFactory.scala:663) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalOverAggregate.translateToExecNode(StreamPhysicalOverAggregate.scala:57) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNodeGraphGenerator.generate(ExecNodeGraphGenerator.java:74) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNodeGraphGenerator.generate(ExecNodeGraphGenerator.java:71) > {code} > > I think we can add come logical in FlinkLogicalOverAggregate to avoid > duplicate names of output rowType. > > > -- This message was sent by Atlassian Jira (v8.20.7#820007)