[ 
https://issues.apache.org/jira/browse/FLINK-21145?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17272052#comment-17272052
 ] 

Jark Wu commented on FLINK-21145:
---------------------------------

I don't think we should provide such filter connector option. But this can be 
done by supporting filter pushdown for the lookup table source, i.e. 
FLINK-18779.
For example, if you have a temporal join query 

{code}
SELECT * FROM T1 JOIN hive_dim FOR SYSTEM_TIME AS OF T1.proctime
ON T1.id = hive_dim.id
WHERE hive_dim.f1 > 1000;
{code}

Then the filter {{hive_dim.f1 > 1000}} will be pushed down into the lookup 
source of Hive, that means the cached hive partition data is filtered. 

If the filter pushdown optimization can address your problem, I will close this 
issue as a duplication of FLINK-18779.

> Flink Temporal Join Hive optimization
> -------------------------------------
>
>                 Key: FLINK-21145
>                 URL: https://issues.apache.org/jira/browse/FLINK-21145
>             Project: Flink
>          Issue Type: Wish
>          Components: Connectors / Hive
>    Affects Versions: 1.12.0
>            Reporter: HideOnBush
>            Priority: Major
>
> When flink temporal join hive dimension table, the latest partition data will 
> be loaded into task memory in full, which will lead to high memory overhead. 
> In fact, sometimes the latest full data is not required. You can add options 
> like options in future versions. Is the dimension table data filtered?
> For example, select * from dim /*'streaming-source.partition.include' 
> ='latest' condition='fild1=ab'*/ filter the latest partition data as long as 
> fild1=ab



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to