[ 
https://issues.apache.org/jira/browse/FLINK-5266?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15727375#comment-15727375
 ] 

Kurt Young commented on FLINK-5266:
-----------------------------------

Hi [~fhueske], i have checked your PR #2926, and it doesn't solve the case.
I used this query:
{code}
groupBy('name).select('acctbal.sum, 'name)
{code}

Ideally, we only need two fields from the source, and can immediately do 
project right after the scan node. Furthermore, we can push the project into 
the scan if the source supports it. Currently the AST and optimized plan looks 
like this:
{code}
== Abstract Syntax Tree ==
LogicalProject(TMP_1=[$1], name=[$0])
  LogicalAggregate(group=[{1}], TMP_0=[SUM($5)])
    LogicalTableScan(table=[[supplier]])

== Optimized Logical Plan ==
DataSetCalc(select=[TMP_0 AS TMP_1, name])
  DataSetAggregate(groupBy=[name], select=[name, SUM(acctbal) AS TMP_0])
    BatchTableSourceScan(table=[[supplier]])
{code}

Seems we need a optimize rule like "push project past aggregator" or "extract 
project from aggregate" if there is no projection. I think either of these is 
non-trivial to implement. But when i use sql query, it seems Calcite's parser 
has already extract the projection node in the first place. The query and plan 
looks like this:
{code}
select sum(acctbal), name from supplier group by name

== Abstract Syntax Tree ==
LogicalProject(EXPR$0=[$1], name=[$0])
  LogicalAggregate(group=[{0}], EXPR$0=[SUM($1)])
    LogicalProject(name=[$1], acctbal=[$5])
      LogicalTableScan(table=[[supplier]])

== Optimized Logical Plan ==
DataSetCalc(select=[EXPR$0, name])
  DataSetAggregate(groupBy=[name], select=[name, SUM(acctbal) AS EXPR$0])
    DataSetCalc(select=[name, acctbal])
      BatchTableSourceScan(table=[[supplier]])
{code}

I think we could also follow this approach by eagerly generating a projection 
operator when we selecting aggregation fields from table or GroupedTable.

I actually have almost finish the code, will open a PR soon.

> Eagerly project unused fields when selecting aggregation fields
> ---------------------------------------------------------------
>
>                 Key: FLINK-5266
>                 URL: https://issues.apache.org/jira/browse/FLINK-5266
>             Project: Flink
>          Issue Type: Improvement
>          Components: Table API & SQL
>            Reporter: Kurt Young
>            Assignee: Kurt Young
>
> When we call table's {{select}} method and if it contains some aggregations, 
> we will project fields after the aggregation. Would be better to project 
> unused fields before the aggregation, and can furthermore leave the 
> opportunity to push the project into scan.
> For example, the current logical plan of a simple query:
> {code}
> table.select('a.sum as 's, 'a.max)
> {code}
> is
> {code}
> LogicalProject(s=[$0], TMP_2=[$1])
>   LogicalAggregate(group=[{}], TMP_0=[SUM($5)], TMP_1=[MAX($5)])
>     LogicalTableScan(table=[[supplier]])
> {code}
> Would be better if we can project unused fields right after scan, and looks 
> like this:
> {code}
> LogicalProject(s=[$0], EXPR$1=[$0])
>   LogicalAggregate(group=[{}], EXPR$1=[SUM($0)])
>     LogicalProject(a=[$5])
>       LogicalTableScan(table=[[supplier]])
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to