[ 
https://issues.apache.org/jira/browse/FLINK-27519?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17533238#comment-17533238
 ] 

Yao Zhang commented on FLINK-27519:
-----------------------------------

Hi [~hackergin] and [~TsReaper],

I created a demo using Calcite HepPlanner and ProjectToWindowRule.
{code:java}
SchemaPlus schemaPlus = Frameworks.createRootSchema(true);

schemaPlus.add("T", new ReflectiveSchema(new TestSchema()));
Frameworks.ConfigBuilder configBuilder = Frameworks.newConfigBuilder();

configBuilder.defaultSchema(schemaPlus);

FrameworkConfig frameworkConfig = configBuilder.build();

SqlParser.ConfigBuilder parserConfig = 
SqlParser.configBuilder(frameworkConfig.getParserConfig());

parserConfig.setCaseSensitive(false).setConfig(parserConfig.build());

Planner planner = Frameworks.getPlanner(frameworkConfig);

SqlNode sqlNode;
RelRoot relRoot = null;
try {

    sqlNode = planner.parse(
        "SELECT *, count(distinct(\"z\".\"o\")) over (partition by \"z\".\"s\" 
order by \"z\".\"p\" desc) as uv" +
        "  FROM (" +
        "    SELECT *, count(*) over (partition by \"t\".\"s\", \"t\".\"o\" 
order by \"t\".\"p\" desc) as pv" +
        "    FROM \"T\".\"rdf\" \"t\" " +
        "  ) as \"z\""
    );

    planner.validate(sqlNode);

    relRoot = planner.rel(sqlNode);
} catch (Exception e) {
    e.printStackTrace();
}

RelNode relNode = relRoot.project();

System.out.print(RelOptUtil.toString(relNode));

HepProgram hepProgram = HepProgram.builder()
        .addRuleInstance(CoreRules.PROJECT_TO_LOGICAL_PROJECT_AND_WINDOW)
        .addMatchOrder(HepMatchOrder.BOTTOM_UP)
        .build();
HepPlanner hepPlanner = new HepPlanner(hepProgram);
hepPlanner.setRoot(relNode);
RelNode bestExp = hepPlanner.findBestExp();
System.out.println(RelOptUtil.toString(bestExp));
System.out.println(bestExp.getRowType().getFieldList());

{code}
The test schema is:
{code:java}
class Triple {
    public String s;
    public String p;
    public String o;

    public Triple(String s, String p, String o) {
        super();
        this.s = s;
        this.p = p;
        this.o = o;
    }
}
public static class TestSchema {
    public final Triple[] rdf = {new Triple("s", "p", "o")};
}
{code}
Then the field list of bestExp it printed was:
{code:java}
[#0: s JavaType(class java.lang.String), #1: p JavaType(class 
java.lang.String), #2: o JavaType(class java.lang.String), #3: PV BIGINT, #4: 
w0$o0 BIGINT]
{code}
The field names are not duplicated at all. Everything works well.

I guess the problem might be some rules that precedes ProjectToWindowRule 
transform the plan. We might need to check the rule set (especially some nested 
SQL transposition). I am not very faliliar with those rules.

Currenty as a workaround I think we can add the following inside class 
FlinkLogicalOverAggregate:
{code:scala}
  override def deriveRowType: RelDataType = {
    val typeFactory = cluster.getRexBuilder.getTypeFactory
    val typeBuilder = typeFactory.builder()
    input.getRowType.getFieldList.foreach(field => {
      if (typeBuilder.nameExists(field.getName)) {
        val newFieldName = RowTypeUtils.getUniqueName(field.getName, 
input.getRowType.getFieldNames)
        typeBuilder.add(
          new RelDataTypeFieldImpl(
            newFieldName,
            field.getIndex,
            field.getType))
      } else {
        typeBuilder.add(field)
      }
    })
    typeBuilder.build()
  }
{code}
Correct me if I am wrong.

> Fix duplicates names when there are multiple levels of over window aggregate
> ----------------------------------------------------------------------------
>
>                 Key: FLINK-27519
>                 URL: https://issues.apache.org/jira/browse/FLINK-27519
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 1.15.0
>            Reporter: jinfeng
>            Priority: Major
>
> A similar  issue like 
> [FLINK-22121|https://issues.apache.org/jira/browse/FLINK-22121]
> And can be reproduced by adding this unit test 
> org.apache.flink.table.planner.plan.stream.sql.agg.GroupWindowTest#testWindowAggregateWithAnotherWindowAggregate
> {code:java}
> //代码占位符
> @Test
> def testWindowAggregateWithAnotherWindowAggregate(): Unit = {
>   val sql =
>     """
>       |SELECT CAST(pv AS INT) AS pv, CAST(uv AS INT) AS uv FROM (
>       |  SELECT *, count(distinct(c)) over (partition by a order by b desc) 
> AS uv
>       |  FROM (
>       |    SELECT *, count(*) over (partition by a, c order by b desc) AS pv
>       |    FROM MyTable
>       |  )
>       |)
>       |""".stripMargin
>   util.verifyExecPlan(sql)
> } {code}
> The error message : 
>  
>  
> {code:java}
> //代码占位符
> org.apache.flink.table.api.ValidationException: Field names must be unique. 
> Found duplicates: [w0$o0]    at 
> org.apache.flink.table.types.logical.RowType.validateFields(RowType.java:273)
>     at org.apache.flink.table.types.logical.RowType.<init>(RowType.java:158)
>     at org.apache.flink.table.types.logical.RowType.of(RowType.java:298)
>     at org.apache.flink.table.types.logical.RowType.of(RowType.java:290)
>     at 
> org.apache.flink.table.planner.calcite.FlinkTypeFactory$.toLogicalRowType(FlinkTypeFactory.scala:663)
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalOverAggregate.translateToExecNode(StreamPhysicalOverAggregate.scala:57)
>     at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNodeGraphGenerator.generate(ExecNodeGraphGenerator.java:74)
>     at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNodeGraphGenerator.generate(ExecNodeGraphGenerator.java:71)
>  {code}
>  
> I think we can add come logical in  FlinkLogicalOverAggregate  to avoid 
> duplicate names of  output rowType. 
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to