[ 
https://issues.apache.org/jira/browse/FLINK-32940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17758824#comment-17758824
 ] 

Yunhong Zheng commented on FLINK-32940:
---------------------------------------

Hi, [~vsowrirajan] . I think your idea is good, but the biggest problem 
currently is how to pass the column cropping condition of 
LogicalTableFunctionScan to LogicalTableScan and rewrite 
LogicalTableFunctionScan. So I think we need to add a rule in project_rewrite 
stage. 
 # Actually, first we need add  rule CoreRules.ProjectCorrelateTransposeRule in 
FlinkBatchRuleSets to push project into LogicalCorrelate.
 # And we need add a rule in project_rewrite stage to pass by this project into 
LogicalTableScan side and rewrite LogicalTableFunctionScan.
 # For this npe problem, you can add if logical to avoid it.

Adding one example to explain step: 2

for this ddl

 
{code:java}
String ddl =
        "CREATE TABLE NestedItemTable1 (\n"
                + "  `deptno` INT,\n"
                + "  `employees` MAP<varchar, varchar>\n"
                + ") WITH (\n"
                + " 'connector' = 'values',\n"
                + " 'nested-projection-supported' = 'true',"
                + " 'bounded' = 'true'\n"
                + ")";
util.tableEnv().executeSql(ddl);

