Hi Jing,
Thanks very much for your FLIP. I have some points: - How shall we deal with CDC data? If there is CDC data in the pipeline, IMHO, shuffle by join key will cause CDC data disorder. Will it be better to use primary key in this case? - If the shuffle keys can be customized when users have the knowledge about distribution of data? - Some connectors such as hive, caches all data in LookupFunction. How to decrease the valid cache data size if data can be shuffled? Best regards, Yuan On 12/28/2021 15:11,Jing Zhang<beyond1...@gmail.com> wrote: Hi everyone, Look up join <https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#lookup-join>[1] is commonly used feature in Flink SQL. We have received many optimization requirements on look up join. For example: 1. Enforces left side of lookup join do a hash partitioner to raise cache hint ratio 2. Solves the data skew problem after introduces hash lookup join 3. Enables mini-batch optimization to reduce RPC call Next we will solve these problems one by one. Firstly, we would focus on point 1, and continue to discuss point 2 and point 3 later. There are many similar requirements from user mail list and JIRA about hash Lookup Join, for example: 1. FLINK-23687 <https://issues.apache.org/jira/browse/FLINK-23687> - Introduce partitioned lookup join to enforce input of LookupJoin to hash shuffle by lookup keys 2. FLINK-25396 <https://issues.apache.org/jira/browse/FLINK-25396> - lookupjoin source table for pre-partitioning 3. FLINK-25262 <https://issues.apache.org/jira/browse/FLINK-25262> - Support to send data to lookup table for KeyGroupStreamPartitioner way for SQL. In this FLIP, I would like to start a discussion about Hash Lookup Join. The core idea is introducing a 'USE_HASH' hint in query. This syntax is directly user-oriented and therefore requires careful design. There are two ways about how to propagate this hint to LookupJoin in optimizer. We need further discussion to do final decide. Anyway, the difference between the two solution is only about the internal implementation and has no impact on the user. For more detail on the proposal: https://cwiki.apache.org/confluence/display/FLINK/FLIP-204%3A+Introduce+Hash+Lookup+Join Looking forward to your feedback, thanks. Best, Jing Zhang [1] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#lookup-join