Hi all, Thanks for all the feedback so far. If there is no more suggestions, I would like to drive a vote in Tuesday next week (18 Jan).
Best, Jing Zhang Jing Zhang <beyond1...@gmail.com> 于2022年1月5日周三 11:33写道: > Hi Francesco, > Thanks a lot for the feedback. > > > does it makes sense for a lookup join to use hash distribution whenever > is possible by default? > I prefer to enable the hash lookup join only find the hint in the query > for the following reason: > 1. Plan compatibility > Add a hash shuffle by default would leads to the change of plan after > users upgrade the flink version. > Besides, lookup join is commonly used feature in flink SQL. > 2. Not all flink jobs could benefit from this improvement. > It is a trade off for the lookup join with dimension connectors which > has cache inside. > We hope the raise the cache hit ratio by Hash Lookup Join, however it > would leads to an extra shuffle at the same time. > It is not always a positive optimization, especially for the > connectors which does not have cache inside. > > > Shouldn't the hint take the table alias as the "table name"? What if > you do two lookup joins in cascade within the same query with the same > table (once > on a key, then on another one), where you use two different aliases for > the table? > In theory, it's better to support both table names and alias names. > But in calcite, the alias name of subquery or table would not be lost in > the sql conversion phase and sql optimization phase. > So here we only support table names. > > Best, > Jing Zhang > > > Francesco Guardiani <france...@ververica.com> 于2022年1月3日周一 18:38写道: > >> Hi Jing, >> >> Thanks for the FLIP. I'm not very knowledgeable about the topic, but going >> through both the FLIP and the discussion here, I wonder, does it makes >> sense for a lookup join to use hash distribution whenever is possible by >> default? >> >> The point you're explaining here: >> >> > Many Lookup table sources introduce cache in order >> to reduce the RPC call, such as JDBC, CSV, HBase connectors. >> For those connectors, we could raise cache hit ratio by routing the same >> lookup keys to the same task instance >> >> Seems something we can infer automatically, rather than manually asking >> the >> user to add this hint to the query. Note that I'm not talking against the >> hint syntax, which might still make sense to be introduced, but I feel >> like >> this optimization makes sense in the general case when using the >> connectors >> you have quoted. Perhaps there is some downside I'm not aware of? >> >> Talking about the hint themselves, taking this example as reference: >> >> SELECT /*+ SHUFFLE_HASH('Orders', 'Customers') */ o.order_id, o.total, >> c.country, c.zip >> FROM Orders AS o >> JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c >> ON o.customer_id = c.id; >> >> Shouldn't the hint take the table alias as the "table name"? What If you >> do >> two lookup joins in cascade within the same query with the same table >> (once >> on a key, then on another one), where you use two different aliases for >> the >> table? >> >> >> On Fri, Dec 31, 2021 at 9:56 AM Jing Zhang <beyond1...@gmail.com> wrote: >> >> > Hi Lincoln, >> > Thanks for the feedback. >> > >> > > 1. For the hint name, +1 for WenLong's proposal. >> > >> > I've added add 'SHUFFLE_HASH' to other alternatives in FLIP. Let's >> waiting >> > for more voices here. >> > >> > > Regarding the `SKEW` hint, agree with you that it can be used widely, >> and >> > I >> > prefer to treat it as a metadata hint, a new category differs from a >> join >> > hint. >> > For your example: >> > ``` >> > SELECT /*+ USE_HASH('Orders', 'Customers'), SKEW('Orders') */ >> o.order_id, >> > o.total, c.country, c.zip >> > FROM Orders AS o >> > JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c >> > ON o.customer_id = c.id; >> > ``` >> > I would prefer another form: >> > ``` >> > -- provide the skew info to let the engine choose the optimal plan >> > SELECT /*+ SKEW('Orders') */ o.order_id, ... >> > >> > -- or introduce a new hint for the join case, e.g., >> > SELECT /*+ REPLICATED_SHUFFLE_HASH('Orders') */ o.order_id, ... >> > ``` >> > >> > Maybe there is misunderstanding here. >> > I just use a syntax sugar here. >> > >> > SELECT /*+ USE_HASH('Orders', 'Customers'), SKEW('Orders') */ >> o.order_id, >> > .... >> > >> > is just a syntax with >> > >> > SELECT /*+ USE_HASH('Orders', 'Customers') */ /*+SKEW('Orders') */ >> > o.order_id, >> > .... >> > >> > Although I list 'USE_HASH' and 'SKEW' hint in a query hint clause, it >> does >> > not mean they must appear together as a whole. >> > Based on calcite syntax doc [1], you could list more than one hint in >> > a /*+' hint [, hint ]* '*/ clause. >> > >> > Each hint has different function. >> > The'USE_HASH' hint suggests the optimizer use hash partitioner for >> Lookup >> > Join for table 'Orders' and table 'Customers' while the 'SKEW' hint >> tells >> > the optimizer the skew metadata about the table 'Orders'. >> > >> > Best, >> > Jing Zhang >> > >> > [1] https://calcite.apache.org/docs/reference.html#sql-hints >> > >> > >> > >> > >> > Jing Zhang <beyond1...@gmail.com> 于2021年12月31日周五 16:39写道: >> > >> > > Hi Martijn, >> > > Thanks for the feedback. >> > > >> > > Glad to hear that we reached a consensus on the first and second >> point. >> > > >> > > About whether to use `use_hash` as a term, I think your concern makes >> > > sense. >> > > Although the hash lookup join is similar to Hash join in oracle that >> they >> > > all require hash distribution on input, there exists a little >> difference >> > > between them. >> > > About this point, Lincoln and WenLong both prefer the term >> > 'SHUFFLE_HASH', >> > > WDYT? >> > > >> > > Best, >> > > Jing Zhang >> > > >> > > >> > > Lincoln Lee <lincoln.8...@gmail.com> 于2021年12月30日周四 11:21写道: >> > > >> > >> Hi Jing, >> > >> Thanks for your explanation! >> > >> >> > >> 1. For the hint name, +1 for WenLong's proposal. I think the >> `SHUFFLE` >> > >> keyword is important in a classic distributed computing system, >> > >> a hash-join usually means there's a shuffle stage(include shuffle >> > >> hash-join, broadcast hash-join). Users only need to pass the `build` >> > side >> > >> table(usually the smaller one) into `SHUFFLE_HASH` join hint, more >> > >> concisely than `USE_HASH(left_table, right_table)`. Please correct >> me if >> > >> my >> > >> understanding is wrong. >> > >> Regarding the `SKEW` hint, agree with you that it can be used widely, >> > and >> > >> I >> > >> prefer to treat it as a metadata hint, a new category differs from a >> > join >> > >> hint. >> > >> For your example: >> > >> ``` >> > >> SELECT /*+ USE_HASH('Orders', 'Customers'), SKEW('Orders') */ >> > o.order_id, >> > >> o.total, c.country, c.zip >> > >> FROM Orders AS o >> > >> JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c >> > >> ON o.customer_id = c.id; >> > >> ``` >> > >> I would prefer another form: >> > >> ``` >> > >> -- provide the skew info to let the engine choose the optimal plan >> > >> SELECT /*+ SKEW('Orders') */ o.order_id, ... >> > >> >> > >> -- or introduce a new hint for the join case, e.g., >> > >> SELECT /*+ REPLICATED_SHUFFLE_HASH('Orders') */ o.order_id, ... >> > >> ``` >> > >> >> > >> 2. Agree with Martin adding the feature to 1.16, we need time to >> > complete >> > >> the change in calcite and also the upgrading work. >> > >> >> > >> 3. I misunderstood the 'Other Alternatives' part as the 'Rejected' >> ones >> > in >> > >> the FLIP doc. And my point is avoiding the hacky way with our best >> > effort. >> > >> The potential issues for calcite's hint propagation, e.g., join hints >> > >> correctly propagate into proper join scope include subquery or views >> > which >> > >> may have various sql operators, so we should check all kinds of >> > operators >> > >> for the correct propagation. Hope this may help. And also cc @Shuo >> Cheng >> > >> may >> > >> offer more help. >> > >> >> > >> >> > >> Best, >> > >> Lincoln Lee >> > >> >> > >> >> > >> Martijn Visser <mart...@ververica.com> 于2021年12月29日周三 22:21写道: >> > >> >> > >> > Hi Jing, >> > >> > >> > >> > Thanks for explaining this in more detail and also to others >> > >> > participating. >> > >> > >> > >> > > I think using query hints in this case is more natural for users, >> > >> WDYT? >> > >> > >> > >> > Yes, I agree. As long as we properly explain in our documentation >> that >> > >> we >> > >> > support both Query Hints and Table Hints, what's the difference >> > between >> > >> > them and how to use them, I think our users can understand this >> > >> perfectly. >> > >> > >> > >> > > I admit upgrading from Calcite 1.26 to 1.30 would be a big >> change. >> > >> > However we could not always avoid upgrade for the following reason >> > >> > >> > >> > We have to upgrade Calcite. We actually considered putting that in >> the >> > >> > Flink 1.15 scope but ultimately had to drop it, but I definitely >> think >> > >> this >> > >> > needs to be done for 1.16. It's not only because of new features >> that >> > >> are >> > >> > depending on Calcite upgrades, but also because newer versions have >> > >> > resolved bugs that also hurt our users. That's why we also already >> > have >> > >> > tickets for upgrading to Calcite 1.27 [1] and 1.28 [2]. >> > >> > >> > >> > With regards to using `use_hash` as a term, I think the most >> important >> > >> part >> > >> > is that if we re-use a term like Oracle is using, is that the >> > behaviour >> > >> and >> > >> > outcome should be the same/comparable to the one from (in this >> case) >> > >> > Oracle. If their behaviour and outcome are not the same or >> > comparable, I >> > >> > would probably introduce our own term to avoid that users get >> > confused. >> > >> > >> > >> > Best regards, >> > >> > >> > >> > Martijn >> > >> > >> > >> > [1] https://issues.apache.org/jira/browse/FLINK-20873 >> > >> > [2] https://issues.apache.org/jira/browse/FLINK-21239 >> > >> > >> > >> > On Wed, 29 Dec 2021 at 14:18, Jing Zhang <beyond1...@gmail.com> >> > wrote: >> > >> > >> > >> > > Hi Jian gang, >> > >> > > Thanks for the feedback. >> > >> > > >> > >> > > > When it comes to hive, how do you load partial data instead of >> the >> > >> > > whole data? Any change related with hive? >> > >> > > >> > >> > > The question is same as Yuan mentioned before. >> > >> > > I prefer to drive another FLIP on this topic to further >> discussion >> > >> > > individually because this point involves many extension on API. >> > >> > > Here I would like to share the implementation in our internal >> > version >> > >> > > firstly, it maybe very different with the final solution which >> > merged >> > >> to >> > >> > > community. >> > >> > > The core idea is push the partitioner information down to the >> lookup >> > >> > table >> > >> > > source. >> > >> > > Hive connector need also upgrades. When loading data into caches, >> > each >> > >> > task >> > >> > > could only store records which look keys are sent to current >> task. >> > >> > > >> > >> > > > How to define the cache configuration? For example, the size >> and >> > the >> > >> > ttl. >> > >> > > >> > >> > > I'm afraid there is no a unify caching configuration and cache >> > >> > > implementation of different connectors yet. >> > >> > > You could find cache size and ttl config of JDBC in doc [1], >> HBase >> > in >> > >> doc >> > >> > > [2] >> > >> > > >> > >> > > > Will this feature add another shuffle phase compared with the >> > >> default >> > >> > > behavior? In what situations will user choose this feature? >> > >> > > >> > >> > > Yes, if user specify hash hint in query, optimizer would prefer >> to >> > >> choose >> > >> > > Hash Lookup Join, which would add a Hash Shuffle. >> > >> > > If lookup table source has cache inside (for example HBase/Jdbc) >> and >> > >> the >> > >> > > benefit of increasing cache hit ratio is bigger than add an extra >> > >> shuffle >> > >> > > cost, the user could use Hash Lookup Join. >> > >> > > >> > >> > > > For the keys, the default implementation will be ok. But I >> wonder >> > >> > > whether we can support more flexible strategies. >> > >> > > >> > >> > > The question is same as Yuan mentioned before. >> > >> > > >> > >> > > I'm afraid there is no plan to support flexible strategies yet >> > because >> > >> > the >> > >> > > feature involves many things, for example: >> > >> > > 1. sql syntax >> > >> > > 2. user defined partitioner API >> > >> > > 3. RelDistribution type extension and Flink RelDistribution >> > extension >> > >> > > 4. FlinkExpandConversionRule >> > >> > > 5. Exchange execNode extension >> > >> > > 6. .... >> > >> > > It needs well designed and more discussion. If this is a strong >> > >> > > requirement, we would drive another discussion on this point >> > >> > individually. >> > >> > > In this FLIP, I would first support hash shuffle. WDYT? >> > >> > > >> > >> > > Best, >> > >> > > Jing Zhang >> > >> > > >> > >> > > [1] >> > >> > > >> > >> > > >> > >> > >> > >> >> > >> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/jdbc/#connector-options >> > >> > > [2] >> > >> > > >> > >> > > >> > >> > >> > >> >> > >> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/hbase/#connector-options >> > >> > > >> > >> > > Jing Zhang <beyond1...@gmail.com> 于2021年12月29日周三 20:37写道: >> > >> > > >> > >> > > > Hi Wenlong, >> > >> > > > Thanks for the feedback. >> > >> > > > I've checked similar syntax in other systems, they are all >> > different >> > >> > from >> > >> > > > each other. It seems to be without consensus. >> > >> > > > As mentioned in FLIP-204, oracle uses a query hint, the hint >> name >> > is >> > >> > > > 'use_hash' [1]. >> > >> > > > Spark also uses a query hint, its name is 'SHUFFLE_HASH' [2]. >> > >> > > > SQL Server uses keyword 'HASH' instead of query hint [3]. >> > >> > > > Note, the purposes of hash shuffle in [1][2][3] are a little >> > >> different >> > >> > > > from the purpose of FLIP-204, we just discuss syntax here. >> > >> > > > >> > >> > > > I've added this part to FLIP waiting for further discussion. >> > >> > > > >> > >> > > > Best, >> > >> > > > Jing Zhang >> > >> > > > >> > >> > > > [1] >> > >> > > > >> > >> > >> > >> >> > >> https://docs.oracle.com/cd/B12037_01/server.101/b10752/hintsref.htm#5683 >> > >> > > > [2] >> > >> > > > >> > >> > >> > >> >> > >> https://spark.apache.org/docs/3.0.0/sql-ref-syntax-qry-select-hints.html >> > >> > > > [3] >> > >> > > > >> > >> > > >> > >> > >> > >> >> > >> https://docs.microsoft.com/en-us/sql/t-sql/queries/hints-transact-sql-join?view=sql-server-ver15 >> > >> > > > >> > >> > > > >> > >> > > > wenlong.lwl <wenlong88....@gmail.com> 于2021年12月29日周三 17:18写道: >> > >> > > > >> > >> > > >> Hi, Jing, thanks for driving the discussion. >> > >> > > >> >> > >> > > >> Have you made some investigation on the syntax of join hint? >> > >> > > >> Why do you choose USE_HASH from oracle instead of the style of >> > >> spark >> > >> > > >> SHUFFLE_HASH, they are quite different. >> > >> > > >> People in the big data world may be more familiar with >> > spark/hive, >> > >> if >> > >> > we >> > >> > > >> need to choose one, personally, I prefer the style of spark. >> > >> > > >> >> > >> > > >> >> > >> > > >> Best, >> > >> > > >> Wenlong >> > >> > > >> >> > >> > > >> On Wed, 29 Dec 2021 at 16:48, zst...@163.com <zst...@163.com> >> > >> wrote: >> > >> > > >> >> > >> > > >> > >> > >> > > >> > >> > >> > > >> > >> > >> > > >> > Hi Jing, >> > >> > > >> > Thanks for your detail reply. >> > >> > > >> > 1) In the last suggestion, hash by primary key is not use >> for >> > >> > raising >> > >> > > >> the >> > >> > > >> > cache hit, but handling with skew of left source. Now that >> you >> > >> have >> > >> > > >> 'skew' >> > >> > > >> > hint and other discussion about it, I'm looking forward to >> it. >> > >> > > >> > 2) I mean to support user defined partitioner function. We >> > have a >> > >> > case >> > >> > > >> > that joining a datalake source with special way of >> partition, >> > and >> > >> > have >> > >> > > >> > implemented not elegantly in our internal version. As you >> said, >> > >> it >> > >> > > needs >> > >> > > >> > more design. >> > >> > > >> > 3) I thing so-called 'HashPartitionedCache' is usefull, >> > otherwise >> > >> > > >> loading >> > >> > > >> > all data such as hive lookup table source is almost not >> > >> available in >> > >> > > big >> > >> > > >> > data. >> > >> > > >> > >> > >> > > >> > >> > >> > > >> > >> > >> > > >> > >> > >> > > >> > >> > >> > > >> > >> > >> > > >> > >> > >> > > >> > Best regards, >> > >> > > >> > Yuan >> > >> > > >> > >> > >> > > >> > >> > >> > > >> > >> > >> > > >> > >> > >> > > >> > >> > >> > > >> > >> > >> > > >> > >> > >> > > >> > >> > >> > > >> > 在 2021-12-29 14:52:11,"Jing Zhang" <beyond1...@gmail.com> >> 写道: >> > >> > > >> > >Hi, Lincoln >> > >> > > >> > >Thanks a lot for the feedback. >> > >> > > >> > > >> > >> > > >> > >> Regarding the hint name ‘USE_HASH’, could we consider >> more >> > >> > > >> candidates? >> > >> > > >> > >Things are a little different from RDBMS in the distributed >> > >> world, >> > >> > > and >> > >> > > >> we >> > >> > > >> > >also aim to solve the data skew problem, so all these >> incoming >> > >> > hints >> > >> > > >> names >> > >> > > >> > >should be considered together. >> > >> > > >> > > >> > >> > > >> > >About skew problem, I would discuss this in next FLIP >> > >> > individually. I >> > >> > > >> > would >> > >> > > >> > >like to share hint proposal for skew here. >> > >> > > >> > >I want to introduce 'skew' hint which is a query hint, >> similar >> > >> with >> > >> > > >> skew >> > >> > > >> > >hint in spark [1] and MaxCompute[2]. >> > >> > > >> > >The 'skew' hint could only contain the name of the table >> with >> > >> skew. >> > >> > > >> > >Besides, skew hint could accept table name and column >> names. >> > >> > > >> > >In addition, skew hint could accept table name, column >> names >> > and >> > >> > skew >> > >> > > >> > >values. >> > >> > > >> > >For example: >> > >> > > >> > > >> > >> > > >> > >SELECT /*+ USE_HASH('Orders', 'Customers'), SKEW('Orders') >> */ >> > >> > > >> o.order_id, >> > >> > > >> > >o.total, c.country, c.zip >> > >> > > >> > >FROM Orders AS o >> > >> > > >> > >JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c >> > >> > > >> > >ON o.customer_id = c.id; >> > >> > > >> > > >> > >> > > >> > >The 'skew' hint is not only used for look up join here, but >> > also >> > >> > > could >> > >> > > >> be >> > >> > > >> > >used for other types of join later, for example, batch hash >> > >> join or >> > >> > > >> > >streaming regular join. >> > >> > > >> > >Go back to better name problem for hash look up join. Since >> > the >> > >> > > 'skew' >> > >> > > >> > hint >> > >> > > >> > >is a separate hint, so 'use_hash' is still an alternative. >> > >> > > >> > >WDYT? >> > >> > > >> > >I don't have a good idea about the better hint name yet. I >> > would >> > >> > like >> > >> > > >> to >> > >> > > >> > >heard more suggestions about hint names. >> > >> > > >> > > >> > >> > > >> > >> As you mentioned in the flip, this solution depends on >> > future >> > >> > > >> changes >> > >> > > >> > to >> > >> > > >> > >calcite (and also upgrading calcite would be another >> possible >> > >> big >> > >> > > >> change: >> > >> > > >> > >at least calicite-1.30 vs 1.26, are we preparing to accept >> > this >> > >> big >> > >> > > >> > >change?). >> > >> > > >> > > >> > >> > > >> > >Indeed, solution 1 depends on calcite upgrade. >> > >> > > >> > >I admit upgrade from Calcite 1.26 to 1.30 would be a big >> > >> change. I >> > >> > > >> still >> > >> > > >> > >remember what we have suffered from last upgrade to Calcite >> > >> 1.26. >> > >> > > >> > >However we could not always avoid upgrade for the following >> > >> reason: >> > >> > > >> > >1. Other features also depends on the Calcite upgrade. For >> > >> example, >> > >> > > >> > Session >> > >> > > >> > >Window and Count Window. >> > >> > > >> > >2. If we always avoid Calcite upgrade, there would be more >> gap >> > >> with >> > >> > > the >> > >> > > >> > >latest version. One day, if upgrading becomes a thing which >> > has >> > >> to >> > >> > be >> > >> > > >> > done, >> > >> > > >> > >the pain is more. >> > >> > > >> > > >> > >> > > >> > >WDYT? >> > >> > > >> > > >> > >> > > >> > >> Is there another possible way to minimize the change in >> > >> calcite? >> > >> > > >> > > >> > >> > > >> > >Do you check the 'Other Alternatives' part in the >> FLIP-204? It >> > >> > gives >> > >> > > >> > >another solution which does not depend on calcite upgrade >> and >> > do >> > >> > not >> > >> > > >> need >> > >> > > >> > >to worry about the hint would be missed in the propagation. >> > >> > > >> > >This is also what we have done in the internal version. >> > >> > > >> > >The core idea is propagating 'use_hash' hint to TableScan >> with >> > >> > > matched >> > >> > > >> > >table names. However, it is a little hacky. >> > >> > > >> > > >> > >> > > >> > >> As I know there're more limitations than `Correlate`. >> > >> > > >> > > >> > >> > > >> > >As mentioned before, in our external version, I choose the >> the >> > >> > 'Other >> > >> > > >> > >Alternatives' part in the FLIP-204. >> > >> > > >> > >Although I do a POC in the solution 1 and lists all >> changes I >> > >> found >> > >> > > in >> > >> > > >> the >> > >> > > >> > >FLIP, there may still be something I missed. >> > >> > > >> > >I'm very happy to hear that you point out there're more >> > >> limitations >> > >> > > >> except >> > >> > > >> > >for `Correlate`, would you please give more details on this >> > >> part? >> > >> > > >> > > >> > >> > > >> > >Best, >> > >> > > >> > >Jing Zhang >> > >> > > >> > > >> > >> > > >> > >[1] >> > >> > > >> https://docs.databricks.com/delta/join-performance/skew-join.html >> > >> > > >> > >[2] >> > >> > > >> > > >> > >> > > >> > >> > >> > > >> >> > >> > > >> > >> > >> > >> >> > >> https://help.aliyun.com/apsara/enterprise/v_3_13_0_20201215/odps/enterprise-ascm-user-guide/hotspot-tilt.html?spm=a2c4g.14484438.10001.669 >> > >> > > >> > > >> > >> > > >> > >Jing Zhang <beyond1...@gmail.com> 于2021年12月29日周三 14:40写道: >> > >> > > >> > > >> > >> > > >> > >> Hi Yuan and Lincoln, >> > >> > > >> > >> thanks a lot for the attention. I would answer the email >> one >> > >> by >> > >> > > one. >> > >> > > >> > >> >> > >> > > >> > >> To Yuan >> > >> > > >> > >> > How shall we deal with CDC data? If there is CDC data >> in >> > the >> > >> > > >> pipeline, >> > >> > > >> > >> IMHO, shuffle by join key will cause CDC data disorder. >> Will >> > >> it >> > >> > be >> > >> > > >> > better >> > >> > > >> > >> to use primary key in this case? >> > >> > > >> > >> >> > >> > > >> > >> Good question. >> > >> > > >> > >> The problem could not only exists in CDC data source, but >> > also >> > >> > > exists >> > >> > > >> > when >> > >> > > >> > >> the input stream is not insert-only stream (for example, >> the >> > >> > result >> > >> > > >> of >> > >> > > >> > >> unbounded aggregate or regular join). >> > >> > > >> > >> I think use hash by primary key is not a good choise. It >> > could >> > >> > not >> > >> > > >> raise >> > >> > > >> > >> the cache hit because cache key is look up key instead of >> > >> primary >> > >> > > >> key of >> > >> > > >> > >> input. >> > >> > > >> > >> >> > >> > > >> > >> To avoid wrong result, hash lookup Join requires that the >> > >> input >> > >> > > >> stream >> > >> > > >> > >> should be insert_only stream or its upsert keys contains >> > >> lookup >> > >> > > keys. >> > >> > > >> > >> >> > >> > > >> > >> I've added this limitation to FLIP, thanks a lot for >> > >> reminding. >> > >> > > >> > >> >> > >> > > >> > >> > If the shuffle keys can be customized when users have >> the >> > >> > > >> knowledge >> > >> > > >> > >> about distribution of data? >> > >> > > >> > >> >> > >> > > >> > >> I'm not sure I understand your question. >> > >> > > >> > >> >> > >> > > >> > >> Do you mean to support user defined partitioner function >> on >> > >> keys >> > >> > > just >> > >> > > >> > like >> > >> > > >> > >> flink DataStream sql? >> > >> > > >> > >> If yes, I'm afraid there is no plan to support this >> feature >> > >> yet >> > >> > > >> because >> > >> > > >> > >> the feature involves many things, for example: >> > >> > > >> > >> 1. sql syntax >> > >> > > >> > >> 2. user defined partitioner API >> > >> > > >> > >> 3. RelDistribution type extension and Flink >> RelDistribution >> > >> > > extension >> > >> > > >> > >> 4. FlinkExpandConversionRule >> > >> > > >> > >> 5. Exchange execNode extension >> > >> > > >> > >> 6. .... >> > >> > > >> > >> It needs well designed and more discussion. If this is a >> > >> strong >> > >> > > >> > >> requirement, we would drive another discussion on this >> point >> > >> > > >> > individually. >> > >> > > >> > >> In this FLIP, I would first support hash shuffle. WDYT? >> > >> > > >> > >> >> > >> > > >> > >> Or do you mean support hash by other keys instead of >> lookup >> > >> key? >> > >> > > >> > >> If yes, would you please tell me a specific user case? >> > >> > > >> > >> We need to fetch the record from external storage of >> > dimension >> > >> > > table >> > >> > > >> by >> > >> > > >> > >> look up key, so those dimension table source uses look up >> > >> keys as >> > >> > > >> cache >> > >> > > >> > >> key. >> > >> > > >> > >> We could only increase the cache ratio by shuffle lookup >> > >> keys. >> > >> > > >> > >> I need more use cases to understand this requirement. >> > >> > > >> > >> >> > >> > > >> > >> > Some connectors such as hive, caches all data in >> > >> > LookupFunction. >> > >> > > >> How >> > >> > > >> > to >> > >> > > >> > >> decrease the valid cache data size if data can be >> shuffled? >> > >> > > >> > >> >> > >> > > >> > >> Very good idea. >> > >> > > >> > >> There are two types of cache. >> > >> > > >> > >> For Key-Value storage, such as Redis/HBase, the lookup >> table >> > >> > source >> > >> > > >> > stores >> > >> > > >> > >> the visited lookup keys and it's record into cache >> lazily. >> > >> > > >> > >> For other storage without keys, such as hive, each task >> > loads >> > >> all >> > >> > > >> data >> > >> > > >> > >> into cache eagerly in the initialize phase. >> > >> > > >> > >> After introduce hash partitioner, for key-value storages, >> > >> there >> > >> > is >> > >> > > no >> > >> > > >> > need >> > >> > > >> > >> to change; for hive, each task could only load part of >> cache >> > >> > > instead >> > >> > > >> of >> > >> > > >> > >> load all cache. >> > >> > > >> > >> >> > >> > > >> > >> We have implemented this optimization in our internal >> > version. >> > >> > > >> > >> The core idea is push the partitioner information down to >> > the >> > >> > > lookup >> > >> > > >> > table >> > >> > > >> > >> source. When loading data into caches, each task could >> only >> > >> store >> > >> > > >> those >> > >> > > >> > >> records which look keys are sent to current task. >> > >> > > >> > >> We called this 'HashPartitionedCache'. >> > >> > > >> > >> >> > >> > > >> > >> I have added this point into the Lookup Join requirements >> > >> list in >> > >> > > the >> > >> > > >> > >> motivation of the FLIP, but I would not do this point in >> > this >> > >> > FLIP >> > >> > > >> right >> > >> > > >> > >> now. >> > >> > > >> > >> If this is a strong requirement, we need drive another >> > >> discussion >> > >> > > on >> > >> > > >> > this >> > >> > > >> > >> topic individually because this point involves many >> > extension >> > >> on >> > >> > > API. >> > >> > > >> > >> >> > >> > > >> > >> Best, >> > >> > > >> > >> Jing Zhang >> > >> > > >> > >> >> > >> > > >> > >> >> > >> > > >> > >> Lincoln Lee <lincoln.8...@gmail.com> 于2021年12月29日周三 >> > 10:01写道: >> > >> > > >> > >> >> > >> > > >> > >>> Hi Jing, >> > >> > > >> > >>> Thanks for bringing up this discussion! Agree that >> > this >> > >> > join >> > >> > > >> hints >> > >> > > >> > >>> should benefit both bounded and unbounded cases as >> Martin >> > >> > > mentioned. >> > >> > > >> > >>> I also agree that implementing the query hint is the >> right >> > >> way >> > >> > > for a >> > >> > > >> > more >> > >> > > >> > >>> general purpose since the dynamic table options has a >> > limited >> > >> > > scope. >> > >> > > >> > >>> Some points I'd like to share are: >> > >> > > >> > >>> 1. Regarding the hint name ‘USE_HASH’, could we consider >> > more >> > >> > > >> > candidates? >> > >> > > >> > >>> Things are a little different from RDBMS in the >> distributed >> > >> > world, >> > >> > > >> and >> > >> > > >> > we >> > >> > > >> > >>> also aim to solve the data skew problem, so all these >> > >> incoming >> > >> > > hints >> > >> > > >> > names >> > >> > > >> > >>> should be considered together. >> > >> > > >> > >>> 2. As you mentioned in the flip, this solution depends >> on >> > >> future >> > >> > > >> > changes >> > >> > > >> > >>> to >> > >> > > >> > >>> calcite (and also upgrading calcite would be another >> > possible >> > >> > big >> > >> > > >> > change: >> > >> > > >> > >>> at least calicite-1.30 vs 1.26, are we preparing to >> accept >> > >> this >> > >> > > big >> > >> > > >> > >>> change?). Is there another possible way to minimize the >> > >> change >> > >> > in >> > >> > > >> > calcite? >> > >> > > >> > >>> As I know there're more limitations than `Correlate`. >> > >> > > >> > >>> >> > >> > > >> > >>> Best, >> > >> > > >> > >>> Lincoln Lee >> > >> > > >> > >>> >> > >> > > >> > >>> >> > >> > > >> > >>> Jing Zhang <beyond1...@gmail.com> 于2021年12月28日周二 >> 23:04写道: >> > >> > > >> > >>> >> > >> > > >> > >>> > Hi Martijn, >> > >> > > >> > >>> > Thanks a lot for your attention. >> > >> > > >> > >>> > I'm sorry I didn't explain the motivation clearly. I >> > would >> > >> > like >> > >> > > to >> > >> > > >> > >>> explain >> > >> > > >> > >>> > it in detail, and then give response on your >> questions. >> > >> > > >> > >>> > A lookup join is typically used to enrich a table with >> > data >> > >> > that >> > >> > > >> is >> > >> > > >> > >>> queried >> > >> > > >> > >>> > from an external system. Many Lookup table sources >> > >> introduce >> > >> > > >> cache in >> > >> > > >> > >>> order >> > >> > > >> > >>> > to reduce the RPC call, such as JDBC, CSV, HBase >> > >> connectors. >> > >> > > >> > >>> > For those connectors, we could raise cache hit ratio >> by >> > >> > routing >> > >> > > >> the >> > >> > > >> > same >> > >> > > >> > >>> > lookup keys to the same task instance. This is the >> > purpose >> > >> of >> > >> > > >> > >>> > >> > >> > > >> > >>> > >> > >> > > >> > >>> >> > >> > > >> > >> > >> > > >> >> > >> > > >> > >> > >> > >> >> > >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-204%3A+Introduce+Hash+Lookup+Join >> > >> > > >> > >>> > . >> > >> > > >> > >>> > Other cases might benefit from Hash distribution, >> such as >> > >> > batch >> > >> > > >> hash >> > >> > > >> > >>> join >> > >> > > >> > >>> > as you mentioned. It is a cool idea, however it is not >> > the >> > >> > > >> purpose of >> > >> > > >> > >>> this >> > >> > > >> > >>> > FLIP, we could discuss this in FLINK-20670 >> > >> > > >> > >>> > <https://issues.apache.org/jira/browse/FLINK-20670>. >> > >> > > >> > >>> > >> > >> > > >> > >>> > > - When I was reading about this topic [1] I was >> > >> wondering if >> > >> > > >> this >> > >> > > >> > >>> feature >> > >> > > >> > >>> > would be more beneficial for bounded use cases and >> not so >> > >> much >> > >> > > for >> > >> > > >> > >>> > unbounded use cases. What do you think? >> > >> > > >> > >>> > >> > >> > > >> > >>> > As mentioned before, the purpose of Hash Lookup Join >> is >> > to >> > >> > > >> increase >> > >> > > >> > the >> > >> > > >> > >>> > cache hit ratio which is different from Oracle Hash >> Join. >> > >> > > However >> > >> > > >> we >> > >> > > >> > >>> could >> > >> > > >> > >>> > use the similar hint syntax. >> > >> > > >> > >>> > >> > >> > > >> > >>> > > - If I look at the current documentation for SQL >> Hints >> > in >> > >> > > Flink >> > >> > > >> > [2], I >> > >> > > >> > >>> > notice that all of the hints there are located at the >> end >> > >> of >> > >> > the >> > >> > > >> SQL >> > >> > > >> > >>> > statement. In the FLIP, the use_hash is defined >> directly >> > >> after >> > >> > > the >> > >> > > >> > >>> 'SELECT' >> > >> > > >> > >>> > keyword. Can we somehow make this consistent for the >> > user? >> > >> Or >> > >> > > >> should >> > >> > > >> > the >> > >> > > >> > >>> > user be able to specify hints anywhere in its SQL >> > >> statement? >> > >> > > >> > >>> > >> > >> > > >> > >>> > Calcite supports hints in two locations [3]: >> > >> > > >> > >>> > Query Hint: right after the SELECT keyword; >> > >> > > >> > >>> > Table Hint: right after the referenced table name. >> > >> > > >> > >>> > Now Flink has supported dynamic table options based on >> > the >> > >> > Hint >> > >> > > >> > >>> framework >> > >> > > >> > >>> > of Calcite which is mentioned in doc[2]. >> > >> > > >> > >>> > Besides, query hints are also important, it could >> give a >> > >> hint >> > >> > > for >> > >> > > >> > >>> > optimizers to choose a better plan. Almost all popular >> > >> > databases >> > >> > > >> and >> > >> > > >> > >>> > big-data engines support sql query hints, such as >> oracle, >> > >> > hive, >> > >> > > >> spark >> > >> > > >> > >>> and >> > >> > > >> > >>> > so on. >> > >> > > >> > >>> > I think using query hints in this case is more natural >> > for >> > >> > > users, >> > >> > > >> > WDYT? >> > >> > > >> > >>> > >> > >> > > >> > >>> > I have updated the motivation part in the FLIP, >> > >> > > >> > >>> > Thanks for the feedback! >> > >> > > >> > >>> > >> > >> > > >> > >>> > [1] >> > >> > > >> >> https://logicalread.com/oracle-11g-hash-joins-mc02/#.V5Wm4_mnoUI >> > >> > > >> > >>> > [2] >> > >> > > >> > >>> > >> > >> > > >> > >>> > >> > >> > > >> > >>> >> > >> > > >> > >> > >> > > >> >> > >> > > >> > >> > >> > >> >> > >> https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/table/sql/queries/hints/ >> > >> > > >> > >>> > [3] >> > >> https://calcite.apache.org/docs/reference.html#sql-hints >> > >> > > >> > >>> > >> > >> > > >> > >>> > Best, >> > >> > > >> > >>> > Jing Zhang >> > >> > > >> > >>> > >> > >> > > >> > >>> > Martijn Visser <mart...@ververica.com> 于2021年12月28日周二 >> > >> > 22:02写道: >> > >> > > >> > >>> > >> > >> > > >> > >>> > > Hi Jing, >> > >> > > >> > >>> > > >> > >> > > >> > >>> > > Thanks a lot for the explanation and the FLIP. I >> > >> definitely >> > >> > > >> learned >> > >> > > >> > >>> > > something when reading more about `use_hash`. My >> > >> > > interpretation >> > >> > > >> > would >> > >> > > >> > >>> be >> > >> > > >> > >>> > > that the primary benefit of a hash lookup join >> would be >> > >> > > improved >> > >> > > >> > >>> > > performance by allowing the user to explicitly >> optimise >> > >> the >> > >> > > >> > planner. >> > >> > > >> > >>> > > >> > >> > > >> > >>> > > I have a couple of questions: >> > >> > > >> > >>> > > >> > >> > > >> > >>> > > - When I was reading about this topic [1] I was >> > >> wondering if >> > >> > > >> this >> > >> > > >> > >>> feature >> > >> > > >> > >>> > > would be more beneficial for bounded use cases and >> not >> > so >> > >> > much >> > >> > > >> for >> > >> > > >> > >>> > > unbounded use cases. What do you think? >> > >> > > >> > >>> > > - If I look at the current documentation for SQL >> Hints >> > in >> > >> > > Flink >> > >> > > >> > [2], I >> > >> > > >> > >>> > > notice that all of the hints there are located at >> the >> > >> end of >> > >> > > the >> > >> > > >> > SQL >> > >> > > >> > >>> > > statement. In the FLIP, the use_hash is defined >> > directly >> > >> > after >> > >> > > >> the >> > >> > > >> > >>> > 'SELECT' >> > >> > > >> > >>> > > keyword. Can we somehow make this consistent for the >> > >> user? >> > >> > Or >> > >> > > >> > should >> > >> > > >> > >>> the >> > >> > > >> > >>> > > user be able to specify hints anywhere in its SQL >> > >> statement? >> > >> > > >> > >>> > > >> > >> > > >> > >>> > > Best regards, >> > >> > > >> > >>> > > >> > >> > > >> > >>> > > Martijn >> > >> > > >> > >>> > > >> > >> > > >> > >>> > > [1] >> > >> > > >> > >> > https://logicalread.com/oracle-11g-hash-joins-mc02/#.V5Wm4_mnoUI >> > >> > > >> > >>> > > [2] >> > >> > > >> > >>> > > >> > >> > > >> > >>> > > >> > >> > > >> > >>> > >> > >> > > >> > >>> >> > >> > > >> > >> > >> > > >> >> > >> > > >> > >> > >> > >> >> > >> https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/table/sql/queries/hints/ >> > >> > > >> > >>> > > >> > >> > > >> > >>> > > >> > >> > > >> > >>> > > On Tue, 28 Dec 2021 at 08:17, Jing Zhang < >> > >> > > beyond1...@gmail.com> >> > >> > > >> > >>> wrote: >> > >> > > >> > >>> > > >> > >> > > >> > >>> > > > Hi everyone, >> > >> > > >> > >>> > > > Look up join >> > >> > > >> > >>> > > > < >> > >> > > >> > >>> > > > >> > >> > > >> > >>> > > >> > >> > > >> > >>> > >> > >> > > >> > >>> >> > >> > > >> > >> > >> > > >> >> > >> > > >> > >> > >> > >> >> > >> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#lookup-join >> > >> > > >> > >>> > > > >[1] >> > >> > > >> > >>> > > > is >> > >> > > >> > >>> > > > commonly used feature in Flink SQL. We have >> received >> > >> many >> > >> > > >> > >>> optimization >> > >> > > >> > >>> > > > requirements on look up join. For example: >> > >> > > >> > >>> > > > 1. Enforces left side of lookup join do a hash >> > >> partitioner >> > >> > > to >> > >> > > >> > raise >> > >> > > >> > >>> > cache >> > >> > > >> > >>> > > > hint ratio >> > >> > > >> > >>> > > > 2. Solves the data skew problem after introduces >> hash >> > >> > lookup >> > >> > > >> join >> > >> > > >> > >>> > > > 3. Enables mini-batch optimization to reduce RPC >> call >> > >> > > >> > >>> > > > >> > >> > > >> > >>> > > > Next we will solve these problems one by one. >> > >> Firstly, we >> > >> > > >> would >> > >> > > >> > >>> focus >> > >> > > >> > >>> > on >> > >> > > >> > >>> > > > point 1, and continue to discuss point 2 and >> point 3 >> > >> > later. >> > >> > > >> > >>> > > > >> > >> > > >> > >>> > > > There are many similar requirements from user mail >> > list >> > >> > and >> > >> > > >> JIRA >> > >> > > >> > >>> about >> > >> > > >> > >>> > > hash >> > >> > > >> > >>> > > > Lookup Join, for example: >> > >> > > >> > >>> > > > 1. FLINK-23687 < >> > >> > > >> > https://issues.apache.org/jira/browse/FLINK-23687> >> > >> > > >> > >>> - >> > >> > > >> > >>> > > > Introduce partitioned lookup join to enforce >> input of >> > >> > > >> LookupJoin >> > >> > > >> > to >> > >> > > >> > >>> > hash >> > >> > > >> > >>> > > > shuffle by lookup keys >> > >> > > >> > >>> > > > 2. FLINK-25396 < >> > >> > > >> > https://issues.apache.org/jira/browse/FLINK-25396> >> > >> > > >> > >>> - >> > >> > > >> > >>> > > > lookupjoin source table for pre-partitioning >> > >> > > >> > >>> > > > 3. FLINK-25262 < >> > >> > > >> > https://issues.apache.org/jira/browse/FLINK-25262> >> > >> > > >> > >>> - >> > >> > > >> > >>> > > > Support to send data to lookup table for >> > >> > > >> > KeyGroupStreamPartitioner >> > >> > > >> > >>> way >> > >> > > >> > >>> > > for >> > >> > > >> > >>> > > > SQL. >> > >> > > >> > >>> > > > >> > >> > > >> > >>> > > > In this FLIP, I would like to start a discussion >> > about >> > >> > Hash >> > >> > > >> > Lookup >> > >> > > >> > >>> > Join. >> > >> > > >> > >>> > > > The core idea is introducing a 'USE_HASH' hint in >> > >> query. >> > >> > > This >> > >> > > >> > >>> syntax >> > >> > > >> > >>> > is >> > >> > > >> > >>> > > > directly user-oriented and therefore requires >> careful >> > >> > > design. >> > >> > > >> > >>> > > > There are two ways about how to propagate this >> hint >> > to >> > >> > > >> > LookupJoin in >> > >> > > >> > >>> > > > optimizer. We need further discussion to do final >> > >> decide. >> > >> > > >> Anyway, >> > >> > > >> > >>> the >> > >> > > >> > >>> > > > difference between the two solution is only about >> the >> > >> > > internal >> > >> > > >> > >>> > > > implementation and has no impact on the user. >> > >> > > >> > >>> > > > >> > >> > > >> > >>> > > > For more detail on the proposal: >> > >> > > >> > >>> > > > >> > >> > > >> > >>> > > > >> > >> > > >> > >>> > > >> > >> > > >> > >>> > >> > >> > > >> > >>> >> > >> > > >> > >> > >> > > >> >> > >> > > >> > >> > >> > >> >> > >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-204%3A+Introduce+Hash+Lookup+Join >> > >> > > >> > >>> > > > >> > >> > > >> > >>> > > > >> > >> > > >> > >>> > > > Looking forward to your feedback, thanks. >> > >> > > >> > >>> > > > >> > >> > > >> > >>> > > > Best, >> > >> > > >> > >>> > > > Jing Zhang >> > >> > > >> > >>> > > > >> > >> > > >> > >>> > > > [1] >> > >> > > >> > >>> > > > >> > >> > > >> > >>> > > > >> > >> > > >> > >>> > > >> > >> > > >> > >>> > >> > >> > > >> > >>> >> > >> > > >> > >> > >> > > >> >> > >> > > >> > >> > >> > >> >> > >> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#lookup-join >> > >> > > >> > >>> > > > >> > >> > > >> > >>> > > >> > >> > > >> > >>> > >> > >> > > >> > >>> >> > >> > > >> > >> >> > >> > > >> > >> > >> > > >> >> > >> > > > >> > >> > > >> > >> > >> > >> >> > > >> > >> >>