[ https://issues.apache.org/jira/browse/FLINK-29138?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
lincoln lee updated FLINK-29138: -------------------------------- Description: Current tests: LookupJoinTest#testJoinTemporalTableWithProjectionPushDown {code:java} @Test def testJoinTemporalTableWithProjectionPushDown(): Unit = { val sql = """ |SELECT T.*, D.id |FROM MyTable AS T |JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D |ON T.a = D.id """.stripMargin util.verifyExecPlan(sql) } {code} the optimized plan doesn't print the selected columns from lookup source, but actually it didn't push the project into lookup source (still select all columns from source), this is not as expected {code:java} <Resource name="optimized exec plan"> <![CDATA[ Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, id]) +- LookupJoin(table=[default_catalog.default_database.LookupTable], joinType=[InnerJoin], lookup=[id=a], select=[a, b, c, proctime, rowtime, id]) +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime]) ]]> </Resource> {code} incorrect intermediate optimization result {code} ========= logical_rewrite ======== optimize result: FlinkLogicalJoin(condition=[=($0, $5)], joinType=[inner]) :- FlinkLogicalDataStreamTableScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime]) +- FlinkLogicalSnapshot(period=[$cor0.proctime]) +- FlinkLogicalCalc(select=[id]) +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, LookupTable]], fields=[id, name, age]) ========= time_indicator ======== optimize result: FlinkLogicalCalc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, id]) +- FlinkLogicalJoin(condition=[=($0, $5)], joinType=[inner]) :- FlinkLogicalDataStreamTableScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime]) +- FlinkLogicalSnapshot(period=[$cor0.proctime]) +- FlinkLogicalCalc(select=[id]) +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, LookupTable]], fields=[id, name, age]) {code} was: Current tests: LookupJoinTest#testJoinTemporalTableWithProjectionPushDown {code} @Test def testJoinTemporalTableWithProjectionPushDown(): Unit = { val sql = """ |SELECT T.*, D.id |FROM MyTable AS T |JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D |ON T.a = D.id """.stripMargin util.verifyExecPlan(sql) } {code} the optimized plan doesn't print the selected columns from lookup source, but actually it didn't push the project into lookup source (still select all columns from source), this is not as expected {code} <Resource name="optimized exec plan"> <![CDATA[ Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, id]) +- LookupJoin(table=[default_catalog.default_database.LookupTable], joinType=[InnerJoin], lookup=[id=a], select=[a, b, c, proctime, rowtime, id]) +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime]) ]]> </Resource> {code} > Project pushdown not work for lookup source > ------------------------------------------- > > Key: FLINK-29138 > URL: https://issues.apache.org/jira/browse/FLINK-29138 > Project: Flink > Issue Type: Bug > Reporter: lincoln lee > Priority: Major > > Current tests: LookupJoinTest#testJoinTemporalTableWithProjectionPushDown > {code:java} > @Test > def testJoinTemporalTableWithProjectionPushDown(): Unit = { > val sql = > """ > |SELECT T.*, D.id > |FROM MyTable AS T > |JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D > |ON T.a = D.id > """.stripMargin > util.verifyExecPlan(sql) > } > {code} > the optimized plan doesn't print the selected columns from lookup source, but > actually it didn't push the project into lookup source (still select all > columns from source), this is not as expected > {code:java} > <Resource name="optimized exec plan"> > <![CDATA[ > Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, > id]) > +- LookupJoin(table=[default_catalog.default_database.LookupTable], > joinType=[InnerJoin], lookup=[id=a], select=[a, b, c, proctime, rowtime, id]) > +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], > fields=[a, b, c, proctime, rowtime]) > ]]> > </Resource> > {code} > > incorrect intermediate optimization result > {code} > ========= logical_rewrite ======== > optimize result: > FlinkLogicalJoin(condition=[=($0, $5)], joinType=[inner]) > :- FlinkLogicalDataStreamTableScan(table=[[default_catalog, default_database, > MyTable]], fields=[a, b, c, proctime, rowtime]) > +- FlinkLogicalSnapshot(period=[$cor0.proctime]) > +- FlinkLogicalCalc(select=[id]) > +- FlinkLogicalTableSourceScan(table=[[default_catalog, > default_database, LookupTable]], fields=[id, name, age]) > ========= time_indicator ======== > optimize result: > FlinkLogicalCalc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, > rowtime, id]) > +- FlinkLogicalJoin(condition=[=($0, $5)], joinType=[inner]) > :- FlinkLogicalDataStreamTableScan(table=[[default_catalog, > default_database, MyTable]], fields=[a, b, c, proctime, rowtime]) > +- FlinkLogicalSnapshot(period=[$cor0.proctime]) > +- FlinkLogicalCalc(select=[id]) > +- FlinkLogicalTableSourceScan(table=[[default_catalog, > default_database, LookupTable]], fields=[id, name, age]) > {code} > > -- This message was sent by Atlassian Jira (v8.20.10#820010)