[ https://issues.apache.org/jira/browse/FLINK-32940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17758824#comment-17758824 ]
Yunhong Zheng commented on FLINK-32940: --------------------------------------- Hi, [~vsowrirajan] . I think your idea is good, but the biggest problem currently is how to pass the column cropping condition of LogicalTableFunctionScan to LogicalTableScan and rewrite LogicalTableFunctionScan. So I think we need to add a rule in project_rewrite stage. # Actually, first we need add rule CoreRules.ProjectCorrelateTransposeRule in FlinkBatchRuleSets to push project into LogicalCorrelate. # And we need add a rule in project_rewrite stage to pass by this project into LogicalTableScan side and rewrite LogicalTableFunctionScan. # For this npe problem, you can add if logical to avoid it. Adding one example to explain step: 2 for this ddl {code:java} String ddl = "CREATE TABLE NestedItemTable1 (\n" + " `deptno` INT,\n" + " `employees` MAP<varchar, varchar>\n" + ") WITH (\n" + " 'connector' = 'values',\n" + " 'nested-projection-supported' = 'true'," + " 'bounded' = 'true'\n" + ")"; util.tableEnv().executeSql(ddl); util.verifyRelPlan( "SELECT t1.deptno, k FROM NestedItemTable1 t1, UNNEST(t1.employees) as f(k, v)");{code} we will get the below plan after add CoreRules.ProjectCorrelateTransposeRule: {code:java} optimize project_rewrite cost 413675 ms. optimize result: LogicalProject(inputs=[0], exprs=[[$2]]) +- LogicalCorrelate(correlation=[$cor1], joinType=[inner], requiredColumns=[{1}]) :- LogicalTableScan(table=[[default_catalog, default_database, NestedItemTable1]]) +- LogicalProject(inputs=[0]) +- LogicalTableFunctionScan(invocation=[$UNNEST_ROWS$1($cor0.employees)], rowType=[RecordType:peek_no_expand(VARCHAR(2147483647) f0, VARCHAR(2147483647) f1)]){code} I think for this pattern, we need add a new rule to match this.: {code:java} +- LogicalCorrelate :- LogicalTableScan +- LogicalProject +- LogicalTableFunctionScan{code} In this rule, we first need to create a new LogicalTableFunctionScan after merge LogicalProject and LogicalTableFunctionScan. second, we need add a new LogicalProject for LogicalTableScan, which will be push down to LogicalTableScan in logical stage. IMO, the new plan after match this rule will be (just an example, not correct plan): {code:java} LogicalProject(inputs=[0], exprs=[[$2]]) +- LogicalCorrelate(correlation=[$cor1], joinType=[inner], requiredColumns=[{1}]) :- LogicalProject(inputs=[employees.k]) +- LogicalTableScan(table=[[default_catalog, default_database, NestedItemTable1]]) :- LogicalTableFunctionScan(invocation=[$UNNEST_ROWS$1($cor0.employees.k)], rowType=[RecordType:peek_no_expand(VARCHAR(2147483647) f0)]){code} WDYT? [~vsowrirajan]. Once the solution is determined and u complete the development, you can ping me to review it. > Support projection pushdown to table source for column projections through > UDTF > ------------------------------------------------------------------------------- > > Key: FLINK-32940 > URL: https://issues.apache.org/jira/browse/FLINK-32940 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner > Reporter: Venkata krishnan Sowrirajan > Priority: Major > > Currently, Flink doesn't push down columns projected through UDTF like > _UNNEST_ to the table source. > For eg: > {code:java} > SELECT t1.deptno, t2.ename FROM db.dept_nested t1, UNNEST(t1.employees) AS > t2{code} > For the above SQL, Flink projects all the columns for DEPT_NESTED rather than > only _name_ and {_}employees{_}. If the table source supports nested fields > column projection, ideally it should project only _t1.employees.ename_ from > the table source. > Query plan: > {code:java} > == Abstract Syntax Tree == > LogicalProject(deptno=[$0], ename=[$5]) > +- LogicalCorrelate(correlation=[$cor1], joinType=[inner], > requiredColumns=[{3}]) > :- LogicalTableScan(table=[[hive_catalog, db, dept_nested]]) > +- Uncollect > +- LogicalProject(employees=[$cor1.employees]) > +- LogicalValues(tuples=[[{ 0 }]]){code} > {code:java} > == Optimized Physical Plan == > Calc(select=[deptno, ename]) > +- Correlate(invocation=[$UNNEST_ROWS$1($cor1.employees)], > correlate=[table($UNNEST_ROWS$1($cor1.employees))], > select=[deptno,name,skillrecord,employees,empno,ename,skills], > rowType=[RecordType(BIGINT deptno, VARCHAR(2147483647) name, > RecordType:peek_no_expand(VARCHAR(2147483647) skilltype, VARCHAR(2147483647) > desc, RecordType:peek_no_expand(VARCHAR(2147483647) a, VARCHAR(2147483647) b) > others) skillrecord, RecordType:peek_no_expand(BIGINT empno, > VARCHAR(2147483647) ename, RecordType:peek_no_expand(VARCHAR(2147483647) > type, VARCHAR(2147483647) desc, RecordType:peek_no_expand(VARCHAR(2147483647) > a, VARCHAR(2147483647) b) others) ARRAY skills) ARRAY employees, BIGINT > empno, VARCHAR(2147483647) ename, > RecordType:peek_no_expand(VARCHAR(2147483647) type, VARCHAR(2147483647) desc, > RecordType:peek_no_expand(VARCHAR(2147483647) a, VARCHAR(2147483647) b) > others) ARRAY skills)], joinType=[INNER]) > +- TableSourceScan(table=[[hive_catalog, db, dept_nested]], > fields=[deptno, name, skillrecord, employees]){code} > {code:java} > == Optimized Execution Plan == > Calc(select=[deptno, ename]) > +- Correlate(invocation=[$UNNEST_ROWS$1($cor1.employees)], > correlate=[table($UNNEST_ROWS$1($cor1.employees))], > select=[deptno,name,skillrecord,employees,empno,ename,skills], > rowType=[RecordType(BIGINT deptno, VARCHAR(2147483647) name, > RecordType:peek_no_expand(VARCHAR(2147483647) skilltype, VARCHAR(2147483647) > desc, RecordType:peek_no_expand(VARCHAR(2147483647) a, VARCHAR(2147483647) b) > others) skillrecord, RecordType:peek_no_expand(BIGINT empno, > VARCHAR(2147483647) ename, RecordType:peek_no_expand(VARCHAR(2147483647) > type, VARCHAR(2147483647) desc, RecordType:peek_no_expand(VARCHAR(2147483647) > a, VARCHAR(2147483647) b) others) ARRAY skills) ARRAY employees, BIGINT > empno, VARCHAR(2147483647) ename, > RecordType:peek_no_expand(VARCHAR(2147483647) type, VARCHAR(2147483647) desc, > RecordType:peek_no_expand(VARCHAR(2147483647) a, VARCHAR(2147483647) b) > others) ARRAY skills)], joinType=[INNER]) > +- TableSourceScan(table=[[hive_catalog, db, dept_nested]], > fields=[deptno, name, skillrecord, employees]) {code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)