Hi, Xuyang

Thanks for initiating this FLIP, which is of great value in solving the
Streaming Join that troubles many users. Big +1 for it.

For the overall design of FLIP, I have the following questions:

1. Can you explain the currently supported Join types in FLIP, such as
Inner, Left, and Right Join?

2. You mentioned strong consistency semantics and eventual consistency
semantics, and gave the corresponding derivation formulas. Under eventual
consistency semantics,
you mentioned that some intermediate data will be introduced. What confuses
me here is how to achieve eventual consistency semantics if Sink does not
support idempotent updates by Join key, after all, the Join operator
outputs some duplicate data.

3. Due to storage reasons, the first step is to only consider supporting
eventual consistency semantics, but Paimon and other lake storages support
Snapshot, which can achieve strong consistency semantics. If strong
consistency is considered in the future, can it be well implemented based
on the current design?

4. Regarding the LRU cache part, if the record in the source table is
updated, how to automatically update the cache, otherwise I feel that the
wrong data may be found from the cache.

5. In terms of implementation, have you considered batching input records
and then querying them in batches to improve operator performance?

6. Since Flink SQL syntax does not support indexes now, how can you create
indexes on the source table and pass them to Flink? I mean, how can users
use Delta Join?

7. FLIP-516 solves the Streaming Join problem via Multi-Way Join. When both
Join optimizations are enabled, which one will take precedence, Delta Join
or Multi-Way Join? What is the relationship between the two?


Best,
Ron

Xuyang <xyzhong...@163.com> 于2025年5月7日周三 10:02写道:

> Hi, Weijie.
>
> Thanks for your review! Let me try to answer these questions.
>
> 1. The nodes between source and join must be included in the whitelist,
> such as Calc,
>
> DropUpdateBefore, WatermarkAssigner, Exchange and etc. You can see more in
> chapter
>
> "Limitations about Delta Join with Eventual Consistency".
>
> 2. Currently, we have only one vague plan. First and foremost, we need to
> ensure that storage
>
> engines like Paimon, Fluss and etcs support snapshot awareness during
> scans and including
>
> snapshots during lookups. Overall, it is likely to be supported in 2.2/2.3
> later...
>
>
>
>
> --
>
>     Best!
>     Xuyang
>
>
>
>
>
> At 2025-04-29 11:53:48, "weijie guo" <guoweijieres...@gmail.com> wrote:
> >Hi Xuyang,
> >
> >Thanks for driving this! The state of two-stream join is a very
> >headache-inducing problem in stream computing system.
> >
> >After reading this FLIP, I have two question:
> >
> >1. Can the two sides of the join operator be any stream or must it be
> >immediately followed by the LookupSource? If it is any stream, how should
> >the stateful operators in the path be handled?
> >2. Are there any plans to support strong consistency in the future? This
> >should be helpful for the scenarios of incremental computing.
> >
> >
> >
> >Best regards,
> >
> >Weijie
> >
> >
> >Xuyang <xyzhong...@163.com> 于2025年4月25日周五 10:52写道:
> >
> >> Hi, devs.
> >>
> >> I'd like to start a discussion on FLIP-486: Introduce a new
> DeltaJoin[1].
> >>
> >>
> >>
> >>
> >> In Flink streaming jobs, the large state of Join nodes has been a
> >> persistent concern for users.
> >>
> >> Since streaming jobs are long-running, the state of join generally
> >> increases in size over time.
> >>
> >> Although users can set the state TTL to mitigate this issue, it is not
> >> applicable to all scenarios
> >>
> >> and does not provide a fundamental solution.
> >>
> >> An oversized state can lead to a series of problems, including but not
> >> limited to:
> >>
> >> 1. Resource bottlenecks in individual tasks
> >>
> >> 2. Slow checkpointing, which affects job stability during the
> >> checkpointing process
> >>
> >> 3. Long recovery time from state
> >>
> >> In addition, when analyzing the join state, we find that in some
> >> scenarios, the state within the join
> >>
> >> actually contains redundant data from the source tables.
> >>
> >>
> >>
> >>
> >> To address this issue, we aim to introduce Delta Join, which is based on
> >> the core idea of leveraging
> >>
> >> a bidirectional lookup join approach to reuse source table data as a
> >> substitute for join state.
> >>
> >>
> >>
> >>
> >> You can find more details in this Flip. I'm looking forward to your
> >> comments and feedback.
> >>
> >>
> >>
> >>
> >> [1]
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-486%3A+Introduce+A+New+DeltaJoin
> >>
> >>
> >>
> >>
> >> --
> >>
> >>     Best!
> >>     Xuyang
>

Reply via email to