Hi Lincoln,

Thanks for your reply. Weijie and I discussed these two issues offline, 
and here are the results of our discussion:
1. When the user utilizes the hash lookup join hint introduced by FLIP-204[1],
the `SupportsLookupCustomShuffle` interface should be ignored. This is because
the hash lookup join hint is directly specified by the user through a SQL HINT, 
which is more in line with user intuition. WDYT?
2. We agree with the introduction of the `isDeterministic` method. The 
`SupportsLookupCustomShuffle` interface introduces a custom shuffle, which 
can cause ADD/UPDATE_AFTER events (+I, +U) to appear 
after UPDATE_BEFORE/DELETE events (-D, -U), thus breaking the current 
limitations of the Flink Sink Operator[2]. If `isDeterministic` returns false 
and the
changelog event type is not insert-only, the Planner should not apply the 
shuffle 
provided by `SupportsLookupCustomShuffle`.


[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-204%3A+Introduce+Hash+Lookup+Join
[2] 
https://www.ververica.com/blog/flink-sql-secrets-mastering-the-art-of-changelog-event-out-of-orderness


Best,
Wencong









At 2024-06-11 00:02:57, "Lincoln Lee" <lincoln.8...@gmail.com> wrote:
>Hi Weijie,
>
>Thanks for your proposal, this will be a useful advanced optimization for
>connector developers!
>
>I have two questions:
>
>1. FLIP-204[1] hash lookup join hint is mentioned in this FLIP, what's the
>apply ordering of the two feature? For example, a connector that
>implements the `SupportsLookupCustomShuffle` interface also has a
>`SHUFFLE_HASH` lookup join hint specified by the user in sql, what's
>the expected behavior?
>
>2. This FLIP considers the relationship with NDU processing, and I agree
>with the current choice to prioritize NDU first. However, we should also
>consider another issue: out-of-orderness of the changelog events in
>streaming[2]. If the connector developer supplies a non-deterministic
>partitioner, e.g., a random partitioner for anti-skew purpose, then it'll
>break the assumption relied by current SQL operators in streaming: the
>ADD/UDPATE_AFTER events (+I, +U) always occur before its related
>UDPATE_BEFORE/DELETE events (-D, -U) and they are always
>processed by the same task even if a data shuffle is involved. So a
>straightforward approach would be to add method `isDeterministic` to
>the `InputDataPartitioner` interface to explicitly tell the planner whether
>the partitioner is deterministic or not(then the planner can reject the
>non-deterministic custom partitioner for correctness requirements).
>
>[1]
>https://cwiki.apache.org/confluence/display/FLINK/FLIP-204%3A+Introduce+Hash+Lookup+Join
>[2]
>https://www.ververica.com/blog/flink-sql-secrets-mastering-the-art-of-changelog-event-out-of-orderness
>
>
>Best,
>Lincoln Lee
>
>
>Xintong Song <tonysong...@gmail.com> 于2024年6月7日周五 13:53写道:
>
>> +1 for this proposal.
>>
>> This FLIP will make it possible for each lookup join parallel task to only
>> access and cache a subset of the data. This will significantly improve the
>> performance and reduce the overhead when using Paimon for the dimension
>> table. And it's general enough to also be leveraged by other connectors.
>>
>> Best,
>>
>> Xintong
>>
>>
>>
>> On Fri, Jun 7, 2024 at 10:01 AM weijie guo <guoweijieres...@gmail.com>
>> wrote:
>>
>> > Hi devs,
>> >
>> >
>> > I'd like to start a discussion about FLIP-462[1]: Support Custom Data
>> > Distribution for Input Stream of Lookup Join.
>> >
>> >
>> > Lookup Join is an important feature in Flink, It is typically used to
>> > enrich a table with data that is queried from an external system.
>> > If we interact with the external systems for each incoming record, we
>> > incur significant network IO and RPC overhead.
>> >
>> > Therefore, most connectors introduce caching to reduce the per-record
>> > level query overhead. However, because the data distribution of Lookup
>> > Join's input stream is arbitrary, the cache hit rate is sometimes
>> > unsatisfactory.
>> >
>> >
>> > We want to introduce a mechanism for the connector to tell the Flink
>> > planner its desired input stream data distribution or partitioning
>> > strategy. This can significantly reduce the amount of cached data and
>> > improve performance of Lookup Join.
>> >
>> >
>> > You can find more details in this FLIP[1]. Looking forward to hearing
>> > from you, thanks!
>> >
>> >
>> > Best regards,
>> >
>> > Weijie
>> >
>> >
>> > [1]
>> >
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-462+Support+Custom+Data+Distribution+for+Input+Stream+of+Lookup+Join
>> >
>>

Reply via email to