Hi Wenlong, Thanks for the feedback. I've checked similar syntax in other systems, they are all different from each other. It seems to be without consensus. As mentioned in FLIP-204, oracle uses a query hint, the hint name is 'use_hash' [1]. Spark also uses a query hint, its name is 'SHUFFLE_HASH' [2]. SQL Server uses keyword 'HASH' instead of query hint [3]. Note, the purposes of hash shuffle in [1][2][3] are a little different from the purpose of FLIP-204, we just discuss syntax here.
I've added this part to FLIP waiting for further discussion. Best, Jing Zhang [1] https://docs.oracle.com/cd/B12037_01/server.101/b10752/hintsref.htm#5683 [2] https://spark.apache.org/docs/3.0.0/sql-ref-syntax-qry-select-hints.html [3] https://docs.microsoft.com/en-us/sql/t-sql/queries/hints-transact-sql-join?view=sql-server-ver15 wenlong.lwl <wenlong88....@gmail.com> 于2021年12月29日周三 17:18写道: > Hi, Jing, thanks for driving the discussion. > > Have you made some investigation on the syntax of join hint? > Why do you choose USE_HASH from oracle instead of the style of spark > SHUFFLE_HASH, they are quite different. > People in the big data world may be more familiar with spark/hive, if we > need to choose one, personally, I prefer the style of spark. > > > Best, > Wenlong > > On Wed, 29 Dec 2021 at 16:48, zst...@163.com <zst...@163.com> wrote: > > > > > > > > > Hi Jing, > > Thanks for your detail reply. > > 1) In the last suggestion, hash by primary key is not use for raising the > > cache hit, but handling with skew of left source. Now that you have > 'skew' > > hint and other discussion about it, I'm looking forward to it. > > 2) I mean to support user defined partitioner function. We have a case > > that joining a datalake source with special way of partition, and have > > implemented not elegantly in our internal version. As you said, it needs > > more design. > > 3) I thing so-called 'HashPartitionedCache' is usefull, otherwise loading > > all data such as hive lookup table source is almost not available in big > > data. > > > > > > > > > > > > > > > > Best regards, > > Yuan > > > > > > > > > > > > > > > > > > 在 2021-12-29 14:52:11,"Jing Zhang" <beyond1...@gmail.com> 写道: > > >Hi, Lincoln > > >Thanks a lot for the feedback. > > > > > >> Regarding the hint name ‘USE_HASH’, could we consider more > candidates? > > >Things are a little different from RDBMS in the distributed world, and > we > > >also aim to solve the data skew problem, so all these incoming hints > names > > >should be considered together. > > > > > >About skew problem, I would discuss this in next FLIP individually. I > > would > > >like to share hint proposal for skew here. > > >I want to introduce 'skew' hint which is a query hint, similar with skew > > >hint in spark [1] and MaxCompute[2]. > > >The 'skew' hint could only contain the name of the table with skew. > > >Besides, skew hint could accept table name and column names. > > >In addition, skew hint could accept table name, column names and skew > > >values. > > >For example: > > > > > >SELECT /*+ USE_HASH('Orders', 'Customers'), SKEW('Orders') */ > o.order_id, > > >o.total, c.country, c.zip > > >FROM Orders AS o > > >JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c > > >ON o.customer_id = c.id; > > > > > >The 'skew' hint is not only used for look up join here, but also could > be > > >used for other types of join later, for example, batch hash join or > > >streaming regular join. > > >Go back to better name problem for hash look up join. Since the 'skew' > > hint > > >is a separate hint, so 'use_hash' is still an alternative. > > >WDYT? > > >I don't have a good idea about the better hint name yet. I would like to > > >heard more suggestions about hint names. > > > > > >> As you mentioned in the flip, this solution depends on future changes > > to > > >calcite (and also upgrading calcite would be another possible big > change: > > >at least calicite-1.30 vs 1.26, are we preparing to accept this big > > >change?). > > > > > >Indeed, solution 1 depends on calcite upgrade. > > >I admit upgrade from Calcite 1.26 to 1.30 would be a big change. I still > > >remember what we have suffered from last upgrade to Calcite 1.26. > > >However we could not always avoid upgrade for the following reason: > > >1. Other features also depends on the Calcite upgrade. For example, > > Session > > >Window and Count Window. > > >2. If we always avoid Calcite upgrade, there would be more gap with the > > >latest version. One day, if upgrading becomes a thing which has to be > > done, > > >the pain is more. > > > > > >WDYT? > > > > > >> Is there another possible way to minimize the change in calcite? > > > > > >Do you check the 'Other Alternatives' part in the FLIP-204? It gives > > >another solution which does not depend on calcite upgrade and do not > need > > >to worry about the hint would be missed in the propagation. > > >This is also what we have done in the internal version. > > >The core idea is propagating 'use_hash' hint to TableScan with matched > > >table names. However, it is a little hacky. > > > > > >> As I know there're more limitations than `Correlate`. > > > > > >As mentioned before, in our external version, I choose the the 'Other > > >Alternatives' part in the FLIP-204. > > >Although I do a POC in the solution 1 and lists all changes I found in > the > > >FLIP, there may still be something I missed. > > >I'm very happy to hear that you point out there're more limitations > except > > >for `Correlate`, would you please give more details on this part? > > > > > >Best, > > >Jing Zhang > > > > > >[1] https://docs.databricks.com/delta/join-performance/skew-join.html > > >[2] > > > > > > https://help.aliyun.com/apsara/enterprise/v_3_13_0_20201215/odps/enterprise-ascm-user-guide/hotspot-tilt.html?spm=a2c4g.14484438.10001.669 > > > > > >Jing Zhang <beyond1...@gmail.com> 于2021年12月29日周三 14:40写道: > > > > > >> Hi Yuan and Lincoln, > > >> thanks a lot for the attention. I would answer the email one by one. > > >> > > >> To Yuan > > >> > How shall we deal with CDC data? If there is CDC data in the > pipeline, > > >> IMHO, shuffle by join key will cause CDC data disorder. Will it be > > better > > >> to use primary key in this case? > > >> > > >> Good question. > > >> The problem could not only exists in CDC data source, but also exists > > when > > >> the input stream is not insert-only stream (for example, the result of > > >> unbounded aggregate or regular join). > > >> I think use hash by primary key is not a good choise. It could not > raise > > >> the cache hit because cache key is look up key instead of primary key > of > > >> input. > > >> > > >> To avoid wrong result, hash lookup Join requires that the input stream > > >> should be insert_only stream or its upsert keys contains lookup keys. > > >> > > >> I've added this limitation to FLIP, thanks a lot for reminding. > > >> > > >> > If the shuffle keys can be customized when users have the knowledge > > >> about distribution of data? > > >> > > >> I'm not sure I understand your question. > > >> > > >> Do you mean to support user defined partitioner function on keys just > > like > > >> flink DataStream sql? > > >> If yes, I'm afraid there is no plan to support this feature yet > because > > >> the feature involves many things, for example: > > >> 1. sql syntax > > >> 2. user defined partitioner API > > >> 3. RelDistribution type extension and Flink RelDistribution extension > > >> 4. FlinkExpandConversionRule > > >> 5. Exchange execNode extension > > >> 6. .... > > >> It needs well designed and more discussion. If this is a strong > > >> requirement, we would drive another discussion on this point > > individually. > > >> In this FLIP, I would first support hash shuffle. WDYT? > > >> > > >> Or do you mean support hash by other keys instead of lookup key? > > >> If yes, would you please tell me a specific user case? > > >> We need to fetch the record from external storage of dimension table > by > > >> look up key, so those dimension table source uses look up keys as > cache > > >> key. > > >> We could only increase the cache ratio by shuffle lookup keys. > > >> I need more use cases to understand this requirement. > > >> > > >> > Some connectors such as hive, caches all data in LookupFunction. How > > to > > >> decrease the valid cache data size if data can be shuffled? > > >> > > >> Very good idea. > > >> There are two types of cache. > > >> For Key-Value storage, such as Redis/HBase, the lookup table source > > stores > > >> the visited lookup keys and it's record into cache lazily. > > >> For other storage without keys, such as hive, each task loads all data > > >> into cache eagerly in the initialize phase. > > >> After introduce hash partitioner, for key-value storages, there is no > > need > > >> to change; for hive, each task could only load part of cache instead > of > > >> load all cache. > > >> > > >> We have implemented this optimization in our internal version. > > >> The core idea is push the partitioner information down to the lookup > > table > > >> source. When loading data into caches, each task could only store > those > > >> records which look keys are sent to current task. > > >> We called this 'HashPartitionedCache'. > > >> > > >> I have added this point into the Lookup Join requirements list in the > > >> motivation of the FLIP, but I would not do this point in this FLIP > right > > >> now. > > >> If this is a strong requirement, we need drive another discussion on > > this > > >> topic individually because this point involves many extension on API. > > >> > > >> Best, > > >> Jing Zhang > > >> > > >> > > >> Lincoln Lee <lincoln.8...@gmail.com> 于2021年12月29日周三 10:01写道: > > >> > > >>> Hi Jing, > > >>> Thanks for bringing up this discussion! Agree that this join > hints > > >>> should benefit both bounded and unbounded cases as Martin mentioned. > > >>> I also agree that implementing the query hint is the right way for a > > more > > >>> general purpose since the dynamic table options has a limited scope. > > >>> Some points I'd like to share are: > > >>> 1. Regarding the hint name ‘USE_HASH’, could we consider more > > candidates? > > >>> Things are a little different from RDBMS in the distributed world, > and > > we > > >>> also aim to solve the data skew problem, so all these incoming hints > > names > > >>> should be considered together. > > >>> 2. As you mentioned in the flip, this solution depends on future > > changes > > >>> to > > >>> calcite (and also upgrading calcite would be another possible big > > change: > > >>> at least calicite-1.30 vs 1.26, are we preparing to accept this big > > >>> change?). Is there another possible way to minimize the change in > > calcite? > > >>> As I know there're more limitations than `Correlate`. > > >>> > > >>> Best, > > >>> Lincoln Lee > > >>> > > >>> > > >>> Jing Zhang <beyond1...@gmail.com> 于2021年12月28日周二 23:04写道: > > >>> > > >>> > Hi Martijn, > > >>> > Thanks a lot for your attention. > > >>> > I'm sorry I didn't explain the motivation clearly. I would like to > > >>> explain > > >>> > it in detail, and then give response on your questions. > > >>> > A lookup join is typically used to enrich a table with data that is > > >>> queried > > >>> > from an external system. Many Lookup table sources introduce cache > in > > >>> order > > >>> > to reduce the RPC call, such as JDBC, CSV, HBase connectors. > > >>> > For those connectors, we could raise cache hit ratio by routing the > > same > > >>> > lookup keys to the same task instance. This is the purpose of > > >>> > > > >>> > > > >>> > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-204%3A+Introduce+Hash+Lookup+Join > > >>> > . > > >>> > Other cases might benefit from Hash distribution, such as batch > hash > > >>> join > > >>> > as you mentioned. It is a cool idea, however it is not the purpose > of > > >>> this > > >>> > FLIP, we could discuss this in FLINK-20670 > > >>> > <https://issues.apache.org/jira/browse/FLINK-20670>. > > >>> > > > >>> > > - When I was reading about this topic [1] I was wondering if this > > >>> feature > > >>> > would be more beneficial for bounded use cases and not so much for > > >>> > unbounded use cases. What do you think? > > >>> > > > >>> > As mentioned before, the purpose of Hash Lookup Join is to increase > > the > > >>> > cache hit ratio which is different from Oracle Hash Join. However > we > > >>> could > > >>> > use the similar hint syntax. > > >>> > > > >>> > > - If I look at the current documentation for SQL Hints in Flink > > [2], I > > >>> > notice that all of the hints there are located at the end of the > SQL > > >>> > statement. In the FLIP, the use_hash is defined directly after the > > >>> 'SELECT' > > >>> > keyword. Can we somehow make this consistent for the user? Or > should > > the > > >>> > user be able to specify hints anywhere in its SQL statement? > > >>> > > > >>> > Calcite supports hints in two locations [3]: > > >>> > Query Hint: right after the SELECT keyword; > > >>> > Table Hint: right after the referenced table name. > > >>> > Now Flink has supported dynamic table options based on the Hint > > >>> framework > > >>> > of Calcite which is mentioned in doc[2]. > > >>> > Besides, query hints are also important, it could give a hint for > > >>> > optimizers to choose a better plan. Almost all popular databases > and > > >>> > big-data engines support sql query hints, such as oracle, hive, > spark > > >>> and > > >>> > so on. > > >>> > I think using query hints in this case is more natural for users, > > WDYT? > > >>> > > > >>> > I have updated the motivation part in the FLIP, > > >>> > Thanks for the feedback! > > >>> > > > >>> > [1] > https://logicalread.com/oracle-11g-hash-joins-mc02/#.V5Wm4_mnoUI > > >>> > [2] > > >>> > > > >>> > > > >>> > > > https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/table/sql/queries/hints/ > > >>> > [3] https://calcite.apache.org/docs/reference.html#sql-hints > > >>> > > > >>> > Best, > > >>> > Jing Zhang > > >>> > > > >>> > Martijn Visser <mart...@ververica.com> 于2021年12月28日周二 22:02写道: > > >>> > > > >>> > > Hi Jing, > > >>> > > > > >>> > > Thanks a lot for the explanation and the FLIP. I definitely > learned > > >>> > > something when reading more about `use_hash`. My interpretation > > would > > >>> be > > >>> > > that the primary benefit of a hash lookup join would be improved > > >>> > > performance by allowing the user to explicitly optimise the > > planner. > > >>> > > > > >>> > > I have a couple of questions: > > >>> > > > > >>> > > - When I was reading about this topic [1] I was wondering if this > > >>> feature > > >>> > > would be more beneficial for bounded use cases and not so much > for > > >>> > > unbounded use cases. What do you think? > > >>> > > - If I look at the current documentation for SQL Hints in Flink > > [2], I > > >>> > > notice that all of the hints there are located at the end of the > > SQL > > >>> > > statement. In the FLIP, the use_hash is defined directly after > the > > >>> > 'SELECT' > > >>> > > keyword. Can we somehow make this consistent for the user? Or > > should > > >>> the > > >>> > > user be able to specify hints anywhere in its SQL statement? > > >>> > > > > >>> > > Best regards, > > >>> > > > > >>> > > Martijn > > >>> > > > > >>> > > [1] > > https://logicalread.com/oracle-11g-hash-joins-mc02/#.V5Wm4_mnoUI > > >>> > > [2] > > >>> > > > > >>> > > > > >>> > > > >>> > > > https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/table/sql/queries/hints/ > > >>> > > > > >>> > > > > >>> > > On Tue, 28 Dec 2021 at 08:17, Jing Zhang <beyond1...@gmail.com> > > >>> wrote: > > >>> > > > > >>> > > > Hi everyone, > > >>> > > > Look up join > > >>> > > > < > > >>> > > > > > >>> > > > > >>> > > > >>> > > > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#lookup-join > > >>> > > > >[1] > > >>> > > > is > > >>> > > > commonly used feature in Flink SQL. We have received many > > >>> optimization > > >>> > > > requirements on look up join. For example: > > >>> > > > 1. Enforces left side of lookup join do a hash partitioner to > > raise > > >>> > cache > > >>> > > > hint ratio > > >>> > > > 2. Solves the data skew problem after introduces hash lookup > join > > >>> > > > 3. Enables mini-batch optimization to reduce RPC call > > >>> > > > > > >>> > > > Next we will solve these problems one by one. Firstly, we > would > > >>> focus > > >>> > on > > >>> > > > point 1, and continue to discuss point 2 and point 3 later. > > >>> > > > > > >>> > > > There are many similar requirements from user mail list and > JIRA > > >>> about > > >>> > > hash > > >>> > > > Lookup Join, for example: > > >>> > > > 1. FLINK-23687 < > > https://issues.apache.org/jira/browse/FLINK-23687> > > >>> - > > >>> > > > Introduce partitioned lookup join to enforce input of > LookupJoin > > to > > >>> > hash > > >>> > > > shuffle by lookup keys > > >>> > > > 2. FLINK-25396 < > > https://issues.apache.org/jira/browse/FLINK-25396> > > >>> - > > >>> > > > lookupjoin source table for pre-partitioning > > >>> > > > 3. FLINK-25262 < > > https://issues.apache.org/jira/browse/FLINK-25262> > > >>> - > > >>> > > > Support to send data to lookup table for > > KeyGroupStreamPartitioner > > >>> way > > >>> > > for > > >>> > > > SQL. > > >>> > > > > > >>> > > > In this FLIP, I would like to start a discussion about Hash > > Lookup > > >>> > Join. > > >>> > > > The core idea is introducing a 'USE_HASH' hint in query. This > > >>> syntax > > >>> > is > > >>> > > > directly user-oriented and therefore requires careful design. > > >>> > > > There are two ways about how to propagate this hint to > > LookupJoin in > > >>> > > > optimizer. We need further discussion to do final decide. > Anyway, > > >>> the > > >>> > > > difference between the two solution is only about the internal > > >>> > > > implementation and has no impact on the user. > > >>> > > > > > >>> > > > For more detail on the proposal: > > >>> > > > > > >>> > > > > > >>> > > > > >>> > > > >>> > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-204%3A+Introduce+Hash+Lookup+Join > > >>> > > > > > >>> > > > > > >>> > > > Looking forward to your feedback, thanks. > > >>> > > > > > >>> > > > Best, > > >>> > > > Jing Zhang > > >>> > > > > > >>> > > > [1] > > >>> > > > > > >>> > > > > > >>> > > > > >>> > > > >>> > > > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#lookup-join > > >>> > > > > > >>> > > > > >>> > > > >>> > > >> > > >