Hi all, Thanks for all the feedback and suggestions so far.
If there is no further comment, we will open the voting thread next monday. Best regards, Weijie weijie guo <guoweijieres...@gmail.com> 于2024年6月14日周五 15:49写道: > Thanks Lincoln for the quick response. > > > Since we've decided to extend a new hint option 'shuffle' to the current > `LOOKUP` join hint, do we support hash shuffle as well?(It seems like it > shouldn't require a lot of extra work, right?) This will deliver a > complete new feature to users, also because > FLIP-204 is stale for now and this new extension will give user a more > simpler way to achieve the goal, WDYT? > > Yes, I think this makes more sense. > > In a word: If the target dim table does not > implement SupportsLookupCustomShuffle, the planner will try best to apply > customer partitioning for the input stream. Otherwise, the planner will try > best to apply a hash partitioning. > > As for FLIP-204, I think we can discuss whether it should be discarded or > refactored in a separate thread. TBH, I think the current approach can > completely cover it and be much easier to use. > > "upsert mode" should be "updating stream" or "non-insert-only stream". > > Thanks, updated the FLIP. > > > > Best regards, > > Weijie > > > Lincoln Lee <lincoln.8...@gmail.com> 于2024年6月13日周四 23:08写道: > >> Thanks Weijie & Wencong for your update including the conclusions of >> the offline discussion. >> >> There's one thing need to be confirmed in the FLIP: >> > The hint only provides a suggestion to the optimizer, it is not an >> enforcer. As a result, If the target dim table not implements >> SupportsLookupCustomShuffle, planner will ignore this newly introduced >> shuffle option. >> >> Since we've decided to extend a new hint option 'shuffle' to the current >> `LOOKUP` join hint, do we support hash shuffle as well?(It seems like it >> shouldn't require a lot of extra work, right?) >> This will deliver a complete new feature to users, also because >> FLIP-204 is stale for now and this new extension will give user a more >> simpler way to achieve the goal, WDYT? >> >> Another small comment for the new interface: >> > "... planner may not apply this partitioner in upsert mode ..." >> > default boolean isDeterministic() >> "upsert mode" should be "updating stream" or "non-insert-only stream". >> >> >> Best, >> Lincoln Lee >> >> >> Wencong Liu <liuwencle...@163.com> 于2024年6月12日周三 21:43写道: >> >> > Hi Jingsong, >> > >> > >> > Some of the points you mentioned are currently clarified in >> > the updated FLIP. Please check it out. >> > >> > >> > 1. Enabling custom data distribution can be done through the >> > LOOKUP SQL Hint. There are detailed examples provided in the FLIP. >> > >> > >> > 2. We will add the isDeterministic method to the `InputDataPartitioner` >> > interface, which will return true by default. If the >> > `InputDataPartitioner` >> > is not deterministic, the connector developer need to override the >> > isDeterministic method to return false. If the connector developer >> > cannot ensure this protocol, they will need to bear the correctness >> > issues that arise. >> > >> > >> > 3. Yes, this feature will work in batch mode as well. >> > >> > >> > Best regards, >> > Wencong >> > >> > >> > >> > >> > >> > At 2024-06-11 23:47:40, "Jingsong Li" <jingsongl...@gmail.com> wrote: >> > >Hi all, >> > > >> > >+1 to this FLIP, very thanks all for your proposal. >> > > >> > >isDeterministic looks good to me too. >> > > >> > >We can consider stating the following points: >> > > >> > >1. How to enable custom data distribution? Is it a dynamic hint? Can >> > >you provide an SQL example. >> > > >> > >2. What impact will it have when the mainstream is changelog? Causing >> > >disorder? This may need to be emphasized. >> > > >> > >3. Does this feature work in batch mode too? >> > > >> > >Best, >> > >Jingsong >> > > >> > >On Tue, Jun 11, 2024 at 8:22 PM Wencong Liu <liuwencle...@163.com> >> wrote: >> > >> >> > >> Hi Lincoln, >> > >> >> > >> >> > >> Thanks for your reply. Weijie and I discussed these two issues >> offline, >> > >> and here are the results of our discussion: >> > >> 1. When the user utilizes the hash lookup join hint introduced by >> > FLIP-204[1], >> > >> the `SupportsLookupCustomShuffle` interface should be ignored. This >> is >> > because >> > >> the hash lookup join hint is directly specified by the user through a >> > SQL HINT, >> > >> which is more in line with user intuition. WDYT? >> > >> 2. We agree with the introduction of the `isDeterministic` method. >> The >> > >> `SupportsLookupCustomShuffle` interface introduces a custom shuffle, >> > which >> > >> can cause ADD/UPDATE_AFTER events (+I, +U) to appear >> > >> after UPDATE_BEFORE/DELETE events (-D, -U), thus breaking the current >> > >> limitations of the Flink Sink Operator[2]. If `isDeterministic` >> returns >> > false and the >> > >> changelog event type is not insert-only, the Planner should not apply >> > the shuffle >> > >> provided by `SupportsLookupCustomShuffle`. >> > >> >> > >> >> > >> [1] >> > >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-204%3A+Introduce+Hash+Lookup+Join >> > >> [2] >> > >> https://www.ververica.com/blog/flink-sql-secrets-mastering-the-art-of-changelog-event-out-of-orderness >> > >> >> > >> >> > >> Best, >> > >> Wencong >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> At 2024-06-11 00:02:57, "Lincoln Lee" <lincoln.8...@gmail.com> >> wrote: >> > >> >Hi Weijie, >> > >> > >> > >> >Thanks for your proposal, this will be a useful advanced >> optimization >> > for >> > >> >connector developers! >> > >> > >> > >> >I have two questions: >> > >> > >> > >> >1. FLIP-204[1] hash lookup join hint is mentioned in this FLIP, >> what's >> > the >> > >> >apply ordering of the two feature? For example, a connector that >> > >> >implements the `SupportsLookupCustomShuffle` interface also has a >> > >> >`SHUFFLE_HASH` lookup join hint specified by the user in sql, what's >> > >> >the expected behavior? >> > >> > >> > >> >2. This FLIP considers the relationship with NDU processing, and I >> > agree >> > >> >with the current choice to prioritize NDU first. However, we should >> > also >> > >> >consider another issue: out-of-orderness of the changelog events in >> > >> >streaming[2]. If the connector developer supplies a >> non-deterministic >> > >> >partitioner, e.g., a random partitioner for anti-skew purpose, then >> > it'll >> > >> >break the assumption relied by current SQL operators in streaming: >> the >> > >> >ADD/UDPATE_AFTER events (+I, +U) always occur before its related >> > >> >UDPATE_BEFORE/DELETE events (-D, -U) and they are always >> > >> >processed by the same task even if a data shuffle is involved. So a >> > >> >straightforward approach would be to add method `isDeterministic` to >> > >> >the `InputDataPartitioner` interface to explicitly tell the planner >> > whether >> > >> >the partitioner is deterministic or not(then the planner can reject >> the >> > >> >non-deterministic custom partitioner for correctness requirements). >> > >> > >> > >> >[1] >> > >> > >> > >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-204%3A+Introduce+Hash+Lookup+Join >> > >> >[2] >> > >> > >> > >> https://www.ververica.com/blog/flink-sql-secrets-mastering-the-art-of-changelog-event-out-of-orderness >> > >> > >> > >> > >> > >> >Best, >> > >> >Lincoln Lee >> > >> > >> > >> > >> > >> >Xintong Song <tonysong...@gmail.com> 于2024年6月7日周五 13:53写道: >> > >> > >> > >> >> +1 for this proposal. >> > >> >> >> > >> >> This FLIP will make it possible for each lookup join parallel task >> > to only >> > >> >> access and cache a subset of the data. This will significantly >> > improve the >> > >> >> performance and reduce the overhead when using Paimon for the >> > dimension >> > >> >> table. And it's general enough to also be leveraged by other >> > connectors. >> > >> >> >> > >> >> Best, >> > >> >> >> > >> >> Xintong >> > >> >> >> > >> >> >> > >> >> >> > >> >> On Fri, Jun 7, 2024 at 10:01 AM weijie guo < >> > guoweijieres...@gmail.com> >> > >> >> wrote: >> > >> >> >> > >> >> > Hi devs, >> > >> >> > >> > >> >> > >> > >> >> > I'd like to start a discussion about FLIP-462[1]: Support Custom >> > Data >> > >> >> > Distribution for Input Stream of Lookup Join. >> > >> >> > >> > >> >> > >> > >> >> > Lookup Join is an important feature in Flink, It is typically >> used >> > to >> > >> >> > enrich a table with data that is queried from an external >> system. >> > >> >> > If we interact with the external systems for each incoming >> record, >> > we >> > >> >> > incur significant network IO and RPC overhead. >> > >> >> > >> > >> >> > Therefore, most connectors introduce caching to reduce the >> > per-record >> > >> >> > level query overhead. However, because the data distribution of >> > Lookup >> > >> >> > Join's input stream is arbitrary, the cache hit rate is >> sometimes >> > >> >> > unsatisfactory. >> > >> >> > >> > >> >> > >> > >> >> > We want to introduce a mechanism for the connector to tell the >> > Flink >> > >> >> > planner its desired input stream data distribution or >> partitioning >> > >> >> > strategy. This can significantly reduce the amount of cached >> data >> > and >> > >> >> > improve performance of Lookup Join. >> > >> >> > >> > >> >> > >> > >> >> > You can find more details in this FLIP[1]. Looking forward to >> > hearing >> > >> >> > from you, thanks! >> > >> >> > >> > >> >> > >> > >> >> > Best regards, >> > >> >> > >> > >> >> > Weijie >> > >> >> > >> > >> >> > >> > >> >> > [1] >> > >> >> > >> > >> >> > >> > >> >> >> > >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-462+Support+Custom+Data+Distribution+for+Input+Stream+of+Lookup+Join >> > >> >> > >> > >> >> >> > >> >