util.verifyRelPlan(
        "SELECT t1.deptno, k FROM NestedItemTable1 t1, UNNEST(t1.employees) as 
f(k, v)");{code}
 

we will get the below plan after add CoreRules.ProjectCorrelateTransposeRule:

 
{code:java}
optimize project_rewrite cost 413675 ms.
optimize result: 
LogicalProject(inputs=[0], exprs=[[$2]])
+- LogicalCorrelate(correlation=[$cor1], joinType=[inner], 
requiredColumns=[{1}])
   :- LogicalTableScan(table=[[default_catalog, default_database, 
NestedItemTable1]])
   +- LogicalProject(inputs=[0])
      +- LogicalTableFunctionScan(invocation=[$UNNEST_ROWS$1($cor0.employees)], 
rowType=[RecordType:peek_no_expand(VARCHAR(2147483647) f0, VARCHAR(2147483647) 
f1)]){code}
 

I think for this pattern, we need add a new rule to match this.:

 
{code:java}
+- LogicalCorrelate
   :- LogicalTableScan
   +- LogicalProject
      +- LogicalTableFunctionScan{code}
 

In this rule, we first need to create a new LogicalTableFunctionScan after 
merge LogicalProject and LogicalTableFunctionScan.

second, we need add a new LogicalProject for LogicalTableScan, which will be 
push down to LogicalTableScan in logical stage.

IMO, the new plan after match this rule will be (just an example, not correct 
plan):

 
{code:java}
LogicalProject(inputs=[0], exprs=[[$2]])
+- LogicalCorrelate(correlation=[$cor1], joinType=[inner], 
requiredColumns=[{1}])
   :- LogicalProject(inputs=[employees.k])    
      +- LogicalTableScan(table=[[default_catalog, default_database, 
NestedItemTable1]])
   :- LogicalTableFunctionScan(invocation=[$UNNEST_ROWS$1($cor0.employees.k)], 
rowType=[RecordType:peek_no_expand(VARCHAR(2147483647) f0)]){code}
 

WDYT?  [~vsowrirajan]. Once the solution is determined and u complete the 
development, you can ping me to review it.

> Support projection pushdown to table source for column projections through 
> UDTF
> -------------------------------------------------------------------------------
>
>                 Key: FLINK-32940
>                 URL: https://issues.apache.org/jira/browse/FLINK-32940
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>            Reporter: Venkata krishnan Sowrirajan
>            Priority: Major
>
> Currently, Flink doesn't push down columns projected through UDTF like 
> _UNNEST_ to the table source.
> For eg:
> {code:java}
> SELECT t1.deptno, t2.ename FROM db.dept_nested t1, UNNEST(t1.employees) AS 
> t2{code}
> For the above SQL, Flink projects all the columns for DEPT_NESTED rather than 
> only _name_ and {_}employees{_}. If the table source supports nested fields 
> column projection, ideally it should project only _t1.employees.ename_ from 
> the table source.
> Query plan:
> {code:java}
> == Abstract Syntax Tree ==
> LogicalProject(deptno=[$0], ename=[$5])
> +- LogicalCorrelate(correlation=[$cor1], joinType=[inner], 
> requiredColumns=[{3}])
>    :- LogicalTableScan(table=[[hive_catalog, db, dept_nested]])
>    +- Uncollect
>       +- LogicalProject(employees=[$cor1.employees])
>          +- LogicalValues(tuples=[[{ 0 }]]){code}
> {code:java}
> == Optimized Physical Plan ==
> Calc(select=[deptno, ename])
> +- Correlate(invocation=[$UNNEST_ROWS$1($cor1.employees)], 
> correlate=[table($UNNEST_ROWS$1($cor1.employees))], 
> select=[deptno,name,skillrecord,employees,empno,ename,skills], 
> rowType=[RecordType(BIGINT deptno, VARCHAR(2147483647) name, 
> RecordType:peek_no_expand(VARCHAR(2147483647) skilltype, VARCHAR(2147483647) 
> desc, RecordType:peek_no_expand(VARCHAR(2147483647) a, VARCHAR(2147483647) b) 
> others) skillrecord, RecordType:peek_no_expand(BIGINT empno, 
> VARCHAR(2147483647) ename, RecordType:peek_no_expand(VARCHAR(2147483647) 
> type, VARCHAR(2147483647) desc, RecordType:peek_no_expand(VARCHAR(2147483647) 
> a, VARCHAR(2147483647) b) others) ARRAY skills) ARRAY employees, BIGINT 
> empno, VARCHAR(2147483647) ename, 
> RecordType:peek_no_expand(VARCHAR(2147483647) type, VARCHAR(2147483647) desc, 
> RecordType:peek_no_expand(VARCHAR(2147483647) a, VARCHAR(2147483647) b) 
> others) ARRAY skills)], joinType=[INNER])
>    +- TableSourceScan(table=[[hive_catalog, db, dept_nested]], 
> fields=[deptno, name, skillrecord, employees]){code}
> {code:java}
> == Optimized Execution Plan ==
> Calc(select=[deptno, ename])
> +- Correlate(invocation=[$UNNEST_ROWS$1($cor1.employees)], 
> correlate=[table($UNNEST_ROWS$1($cor1.employees))], 
> select=[deptno,name,skillrecord,employees,empno,ename,skills], 
> rowType=[RecordType(BIGINT deptno, VARCHAR(2147483647) name, 
> RecordType:peek_no_expand(VARCHAR(2147483647) skilltype, VARCHAR(2147483647) 
> desc, RecordType:peek_no_expand(VARCHAR(2147483647) a, VARCHAR(2147483647) b) 
> others) skillrecord, RecordType:peek_no_expand(BIGINT empno, 
> VARCHAR(2147483647) ename, RecordType:peek_no_expand(VARCHAR(2147483647) 
> type, VARCHAR(2147483647) desc, RecordType:peek_no_expand(VARCHAR(2147483647) 
> a, VARCHAR(2147483647) b) others) ARRAY skills) ARRAY employees, BIGINT 
> empno, VARCHAR(2147483647) ename, 
> RecordType:peek_no_expand(VARCHAR(2147483647) type, VARCHAR(2147483647) desc, 
> RecordType:peek_no_expand(VARCHAR(2147483647) a, VARCHAR(2147483647) b) 
> others) ARRAY skills)], joinType=[INNER])
>    +- TableSourceScan(table=[[hive_catalog, db, dept_nested]], 
> fields=[deptno, name, skillrecord, employees]) {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to