[ 
https://issues.apache.org/jira/browse/FLINK-29138?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lincoln lee updated FLINK-29138:
--------------------------------
    Attachment: image-2022-08-30-20-33-24-105.png

> Project pushdown not work for lookup source
> -------------------------------------------
>
>                 Key: FLINK-29138
>                 URL: https://issues.apache.org/jira/browse/FLINK-29138
>             Project: Flink
>          Issue Type: Bug
>            Reporter: lincoln lee
>            Assignee: lincoln lee
>            Priority: Major
>         Attachments: image-2022-08-30-20-33-24-105.png
>
>
> Current tests: LookupJoinTest#testJoinTemporalTableWithProjectionPushDown
> {code:java}
> @Test
> def testJoinTemporalTableWithProjectionPushDown(): Unit = {
> val sql =
> """
> |SELECT T.*, D.id
> |FROM MyTable AS T
> |JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D
> |ON T.a = D.id
> """.stripMargin
> util.verifyExecPlan(sql)
> }
> {code}
> the optimized plan doesn't print the selected columns from lookup source, but 
> actually it didn't push the project into lookup source (still select all 
> columns from source), this is not as expected
> {code:java}
> <Resource name="optimized exec plan">
> <![CDATA[
> Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, 
> id])
> +- LookupJoin(table=[default_catalog.default_database.LookupTable], 
> joinType=[InnerJoin], lookup=[id=a], select=[a, b, c, proctime, rowtime, id])
> +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], 
> fields=[a, b, c, proctime, rowtime])
> ]]>
> </Resource>
> {code}
>  
> incorrect intermediate optimization result
> {code}
> =========  logical_rewrite ========
>  optimize result: 
> FlinkLogicalJoin(condition=[=($0, $5)], joinType=[inner])
> :- FlinkLogicalDataStreamTableScan(table=[[default_catalog, default_database, 
> MyTable]], fields=[a, b, c, proctime, rowtime])
> +- FlinkLogicalSnapshot(period=[$cor0.proctime])
>    +- FlinkLogicalCalc(select=[id])
>       +- FlinkLogicalTableSourceScan(table=[[default_catalog, 
> default_database, LookupTable]], fields=[id, name, age])
> =========  time_indicator ========
>  optimize result: 
> FlinkLogicalCalc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, 
> rowtime, id])
> +- FlinkLogicalJoin(condition=[=($0, $5)], joinType=[inner])
>    :- FlinkLogicalDataStreamTableScan(table=[[default_catalog, 
> default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
>    +- FlinkLogicalSnapshot(period=[$cor0.proctime])
>       +- FlinkLogicalCalc(select=[id])
>          +- FlinkLogicalTableSourceScan(table=[[default_catalog, 
> default_database, LookupTable]], fields=[id, name, age])
> {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to