Hi Weijie,

Thanks for your proposal, this will be a useful advanced optimization for
connector developers!

I have two questions:

1. FLIP-204[1] hash lookup join hint is mentioned in this FLIP, what's the
apply ordering of the two feature? For example, a connector that
implements the `SupportsLookupCustomShuffle` interface also has a
`SHUFFLE_HASH` lookup join hint specified by the user in sql, what's
the expected behavior?

2. This FLIP considers the relationship with NDU processing, and I agree
with the current choice to prioritize NDU first. However, we should also
consider another issue: out-of-orderness of the changelog events in
streaming[2]. If the connector developer supplies a non-deterministic
partitioner, e.g., a random partitioner for anti-skew purpose, then it'll
break the assumption relied by current SQL operators in streaming: the
ADD/UDPATE_AFTER events (+I, +U) always occur before its related
UDPATE_BEFORE/DELETE events (-D, -U) and they are always
processed by the same task even if a data shuffle is involved. So a
straightforward approach would be to add method `isDeterministic` to
the `InputDataPartitioner` interface to explicitly tell the planner whether
the partitioner is deterministic or not(then the planner can reject the
non-deterministic custom partitioner for correctness requirements).

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-204%3A+Introduce+Hash+Lookup+Join
[2]
https://www.ververica.com/blog/flink-sql-secrets-mastering-the-art-of-changelog-event-out-of-orderness


Best,
Lincoln Lee


Xintong Song <tonysong...@gmail.com> 于2024年6月7日周五 13:53写道:

> +1 for this proposal.
>
> This FLIP will make it possible for each lookup join parallel task to only
> access and cache a subset of the data. This will significantly improve the
> performance and reduce the overhead when using Paimon for the dimension
> table. And it's general enough to also be leveraged by other connectors.
>
> Best,
>
> Xintong
>
>
>
> On Fri, Jun 7, 2024 at 10:01 AM weijie guo <guoweijieres...@gmail.com>
> wrote:
>
> > Hi devs,
> >
> >
> > I'd like to start a discussion about FLIP-462[1]: Support Custom Data
> > Distribution for Input Stream of Lookup Join.
> >
> >
> > Lookup Join is an important feature in Flink, It is typically used to
> > enrich a table with data that is queried from an external system.
> > If we interact with the external systems for each incoming record, we
> > incur significant network IO and RPC overhead.
> >
> > Therefore, most connectors introduce caching to reduce the per-record
> > level query overhead. However, because the data distribution of Lookup
> > Join's input stream is arbitrary, the cache hit rate is sometimes
> > unsatisfactory.
> >
> >
> > We want to introduce a mechanism for the connector to tell the Flink
> > planner its desired input stream data distribution or partitioning
> > strategy. This can significantly reduce the amount of cached data and
> > improve performance of Lookup Join.
> >
> >
> > You can find more details in this FLIP[1]. Looking forward to hearing
> > from you, thanks!
> >
> >
> > Best regards,
> >
> > Weijie
> >
> >
> > [1]
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-462+Support+Custom+Data+Distribution+for+Input+Stream+of+Lookup+Join
> >
>

Reply via email to