Hi Martijn, Thanks for the feedback. Glad to hear that we reached a consensus on the first and second point.
About whether to use `use_hash` as a term, I think your concern makes sense. Although the hash lookup join is similar to Hash join in oracle that they all require hash distribution on input, there exists a little difference between them. About this point, Lincoln and WenLong both prefer the term 'SHUFFLE_HASH', WDYT? Best, Jing Zhang Lincoln Lee <lincoln.8...@gmail.com> 于2021年12月30日周四 11:21写道: > Hi Jing, > Thanks for your explanation! > > 1. For the hint name, +1 for WenLong's proposal. I think the `SHUFFLE` > keyword is important in a classic distributed computing system, > a hash-join usually means there's a shuffle stage(include shuffle > hash-join, broadcast hash-join). Users only need to pass the `build` side > table(usually the smaller one) into `SHUFFLE_HASH` join hint, more > concisely than `USE_HASH(left_table, right_table)`. Please correct me if my > understanding is wrong. > Regarding the `SKEW` hint, agree with you that it can be used widely, and I > prefer to treat it as a metadata hint, a new category differs from a join > hint. > For your example: > ``` > SELECT /*+ USE_HASH('Orders', 'Customers'), SKEW('Orders') */ o.order_id, > o.total, c.country, c.zip > FROM Orders AS o > JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c > ON o.customer_id = c.id; > ``` > I would prefer another form: > ``` > -- provide the skew info to let the engine choose the optimal plan > SELECT /*+ SKEW('Orders') */ o.order_id, ... > > -- or introduce a new hint for the join case, e.g., > SELECT /*+ REPLICATED_SHUFFLE_HASH('Orders') */ o.order_id, ... > ``` > > 2. Agree with Martin adding the feature to 1.16, we need time to complete > the change in calcite and also the upgrading work. > > 3. I misunderstood the 'Other Alternatives' part as the 'Rejected' ones in > the FLIP doc. And my point is avoiding the hacky way with our best effort. > The potential issues for calcite's hint propagation, e.g., join hints > correctly propagate into proper join scope include subquery or views which > may have various sql operators, so we should check all kinds of operators > for the correct propagation. Hope this may help. And also cc @Shuo Cheng > may > offer more help. > > > Best, > Lincoln Lee > > > Martijn Visser <mart...@ververica.com> 于2021年12月29日周三 22:21写道: > > > Hi Jing, > > > > Thanks for explaining this in more detail and also to others > > participating. > > > > > I think using query hints in this case is more natural for users, WDYT? > > > > Yes, I agree. As long as we properly explain in our documentation that we > > support both Query Hints and Table Hints, what's the difference between > > them and how to use them, I think our users can understand this > perfectly. > > > > > I admit upgrading from Calcite 1.26 to 1.30 would be a big change. > > However we could not always avoid upgrade for the following reason > > > > We have to upgrade Calcite. We actually considered putting that in the > > Flink 1.15 scope but ultimately had to drop it, but I definitely think > this > > needs to be done for 1.16. It's not only because of new features that are > > depending on Calcite upgrades, but also because newer versions have > > resolved bugs that also hurt our users. That's why we also already have > > tickets for upgrading to Calcite 1.27 [1] and 1.28 [2]. > > > > With regards to using `use_hash` as a term, I think the most important > part > > is that if we re-use a term like Oracle is using, is that the behaviour > and > > outcome should be the same/comparable to the one from (in this case) > > Oracle. If their behaviour and outcome are not the same or comparable, I > > would probably introduce our own term to avoid that users get confused. > > > > Best regards, > > > > Martijn > > > > [1] https://issues.apache.org/jira/browse/FLINK-20873 > > [2] https://issues.apache.org/jira/browse/FLINK-21239 > > > > On Wed, 29 Dec 2021 at 14:18, Jing Zhang <beyond1...@gmail.com> wrote: > > > > > Hi Jian gang, > > > Thanks for the feedback. > > > > > > > When it comes to hive, how do you load partial data instead of the > > > whole data? Any change related with hive? > > > > > > The question is same as Yuan mentioned before. > > > I prefer to drive another FLIP on this topic to further discussion > > > individually because this point involves many extension on API. > > > Here I would like to share the implementation in our internal version > > > firstly, it maybe very different with the final solution which merged > to > > > community. > > > The core idea is push the partitioner information down to the lookup > > table > > > source. > > > Hive connector need also upgrades. When loading data into caches, each > > task > > > could only store records which look keys are sent to current task. > > > > > > > How to define the cache configuration? For example, the size and the > > ttl. > > > > > > I'm afraid there is no a unify caching configuration and cache > > > implementation of different connectors yet. > > > You could find cache size and ttl config of JDBC in doc [1], HBase in > doc > > > [2] > > > > > > > Will this feature add another shuffle phase compared with the > default > > > behavior? In what situations will user choose this feature? > > > > > > Yes, if user specify hash hint in query, optimizer would prefer to > choose > > > Hash Lookup Join, which would add a Hash Shuffle. > > > If lookup table source has cache inside (for example HBase/Jdbc) and > the > > > benefit of increasing cache hit ratio is bigger than add an extra > shuffle > > > cost, the user could use Hash Lookup Join. > > > > > > > For the keys, the default implementation will be ok. But I wonder > > > whether we can support more flexible strategies. > > > > > > The question is same as Yuan mentioned before. > > > > > > I'm afraid there is no plan to support flexible strategies yet because > > the > > > feature involves many things, for example: > > > 1. sql syntax > > > 2. user defined partitioner API > > > 3. RelDistribution type extension and Flink RelDistribution extension > > > 4. FlinkExpandConversionRule > > > 5. Exchange execNode extension > > > 6. .... > > > It needs well designed and more discussion. If this is a strong > > > requirement, we would drive another discussion on this point > > individually. > > > In this FLIP, I would first support hash shuffle. WDYT? > > > > > > Best, > > > Jing Zhang > > > > > > [1] > > > > > > > > > https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/jdbc/#connector-options > > > [2] > > > > > > > > > https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/hbase/#connector-options > > > > > > Jing Zhang <beyond1...@gmail.com> 于2021年12月29日周三 20:37写道: > > > > > > > Hi Wenlong, > > > > Thanks for the feedback. > > > > I've checked similar syntax in other systems, they are all different > > from > > > > each other. It seems to be without consensus. > > > > As mentioned in FLIP-204, oracle uses a query hint, the hint name is > > > > 'use_hash' [1]. > > > > Spark also uses a query hint, its name is 'SHUFFLE_HASH' [2]. > > > > SQL Server uses keyword 'HASH' instead of query hint [3]. > > > > Note, the purposes of hash shuffle in [1][2][3] are a little > different > > > > from the purpose of FLIP-204, we just discuss syntax here. > > > > > > > > I've added this part to FLIP waiting for further discussion. > > > > > > > > Best, > > > > Jing Zhang > > > > > > > > [1] > > > > > > https://docs.oracle.com/cd/B12037_01/server.101/b10752/hintsref.htm#5683 > > > > [2] > > > > > > https://spark.apache.org/docs/3.0.0/sql-ref-syntax-qry-select-hints.html > > > > [3] > > > > > > > > > > https://docs.microsoft.com/en-us/sql/t-sql/queries/hints-transact-sql-join?view=sql-server-ver15 > > > > > > > > > > > > wenlong.lwl <wenlong88....@gmail.com> 于2021年12月29日周三 17:18写道: > > > > > > > >> Hi, Jing, thanks for driving the discussion. > > > >> > > > >> Have you made some investigation on the syntax of join hint? > > > >> Why do you choose USE_HASH from oracle instead of the style of spark > > > >> SHUFFLE_HASH, they are quite different. > > > >> People in the big data world may be more familiar with spark/hive, > if > > we > > > >> need to choose one, personally, I prefer the style of spark. > > > >> > > > >> > > > >> Best, > > > >> Wenlong > > > >> > > > >> On Wed, 29 Dec 2021 at 16:48, zst...@163.com <zst...@163.com> > wrote: > > > >> > > > >> > > > > >> > > > > >> > > > > >> > Hi Jing, > > > >> > Thanks for your detail reply. > > > >> > 1) In the last suggestion, hash by primary key is not use for > > raising > > > >> the > > > >> > cache hit, but handling with skew of left source. Now that you > have > > > >> 'skew' > > > >> > hint and other discussion about it, I'm looking forward to it. > > > >> > 2) I mean to support user defined partitioner function. We have a > > case > > > >> > that joining a datalake source with special way of partition, and > > have > > > >> > implemented not elegantly in our internal version. As you said, it > > > needs > > > >> > more design. > > > >> > 3) I thing so-called 'HashPartitionedCache' is usefull, otherwise > > > >> loading > > > >> > all data such as hive lookup table source is almost not available > in > > > big > > > >> > data. > > > >> > > > > >> > > > > >> > > > > >> > > > > >> > > > > >> > > > > >> > > > > >> > Best regards, > > > >> > Yuan > > > >> > > > > >> > > > > >> > > > > >> > > > > >> > > > > >> > > > > >> > > > > >> > > > > >> > 在 2021-12-29 14:52:11,"Jing Zhang" <beyond1...@gmail.com> 写道: > > > >> > >Hi, Lincoln > > > >> > >Thanks a lot for the feedback. > > > >> > > > > > >> > >> Regarding the hint name ‘USE_HASH’, could we consider more > > > >> candidates? > > > >> > >Things are a little different from RDBMS in the distributed > world, > > > and > > > >> we > > > >> > >also aim to solve the data skew problem, so all these incoming > > hints > > > >> names > > > >> > >should be considered together. > > > >> > > > > > >> > >About skew problem, I would discuss this in next FLIP > > individually. I > > > >> > would > > > >> > >like to share hint proposal for skew here. > > > >> > >I want to introduce 'skew' hint which is a query hint, similar > with > > > >> skew > > > >> > >hint in spark [1] and MaxCompute[2]. > > > >> > >The 'skew' hint could only contain the name of the table with > skew. > > > >> > >Besides, skew hint could accept table name and column names. > > > >> > >In addition, skew hint could accept table name, column names and > > skew > > > >> > >values. > > > >> > >For example: > > > >> > > > > > >> > >SELECT /*+ USE_HASH('Orders', 'Customers'), SKEW('Orders') */ > > > >> o.order_id, > > > >> > >o.total, c.country, c.zip > > > >> > >FROM Orders AS o > > > >> > >JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c > > > >> > >ON o.customer_id = c.id; > > > >> > > > > > >> > >The 'skew' hint is not only used for look up join here, but also > > > could > > > >> be > > > >> > >used for other types of join later, for example, batch hash join > or > > > >> > >streaming regular join. > > > >> > >Go back to better name problem for hash look up join. Since the > > > 'skew' > > > >> > hint > > > >> > >is a separate hint, so 'use_hash' is still an alternative. > > > >> > >WDYT? > > > >> > >I don't have a good idea about the better hint name yet. I would > > like > > > >> to > > > >> > >heard more suggestions about hint names. > > > >> > > > > > >> > >> As you mentioned in the flip, this solution depends on future > > > >> changes > > > >> > to > > > >> > >calcite (and also upgrading calcite would be another possible big > > > >> change: > > > >> > >at least calicite-1.30 vs 1.26, are we preparing to accept this > big > > > >> > >change?). > > > >> > > > > > >> > >Indeed, solution 1 depends on calcite upgrade. > > > >> > >I admit upgrade from Calcite 1.26 to 1.30 would be a big change. > I > > > >> still > > > >> > >remember what we have suffered from last upgrade to Calcite 1.26. > > > >> > >However we could not always avoid upgrade for the following > reason: > > > >> > >1. Other features also depends on the Calcite upgrade. For > example, > > > >> > Session > > > >> > >Window and Count Window. > > > >> > >2. If we always avoid Calcite upgrade, there would be more gap > with > > > the > > > >> > >latest version. One day, if upgrading becomes a thing which has > to > > be > > > >> > done, > > > >> > >the pain is more. > > > >> > > > > > >> > >WDYT? > > > >> > > > > > >> > >> Is there another possible way to minimize the change in > calcite? > > > >> > > > > > >> > >Do you check the 'Other Alternatives' part in the FLIP-204? It > > gives > > > >> > >another solution which does not depend on calcite upgrade and do > > not > > > >> need > > > >> > >to worry about the hint would be missed in the propagation. > > > >> > >This is also what we have done in the internal version. > > > >> > >The core idea is propagating 'use_hash' hint to TableScan with > > > matched > > > >> > >table names. However, it is a little hacky. > > > >> > > > > > >> > >> As I know there're more limitations than `Correlate`. > > > >> > > > > > >> > >As mentioned before, in our external version, I choose the the > > 'Other > > > >> > >Alternatives' part in the FLIP-204. > > > >> > >Although I do a POC in the solution 1 and lists all changes I > found > > > in > > > >> the > > > >> > >FLIP, there may still be something I missed. > > > >> > >I'm very happy to hear that you point out there're more > limitations > > > >> except > > > >> > >for `Correlate`, would you please give more details on this part? > > > >> > > > > > >> > >Best, > > > >> > >Jing Zhang > > > >> > > > > > >> > >[1] > > > https://docs.databricks.com/delta/join-performance/skew-join.html > > > >> > >[2] > > > >> > > > > > >> > > > > >> > > > > > > https://help.aliyun.com/apsara/enterprise/v_3_13_0_20201215/odps/enterprise-ascm-user-guide/hotspot-tilt.html?spm=a2c4g.14484438.10001.669 > > > >> > > > > > >> > >Jing Zhang <beyond1...@gmail.com> 于2021年12月29日周三 14:40写道: > > > >> > > > > > >> > >> Hi Yuan and Lincoln, > > > >> > >> thanks a lot for the attention. I would answer the email one by > > > one. > > > >> > >> > > > >> > >> To Yuan > > > >> > >> > How shall we deal with CDC data? If there is CDC data in the > > > >> pipeline, > > > >> > >> IMHO, shuffle by join key will cause CDC data disorder. Will it > > be > > > >> > better > > > >> > >> to use primary key in this case? > > > >> > >> > > > >> > >> Good question. > > > >> > >> The problem could not only exists in CDC data source, but also > > > exists > > > >> > when > > > >> > >> the input stream is not insert-only stream (for example, the > > result > > > >> of > > > >> > >> unbounded aggregate or regular join). > > > >> > >> I think use hash by primary key is not a good choise. It could > > not > > > >> raise > > > >> > >> the cache hit because cache key is look up key instead of > primary > > > >> key of > > > >> > >> input. > > > >> > >> > > > >> > >> To avoid wrong result, hash lookup Join requires that the input > > > >> stream > > > >> > >> should be insert_only stream or its upsert keys contains lookup > > > keys. > > > >> > >> > > > >> > >> I've added this limitation to FLIP, thanks a lot for reminding. > > > >> > >> > > > >> > >> > If the shuffle keys can be customized when users have the > > > >> knowledge > > > >> > >> about distribution of data? > > > >> > >> > > > >> > >> I'm not sure I understand your question. > > > >> > >> > > > >> > >> Do you mean to support user defined partitioner function on > keys > > > just > > > >> > like > > > >> > >> flink DataStream sql? > > > >> > >> If yes, I'm afraid there is no plan to support this feature yet > > > >> because > > > >> > >> the feature involves many things, for example: > > > >> > >> 1. sql syntax > > > >> > >> 2. user defined partitioner API > > > >> > >> 3. RelDistribution type extension and Flink RelDistribution > > > extension > > > >> > >> 4. FlinkExpandConversionRule > > > >> > >> 5. Exchange execNode extension > > > >> > >> 6. .... > > > >> > >> It needs well designed and more discussion. If this is a strong > > > >> > >> requirement, we would drive another discussion on this point > > > >> > individually. > > > >> > >> In this FLIP, I would first support hash shuffle. WDYT? > > > >> > >> > > > >> > >> Or do you mean support hash by other keys instead of lookup > key? > > > >> > >> If yes, would you please tell me a specific user case? > > > >> > >> We need to fetch the record from external storage of dimension > > > table > > > >> by > > > >> > >> look up key, so those dimension table source uses look up keys > as > > > >> cache > > > >> > >> key. > > > >> > >> We could only increase the cache ratio by shuffle lookup keys. > > > >> > >> I need more use cases to understand this requirement. > > > >> > >> > > > >> > >> > Some connectors such as hive, caches all data in > > LookupFunction. > > > >> How > > > >> > to > > > >> > >> decrease the valid cache data size if data can be shuffled? > > > >> > >> > > > >> > >> Very good idea. > > > >> > >> There are two types of cache. > > > >> > >> For Key-Value storage, such as Redis/HBase, the lookup table > > source > > > >> > stores > > > >> > >> the visited lookup keys and it's record into cache lazily. > > > >> > >> For other storage without keys, such as hive, each task loads > all > > > >> data > > > >> > >> into cache eagerly in the initialize phase. > > > >> > >> After introduce hash partitioner, for key-value storages, there > > is > > > no > > > >> > need > > > >> > >> to change; for hive, each task could only load part of cache > > > instead > > > >> of > > > >> > >> load all cache. > > > >> > >> > > > >> > >> We have implemented this optimization in our internal version. > > > >> > >> The core idea is push the partitioner information down to the > > > lookup > > > >> > table > > > >> > >> source. When loading data into caches, each task could only > store > > > >> those > > > >> > >> records which look keys are sent to current task. > > > >> > >> We called this 'HashPartitionedCache'. > > > >> > >> > > > >> > >> I have added this point into the Lookup Join requirements list > in > > > the > > > >> > >> motivation of the FLIP, but I would not do this point in this > > FLIP > > > >> right > > > >> > >> now. > > > >> > >> If this is a strong requirement, we need drive another > discussion > > > on > > > >> > this > > > >> > >> topic individually because this point involves many extension > on > > > API. > > > >> > >> > > > >> > >> Best, > > > >> > >> Jing Zhang > > > >> > >> > > > >> > >> > > > >> > >> Lincoln Lee <lincoln.8...@gmail.com> 于2021年12月29日周三 10:01写道: > > > >> > >> > > > >> > >>> Hi Jing, > > > >> > >>> Thanks for bringing up this discussion! Agree that this > > join > > > >> hints > > > >> > >>> should benefit both bounded and unbounded cases as Martin > > > mentioned. > > > >> > >>> I also agree that implementing the query hint is the right way > > > for a > > > >> > more > > > >> > >>> general purpose since the dynamic table options has a limited > > > scope. > > > >> > >>> Some points I'd like to share are: > > > >> > >>> 1. Regarding the hint name ‘USE_HASH’, could we consider more > > > >> > candidates? > > > >> > >>> Things are a little different from RDBMS in the distributed > > world, > > > >> and > > > >> > we > > > >> > >>> also aim to solve the data skew problem, so all these incoming > > > hints > > > >> > names > > > >> > >>> should be considered together. > > > >> > >>> 2. As you mentioned in the flip, this solution depends on > future > > > >> > changes > > > >> > >>> to > > > >> > >>> calcite (and also upgrading calcite would be another possible > > big > > > >> > change: > > > >> > >>> at least calicite-1.30 vs 1.26, are we preparing to accept > this > > > big > > > >> > >>> change?). Is there another possible way to minimize the change > > in > > > >> > calcite? > > > >> > >>> As I know there're more limitations than `Correlate`. > > > >> > >>> > > > >> > >>> Best, > > > >> > >>> Lincoln Lee > > > >> > >>> > > > >> > >>> > > > >> > >>> Jing Zhang <beyond1...@gmail.com> 于2021年12月28日周二 23:04写道: > > > >> > >>> > > > >> > >>> > Hi Martijn, > > > >> > >>> > Thanks a lot for your attention. > > > >> > >>> > I'm sorry I didn't explain the motivation clearly. I would > > like > > > to > > > >> > >>> explain > > > >> > >>> > it in detail, and then give response on your questions. > > > >> > >>> > A lookup join is typically used to enrich a table with data > > that > > > >> is > > > >> > >>> queried > > > >> > >>> > from an external system. Many Lookup table sources introduce > > > >> cache in > > > >> > >>> order > > > >> > >>> > to reduce the RPC call, such as JDBC, CSV, HBase connectors. > > > >> > >>> > For those connectors, we could raise cache hit ratio by > > routing > > > >> the > > > >> > same > > > >> > >>> > lookup keys to the same task instance. This is the purpose > of > > > >> > >>> > > > > >> > >>> > > > > >> > >>> > > > >> > > > > >> > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-204%3A+Introduce+Hash+Lookup+Join > > > >> > >>> > . > > > >> > >>> > Other cases might benefit from Hash distribution, such as > > batch > > > >> hash > > > >> > >>> join > > > >> > >>> > as you mentioned. It is a cool idea, however it is not the > > > >> purpose of > > > >> > >>> this > > > >> > >>> > FLIP, we could discuss this in FLINK-20670 > > > >> > >>> > <https://issues.apache.org/jira/browse/FLINK-20670>. > > > >> > >>> > > > > >> > >>> > > - When I was reading about this topic [1] I was wondering > if > > > >> this > > > >> > >>> feature > > > >> > >>> > would be more beneficial for bounded use cases and not so > much > > > for > > > >> > >>> > unbounded use cases. What do you think? > > > >> > >>> > > > > >> > >>> > As mentioned before, the purpose of Hash Lookup Join is to > > > >> increase > > > >> > the > > > >> > >>> > cache hit ratio which is different from Oracle Hash Join. > > > However > > > >> we > > > >> > >>> could > > > >> > >>> > use the similar hint syntax. > > > >> > >>> > > > > >> > >>> > > - If I look at the current documentation for SQL Hints in > > > Flink > > > >> > [2], I > > > >> > >>> > notice that all of the hints there are located at the end of > > the > > > >> SQL > > > >> > >>> > statement. In the FLIP, the use_hash is defined directly > after > > > the > > > >> > >>> 'SELECT' > > > >> > >>> > keyword. Can we somehow make this consistent for the user? > Or > > > >> should > > > >> > the > > > >> > >>> > user be able to specify hints anywhere in its SQL statement? > > > >> > >>> > > > > >> > >>> > Calcite supports hints in two locations [3]: > > > >> > >>> > Query Hint: right after the SELECT keyword; > > > >> > >>> > Table Hint: right after the referenced table name. > > > >> > >>> > Now Flink has supported dynamic table options based on the > > Hint > > > >> > >>> framework > > > >> > >>> > of Calcite which is mentioned in doc[2]. > > > >> > >>> > Besides, query hints are also important, it could give a > hint > > > for > > > >> > >>> > optimizers to choose a better plan. Almost all popular > > databases > > > >> and > > > >> > >>> > big-data engines support sql query hints, such as oracle, > > hive, > > > >> spark > > > >> > >>> and > > > >> > >>> > so on. > > > >> > >>> > I think using query hints in this case is more natural for > > > users, > > > >> > WDYT? > > > >> > >>> > > > > >> > >>> > I have updated the motivation part in the FLIP, > > > >> > >>> > Thanks for the feedback! > > > >> > >>> > > > > >> > >>> > [1] > > > >> https://logicalread.com/oracle-11g-hash-joins-mc02/#.V5Wm4_mnoUI > > > >> > >>> > [2] > > > >> > >>> > > > > >> > >>> > > > > >> > >>> > > > >> > > > > >> > > > > > > https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/table/sql/queries/hints/ > > > >> > >>> > [3] > https://calcite.apache.org/docs/reference.html#sql-hints > > > >> > >>> > > > > >> > >>> > Best, > > > >> > >>> > Jing Zhang > > > >> > >>> > > > > >> > >>> > Martijn Visser <mart...@ververica.com> 于2021年12月28日周二 > > 22:02写道: > > > >> > >>> > > > > >> > >>> > > Hi Jing, > > > >> > >>> > > > > > >> > >>> > > Thanks a lot for the explanation and the FLIP. I > definitely > > > >> learned > > > >> > >>> > > something when reading more about `use_hash`. My > > > interpretation > > > >> > would > > > >> > >>> be > > > >> > >>> > > that the primary benefit of a hash lookup join would be > > > improved > > > >> > >>> > > performance by allowing the user to explicitly optimise > the > > > >> > planner. > > > >> > >>> > > > > > >> > >>> > > I have a couple of questions: > > > >> > >>> > > > > > >> > >>> > > - When I was reading about this topic [1] I was wondering > if > > > >> this > > > >> > >>> feature > > > >> > >>> > > would be more beneficial for bounded use cases and not so > > much > > > >> for > > > >> > >>> > > unbounded use cases. What do you think? > > > >> > >>> > > - If I look at the current documentation for SQL Hints in > > > Flink > > > >> > [2], I > > > >> > >>> > > notice that all of the hints there are located at the end > of > > > the > > > >> > SQL > > > >> > >>> > > statement. In the FLIP, the use_hash is defined directly > > after > > > >> the > > > >> > >>> > 'SELECT' > > > >> > >>> > > keyword. Can we somehow make this consistent for the user? > > Or > > > >> > should > > > >> > >>> the > > > >> > >>> > > user be able to specify hints anywhere in its SQL > statement? > > > >> > >>> > > > > > >> > >>> > > Best regards, > > > >> > >>> > > > > > >> > >>> > > Martijn > > > >> > >>> > > > > > >> > >>> > > [1] > > > >> > https://logicalread.com/oracle-11g-hash-joins-mc02/#.V5Wm4_mnoUI > > > >> > >>> > > [2] > > > >> > >>> > > > > > >> > >>> > > > > > >> > >>> > > > > >> > >>> > > > >> > > > > >> > > > > > > https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/table/sql/queries/hints/ > > > >> > >>> > > > > > >> > >>> > > > > > >> > >>> > > On Tue, 28 Dec 2021 at 08:17, Jing Zhang < > > > beyond1...@gmail.com> > > > >> > >>> wrote: > > > >> > >>> > > > > > >> > >>> > > > Hi everyone, > > > >> > >>> > > > Look up join > > > >> > >>> > > > < > > > >> > >>> > > > > > > >> > >>> > > > > > >> > >>> > > > > >> > >>> > > > >> > > > > >> > > > > > > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#lookup-join > > > >> > >>> > > > >[1] > > > >> > >>> > > > is > > > >> > >>> > > > commonly used feature in Flink SQL. We have received > many > > > >> > >>> optimization > > > >> > >>> > > > requirements on look up join. For example: > > > >> > >>> > > > 1. Enforces left side of lookup join do a hash > partitioner > > > to > > > >> > raise > > > >> > >>> > cache > > > >> > >>> > > > hint ratio > > > >> > >>> > > > 2. Solves the data skew problem after introduces hash > > lookup > > > >> join > > > >> > >>> > > > 3. Enables mini-batch optimization to reduce RPC call > > > >> > >>> > > > > > > >> > >>> > > > Next we will solve these problems one by one. Firstly, > we > > > >> would > > > >> > >>> focus > > > >> > >>> > on > > > >> > >>> > > > point 1, and continue to discuss point 2 and point 3 > > later. > > > >> > >>> > > > > > > >> > >>> > > > There are many similar requirements from user mail list > > and > > > >> JIRA > > > >> > >>> about > > > >> > >>> > > hash > > > >> > >>> > > > Lookup Join, for example: > > > >> > >>> > > > 1. FLINK-23687 < > > > >> > https://issues.apache.org/jira/browse/FLINK-23687> > > > >> > >>> - > > > >> > >>> > > > Introduce partitioned lookup join to enforce input of > > > >> LookupJoin > > > >> > to > > > >> > >>> > hash > > > >> > >>> > > > shuffle by lookup keys > > > >> > >>> > > > 2. FLINK-25396 < > > > >> > https://issues.apache.org/jira/browse/FLINK-25396> > > > >> > >>> - > > > >> > >>> > > > lookupjoin source table for pre-partitioning > > > >> > >>> > > > 3. FLINK-25262 < > > > >> > https://issues.apache.org/jira/browse/FLINK-25262> > > > >> > >>> - > > > >> > >>> > > > Support to send data to lookup table for > > > >> > KeyGroupStreamPartitioner > > > >> > >>> way > > > >> > >>> > > for > > > >> > >>> > > > SQL. > > > >> > >>> > > > > > > >> > >>> > > > In this FLIP, I would like to start a discussion about > > Hash > > > >> > Lookup > > > >> > >>> > Join. > > > >> > >>> > > > The core idea is introducing a 'USE_HASH' hint in query. > > > This > > > >> > >>> syntax > > > >> > >>> > is > > > >> > >>> > > > directly user-oriented and therefore requires careful > > > design. > > > >> > >>> > > > There are two ways about how to propagate this hint to > > > >> > LookupJoin in > > > >> > >>> > > > optimizer. We need further discussion to do final > decide. > > > >> Anyway, > > > >> > >>> the > > > >> > >>> > > > difference between the two solution is only about the > > > internal > > > >> > >>> > > > implementation and has no impact on the user. > > > >> > >>> > > > > > > >> > >>> > > > For more detail on the proposal: > > > >> > >>> > > > > > > >> > >>> > > > > > > >> > >>> > > > > > >> > >>> > > > > >> > >>> > > > >> > > > > >> > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-204%3A+Introduce+Hash+Lookup+Join > > > >> > >>> > > > > > > >> > >>> > > > > > > >> > >>> > > > Looking forward to your feedback, thanks. > > > >> > >>> > > > > > > >> > >>> > > > Best, > > > >> > >>> > > > Jing Zhang > > > >> > >>> > > > > > > >> > >>> > > > [1] > > > >> > >>> > > > > > > >> > >>> > > > > > > >> > >>> > > > > > >> > >>> > > > > >> > >>> > > > >> > > > > >> > > > > > > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#lookup-join > > > >> > >>> > > > > > > >> > >>> > > > > > >> > >>> > > > > >> > >>> > > > >> > >> > > > >> > > > > >> > > > > > > > > > >