Hi, Jing Glad to hear the agreement on the hint syntax, let's keep going!
Best, Lincoln Lee Jing Zhang <beyond1...@gmail.com> 于2022年1月20日周四 16:52写道: > Hi Jingsong, > Thanks for the feedback. > > > Is there a conclusion about naming here? (Maybe I missed something?) > Use USE_HASH or some other names. Slightly confusing in the FLIP. > > 'SHUFFLE_HASH' is final hint name, 'USE_HASH' is rejected. I've updated the > FLIP. > > > And the problem of what to write inside the hint, as mentioned by > Lincoln. > > I agree with Lincolon to only include one 'build' side table name only. > Besides, Lookup Join only support dimension table as build table, it does > not support left input as build table because Lookup join is always > triggered by left side. > > > I think maybe we can list the grammars of other distributed systems, > like Hive Spark(Databricks) Snowflake? > > I add the grammars of other distributed systems(oracle, spark, impala, SQL > Server) in FLIP. > > [1] Oracle USE_Hash hint > <https://docs.oracle.com/cd/B12037_01/server.101/b10752/hintsref.htm#5683> > SELECT /*+ USE_HASH(l h) */ * > FROM orders h, order_items l > WHERE l.order_id = h.order_id > AND l.order_id > 3500; > > > [2] Spark SHUFFLE_HASH hint > < > https://docs.databricks.com/spark/latest/spark-sql/language-manual/sql-ref-syntax-qry-select-hints.html > > > SELECT /*+ SHUFFLE_HASH(t1) */ * FROM t1 INNER JOIN t2 ON t1.key = t2.key; > > > [3] IMPALA SHUFFLE hint > <https://impala.apache.org/docs/build/html/topics/impala_hints.html> > SELECT straight_join weather.wind_velocity, geospatial.altitude > FROM weather JOIN /* +SHUFFLE */ geospatial > ON weather.lat = geospatial.lat AND weather.long = geospatial.long; > > > [4] SQL Server Hash Keyword > < > https://docs.microsoft.com/en-us/sql/t-sql/queries/hints-transact-sql-join?view=sql-server-ver15 > > > SELECT p.Name, pr.ProductReviewID FROM Production.Product AS p LEFT OUTER > HASH JOIN Production.ProductReview AS pr ON p.ProductID = pr.ProductID > ORDER > BY ProductReviewID DESC; > > > Hive does not have similar grammars because shuffle join is default join > behavior of Hive. it only have map join hint grammars. > > I didn't find the similar query hint in Snowflake yet. > > > > About `SHUFFLE_HASH(left_table, right_table)`, one case can be shared: > > SELECT * FROM left_t > JOIN right_1 ON ... > JOIN right_2 ON ... > JOIN right_3 ON ... > > What if we want to use shuffle_hash for all three joints? > > SELECT /*+ SHUFFLE_HASH('left_t', 'right_1', 'right_2', 'right_3') */ ? > > It does not work, because the left input of the second join is not > 'left_t' anymore. It is the output of the first join. > > Good point. > As mentioned before, now SHUFFLE_HASH hint only requires to specify build > table name. > So in the above case, > SELECT /*+ SHUFFLE_HASH('right_1', 'right_2', 'right_3') */ > * FROM left_t > JOIN right_1 ON ... > JOIN right_2 ON ... > JOIN right_3 ON > It means require shuffle on lookup join which contain dimension table with > name as 'right_1' or 'right_2' or 'right_3'. > > WDYT? > > Best, > Jing Zhang > > Jingsong Li <jingsongl...@gmail.com> 于2022年1月20日周四 14:33写道: > > > Hi Jing, > > > > Sorry for the late reply! > > > > Is there a conclusion about naming here? (Maybe I missed something?) > > Use USE_HASH or some other names. Slightly confusing in the FLIP. > > > > And the problem of what to write inside the hint, as mentioned by > lincoln. > > > > I think maybe we can list the grammars of other distributed systems, > > like Hive Spark(Databricks) Snowflake? > > > > Best, > > Jingsong > > > > On Thu, Jan 20, 2022 at 1:56 PM Lincoln Lee <lincoln.8...@gmail.com> > > wrote: > > > > > > Hi, Jing, > > > Sorry for the late reply! The previous discussion for the hint > syntax > > > left a minor difference there: whether to use both sides of join table > > > names or just one 'build' side table name only. I would prefer the > later > > > one. > > > Users only need to pass the `build` side table(usually the smaller > one) > > > into `SHUFFLE_HASH(build_table)` join hint, more concisely than > > > `SHUFFLE_HASH(left_table, right_table)`, WDYT? > > > > > > Best, > > > Lincoln Lee > > > > > > > > > Jing Zhang <beyond1...@gmail.com> 于2022年1月15日周六 17:22写道: > > > > > > > Hi all, > > > > Thanks for all the feedback so far. > > > > If there is no more suggestions, I would like to drive a vote in > > Tuesday > > > > next week (18 Jan). > > > > > > > > Best, > > > > Jing Zhang > > > > > > > > Jing Zhang <beyond1...@gmail.com> 于2022年1月5日周三 11:33写道: > > > > > > > > > Hi Francesco, > > > > > Thanks a lot for the feedback. > > > > > > > > > > > does it makes sense for a lookup join to use hash distribution > > whenever > > > > > is possible by default? > > > > > I prefer to enable the hash lookup join only find the hint in the > > query > > > > > for the following reason: > > > > > 1. Plan compatibility > > > > > Add a hash shuffle by default would leads to the change of plan > > after > > > > > users upgrade the flink version. > > > > > Besides, lookup join is commonly used feature in flink SQL. > > > > > 2. Not all flink jobs could benefit from this improvement. > > > > > It is a trade off for the lookup join with dimension connectors > > which > > > > > has cache inside. > > > > > We hope the raise the cache hit ratio by Hash Lookup Join, > > however it > > > > > would leads to an extra shuffle at the same time. > > > > > It is not always a positive optimization, especially for the > > > > > connectors which does not have cache inside. > > > > > > > > > > > Shouldn't the hint take the table alias as the "table name"? > What > > if > > > > > you do two lookup joins in cascade within the same query with the > > same > > > > > table (once > > > > > on a key, then on another one), where you use two different aliases > > for > > > > > the table? > > > > > In theory, it's better to support both table names and alias names. > > > > > But in calcite, the alias name of subquery or table would not be > > lost in > > > > > the sql conversion phase and sql optimization phase. > > > > > So here we only support table names. > > > > > > > > > > Best, > > > > > Jing Zhang > > > > > > > > > > > > > > > Francesco Guardiani <france...@ververica.com> 于2022年1月3日周一 > 18:38写道: > > > > > > > > > >> Hi Jing, > > > > >> > > > > >> Thanks for the FLIP. I'm not very knowledgeable about the topic, > but > > > > going > > > > >> through both the FLIP and the discussion here, I wonder, does it > > makes > > > > >> sense for a lookup join to use hash distribution whenever is > > possible by > > > > >> default? > > > > >> > > > > >> The point you're explaining here: > > > > >> > > > > >> > Many Lookup table sources introduce cache in order > > > > >> to reduce the RPC call, such as JDBC, CSV, HBase connectors. > > > > >> For those connectors, we could raise cache hit ratio by routing > the > > same > > > > >> lookup keys to the same task instance > > > > >> > > > > >> Seems something we can infer automatically, rather than manually > > asking > > > > >> the > > > > >> user to add this hint to the query. Note that I'm not talking > > against > > > > the > > > > >> hint syntax, which might still make sense to be introduced, but I > > feel > > > > >> like > > > > >> this optimization makes sense in the general case when using the > > > > >> connectors > > > > >> you have quoted. Perhaps there is some downside I'm not aware of? > > > > >> > > > > >> Talking about the hint themselves, taking this example as > reference: > > > > >> > > > > >> SELECT /*+ SHUFFLE_HASH('Orders', 'Customers') */ o.order_id, > > o.total, > > > > >> c.country, c.zip > > > > >> FROM Orders AS o > > > > >> JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c > > > > >> ON o.customer_id = c.id; > > > > >> > > > > >> Shouldn't the hint take the table alias as the "table name"? What > > If you > > > > >> do > > > > >> two lookup joins in cascade within the same query with the same > > table > > > > >> (once > > > > >> on a key, then on another one), where you use two different > aliases > > for > > > > >> the > > > > >> table? > > > > >> > > > > >> > > > > >> On Fri, Dec 31, 2021 at 9:56 AM Jing Zhang <beyond1...@gmail.com> > > > > wrote: > > > > >> > > > > >> > Hi Lincoln, > > > > >> > Thanks for the feedback. > > > > >> > > > > > >> > > 1. For the hint name, +1 for WenLong's proposal. > > > > >> > > > > > >> > I've added add 'SHUFFLE_HASH' to other alternatives in FLIP. > Let's > > > > >> waiting > > > > >> > for more voices here. > > > > >> > > > > > >> > > Regarding the `SKEW` hint, agree with you that it can be used > > > > widely, > > > > >> and > > > > >> > I > > > > >> > prefer to treat it as a metadata hint, a new category differs > > from a > > > > >> join > > > > >> > hint. > > > > >> > For your example: > > > > >> > ``` > > > > >> > SELECT /*+ USE_HASH('Orders', 'Customers'), SKEW('Orders') */ > > > > >> o.order_id, > > > > >> > o.total, c.country, c.zip > > > > >> > FROM Orders AS o > > > > >> > JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c > > > > >> > ON o.customer_id = c.id; > > > > >> > ``` > > > > >> > I would prefer another form: > > > > >> > ``` > > > > >> > -- provide the skew info to let the engine choose the optimal > plan > > > > >> > SELECT /*+ SKEW('Orders') */ o.order_id, ... > > > > >> > > > > > >> > -- or introduce a new hint for the join case, e.g., > > > > >> > SELECT /*+ REPLICATED_SHUFFLE_HASH('Orders') */ o.order_id, ... > > > > >> > ``` > > > > >> > > > > > >> > Maybe there is misunderstanding here. > > > > >> > I just use a syntax sugar here. > > > > >> > > > > > >> > SELECT /*+ USE_HASH('Orders', 'Customers'), SKEW('Orders') */ > > > > >> o.order_id, > > > > >> > .... > > > > >> > > > > > >> > is just a syntax with > > > > >> > > > > > >> > SELECT /*+ USE_HASH('Orders', 'Customers') */ /*+SKEW('Orders') > */ > > > > >> > o.order_id, > > > > >> > .... > > > > >> > > > > > >> > Although I list 'USE_HASH' and 'SKEW' hint in a query hint > > clause, it > > > > >> does > > > > >> > not mean they must appear together as a whole. > > > > >> > Based on calcite syntax doc [1], you could list more than one > > hint in > > > > >> > a /*+' hint [, hint ]* '*/ clause. > > > > >> > > > > > >> > Each hint has different function. > > > > >> > The'USE_HASH' hint suggests the optimizer use hash partitioner > for > > > > >> Lookup > > > > >> > Join for table 'Orders' and table 'Customers' while the 'SKEW' > > hint > > > > >> tells > > > > >> > the optimizer the skew metadata about the table 'Orders'. > > > > >> > > > > > >> > Best, > > > > >> > Jing Zhang > > > > >> > > > > > >> > [1] https://calcite.apache.org/docs/reference.html#sql-hints > > > > >> > > > > > >> > > > > > >> > > > > > >> > > > > > >> > Jing Zhang <beyond1...@gmail.com> 于2021年12月31日周五 16:39写道: > > > > >> > > > > > >> > > Hi Martijn, > > > > >> > > Thanks for the feedback. > > > > >> > > > > > > >> > > Glad to hear that we reached a consensus on the first and > second > > > > >> point. > > > > >> > > > > > > >> > > About whether to use `use_hash` as a term, I think your > concern > > > > makes > > > > >> > > sense. > > > > >> > > Although the hash lookup join is similar to Hash join in > oracle > > that > > > > >> they > > > > >> > > all require hash distribution on input, there exists a little > > > > >> difference > > > > >> > > between them. > > > > >> > > About this point, Lincoln and WenLong both prefer the term > > > > >> > 'SHUFFLE_HASH', > > > > >> > > WDYT? > > > > >> > > > > > > >> > > Best, > > > > >> > > Jing Zhang > > > > >> > > > > > > >> > > > > > > >> > > Lincoln Lee <lincoln.8...@gmail.com> 于2021年12月30日周四 11:21写道: > > > > >> > > > > > > >> > >> Hi Jing, > > > > >> > >> Thanks for your explanation! > > > > >> > >> > > > > >> > >> 1. For the hint name, +1 for WenLong's proposal. I think the > > > > >> `SHUFFLE` > > > > >> > >> keyword is important in a classic distributed computing > system, > > > > >> > >> a hash-join usually means there's a shuffle stage(include > > shuffle > > > > >> > >> hash-join, broadcast hash-join). Users only need to pass the > > > > `build` > > > > >> > side > > > > >> > >> table(usually the smaller one) into `SHUFFLE_HASH` join hint, > > more > > > > >> > >> concisely than `USE_HASH(left_table, right_table)`. Please > > correct > > > > >> me if > > > > >> > >> my > > > > >> > >> understanding is wrong. > > > > >> > >> Regarding the `SKEW` hint, agree with you that it can be used > > > > widely, > > > > >> > and > > > > >> > >> I > > > > >> > >> prefer to treat it as a metadata hint, a new category differs > > from > > > > a > > > > >> > join > > > > >> > >> hint. > > > > >> > >> For your example: > > > > >> > >> ``` > > > > >> > >> SELECT /*+ USE_HASH('Orders', 'Customers'), SKEW('Orders') */ > > > > >> > o.order_id, > > > > >> > >> o.total, c.country, c.zip > > > > >> > >> FROM Orders AS o > > > > >> > >> JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c > > > > >> > >> ON o.customer_id = c.id; > > > > >> > >> ``` > > > > >> > >> I would prefer another form: > > > > >> > >> ``` > > > > >> > >> -- provide the skew info to let the engine choose the optimal > > plan > > > > >> > >> SELECT /*+ SKEW('Orders') */ o.order_id, ... > > > > >> > >> > > > > >> > >> -- or introduce a new hint for the join case, e.g., > > > > >> > >> SELECT /*+ REPLICATED_SHUFFLE_HASH('Orders') */ o.order_id, > ... > > > > >> > >> ``` > > > > >> > >> > > > > >> > >> 2. Agree with Martin adding the feature to 1.16, we need time > > to > > > > >> > complete > > > > >> > >> the change in calcite and also the upgrading work. > > > > >> > >> > > > > >> > >> 3. I misunderstood the 'Other Alternatives' part as the > > 'Rejected' > > > > >> ones > > > > >> > in > > > > >> > >> the FLIP doc. And my point is avoiding the hacky way with our > > best > > > > >> > effort. > > > > >> > >> The potential issues for calcite's hint propagation, e.g., > join > > > > hints > > > > >> > >> correctly propagate into proper join scope include subquery > or > > > > views > > > > >> > which > > > > >> > >> may have various sql operators, so we should check all kinds > of > > > > >> > operators > > > > >> > >> for the correct propagation. Hope this may help. And also cc > > @Shuo > > > > >> Cheng > > > > >> > >> may > > > > >> > >> offer more help. > > > > >> > >> > > > > >> > >> > > > > >> > >> Best, > > > > >> > >> Lincoln Lee > > > > >> > >> > > > > >> > >> > > > > >> > >> Martijn Visser <mart...@ververica.com> 于2021年12月29日周三 > 22:21写道: > > > > >> > >> > > > > >> > >> > Hi Jing, > > > > >> > >> > > > > > >> > >> > Thanks for explaining this in more detail and also to > others > > > > >> > >> > participating. > > > > >> > >> > > > > > >> > >> > > I think using query hints in this case is more natural > for > > > > users, > > > > >> > >> WDYT? > > > > >> > >> > > > > > >> > >> > Yes, I agree. As long as we properly explain in our > > documentation > > > > >> that > > > > >> > >> we > > > > >> > >> > support both Query Hints and Table Hints, what's the > > difference > > > > >> > between > > > > >> > >> > them and how to use them, I think our users can understand > > this > > > > >> > >> perfectly. > > > > >> > >> > > > > > >> > >> > > I admit upgrading from Calcite 1.26 to 1.30 would be a > big > > > > >> change. > > > > >> > >> > However we could not always avoid upgrade for the following > > > > reason > > > > >> > >> > > > > > >> > >> > We have to upgrade Calcite. We actually considered putting > > that > > > > in > > > > >> the > > > > >> > >> > Flink 1.15 scope but ultimately had to drop it, but I > > definitely > > > > >> think > > > > >> > >> this > > > > >> > >> > needs to be done for 1.16. It's not only because of new > > features > > > > >> that > > > > >> > >> are > > > > >> > >> > depending on Calcite upgrades, but also because newer > > versions > > > > have > > > > >> > >> > resolved bugs that also hurt our users. That's why we also > > > > already > > > > >> > have > > > > >> > >> > tickets for upgrading to Calcite 1.27 [1] and 1.28 [2]. > > > > >> > >> > > > > > >> > >> > With regards to using `use_hash` as a term, I think the > most > > > > >> important > > > > >> > >> part > > > > >> > >> > is that if we re-use a term like Oracle is using, is that > the > > > > >> > behaviour > > > > >> > >> and > > > > >> > >> > outcome should be the same/comparable to the one from (in > > this > > > > >> case) > > > > >> > >> > Oracle. If their behaviour and outcome are not the same or > > > > >> > comparable, I > > > > >> > >> > would probably introduce our own term to avoid that users > get > > > > >> > confused. > > > > >> > >> > > > > > >> > >> > Best regards, > > > > >> > >> > > > > > >> > >> > Martijn > > > > >> > >> > > > > > >> > >> > [1] https://issues.apache.org/jira/browse/FLINK-20873 > > > > >> > >> > [2] https://issues.apache.org/jira/browse/FLINK-21239 > > > > >> > >> > > > > > >> > >> > On Wed, 29 Dec 2021 at 14:18, Jing Zhang < > > beyond1...@gmail.com> > > > > >> > wrote: > > > > >> > >> > > > > > >> > >> > > Hi Jian gang, > > > > >> > >> > > Thanks for the feedback. > > > > >> > >> > > > > > > >> > >> > > > When it comes to hive, how do you load partial data > > instead > > > > of > > > > >> the > > > > >> > >> > > whole data? Any change related with hive? > > > > >> > >> > > > > > > >> > >> > > The question is same as Yuan mentioned before. > > > > >> > >> > > I prefer to drive another FLIP on this topic to further > > > > >> discussion > > > > >> > >> > > individually because this point involves many extension > on > > API. > > > > >> > >> > > Here I would like to share the implementation in our > > internal > > > > >> > version > > > > >> > >> > > firstly, it maybe very different with the final solution > > which > > > > >> > merged > > > > >> > >> to > > > > >> > >> > > community. > > > > >> > >> > > The core idea is push the partitioner information down to > > the > > > > >> lookup > > > > >> > >> > table > > > > >> > >> > > source. > > > > >> > >> > > Hive connector need also upgrades. When loading data into > > > > caches, > > > > >> > each > > > > >> > >> > task > > > > >> > >> > > could only store records which look keys are sent to > > current > > > > >> task. > > > > >> > >> > > > > > > >> > >> > > > How to define the cache configuration? For example, the > > size > > > > >> and > > > > >> > the > > > > >> > >> > ttl. > > > > >> > >> > > > > > > >> > >> > > I'm afraid there is no a unify caching configuration and > > cache > > > > >> > >> > > implementation of different connectors yet. > > > > >> > >> > > You could find cache size and ttl config of JDBC in doc > > [1], > > > > >> HBase > > > > >> > in > > > > >> > >> doc > > > > >> > >> > > [2] > > > > >> > >> > > > > > > >> > >> > > > Will this feature add another shuffle phase compared > > with > > > > the > > > > >> > >> default > > > > >> > >> > > behavior? In what situations will user choose this > > feature? > > > > >> > >> > > > > > > >> > >> > > Yes, if user specify hash hint in query, optimizer would > > prefer > > > > >> to > > > > >> > >> choose > > > > >> > >> > > Hash Lookup Join, which would add a Hash Shuffle. > > > > >> > >> > > If lookup table source has cache inside (for example > > > > HBase/Jdbc) > > > > >> and > > > > >> > >> the > > > > >> > >> > > benefit of increasing cache hit ratio is bigger than add > an > > > > extra > > > > >> > >> shuffle > > > > >> > >> > > cost, the user could use Hash Lookup Join. > > > > >> > >> > > > > > > >> > >> > > > For the keys, the default implementation will be ok. > > But I > > > > >> wonder > > > > >> > >> > > whether we can support more flexible strategies. > > > > >> > >> > > > > > > >> > >> > > The question is same as Yuan mentioned before. > > > > >> > >> > > > > > > >> > >> > > I'm afraid there is no plan to support flexible > strategies > > yet > > > > >> > because > > > > >> > >> > the > > > > >> > >> > > feature involves many things, for example: > > > > >> > >> > > 1. sql syntax > > > > >> > >> > > 2. user defined partitioner API > > > > >> > >> > > 3. RelDistribution type extension and Flink > RelDistribution > > > > >> > extension > > > > >> > >> > > 4. FlinkExpandConversionRule > > > > >> > >> > > 5. Exchange execNode extension > > > > >> > >> > > 6. .... > > > > >> > >> > > It needs well designed and more discussion. If this is a > > strong > > > > >> > >> > > requirement, we would drive another discussion on this > > point > > > > >> > >> > individually. > > > > >> > >> > > In this FLIP, I would first support hash shuffle. WDYT? > > > > >> > >> > > > > > > >> > >> > > Best, > > > > >> > >> > > Jing Zhang > > > > >> > >> > > > > > > >> > >> > > [1] > > > > >> > >> > > > > > > >> > >> > > > > > > >> > >> > > > > > >> > >> > > > > >> > > > > > >> > > > > > > > https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/jdbc/#connector-options > > > > >> > >> > > [2] > > > > >> > >> > > > > > > >> > >> > > > > > > >> > >> > > > > > >> > >> > > > > >> > > > > > >> > > > > > > > https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/hbase/#connector-options > > > > >> > >> > > > > > > >> > >> > > Jing Zhang <beyond1...@gmail.com> 于2021年12月29日周三 > 20:37写道: > > > > >> > >> > > > > > > >> > >> > > > Hi Wenlong, > > > > >> > >> > > > Thanks for the feedback. > > > > >> > >> > > > I've checked similar syntax in other systems, they are > > all > > > > >> > different > > > > >> > >> > from > > > > >> > >> > > > each other. It seems to be without consensus. > > > > >> > >> > > > As mentioned in FLIP-204, oracle uses a query hint, the > > hint > > > > >> name > > > > >> > is > > > > >> > >> > > > 'use_hash' [1]. > > > > >> > >> > > > Spark also uses a query hint, its name is > 'SHUFFLE_HASH' > > [2]. > > > > >> > >> > > > SQL Server uses keyword 'HASH' instead of query hint > [3]. > > > > >> > >> > > > Note, the purposes of hash shuffle in [1][2][3] are a > > little > > > > >> > >> different > > > > >> > >> > > > from the purpose of FLIP-204, we just discuss syntax > > here. > > > > >> > >> > > > > > > > >> > >> > > > I've added this part to FLIP waiting for further > > discussion. > > > > >> > >> > > > > > > > >> > >> > > > Best, > > > > >> > >> > > > Jing Zhang > > > > >> > >> > > > > > > > >> > >> > > > [1] > > > > >> > >> > > > > > > > >> > >> > > > > > >> > >> > > > > >> > > > > > >> > > > > > > https://docs.oracle.com/cd/B12037_01/server.101/b10752/hintsref.htm#5683 > > > > >> > >> > > > [2] > > > > >> > >> > > > > > > > >> > >> > > > > > >> > >> > > > > >> > > > > > >> > > > > > > https://spark.apache.org/docs/3.0.0/sql-ref-syntax-qry-select-hints.html > > > > >> > >> > > > [3] > > > > >> > >> > > > > > > > >> > >> > > > > > > >> > >> > > > > > >> > >> > > > > >> > > > > > >> > > > > > > > https://docs.microsoft.com/en-us/sql/t-sql/queries/hints-transact-sql-join?view=sql-server-ver15 > > > > >> > >> > > > > > > > >> > >> > > > > > > > >> > >> > > > wenlong.lwl <wenlong88....@gmail.com> 于2021年12月29日周三 > > > > 17:18写道: > > > > >> > >> > > > > > > > >> > >> > > >> Hi, Jing, thanks for driving the discussion. > > > > >> > >> > > >> > > > > >> > >> > > >> Have you made some investigation on the syntax of join > > hint? > > > > >> > >> > > >> Why do you choose USE_HASH from oracle instead of the > > style > > > > of > > > > >> > >> spark > > > > >> > >> > > >> SHUFFLE_HASH, they are quite different. > > > > >> > >> > > >> People in the big data world may be more familiar with > > > > >> > spark/hive, > > > > >> > >> if > > > > >> > >> > we > > > > >> > >> > > >> need to choose one, personally, I prefer the style of > > spark. > > > > >> > >> > > >> > > > > >> > >> > > >> > > > > >> > >> > > >> Best, > > > > >> > >> > > >> Wenlong > > > > >> > >> > > >> > > > > >> > >> > > >> On Wed, 29 Dec 2021 at 16:48, zst...@163.com < > > > > zst...@163.com> > > > > >> > >> wrote: > > > > >> > >> > > >> > > > > >> > >> > > >> > > > > > >> > >> > > >> > > > > > >> > >> > > >> > > > > > >> > >> > > >> > Hi Jing, > > > > >> > >> > > >> > Thanks for your detail reply. > > > > >> > >> > > >> > 1) In the last suggestion, hash by primary key is > not > > use > > > > >> for > > > > >> > >> > raising > > > > >> > >> > > >> the > > > > >> > >> > > >> > cache hit, but handling with skew of left source. > Now > > that > > > > >> you > > > > >> > >> have > > > > >> > >> > > >> 'skew' > > > > >> > >> > > >> > hint and other discussion about it, I'm looking > > forward to > > > > >> it. > > > > >> > >> > > >> > 2) I mean to support user defined partitioner > > function. We > > > > >> > have a > > > > >> > >> > case > > > > >> > >> > > >> > that joining a datalake source with special way of > > > > >> partition, > > > > >> > and > > > > >> > >> > have > > > > >> > >> > > >> > implemented not elegantly in our internal version. > As > > you > > > > >> said, > > > > >> > >> it > > > > >> > >> > > needs > > > > >> > >> > > >> > more design. > > > > >> > >> > > >> > 3) I thing so-called 'HashPartitionedCache' is > > usefull, > > > > >> > otherwise > > > > >> > >> > > >> loading > > > > >> > >> > > >> > all data such as hive lookup table source is almost > > not > > > > >> > >> available in > > > > >> > >> > > big > > > > >> > >> > > >> > data. > > > > >> > >> > > >> > > > > > >> > >> > > >> > > > > > >> > >> > > >> > > > > > >> > >> > > >> > > > > > >> > >> > > >> > > > > > >> > >> > > >> > > > > > >> > >> > > >> > > > > > >> > >> > > >> > Best regards, > > > > >> > >> > > >> > Yuan > > > > >> > >> > > >> > > > > > >> > >> > > >> > > > > > >> > >> > > >> > > > > > >> > >> > > >> > > > > > >> > >> > > >> > > > > > >> > >> > > >> > > > > > >> > >> > > >> > > > > > >> > >> > > >> > > > > > >> > >> > > >> > 在 2021-12-29 14:52:11,"Jing Zhang" < > > beyond1...@gmail.com> > > > > >> 写道: > > > > >> > >> > > >> > >Hi, Lincoln > > > > >> > >> > > >> > >Thanks a lot for the feedback. > > > > >> > >> > > >> > > > > > > >> > >> > > >> > >> Regarding the hint name ‘USE_HASH’, could we > > consider > > > > >> more > > > > >> > >> > > >> candidates? > > > > >> > >> > > >> > >Things are a little different from RDBMS in the > > > > distributed > > > > >> > >> world, > > > > >> > >> > > and > > > > >> > >> > > >> we > > > > >> > >> > > >> > >also aim to solve the data skew problem, so all > these > > > > >> incoming > > > > >> > >> > hints > > > > >> > >> > > >> names > > > > >> > >> > > >> > >should be considered together. > > > > >> > >> > > >> > > > > > > >> > >> > > >> > >About skew problem, I would discuss this in next > FLIP > > > > >> > >> > individually. I > > > > >> > >> > > >> > would > > > > >> > >> > > >> > >like to share hint proposal for skew here. > > > > >> > >> > > >> > >I want to introduce 'skew' hint which is a query > > hint, > > > > >> similar > > > > >> > >> with > > > > >> > >> > > >> skew > > > > >> > >> > > >> > >hint in spark [1] and MaxCompute[2]. > > > > >> > >> > > >> > >The 'skew' hint could only contain the name of the > > table > > > > >> with > > > > >> > >> skew. > > > > >> > >> > > >> > >Besides, skew hint could accept table name and > column > > > > >> names. > > > > >> > >> > > >> > >In addition, skew hint could accept table name, > > column > > > > >> names > > > > >> > and > > > > >> > >> > skew > > > > >> > >> > > >> > >values. > > > > >> > >> > > >> > >For example: > > > > >> > >> > > >> > > > > > > >> > >> > > >> > >SELECT /*+ USE_HASH('Orders', 'Customers'), > > > > SKEW('Orders') > > > > >> */ > > > > >> > >> > > >> o.order_id, > > > > >> > >> > > >> > >o.total, c.country, c.zip > > > > >> > >> > > >> > >FROM Orders AS o > > > > >> > >> > > >> > >JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time > AS c > > > > >> > >> > > >> > >ON o.customer_id = c.id; > > > > >> > >> > > >> > > > > > > >> > >> > > >> > >The 'skew' hint is not only used for look up join > > here, > > > > but > > > > >> > also > > > > >> > >> > > could > > > > >> > >> > > >> be > > > > >> > >> > > >> > >used for other types of join later, for example, > > batch > > > > hash > > > > >> > >> join or > > > > >> > >> > > >> > >streaming regular join. > > > > >> > >> > > >> > >Go back to better name problem for hash look up > join. > > > > Since > > > > >> > the > > > > >> > >> > > 'skew' > > > > >> > >> > > >> > hint > > > > >> > >> > > >> > >is a separate hint, so 'use_hash' is still an > > > > alternative. > > > > >> > >> > > >> > >WDYT? > > > > >> > >> > > >> > >I don't have a good idea about the better hint name > > yet. > > > > I > > > > >> > would > > > > >> > >> > like > > > > >> > >> > > >> to > > > > >> > >> > > >> > >heard more suggestions about hint names. > > > > >> > >> > > >> > > > > > > >> > >> > > >> > >> As you mentioned in the flip, this solution > > depends on > > > > >> > future > > > > >> > >> > > >> changes > > > > >> > >> > > >> > to > > > > >> > >> > > >> > >calcite (and also upgrading calcite would be > another > > > > >> possible > > > > >> > >> big > > > > >> > >> > > >> change: > > > > >> > >> > > >> > >at least calicite-1.30 vs 1.26, are we preparing to > > > > accept > > > > >> > this > > > > >> > >> big > > > > >> > >> > > >> > >change?). > > > > >> > >> > > >> > > > > > > >> > >> > > >> > >Indeed, solution 1 depends on calcite upgrade. > > > > >> > >> > > >> > >I admit upgrade from Calcite 1.26 to 1.30 would be > a > > big > > > > >> > >> change. I > > > > >> > >> > > >> still > > > > >> > >> > > >> > >remember what we have suffered from last upgrade to > > > > Calcite > > > > >> > >> 1.26. > > > > >> > >> > > >> > >However we could not always avoid upgrade for the > > > > following > > > > >> > >> reason: > > > > >> > >> > > >> > >1. Other features also depends on the Calcite > > upgrade. > > > > For > > > > >> > >> example, > > > > >> > >> > > >> > Session > > > > >> > >> > > >> > >Window and Count Window. > > > > >> > >> > > >> > >2. If we always avoid Calcite upgrade, there would > be > > > > more > > > > >> gap > > > > >> > >> with > > > > >> > >> > > the > > > > >> > >> > > >> > >latest version. One day, if upgrading becomes a > thing > > > > which > > > > >> > has > > > > >> > >> to > > > > >> > >> > be > > > > >> > >> > > >> > done, > > > > >> > >> > > >> > >the pain is more. > > > > >> > >> > > >> > > > > > > >> > >> > > >> > >WDYT? > > > > >> > >> > > >> > > > > > > >> > >> > > >> > >> Is there another possible way to minimize the > > change > > > > in > > > > >> > >> calcite? > > > > >> > >> > > >> > > > > > > >> > >> > > >> > >Do you check the 'Other Alternatives' part in the > > > > >> FLIP-204? It > > > > >> > >> > gives > > > > >> > >> > > >> > >another solution which does not depend on calcite > > upgrade > > > > >> and > > > > >> > do > > > > >> > >> > not > > > > >> > >> > > >> need > > > > >> > >> > > >> > >to worry about the hint would be missed in the > > > > propagation. > > > > >> > >> > > >> > >This is also what we have done in the internal > > version. > > > > >> > >> > > >> > >The core idea is propagating 'use_hash' hint to > > TableScan > > > > >> with > > > > >> > >> > > matched > > > > >> > >> > > >> > >table names. However, it is a little hacky. > > > > >> > >> > > >> > > > > > > >> > >> > > >> > >> As I know there're more limitations than > > `Correlate`. > > > > >> > >> > > >> > > > > > > >> > >> > > >> > >As mentioned before, in our external version, I > > choose > > > > the > > > > >> the > > > > >> > >> > 'Other > > > > >> > >> > > >> > >Alternatives' part in the FLIP-204. > > > > >> > >> > > >> > >Although I do a POC in the solution 1 and lists all > > > > >> changes I > > > > >> > >> found > > > > >> > >> > > in > > > > >> > >> > > >> the > > > > >> > >> > > >> > >FLIP, there may still be something I missed. > > > > >> > >> > > >> > >I'm very happy to hear that you point out there're > > more > > > > >> > >> limitations > > > > >> > >> > > >> except > > > > >> > >> > > >> > >for `Correlate`, would you please give more details > > on > > > > this > > > > >> > >> part? > > > > >> > >> > > >> > > > > > > >> > >> > > >> > >Best, > > > > >> > >> > > >> > >Jing Zhang > > > > >> > >> > > >> > > > > > > >> > >> > > >> > >[1] > > > > >> > >> > > > > > > >> https://docs.databricks.com/delta/join-performance/skew-join.html > > > > >> > >> > > >> > >[2] > > > > >> > >> > > >> > > > > > > >> > >> > > >> > > > > > >> > >> > > >> > > > > >> > >> > > > > > > >> > >> > > > > > >> > >> > > > > >> > > > > > >> > > > > > > > https://help.aliyun.com/apsara/enterprise/v_3_13_0_20201215/odps/enterprise-ascm-user-guide/hotspot-tilt.html?spm=a2c4g.14484438.10001.669 > > > > >> > >> > > >> > > > > > > >> > >> > > >> > >Jing Zhang <beyond1...@gmail.com> 于2021年12月29日周三 > > > > 14:40写道: > > > > >> > >> > > >> > > > > > > >> > >> > > >> > >> Hi Yuan and Lincoln, > > > > >> > >> > > >> > >> thanks a lot for the attention. I would answer > the > > > > email > > > > >> one > > > > >> > >> by > > > > >> > >> > > one. > > > > >> > >> > > >> > >> > > > > >> > >> > > >> > >> To Yuan > > > > >> > >> > > >> > >> > How shall we deal with CDC data? If there is > CDC > > data > > > > >> in > > > > >> > the > > > > >> > >> > > >> pipeline, > > > > >> > >> > > >> > >> IMHO, shuffle by join key will cause CDC data > > disorder. > > > > >> Will > > > > >> > >> it > > > > >> > >> > be > > > > >> > >> > > >> > better > > > > >> > >> > > >> > >> to use primary key in this case? > > > > >> > >> > > >> > >> > > > > >> > >> > > >> > >> Good question. > > > > >> > >> > > >> > >> The problem could not only exists in CDC data > > source, > > > > but > > > > >> > also > > > > >> > >> > > exists > > > > >> > >> > > >> > when > > > > >> > >> > > >> > >> the input stream is not insert-only stream (for > > > > example, > > > > >> the > > > > >> > >> > result > > > > >> > >> > > >> of > > > > >> > >> > > >> > >> unbounded aggregate or regular join). > > > > >> > >> > > >> > >> I think use hash by primary key is not a good > > choise. > > > > It > > > > >> > could > > > > >> > >> > not > > > > >> > >> > > >> raise > > > > >> > >> > > >> > >> the cache hit because cache key is look up key > > instead > > > > of > > > > >> > >> primary > > > > >> > >> > > >> key of > > > > >> > >> > > >> > >> input. > > > > >> > >> > > >> > >> > > > > >> > >> > > >> > >> To avoid wrong result, hash lookup Join requires > > that > > > > the > > > > >> > >> input > > > > >> > >> > > >> stream > > > > >> > >> > > >> > >> should be insert_only stream or its upsert keys > > > > contains > > > > >> > >> lookup > > > > >> > >> > > keys. > > > > >> > >> > > >> > >> > > > > >> > >> > > >> > >> I've added this limitation to FLIP, thanks a lot > > for > > > > >> > >> reminding. > > > > >> > >> > > >> > >> > > > > >> > >> > > >> > >> > If the shuffle keys can be customized when > users > > > > have > > > > >> the > > > > >> > >> > > >> knowledge > > > > >> > >> > > >> > >> about distribution of data? > > > > >> > >> > > >> > >> > > > > >> > >> > > >> > >> I'm not sure I understand your question. > > > > >> > >> > > >> > >> > > > > >> > >> > > >> > >> Do you mean to support user defined partitioner > > > > function > > > > >> on > > > > >> > >> keys > > > > >> > >> > > just > > > > >> > >> > > >> > like > > > > >> > >> > > >> > >> flink DataStream sql? > > > > >> > >> > > >> > >> If yes, I'm afraid there is no plan to support > this > > > > >> feature > > > > >> > >> yet > > > > >> > >> > > >> because > > > > >> > >> > > >> > >> the feature involves many things, for example: > > > > >> > >> > > >> > >> 1. sql syntax > > > > >> > >> > > >> > >> 2. user defined partitioner API > > > > >> > >> > > >> > >> 3. RelDistribution type extension and Flink > > > > >> RelDistribution > > > > >> > >> > > extension > > > > >> > >> > > >> > >> 4. FlinkExpandConversionRule > > > > >> > >> > > >> > >> 5. Exchange execNode extension > > > > >> > >> > > >> > >> 6. .... > > > > >> > >> > > >> > >> It needs well designed and more discussion. If > > this is > > > > a > > > > >> > >> strong > > > > >> > >> > > >> > >> requirement, we would drive another discussion on > > this > > > > >> point > > > > >> > >> > > >> > individually. > > > > >> > >> > > >> > >> In this FLIP, I would first support hash shuffle. > > WDYT? > > > > >> > >> > > >> > >> > > > > >> > >> > > >> > >> Or do you mean support hash by other keys instead > > of > > > > >> lookup > > > > >> > >> key? > > > > >> > >> > > >> > >> If yes, would you please tell me a specific user > > case? > > > > >> > >> > > >> > >> We need to fetch the record from external storage > > of > > > > >> > dimension > > > > >> > >> > > table > > > > >> > >> > > >> by > > > > >> > >> > > >> > >> look up key, so those dimension table source uses > > look > > > > up > > > > >> > >> keys as > > > > >> > >> > > >> cache > > > > >> > >> > > >> > >> key. > > > > >> > >> > > >> > >> We could only increase the cache ratio by > shuffle > > > > lookup > > > > >> > >> keys. > > > > >> > >> > > >> > >> I need more use cases to understand this > > requirement. > > > > >> > >> > > >> > >> > > > > >> > >> > > >> > >> > Some connectors such as hive, caches all data > in > > > > >> > >> > LookupFunction. > > > > >> > >> > > >> How > > > > >> > >> > > >> > to > > > > >> > >> > > >> > >> decrease the valid cache data size if data can be > > > > >> shuffled? > > > > >> > >> > > >> > >> > > > > >> > >> > > >> > >> Very good idea. > > > > >> > >> > > >> > >> There are two types of cache. > > > > >> > >> > > >> > >> For Key-Value storage, such as Redis/HBase, the > > lookup > > > > >> table > > > > >> > >> > source > > > > >> > >> > > >> > stores > > > > >> > >> > > >> > >> the visited lookup keys and it's record into > cache > > > > >> lazily. > > > > >> > >> > > >> > >> For other storage without keys, such as hive, > each > > task > > > > >> > loads > > > > >> > >> all > > > > >> > >> > > >> data > > > > >> > >> > > >> > >> into cache eagerly in the initialize phase. > > > > >> > >> > > >> > >> After introduce hash partitioner, for key-value > > > > storages, > > > > >> > >> there > > > > >> > >> > is > > > > >> > >> > > no > > > > >> > >> > > >> > need > > > > >> > >> > > >> > >> to change; for hive, each task could only load > > part of > > > > >> cache > > > > >> > >> > > instead > > > > >> > >> > > >> of > > > > >> > >> > > >> > >> load all cache. > > > > >> > >> > > >> > >> > > > > >> > >> > > >> > >> We have implemented this optimization in our > > internal > > > > >> > version. > > > > >> > >> > > >> > >> The core idea is push the partitioner information > > down > > > > to > > > > >> > the > > > > >> > >> > > lookup > > > > >> > >> > > >> > table > > > > >> > >> > > >> > >> source. When loading data into caches, each task > > could > > > > >> only > > > > >> > >> store > > > > >> > >> > > >> those > > > > >> > >> > > >> > >> records which look keys are sent to current task. > > > > >> > >> > > >> > >> We called this 'HashPartitionedCache'. > > > > >> > >> > > >> > >> > > > > >> > >> > > >> > >> I have added this point into the Lookup Join > > > > requirements > > > > >> > >> list in > > > > >> > >> > > the > > > > >> > >> > > >> > >> motivation of the FLIP, but I would not do this > > point > > > > in > > > > >> > this > > > > >> > >> > FLIP > > > > >> > >> > > >> right > > > > >> > >> > > >> > >> now. > > > > >> > >> > > >> > >> If this is a strong requirement, we need drive > > another > > > > >> > >> discussion > > > > >> > >> > > on > > > > >> > >> > > >> > this > > > > >> > >> > > >> > >> topic individually because this point involves > many > > > > >> > extension > > > > >> > >> on > > > > >> > >> > > API. > > > > >> > >> > > >> > >> > > > > >> > >> > > >> > >> Best, > > > > >> > >> > > >> > >> Jing Zhang > > > > >> > >> > > >> > >> > > > > >> > >> > > >> > >> > > > > >> > >> > > >> > >> Lincoln Lee <lincoln.8...@gmail.com> > > 于2021年12月29日周三 > > > > >> > 10:01写道: > > > > >> > >> > > >> > >> > > > > >> > >> > > >> > >>> Hi Jing, > > > > >> > >> > > >> > >>> Thanks for bringing up this discussion! > Agree > > > > that > > > > >> > this > > > > >> > >> > join > > > > >> > >> > > >> hints > > > > >> > >> > > >> > >>> should benefit both bounded and unbounded cases > as > > > > >> Martin > > > > >> > >> > > mentioned. > > > > >> > >> > > >> > >>> I also agree that implementing the query hint is > > the > > > > >> right > > > > >> > >> way > > > > >> > >> > > for a > > > > >> > >> > > >> > more > > > > >> > >> > > >> > >>> general purpose since the dynamic table options > > has a > > > > >> > limited > > > > >> > >> > > scope. > > > > >> > >> > > >> > >>> Some points I'd like to share are: > > > > >> > >> > > >> > >>> 1. Regarding the hint name ‘USE_HASH’, could we > > > > consider > > > > >> > more > > > > >> > >> > > >> > candidates? > > > > >> > >> > > >> > >>> Things are a little different from RDBMS in the > > > > >> distributed > > > > >> > >> > world, > > > > >> > >> > > >> and > > > > >> > >> > > >> > we > > > > >> > >> > > >> > >>> also aim to solve the data skew problem, so all > > these > > > > >> > >> incoming > > > > >> > >> > > hints > > > > >> > >> > > >> > names > > > > >> > >> > > >> > >>> should be considered together. > > > > >> > >> > > >> > >>> 2. As you mentioned in the flip, this solution > > depends > > > > >> on > > > > >> > >> future > > > > >> > >> > > >> > changes > > > > >> > >> > > >> > >>> to > > > > >> > >> > > >> > >>> calcite (and also upgrading calcite would be > > another > > > > >> > possible > > > > >> > >> > big > > > > >> > >> > > >> > change: > > > > >> > >> > > >> > >>> at least calicite-1.30 vs 1.26, are we preparing > > to > > > > >> accept > > > > >> > >> this > > > > >> > >> > > big > > > > >> > >> > > >> > >>> change?). Is there another possible way to > > minimize > > > > the > > > > >> > >> change > > > > >> > >> > in > > > > >> > >> > > >> > calcite? > > > > >> > >> > > >> > >>> As I know there're more limitations than > > `Correlate`. > > > > >> > >> > > >> > >>> > > > > >> > >> > > >> > >>> Best, > > > > >> > >> > > >> > >>> Lincoln Lee > > > > >> > >> > > >> > >>> > > > > >> > >> > > >> > >>> > > > > >> > >> > > >> > >>> Jing Zhang <beyond1...@gmail.com> > 于2021年12月28日周二 > > > > >> 23:04写道: > > > > >> > >> > > >> > >>> > > > > >> > >> > > >> > >>> > Hi Martijn, > > > > >> > >> > > >> > >>> > Thanks a lot for your attention. > > > > >> > >> > > >> > >>> > I'm sorry I didn't explain the motivation > > clearly. I > > > > >> > would > > > > >> > >> > like > > > > >> > >> > > to > > > > >> > >> > > >> > >>> explain > > > > >> > >> > > >> > >>> > it in detail, and then give response on your > > > > >> questions. > > > > >> > >> > > >> > >>> > A lookup join is typically used to enrich a > > table > > > > with > > > > >> > data > > > > >> > >> > that > > > > >> > >> > > >> is > > > > >> > >> > > >> > >>> queried > > > > >> > >> > > >> > >>> > from an external system. Many Lookup table > > sources > > > > >> > >> introduce > > > > >> > >> > > >> cache in > > > > >> > >> > > >> > >>> order > > > > >> > >> > > >> > >>> > to reduce the RPC call, such as JDBC, CSV, > HBase > > > > >> > >> connectors. > > > > >> > >> > > >> > >>> > For those connectors, we could raise cache hit > > ratio > > > > >> by > > > > >> > >> > routing > > > > >> > >> > > >> the > > > > >> > >> > > >> > same > > > > >> > >> > > >> > >>> > lookup keys to the same task instance. This is > > the > > > > >> > purpose > > > > >> > >> of > > > > >> > >> > > >> > >>> > > > > > >> > >> > > >> > >>> > > > > > >> > >> > > >> > >>> > > > > >> > >> > > >> > > > > > >> > >> > > >> > > > > >> > >> > > > > > > >> > >> > > > > > >> > >> > > > > >> > > > > > >> > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-204%3A+Introduce+Hash+Lookup+Join > > > > >> > >> > > >> > >>> > . > > > > >> > >> > > >> > >>> > Other cases might benefit from Hash > > distribution, > > > > >> such as > > > > >> > >> > batch > > > > >> > >> > > >> hash > > > > >> > >> > > >> > >>> join > > > > >> > >> > > >> > >>> > as you mentioned. It is a cool idea, however > it > > is > > > > not > > > > >> > the > > > > >> > >> > > >> purpose of > > > > >> > >> > > >> > >>> this > > > > >> > >> > > >> > >>> > FLIP, we could discuss this in FLINK-20670 > > > > >> > >> > > >> > >>> > < > > https://issues.apache.org/jira/browse/FLINK-20670 > > > > >. > > > > >> > >> > > >> > >>> > > > > > >> > >> > > >> > >>> > > - When I was reading about this topic [1] I > > was > > > > >> > >> wondering if > > > > >> > >> > > >> this > > > > >> > >> > > >> > >>> feature > > > > >> > >> > > >> > >>> > would be more beneficial for bounded use cases > > and > > > > >> not so > > > > >> > >> much > > > > >> > >> > > for > > > > >> > >> > > >> > >>> > unbounded use cases. What do you think? > > > > >> > >> > > >> > >>> > > > > > >> > >> > > >> > >>> > As mentioned before, the purpose of Hash > Lookup > > Join > > > > >> is > > > > >> > to > > > > >> > >> > > >> increase > > > > >> > >> > > >> > the > > > > >> > >> > > >> > >>> > cache hit ratio which is different from Oracle > > Hash > > > > >> Join. > > > > >> > >> > > However > > > > >> > >> > > >> we > > > > >> > >> > > >> > >>> could > > > > >> > >> > > >> > >>> > use the similar hint syntax. > > > > >> > >> > > >> > >>> > > > > > >> > >> > > >> > >>> > > - If I look at the current documentation for > > SQL > > > > >> Hints > > > > >> > in > > > > >> > >> > > Flink > > > > >> > >> > > >> > [2], I > > > > >> > >> > > >> > >>> > notice that all of the hints there are located > > at > > > > the > > > > >> end > > > > >> > >> of > > > > >> > >> > the > > > > >> > >> > > >> SQL > > > > >> > >> > > >> > >>> > statement. In the FLIP, the use_hash is > defined > > > > >> directly > > > > >> > >> after > > > > >> > >> > > the > > > > >> > >> > > >> > >>> 'SELECT' > > > > >> > >> > > >> > >>> > keyword. Can we somehow make this consistent > > for the > > > > >> > user? > > > > >> > >> Or > > > > >> > >> > > >> should > > > > >> > >> > > >> > the > > > > >> > >> > > >> > >>> > user be able to specify hints anywhere in its > > SQL > > > > >> > >> statement? > > > > >> > >> > > >> > >>> > > > > > >> > >> > > >> > >>> > Calcite supports hints in two locations [3]: > > > > >> > >> > > >> > >>> > Query Hint: right after the SELECT keyword; > > > > >> > >> > > >> > >>> > Table Hint: right after the referenced table > > name. > > > > >> > >> > > >> > >>> > Now Flink has supported dynamic table options > > based > > > > on > > > > >> > the > > > > >> > >> > Hint > > > > >> > >> > > >> > >>> framework > > > > >> > >> > > >> > >>> > of Calcite which is mentioned in doc[2]. > > > > >> > >> > > >> > >>> > Besides, query hints are also important, it > > could > > > > >> give a > > > > >> > >> hint > > > > >> > >> > > for > > > > >> > >> > > >> > >>> > optimizers to choose a better plan. Almost all > > > > popular > > > > >> > >> > databases > > > > >> > >> > > >> and > > > > >> > >> > > >> > >>> > big-data engines support sql query hints, such > > as > > > > >> oracle, > > > > >> > >> > hive, > > > > >> > >> > > >> spark > > > > >> > >> > > >> > >>> and > > > > >> > >> > > >> > >>> > so on. > > > > >> > >> > > >> > >>> > I think using query hints in this case is more > > > > natural > > > > >> > for > > > > >> > >> > > users, > > > > >> > >> > > >> > WDYT? > > > > >> > >> > > >> > >>> > > > > > >> > >> > > >> > >>> > I have updated the motivation part in the > FLIP, > > > > >> > >> > > >> > >>> > Thanks for the feedback! > > > > >> > >> > > >> > >>> > > > > > >> > >> > > >> > >>> > [1] > > > > >> > >> > > >> > > > > >> https://logicalread.com/oracle-11g-hash-joins-mc02/#.V5Wm4_mnoUI > > > > >> > >> > > >> > >>> > [2] > > > > >> > >> > > >> > >>> > > > > > >> > >> > > >> > >>> > > > > > >> > >> > > >> > >>> > > > > >> > >> > > >> > > > > > >> > >> > > >> > > > > >> > >> > > > > > > >> > >> > > > > > >> > >> > > > > >> > > > > > >> > > > > > > > https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/table/sql/queries/hints/ > > > > >> > >> > > >> > >>> > [3] > > > > >> > >> https://calcite.apache.org/docs/reference.html#sql-hints > > > > >> > >> > > >> > >>> > > > > > >> > >> > > >> > >>> > Best, > > > > >> > >> > > >> > >>> > Jing Zhang > > > > >> > >> > > >> > >>> > > > > > >> > >> > > >> > >>> > Martijn Visser <mart...@ververica.com> > > > > 于2021年12月28日周二 > > > > >> > >> > 22:02写道: > > > > >> > >> > > >> > >>> > > > > > >> > >> > > >> > >>> > > Hi Jing, > > > > >> > >> > > >> > >>> > > > > > > >> > >> > > >> > >>> > > Thanks a lot for the explanation and the > > FLIP. I > > > > >> > >> definitely > > > > >> > >> > > >> learned > > > > >> > >> > > >> > >>> > > something when reading more about > `use_hash`. > > My > > > > >> > >> > > interpretation > > > > >> > >> > > >> > would > > > > >> > >> > > >> > >>> be > > > > >> > >> > > >> > >>> > > that the primary benefit of a hash lookup > join > > > > >> would be > > > > >> > >> > > improved > > > > >> > >> > > >> > >>> > > performance by allowing the user to > explicitly > > > > >> optimise > > > > >> > >> the > > > > >> > >> > > >> > planner. > > > > >> > >> > > >> > >>> > > > > > > >> > >> > > >> > >>> > > I have a couple of questions: > > > > >> > >> > > >> > >>> > > > > > > >> > >> > > >> > >>> > > - When I was reading about this topic [1] I > > was > > > > >> > >> wondering if > > > > >> > >> > > >> this > > > > >> > >> > > >> > >>> feature > > > > >> > >> > > >> > >>> > > would be more beneficial for bounded use > > cases and > > > > >> not > > > > >> > so > > > > >> > >> > much > > > > >> > >> > > >> for > > > > >> > >> > > >> > >>> > > unbounded use cases. What do you think? > > > > >> > >> > > >> > >>> > > - If I look at the current documentation for > > SQL > > > > >> Hints > > > > >> > in > > > > >> > >> > > Flink > > > > >> > >> > > >> > [2], I > > > > >> > >> > > >> > >>> > > notice that all of the hints there are > > located at > > > > >> the > > > > >> > >> end of > > > > >> > >> > > the > > > > >> > >> > > >> > SQL > > > > >> > >> > > >> > >>> > > statement. In the FLIP, the use_hash is > > defined > > > > >> > directly > > > > >> > >> > after > > > > >> > >> > > >> the > > > > >> > >> > > >> > >>> > 'SELECT' > > > > >> > >> > > >> > >>> > > keyword. Can we somehow make this consistent > > for > > > > the > > > > >> > >> user? > > > > >> > >> > Or > > > > >> > >> > > >> > should > > > > >> > >> > > >> > >>> the > > > > >> > >> > > >> > >>> > > user be able to specify hints anywhere in > its > > SQL > > > > >> > >> statement? > > > > >> > >> > > >> > >>> > > > > > > >> > >> > > >> > >>> > > Best regards, > > > > >> > >> > > >> > >>> > > > > > > >> > >> > > >> > >>> > > Martijn > > > > >> > >> > > >> > >>> > > > > > > >> > >> > > >> > >>> > > [1] > > > > >> > >> > > >> > > > > > >> > > https://logicalread.com/oracle-11g-hash-joins-mc02/#.V5Wm4_mnoUI > > > > >> > >> > > >> > >>> > > [2] > > > > >> > >> > > >> > >>> > > > > > > >> > >> > > >> > >>> > > > > > > >> > >> > > >> > >>> > > > > > >> > >> > > >> > >>> > > > > >> > >> > > >> > > > > > >> > >> > > >> > > > > >> > >> > > > > > > >> > >> > > > > > >> > >> > > > > >> > > > > > >> > > > > > > > https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/table/sql/queries/hints/ > > > > >> > >> > > >> > >>> > > > > > > >> > >> > > >> > >>> > > > > > > >> > >> > > >> > >>> > > On Tue, 28 Dec 2021 at 08:17, Jing Zhang < > > > > >> > >> > > beyond1...@gmail.com> > > > > >> > >> > > >> > >>> wrote: > > > > >> > >> > > >> > >>> > > > > > > >> > >> > > >> > >>> > > > Hi everyone, > > > > >> > >> > > >> > >>> > > > Look up join > > > > >> > >> > > >> > >>> > > > < > > > > >> > >> > > >> > >>> > > > > > > > >> > >> > > >> > >>> > > > > > > >> > >> > > >> > >>> > > > > > >> > >> > > >> > >>> > > > > >> > >> > > >> > > > > > >> > >> > > >> > > > > >> > >> > > > > > > >> > >> > > > > > >> > >> > > > > >> > > > > > >> > > > > > > > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#lookup-join > > > > >> > >> > > >> > >>> > > > >[1] > > > > >> > >> > > >> > >>> > > > is > > > > >> > >> > > >> > >>> > > > commonly used feature in Flink SQL. We > have > > > > >> received > > > > >> > >> many > > > > >> > >> > > >> > >>> optimization > > > > >> > >> > > >> > >>> > > > requirements on look up join. For example: > > > > >> > >> > > >> > >>> > > > 1. Enforces left side of lookup join do a > > hash > > > > >> > >> partitioner > > > > >> > >> > > to > > > > >> > >> > > >> > raise > > > > >> > >> > > >> > >>> > cache > > > > >> > >> > > >> > >>> > > > hint ratio > > > > >> > >> > > >> > >>> > > > 2. Solves the data skew problem after > > introduces > > > > >> hash > > > > >> > >> > lookup > > > > >> > >> > > >> join > > > > >> > >> > > >> > >>> > > > 3. Enables mini-batch optimization to > > reduce RPC > > > > >> call > > > > >> > >> > > >> > >>> > > > > > > > >> > >> > > >> > >>> > > > Next we will solve these problems one by > > one. > > > > >> > >> Firstly, we > > > > >> > >> > > >> would > > > > >> > >> > > >> > >>> focus > > > > >> > >> > > >> > >>> > on > > > > >> > >> > > >> > >>> > > > point 1, and continue to discuss point 2 > and > > > > >> point 3 > > > > >> > >> > later. > > > > >> > >> > > >> > >>> > > > > > > > >> > >> > > >> > >>> > > > There are many similar requirements from > > user > > > > mail > > > > >> > list > > > > >> > >> > and > > > > >> > >> > > >> JIRA > > > > >> > >> > > >> > >>> about > > > > >> > >> > > >> > >>> > > hash > > > > >> > >> > > >> > >>> > > > Lookup Join, for example: > > > > >> > >> > > >> > >>> > > > 1. FLINK-23687 < > > > > >> > >> > > >> > https://issues.apache.org/jira/browse/FLINK-23687> > > > > >> > >> > > >> > >>> - > > > > >> > >> > > >> > >>> > > > Introduce partitioned lookup join to > enforce > > > > >> input of > > > > >> > >> > > >> LookupJoin > > > > >> > >> > > >> > to > > > > >> > >> > > >> > >>> > hash > > > > >> > >> > > >> > >>> > > > shuffle by lookup keys > > > > >> > >> > > >> > >>> > > > 2. FLINK-25396 < > > > > >> > >> > > >> > https://issues.apache.org/jira/browse/FLINK-25396> > > > > >> > >> > > >> > >>> - > > > > >> > >> > > >> > >>> > > > lookupjoin source table for > pre-partitioning > > > > >> > >> > > >> > >>> > > > 3. FLINK-25262 < > > > > >> > >> > > >> > https://issues.apache.org/jira/browse/FLINK-25262> > > > > >> > >> > > >> > >>> - > > > > >> > >> > > >> > >>> > > > Support to send data to lookup table for > > > > >> > >> > > >> > KeyGroupStreamPartitioner > > > > >> > >> > > >> > >>> way > > > > >> > >> > > >> > >>> > > for > > > > >> > >> > > >> > >>> > > > SQL. > > > > >> > >> > > >> > >>> > > > > > > > >> > >> > > >> > >>> > > > In this FLIP, I would like to start a > > discussion > > > > >> > about > > > > >> > >> > Hash > > > > >> > >> > > >> > Lookup > > > > >> > >> > > >> > >>> > Join. > > > > >> > >> > > >> > >>> > > > The core idea is introducing a 'USE_HASH' > > hint > > > > in > > > > >> > >> query. > > > > >> > >> > > This > > > > >> > >> > > >> > >>> syntax > > > > >> > >> > > >> > >>> > is > > > > >> > >> > > >> > >>> > > > directly user-oriented and therefore > > requires > > > > >> careful > > > > >> > >> > > design. > > > > >> > >> > > >> > >>> > > > There are two ways about how to propagate > > this > > > > >> hint > > > > >> > to > > > > >> > >> > > >> > LookupJoin in > > > > >> > >> > > >> > >>> > > > optimizer. We need further discussion to > do > > > > final > > > > >> > >> decide. > > > > >> > >> > > >> Anyway, > > > > >> > >> > > >> > >>> the > > > > >> > >> > > >> > >>> > > > difference between the two solution is > only > > > > about > > > > >> the > > > > >> > >> > > internal > > > > >> > >> > > >> > >>> > > > implementation and has no impact on the > > user. > > > > >> > >> > > >> > >>> > > > > > > > >> > >> > > >> > >>> > > > For more detail on the proposal: > > > > >> > >> > > >> > >>> > > > > > > > >> > >> > > >> > >>> > > > > > > > >> > >> > > >> > >>> > > > > > > >> > >> > > >> > >>> > > > > > >> > >> > > >> > >>> > > > > >> > >> > > >> > > > > > >> > >> > > >> > > > > >> > >> > > > > > > >> > >> > > > > > >> > >> > > > > >> > > > > > >> > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-204%3A+Introduce+Hash+Lookup+Join > > > > >> > >> > > >> > >>> > > > > > > > >> > >> > > >> > >>> > > > > > > > >> > >> > > >> > >>> > > > Looking forward to your feedback, thanks. > > > > >> > >> > > >> > >>> > > > > > > > >> > >> > > >> > >>> > > > Best, > > > > >> > >> > > >> > >>> > > > Jing Zhang > > > > >> > >> > > >> > >>> > > > > > > > >> > >> > > >> > >>> > > > [1] > > > > >> > >> > > >> > >>> > > > > > > > >> > >> > > >> > >>> > > > > > > > >> > >> > > >> > >>> > > > > > > >> > >> > > >> > >>> > > > > > >> > >> > > >> > >>> > > > > >> > >> > > >> > > > > > >> > >> > > >> > > > > >> > >> > > > > > > >> > >> > > > > > >> > >> > > > > >> > > > > > >> > > > > > > > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#lookup-join > > > > >> > >> > > >> > >>> > > > > > > > >> > >> > > >> > >>> > > > > > > >> > >> > > >> > >>> > > > > > >> > >> > > >> > >>> > > > > >> > >> > > >> > >> > > > > >> > >> > > >> > > > > > >> > >> > > >> > > > > >> > >> > > > > > > > >> > >> > > > > > > >> > >> > > > > > >> > >> > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > > > > > > >