Hi Xuyang,

Thx for driving this FLIP. I believe delta join is truly helpful for large
state scenarios. The overall design LGTM. Also, I like the idea of updating
the cache using the data from its own side.

I have one minor question: for storages like Paimon, it has its own cache
in its LookupFunction. There can be inconsistencies between the cache in
the DeltaJoinOperator and the cache inside the LookupFunticon, because the
incoming data will only update the former.

How should we address this issue, is it possible the incoming data could
also update the cache inside LookupFunction? Or we can assume that all
LookupFunction with cache should extend `CachingLookupFunction`, then we
can use the incoming data to update its `LookupCache`. Best,
Xiangyu Feng

Ron Liu <ron9....@gmail.com> 于2025年5月15日周四 10:53写道:

> Hi, Xuyang
>
> Thanks for your reply, looks good to me.
>
> Best,
> Ron
>
> Xuyang <xyzhong...@163.com> 于2025年5月13日周二 15:00写道:
>
> > Hi, Feng. Let me address your questions:
> >
> > 1. As you mentioned, these filters should be recalculated. We can apply
> > the filters after lookup the source,
> >
> > and then perform the join. Moreover, we can consider pushing the filters
> > down to the lookup source to
> >
> > achieve a more efficient lookup.
> >
> > 2. You are correct that in the Calc, the join key must be referenced
> > directly without any transformations.
> >
> > Otherwise, we won’t be able to revert the transformed join key back to
> its
> > original ones. Let me clarify this
> >
> > part in the Flip.
> >
> >
> >
> >
> > --
> >
> >     Best!
> >     Xuyang
> >
> >
> >
> >
> >
> > 在 2025-05-12 10:33:41,"Feng Jin" <jinfeng1...@gmail.com> 写道:
> > >Hi, xuyang
> > >
> > >Thanks for proposing this FLIP — it’s very helpful for scenarios
> involving
> > >large states.
> > >
> > >I have a few questions regarding the node whitelist section:
> > >
> > >1. If the TableSource implements FilterPushdownSpec, take LEFT JOIN as
> an
> > >example — when the right side supports filter pushdown, the source
> should
> > >be able to apply the filter. However, in the case of a LookupJoin, it
> > seems
> > >that filter conditions cannot be pushed down. In such cases, are the
> > >filters re-applied after the join?
> > >
> > >2. When there is a Calc node between the source and the join, and the
> join
> > >key involves some computation, does this prevent the plan from being
> > >transformed into a Delta Join?
> > >
> > >
> > >Best,
> > >Feng
> > >
> > >
> > >On Fri, May 9, 2025 at 11:34 AM Xuyang <xyzhong...@163.com> wrote:
> > >
> > >> Hi, Ron. Thanks for your attention to this flip.
> > >>
> > >> 1. At first, inner/left/right/full join will be supported. I have
> > updated
> > >> the flip about this part.
> > >>
> > >> 2. Are you referring to the situation where the downstream primary key
> > >> (PK) differs from the upstream join key? Once the sink requires the
> > >> upstream to send complete -U and +U data to satisfy its idempotent
> > update
> > >> (for example, 1. when the sink PK does not match the upstream upsert
> > key,
> > >> it results in a sink materialization node, or 2. when the sink is a
> > retract
> > >> sink instead of an upsert sink), a deduplication node will be
> introduced
> > >> after the delta join to replay the +U data as both -U and +U. More
> > details
> > >> can be found in section "When and How to Convert." Please correct if I
> > >> mistake your mean.
> > >>
> > >> 3. Yes, the majority of the design can be shared, including the index
> > API,
> > >> optimization phases, and parts of the operator implementation. The
> main
> > >> difference is that strong consistency semantics require additional
> > >> interfaces to establish certain agreements with the connector, such as
> > >> whether snapshot scan reading and snapshot lookup are supported.
> > >>
> > >> 4. You've raised an important point: the introduction of a cache is
> not
> > so
> > >> straightforward. Each side has its own cache that stores remote data
> > from
> > >> this side's table. When one side receives data, it needs to perform
> the
> > >> following steps:
> > >>
> > >> 1). Attempt to update its own cache:
> > >>
> > >> 1.1) If there is a cache, update it.
> > >>
> > >> 1.2) If there is no cache, skip the update.
> > >>
> > >> 2). Query the other side's cache or external table to perform the
> join:
> > >>
> > >> 2.1) If the other side has a cache, query it directly.
> > >>
> > >> 2.2) If the other side does not have a cache, query the external table
> > and
> > >> establish a cache on that side.
> > >>
> > >> In brief, the cache is established using data from the other side and
> is
> > >> updated with data from its own side.
> > >>
> > >> 5. I've thought about this issue as well, that is to support *batch*
> > >> lookup function. This feature would not only be beneficial for delta
> > joins,
> > >> but it would also be very useful for regular lookup syntax. However,
> it
> > may
> > >> be necessary to pursue a separate FLIP to address this.
> > >>
> > >> 6. Schema with indexes in the table can be retrieved by using the
> > >> `getTable` on `Catalog`. I do not want the engine to create indexes
> > >> automatically. Similar to primary keys, even if we introduce index
> > syntax,
> > >> it should be marked as a `NOT ENFORCED` constraint.
> > >>
> > >> 7. Thank you for the reminder. I noticed that Multi-Way Join is
> disabled
> > >> by default. In simple terms, users can adjust various configs to
> enable
> > >> either Multi-Way Join or Delta Join. If both are enabled, the
> Cost-Based
> > >> Optimizer can help us choose the appropriate algorithm, although the
> > >> current table cost is not accurate. Looking ahead, Delta Join will
> also
> > >> support multi-level join scenarios, referred to as Multi-Way Delta
> Join.
> > >>
> > >>
> > >>
> > >>
> > >> --
> > >>
> > >>     Best!
> > >>     Xuyang
> > >>
> > >>
> > >>
> > >>
> > >>
> > >> At 2025-05-08 14:55:42, "Ron Liu" <ron9....@gmail.com> wrote:
> > >> >Hi, Xuyang
> > >> >
> > >> >Thanks for initiating this FLIP, which is of great value in solving
> the
> > >> >Streaming Join that troubles many users. Big +1 for it.
> > >> >
> > >> >For the overall design of FLIP, I have the following questions:
> > >> >
> > >> >1. Can you explain the currently supported Join types in FLIP, such
> as
> > >> >Inner, Left, and Right Join?
> > >> >
> > >> >2. You mentioned strong consistency semantics and eventual
> consistency
> > >> >semantics, and gave the corresponding derivation formulas. Under
> > eventual
> > >> >consistency semantics,
> > >> >you mentioned that some intermediate data will be introduced. What
> > >> confuses
> > >> >me here is how to achieve eventual consistency semantics if Sink does
> > not
> > >> >support idempotent updates by Join key, after all, the Join operator
> > >> >outputs some duplicate data.
> > >> >
> > >> >3. Due to storage reasons, the first step is to only consider
> > supporting
> > >> >eventual consistency semantics, but Paimon and other lake storages
> > support
> > >> >Snapshot, which can achieve strong consistency semantics. If strong
> > >> >consistency is considered in the future, can it be well implemented
> > based
> > >> >on the current design?
> > >> >
> > >> >4. Regarding the LRU cache part, if the record in the source table is
> > >> >updated, how to automatically update the cache, otherwise I feel that
> > the
> > >> >wrong data may be found from the cache.
> > >> >
> > >> >5. In terms of implementation, have you considered batching input
> > records
> > >> >and then querying them in batches to improve operator performance?
> > >> >
> > >> >6. Since Flink SQL syntax does not support indexes now, how can you
> > create
> > >> >indexes on the source table and pass them to Flink? I mean, how can
> > users
> > >> >use Delta Join?
> > >> >
> > >> >7. FLIP-516 solves the Streaming Join problem via Multi-Way Join.
> When
> > >> both
> > >> >Join optimizations are enabled, which one will take precedence, Delta
> > Join
> > >> >or Multi-Way Join? What is the relationship between the two?
> > >> >
> > >> >
> > >> >Best,
> > >> >Ron
> > >> >
> > >> >Xuyang <xyzhong...@163.com> 于2025年5月7日周三 10:02写道:
> > >> >
> > >> >> Hi, Weijie.
> > >> >>
> > >> >> Thanks for your review! Let me try to answer these questions.
> > >> >>
> > >> >> 1. The nodes between source and join must be included in the
> > whitelist,
> > >> >> such as Calc,
> > >> >>
> > >> >> DropUpdateBefore, WatermarkAssigner, Exchange and etc. You can see
> > more
> > >> in
> > >> >> chapter
> > >> >>
> > >> >> "Limitations about Delta Join with Eventual Consistency".
> > >> >>
> > >> >> 2. Currently, we have only one vague plan. First and foremost, we
> > need
> > >> to
> > >> >> ensure that storage
> > >> >>
> > >> >> engines like Paimon, Fluss and etcs support snapshot awareness
> during
> > >> >> scans and including
> > >> >>
> > >> >> snapshots during lookups. Overall, it is likely to be supported in
> > >> 2.2/2.3
> > >> >> later...
> > >> >>
> > >> >>
> > >> >>
> > >> >>
> > >> >> --
> > >> >>
> > >> >>     Best!
> > >> >>     Xuyang
> > >> >>
> > >> >>
> > >> >>
> > >> >>
> > >> >>
> > >> >> At 2025-04-29 11:53:48, "weijie guo" <guoweijieres...@gmail.com>
> > wrote:
> > >> >> >Hi Xuyang,
> > >> >> >
> > >> >> >Thanks for driving this! The state of two-stream join is a very
> > >> >> >headache-inducing problem in stream computing system.
> > >> >> >
> > >> >> >After reading this FLIP, I have two question:
> > >> >> >
> > >> >> >1. Can the two sides of the join operator be any stream or must it
> > be
> > >> >> >immediately followed by the LookupSource? If it is any stream, how
> > >> should
> > >> >> >the stateful operators in the path be handled?
> > >> >> >2. Are there any plans to support strong consistency in the
> future?
> > >> This
> > >> >> >should be helpful for the scenarios of incremental computing.
> > >> >> >
> > >> >> >
> > >> >> >
> > >> >> >Best regards,
> > >> >> >
> > >> >> >Weijie
> > >> >> >
> > >> >> >
> > >> >> >Xuyang <xyzhong...@163.com> 于2025年4月25日周五 10:52写道:
> > >> >> >
> > >> >> >> Hi, devs.
> > >> >> >>
> > >> >> >> I'd like to start a discussion on FLIP-486: Introduce a new
> > >> >> DeltaJoin[1].
> > >> >> >>
> > >> >> >>
> > >> >> >>
> > >> >> >>
> > >> >> >> In Flink streaming jobs, the large state of Join nodes has been
> a
> > >> >> >> persistent concern for users.
> > >> >> >>
> > >> >> >> Since streaming jobs are long-running, the state of join
> generally
> > >> >> >> increases in size over time.
> > >> >> >>
> > >> >> >> Although users can set the state TTL to mitigate this issue, it
> is
> > >> not
> > >> >> >> applicable to all scenarios
> > >> >> >>
> > >> >> >> and does not provide a fundamental solution.
> > >> >> >>
> > >> >> >> An oversized state can lead to a series of problems, including
> but
> > >> not
> > >> >> >> limited to:
> > >> >> >>
> > >> >> >> 1. Resource bottlenecks in individual tasks
> > >> >> >>
> > >> >> >> 2. Slow checkpointing, which affects job stability during the
> > >> >> >> checkpointing process
> > >> >> >>
> > >> >> >> 3. Long recovery time from state
> > >> >> >>
> > >> >> >> In addition, when analyzing the join state, we find that in some
> > >> >> >> scenarios, the state within the join
> > >> >> >>
> > >> >> >> actually contains redundant data from the source tables.
> > >> >> >>
> > >> >> >>
> > >> >> >>
> > >> >> >>
> > >> >> >> To address this issue, we aim to introduce Delta Join, which is
> > >> based on
> > >> >> >> the core idea of leveraging
> > >> >> >>
> > >> >> >> a bidirectional lookup join approach to reuse source table data
> > as a
> > >> >> >> substitute for join state.
> > >> >> >>
> > >> >> >>
> > >> >> >>
> > >> >> >>
> > >> >> >> You can find more details in this Flip. I'm looking forward to
> > your
> > >> >> >> comments and feedback.
> > >> >> >>
> > >> >> >>
> > >> >> >>
> > >> >> >>
> > >> >> >> [1]
> > >> >> >>
> > >> >>
> > >>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-486%3A+Introduce+A+New+DeltaJoin
> > >> >> >>
> > >> >> >>
> > >> >> >>
> > >> >> >>
> > >> >> >> --
> > >> >> >>
> > >> >> >>     Best!
> > >> >> >>     Xuyang
> > >> >>
> > >>
> >
>

Reply via email to