[ https://issues.apache.org/jira/browse/FLINK-29138?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
lincoln lee updated FLINK-29138: -------------------------------- Attachment: image-2022-08-30-20-33-24-105.png > Project pushdown not work for lookup source > ------------------------------------------- > > Key: FLINK-29138 > URL: https://issues.apache.org/jira/browse/FLINK-29138 > Project: Flink > Issue Type: Bug > Reporter: lincoln lee > Assignee: lincoln lee > Priority: Major > Attachments: image-2022-08-30-20-33-24-105.png > > > Current tests: LookupJoinTest#testJoinTemporalTableWithProjectionPushDown > {code:java} > @Test > def testJoinTemporalTableWithProjectionPushDown(): Unit = { > val sql = > """ > |SELECT T.*, D.id > |FROM MyTable AS T > |JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D > |ON T.a = D.id > """.stripMargin > util.verifyExecPlan(sql) > } > {code} > the optimized plan doesn't print the selected columns from lookup source, but > actually it didn't push the project into lookup source (still select all > columns from source), this is not as expected > {code:java} > <Resource name="optimized exec plan"> > <![CDATA[ > Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, > id]) > +- LookupJoin(table=[default_catalog.default_database.LookupTable], > joinType=[InnerJoin], lookup=[id=a], select=[a, b, c, proctime, rowtime, id]) > +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], > fields=[a, b, c, proctime, rowtime]) > ]]> > </Resource> > {code} > > incorrect intermediate optimization result > {code} > ========= logical_rewrite ======== > optimize result: > FlinkLogicalJoin(condition=[=($0, $5)], joinType=[inner]) > :- FlinkLogicalDataStreamTableScan(table=[[default_catalog, default_database, > MyTable]], fields=[a, b, c, proctime, rowtime]) > +- FlinkLogicalSnapshot(period=[$cor0.proctime]) > +- FlinkLogicalCalc(select=[id]) > +- FlinkLogicalTableSourceScan(table=[[default_catalog, > default_database, LookupTable]], fields=[id, name, age]) > ========= time_indicator ======== > optimize result: > FlinkLogicalCalc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, > rowtime, id]) > +- FlinkLogicalJoin(condition=[=($0, $5)], joinType=[inner]) > :- FlinkLogicalDataStreamTableScan(table=[[default_catalog, > default_database, MyTable]], fields=[a, b, c, proctime, rowtime]) > +- FlinkLogicalSnapshot(period=[$cor0.proctime]) > +- FlinkLogicalCalc(select=[id]) > +- FlinkLogicalTableSourceScan(table=[[default_catalog, > default_database, LookupTable]], fields=[id, name, age]) > {code} > > -- This message was sent by Atlassian Jira (v8.20.10#820010)