[ https://issues.apache.org/jira/browse/FLINK-32940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17758238#comment-17758238 ]
Venkata krishnan Sowrirajan commented on FLINK-32940: ----------------------------------------------------- [~jark] Added the SQL and query plan output. This is happening in the master branch of flink. My observations and attempt at fixing the issue: * Apply calcite's {{CoreRules.ProjectCorrelateTransposeRule}} to push project down to the {{Correlate}} `s inputs * Fix {{BatchPhysicalCorrelateRule}} to handle the new logical plan with {{projects}} pushed through {{Correlate}} Adding CoreRules.ProjectCorrelateTransposeRule in FlinkBatchRuleSets. It fails with the below exception {code:java} Exception in thread "main" java.lang.RuntimeException: Error while applying rule BatchPhysicalCorrelateRule(in:LOGICAL,out:BATCH_PHYSICAL), args [rel#372:FlinkLogicalCorrelate.LOGICAL.any.[](left=RelSubset#368,right=RelSubset#371,correlation=$cor2,joinType=inner,requiredColumns={1})] at org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:250) at org.apache.calcite.plan.volcano.IterativeRuleDriver.drive(IterativeRuleDriver.java:59) at org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:523) at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:318) at org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:62) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:59) at scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156) at scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156) at scala.collection.Iterator.foreach(Iterator.scala:937) at scala.collection.Iterator.foreach$(Iterator.scala:937) at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) at scala.collection.IterableLike.foreach(IterableLike.scala:70) at scala.collection.IterableLike.foreach$(IterableLike.scala:69) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156) at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154) at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:55) at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.optimizeTree(BatchCommonSubGraphBasedOptimizer.scala:93) at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.optimizeBlock(BatchCommonSubGraphBasedOptimizer.scala:58) at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.$anonfun$doOptimize$1(BatchCommonSubGraphBasedOptimizer.scala:45) at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.$anonfun$doOptimize$1$adapted(BatchCommonSubGraphBasedOptimizer.scala:45) at scala.collection.immutable.List.foreach(List.scala:388) at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.doOptimize(BatchCommonSubGraphBasedOptimizer.scala:45) at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:87) at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:324) at org.apache.flink.table.planner.delegation.PlannerBase.getExplainGraphs(PlannerBase.scala:536) at org.apache.flink.table.planner.delegation.BatchPlanner.explain(BatchPlanner.scala:115) at org.apache.flink.table.planner.delegation.BatchPlanner.explain(BatchPlanner.scala:47) at org.apache.flink.table.api.internal.TableEnvironmentImpl.explainInternal(TableEnvironmentImpl.java:696) at org.apache.flink.table.api.internal.TableImpl.explain(TableImpl.java:482) at org.apache.flink.table.api.Explainable.explain(Explainable.java:40) at org.apache.flink.table.api.Explainable.printExplain(Explainable.java:57) at com.linkedin.tracking3.flink.FlinkTest.runJoinUsingTableAPI(FlinkTest.java:129) at com.linkedin.tracking3.flink.FlinkTest.main(FlinkTest.java:316) Caused by: java.lang.NullPointerException at org.apache.calcite.rex.RexProgram.expandLocalRef(RexProgram.java:549) at org.apache.flink.table.planner.plan.rules.physical.batch.BatchPhysicalCorrelateRule.convertToCorrelate$1(BatchPhysicalCorrelateRule.scala:67) at org.apache.flink.table.planner.plan.rules.physical.batch.BatchPhysicalCorrelateRule.convert(BatchPhysicalCorrelateRule.scala:80) at org.apache.calcite.rel.convert.ConverterRule.onMatch(ConverterRule.java:172) at org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:223) {code} Looking for pointers to fix the above issue. Appreciate your inputs. Thanks. > Support projection pushdown to table source for column projections through > UDTF > ------------------------------------------------------------------------------- > > Key: FLINK-32940 > URL: https://issues.apache.org/jira/browse/FLINK-32940 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner > Reporter: Venkata krishnan Sowrirajan > Priority: Major > > Currently, Flink doesn't push down columns projected through UDTF like > _UNNEST_ to the table source. > For eg: > {code:java} > SELECT t1.deptno, t2.ename FROM db.dept_nested t1, UNNEST(t1.employees) AS > t2{code} > For the above SQL, Flink projects all the columns for DEPT_NESTED rather than > only _name_ and {_}employees{_}. If the table source supports nested fields > column projection, ideally it should project only _t1.employees.ename_ from > the table source. > Query plan: > {code:java} > == Abstract Syntax Tree == > LogicalProject(deptno=[$0], ename=[$5]) > +- LogicalCorrelate(correlation=[$cor1], joinType=[inner], > requiredColumns=[{3}]) > :- LogicalTableScan(table=[[hive_catalog, db, dept_nested]]) > +- Uncollect > +- LogicalProject(employees=[$cor1.employees]) > +- LogicalValues(tuples=[[{ 0 }]]){code} > {code:java} > == Optimized Physical Plan == > Calc(select=[deptno, ename]) > +- Correlate(invocation=[$UNNEST_ROWS$1($cor1.employees)], > correlate=[table($UNNEST_ROWS$1($cor1.employees))], > select=[deptno,name,skillrecord,employees,empno,ename,skills], > rowType=[RecordType(BIGINT deptno, VARCHAR(2147483647) name, > RecordType:peek_no_expand(VARCHAR(2147483647) skilltype, VARCHAR(2147483647) > desc, RecordType:peek_no_expand(VARCHAR(2147483647) a, VARCHAR(2147483647) b) > others) skillrecord, RecordType:peek_no_expand(BIGINT empno, > VARCHAR(2147483647) ename, RecordType:peek_no_expand(VARCHAR(2147483647) > type, VARCHAR(2147483647) desc, RecordType:peek_no_expand(VARCHAR(2147483647) > a, VARCHAR(2147483647) b) others) ARRAY skills) ARRAY employees, BIGINT > empno, VARCHAR(2147483647) ename, > RecordType:peek_no_expand(VARCHAR(2147483647) type, VARCHAR(2147483647) desc, > RecordType:peek_no_expand(VARCHAR(2147483647) a, VARCHAR(2147483647) b) > others) ARRAY skills)], joinType=[INNER]) > +- TableSourceScan(table=[[hive_catalog, db, dept_nested]], > fields=[deptno, name, skillrecord, employees]){code} > {code:java} > == Optimized Execution Plan == > Calc(select=[deptno, ename]) > +- Correlate(invocation=[$UNNEST_ROWS$1($cor1.employees)], > correlate=[table($UNNEST_ROWS$1($cor1.employees))], > select=[deptno,name,skillrecord,employees,empno,ename,skills], > rowType=[RecordType(BIGINT deptno, VARCHAR(2147483647) name, > RecordType:peek_no_expand(VARCHAR(2147483647) skilltype, VARCHAR(2147483647) > desc, RecordType:peek_no_expand(VARCHAR(2147483647) a, VARCHAR(2147483647) b) > others) skillrecord, RecordType:peek_no_expand(BIGINT empno, > VARCHAR(2147483647) ename, RecordType:peek_no_expand(VARCHAR(2147483647) > type, VARCHAR(2147483647) desc, RecordType:peek_no_expand(VARCHAR(2147483647) > a, VARCHAR(2147483647) b) others) ARRAY skills) ARRAY employees, BIGINT > empno, VARCHAR(2147483647) ename, > RecordType:peek_no_expand(VARCHAR(2147483647) type, VARCHAR(2147483647) desc, > RecordType:peek_no_expand(VARCHAR(2147483647) a, VARCHAR(2147483647) b) > others) ARRAY skills)], joinType=[INNER]) > +- TableSourceScan(table=[[hive_catalog, db, dept_nested]], > fields=[deptno, name, skillrecord, employees]) {code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)