Hi all,

Thanks for all the feedback and suggestions so far.

If there is no further comment, we will open the voting thread next monday.

Best regards,

Weijie


weijie guo <guoweijieres...@gmail.com> 于2024年6月14日周五 15:49写道:

> Thanks Lincoln for the quick response.
>
> > Since we've decided to extend a new hint option 'shuffle' to the current
> `LOOKUP` join hint, do we support hash shuffle as well?(It seems like it
> shouldn't require a lot of extra work, right?) This will deliver a
> complete new feature to users,  also because
> FLIP-204 is stale for now and this new extension will give user a more
> simpler way to achieve the goal, WDYT?
>
> Yes, I think this makes more sense.
>
> In a word: If the target dim table does not
> implement SupportsLookupCustomShuffle, the planner will try best to apply
> customer partitioning for the input stream. Otherwise, the planner will try
> best to apply a hash partitioning.
>
> As for FLIP-204, I think we can discuss whether it should be discarded or
> refactored in a separate thread. TBH, I think the current approach can
> completely cover it and be much easier to use.
> > "upsert mode" should be "updating stream" or "non-insert-only stream".
>
> Thanks, updated the FLIP.
>
>
>
> Best regards,
>
> Weijie
>
>
> Lincoln Lee <lincoln.8...@gmail.com> 于2024年6月13日周四 23:08写道:
>
>> Thanks Weijie & Wencong for your update including the conclusions of
>> the offline discussion.
>>
>> There's one thing need to be confirmed in the FLIP:
>> > The hint only provides a suggestion to the optimizer, it is not an
>> enforcer. As a result, If the target dim table not implements
>> SupportsLookupCustomShuffle, planner will ignore this newly introduced
>> shuffle option.
>>
>> Since we've decided to extend a new hint option 'shuffle' to the current
>> `LOOKUP` join hint, do we support hash shuffle as well?(It seems like it
>> shouldn't require a lot of extra work, right?)
>> This will deliver a complete new feature to users,  also because
>> FLIP-204 is stale for now and this new extension will give user a more
>> simpler way to achieve the goal, WDYT?
>>
>> Another small comment for the new interface:
>> > "... planner may not apply this partitioner in upsert mode ..."
>> > default boolean isDeterministic()
>> "upsert mode" should be "updating stream" or "non-insert-only stream".
>>
>>
>> Best,
>> Lincoln Lee
>>
>>
>> Wencong Liu <liuwencle...@163.com> 于2024年6月12日周三 21:43写道:
>>
>> > Hi Jingsong,
>> >
>> >
>> > Some of the points you mentioned are currently clarified in
>> > the updated FLIP. Please check it out.
>> >
>> >
>> > 1. Enabling custom data distribution can be done through the
>> > LOOKUP SQL Hint. There are detailed examples provided in the FLIP.
>> >
>> >
>> > 2. We will add the isDeterministic method to the `InputDataPartitioner`
>> > interface, which will return true by default. If the
>> > `InputDataPartitioner`
>> > is not deterministic, the connector developer need to override the
>> > isDeterministic method to return false. If the connector developer
>> > cannot ensure this protocol, they will need to bear the correctness
>> > issues that arise.
>> >
>> >
>> > 3. Yes, this feature will work in batch mode as well.
>> >
>> >
>> > Best regards,
>> > Wencong
>> >
>> >
>> >
>> >
>> >
>> > At 2024-06-11 23:47:40, "Jingsong Li" <jingsongl...@gmail.com> wrote:
>> > >Hi all,
>> > >
>> > >+1 to this FLIP, very thanks all for your proposal.
>> > >
>> > >isDeterministic looks good to me too.
>> > >
>> > >We can consider stating the following points:
>> > >
>> > >1. How to enable custom data distribution? Is it a dynamic hint? Can
>> > >you provide an SQL example.
>> > >
>> > >2. What impact will it have when the mainstream is changelog? Causing
>> > >disorder? This may need to be emphasized.
>> > >
>> > >3. Does this feature work in batch mode too?
>> > >
>> > >Best,
>> > >Jingsong
>> > >
>> > >On Tue, Jun 11, 2024 at 8:22 PM Wencong Liu <liuwencle...@163.com>
>> wrote:
>> > >>
>> > >> Hi Lincoln,
>> > >>
>> > >>
>> > >> Thanks for your reply. Weijie and I discussed these two issues
>> offline,
>> > >> and here are the results of our discussion:
>> > >> 1. When the user utilizes the hash lookup join hint introduced by
>> > FLIP-204[1],
>> > >> the `SupportsLookupCustomShuffle` interface should be ignored. This
>> is
>> > because
>> > >> the hash lookup join hint is directly specified by the user through a
>> > SQL HINT,
>> > >> which is more in line with user intuition. WDYT?
>> > >> 2. We agree with the introduction of the `isDeterministic` method.
>> The
>> > >> `SupportsLookupCustomShuffle` interface introduces a custom shuffle,
>> > which
>> > >> can cause ADD/UPDATE_AFTER events (+I, +U) to appear
>> > >> after UPDATE_BEFORE/DELETE events (-D, -U), thus breaking the current
>> > >> limitations of the Flink Sink Operator[2]. If `isDeterministic`
>> returns
>> > false and the
>> > >> changelog event type is not insert-only, the Planner should not apply
>> > the shuffle
>> > >> provided by `SupportsLookupCustomShuffle`.
>> > >>
>> > >>
>> > >> [1]
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-204%3A+Introduce+Hash+Lookup+Join
>> > >> [2]
>> >
>> https://www.ververica.com/blog/flink-sql-secrets-mastering-the-art-of-changelog-event-out-of-orderness
>> > >>
>> > >>
>> > >> Best,
>> > >> Wencong
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >> At 2024-06-11 00:02:57, "Lincoln Lee" <lincoln.8...@gmail.com>
>> wrote:
>> > >> >Hi Weijie,
>> > >> >
>> > >> >Thanks for your proposal, this will be a useful advanced
>> optimization
>> > for
>> > >> >connector developers!
>> > >> >
>> > >> >I have two questions:
>> > >> >
>> > >> >1. FLIP-204[1] hash lookup join hint is mentioned in this FLIP,
>> what's
>> > the
>> > >> >apply ordering of the two feature? For example, a connector that
>> > >> >implements the `SupportsLookupCustomShuffle` interface also has a
>> > >> >`SHUFFLE_HASH` lookup join hint specified by the user in sql, what's
>> > >> >the expected behavior?
>> > >> >
>> > >> >2. This FLIP considers the relationship with NDU processing, and I
>> > agree
>> > >> >with the current choice to prioritize NDU first. However, we should
>> > also
>> > >> >consider another issue: out-of-orderness of the changelog events in
>> > >> >streaming[2]. If the connector developer supplies a
>> non-deterministic
>> > >> >partitioner, e.g., a random partitioner for anti-skew purpose, then
>> > it'll
>> > >> >break the assumption relied by current SQL operators in streaming:
>> the
>> > >> >ADD/UDPATE_AFTER events (+I, +U) always occur before its related
>> > >> >UDPATE_BEFORE/DELETE events (-D, -U) and they are always
>> > >> >processed by the same task even if a data shuffle is involved. So a
>> > >> >straightforward approach would be to add method `isDeterministic` to
>> > >> >the `InputDataPartitioner` interface to explicitly tell the planner
>> > whether
>> > >> >the partitioner is deterministic or not(then the planner can reject
>> the
>> > >> >non-deterministic custom partitioner for correctness requirements).
>> > >> >
>> > >> >[1]
>> > >> >
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-204%3A+Introduce+Hash+Lookup+Join
>> > >> >[2]
>> > >> >
>> >
>> https://www.ververica.com/blog/flink-sql-secrets-mastering-the-art-of-changelog-event-out-of-orderness
>> > >> >
>> > >> >
>> > >> >Best,
>> > >> >Lincoln Lee
>> > >> >
>> > >> >
>> > >> >Xintong Song <tonysong...@gmail.com> 于2024年6月7日周五 13:53写道:
>> > >> >
>> > >> >> +1 for this proposal.
>> > >> >>
>> > >> >> This FLIP will make it possible for each lookup join parallel task
>> > to only
>> > >> >> access and cache a subset of the data. This will significantly
>> > improve the
>> > >> >> performance and reduce the overhead when using Paimon for the
>> > dimension
>> > >> >> table. And it's general enough to also be leveraged by other
>> > connectors.
>> > >> >>
>> > >> >> Best,
>> > >> >>
>> > >> >> Xintong
>> > >> >>
>> > >> >>
>> > >> >>
>> > >> >> On Fri, Jun 7, 2024 at 10:01 AM weijie guo <
>> > guoweijieres...@gmail.com>
>> > >> >> wrote:
>> > >> >>
>> > >> >> > Hi devs,
>> > >> >> >
>> > >> >> >
>> > >> >> > I'd like to start a discussion about FLIP-462[1]: Support Custom
>> > Data
>> > >> >> > Distribution for Input Stream of Lookup Join.
>> > >> >> >
>> > >> >> >
>> > >> >> > Lookup Join is an important feature in Flink, It is typically
>> used
>> > to
>> > >> >> > enrich a table with data that is queried from an external
>> system.
>> > >> >> > If we interact with the external systems for each incoming
>> record,
>> > we
>> > >> >> > incur significant network IO and RPC overhead.
>> > >> >> >
>> > >> >> > Therefore, most connectors introduce caching to reduce the
>> > per-record
>> > >> >> > level query overhead. However, because the data distribution of
>> > Lookup
>> > >> >> > Join's input stream is arbitrary, the cache hit rate is
>> sometimes
>> > >> >> > unsatisfactory.
>> > >> >> >
>> > >> >> >
>> > >> >> > We want to introduce a mechanism for the connector to tell the
>> > Flink
>> > >> >> > planner its desired input stream data distribution or
>> partitioning
>> > >> >> > strategy. This can significantly reduce the amount of cached
>> data
>> > and
>> > >> >> > improve performance of Lookup Join.
>> > >> >> >
>> > >> >> >
>> > >> >> > You can find more details in this FLIP[1]. Looking forward to
>> > hearing
>> > >> >> > from you, thanks!
>> > >> >> >
>> > >> >> >
>> > >> >> > Best regards,
>> > >> >> >
>> > >> >> > Weijie
>> > >> >> >
>> > >> >> >
>> > >> >> > [1]
>> > >> >> >
>> > >> >> >
>> > >> >>
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-462+Support+Custom+Data+Distribution+for+Input+Stream+of+Lookup+Join
>> > >> >> >
>> > >> >>
>> >
>>
>

Reply via email to