Hi Jingsong,
Some of the points you mentioned are currently clarified in the updated FLIP. Please check it out. 1. Enabling custom data distribution can be done through the LOOKUP SQL Hint. There are detailed examples provided in the FLIP. 2. We will add the isDeterministic method to the `InputDataPartitioner` interface, which will return true by default. If the `InputDataPartitioner` is not deterministic, the connector developer need to override the isDeterministic method to return false. If the connector developer cannot ensure this protocol, they will need to bear the correctness issues that arise. 3. Yes, this feature will work in batch mode as well. Best regards, Wencong At 2024-06-11 23:47:40, "Jingsong Li" <jingsongl...@gmail.com> wrote: >Hi all, > >+1 to this FLIP, very thanks all for your proposal. > >isDeterministic looks good to me too. > >We can consider stating the following points: > >1. How to enable custom data distribution? Is it a dynamic hint? Can >you provide an SQL example. > >2. What impact will it have when the mainstream is changelog? Causing >disorder? This may need to be emphasized. > >3. Does this feature work in batch mode too? > >Best, >Jingsong > >On Tue, Jun 11, 2024 at 8:22 PM Wencong Liu <liuwencle...@163.com> wrote: >> >> Hi Lincoln, >> >> >> Thanks for your reply. Weijie and I discussed these two issues offline, >> and here are the results of our discussion: >> 1. When the user utilizes the hash lookup join hint introduced by >> FLIP-204[1], >> the `SupportsLookupCustomShuffle` interface should be ignored. This is >> because >> the hash lookup join hint is directly specified by the user through a SQL >> HINT, >> which is more in line with user intuition. WDYT? >> 2. We agree with the introduction of the `isDeterministic` method. The >> `SupportsLookupCustomShuffle` interface introduces a custom shuffle, which >> can cause ADD/UPDATE_AFTER events (+I, +U) to appear >> after UPDATE_BEFORE/DELETE events (-D, -U), thus breaking the current >> limitations of the Flink Sink Operator[2]. If `isDeterministic` returns >> false and the >> changelog event type is not insert-only, the Planner should not apply the >> shuffle >> provided by `SupportsLookupCustomShuffle`. >> >> >> [1] >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-204%3A+Introduce+Hash+Lookup+Join >> [2] >> https://www.ververica.com/blog/flink-sql-secrets-mastering-the-art-of-changelog-event-out-of-orderness >> >> >> Best, >> Wencong >> >> >> >> >> >> >> >> >> >> At 2024-06-11 00:02:57, "Lincoln Lee" <lincoln.8...@gmail.com> wrote: >> >Hi Weijie, >> > >> >Thanks for your proposal, this will be a useful advanced optimization for >> >connector developers! >> > >> >I have two questions: >> > >> >1. FLIP-204[1] hash lookup join hint is mentioned in this FLIP, what's the >> >apply ordering of the two feature? For example, a connector that >> >implements the `SupportsLookupCustomShuffle` interface also has a >> >`SHUFFLE_HASH` lookup join hint specified by the user in sql, what's >> >the expected behavior? >> > >> >2. This FLIP considers the relationship with NDU processing, and I agree >> >with the current choice to prioritize NDU first. However, we should also >> >consider another issue: out-of-orderness of the changelog events in >> >streaming[2]. If the connector developer supplies a non-deterministic >> >partitioner, e.g., a random partitioner for anti-skew purpose, then it'll >> >break the assumption relied by current SQL operators in streaming: the >> >ADD/UDPATE_AFTER events (+I, +U) always occur before its related >> >UDPATE_BEFORE/DELETE events (-D, -U) and they are always >> >processed by the same task even if a data shuffle is involved. So a >> >straightforward approach would be to add method `isDeterministic` to >> >the `InputDataPartitioner` interface to explicitly tell the planner whether >> >the partitioner is deterministic or not(then the planner can reject the >> >non-deterministic custom partitioner for correctness requirements). >> > >> >[1] >> >https://cwiki.apache.org/confluence/display/FLINK/FLIP-204%3A+Introduce+Hash+Lookup+Join >> >[2] >> >https://www.ververica.com/blog/flink-sql-secrets-mastering-the-art-of-changelog-event-out-of-orderness >> > >> > >> >Best, >> >Lincoln Lee >> > >> > >> >Xintong Song <tonysong...@gmail.com> 于2024年6月7日周五 13:53写道: >> > >> >> +1 for this proposal. >> >> >> >> This FLIP will make it possible for each lookup join parallel task to only >> >> access and cache a subset of the data. This will significantly improve the >> >> performance and reduce the overhead when using Paimon for the dimension >> >> table. And it's general enough to also be leveraged by other connectors. >> >> >> >> Best, >> >> >> >> Xintong >> >> >> >> >> >> >> >> On Fri, Jun 7, 2024 at 10:01 AM weijie guo <guoweijieres...@gmail.com> >> >> wrote: >> >> >> >> > Hi devs, >> >> > >> >> > >> >> > I'd like to start a discussion about FLIP-462[1]: Support Custom Data >> >> > Distribution for Input Stream of Lookup Join. >> >> > >> >> > >> >> > Lookup Join is an important feature in Flink, It is typically used to >> >> > enrich a table with data that is queried from an external system. >> >> > If we interact with the external systems for each incoming record, we >> >> > incur significant network IO and RPC overhead. >> >> > >> >> > Therefore, most connectors introduce caching to reduce the per-record >> >> > level query overhead. However, because the data distribution of Lookup >> >> > Join's input stream is arbitrary, the cache hit rate is sometimes >> >> > unsatisfactory. >> >> > >> >> > >> >> > We want to introduce a mechanism for the connector to tell the Flink >> >> > planner its desired input stream data distribution or partitioning >> >> > strategy. This can significantly reduce the amount of cached data and >> >> > improve performance of Lookup Join. >> >> > >> >> > >> >> > You can find more details in this FLIP[1]. Looking forward to hearing >> >> > from you, thanks! >> >> > >> >> > >> >> > Best regards, >> >> > >> >> > Weijie >> >> > >> >> > >> >> > [1] >> >> > >> >> > >> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-462+Support+Custom+Data+Distribution+for+Input+Stream+of+Lookup+Join >> >> > >> >>