Hi, xuyang

Thanks for proposing this FLIP — it’s very helpful for scenarios involving
large states.

I have a few questions regarding the node whitelist section:

1. If the TableSource implements FilterPushdownSpec, take LEFT JOIN as an
example — when the right side supports filter pushdown, the source should
be able to apply the filter. However, in the case of a LookupJoin, it seems
that filter conditions cannot be pushed down. In such cases, are the
filters re-applied after the join?

2. When there is a Calc node between the source and the join, and the join
key involves some computation, does this prevent the plan from being
transformed into a Delta Join?


Best,
Feng


On Fri, May 9, 2025 at 11:34 AM Xuyang <xyzhong...@163.com> wrote:

> Hi, Ron. Thanks for your attention to this flip.
>
> 1. At first, inner/left/right/full join will be supported. I have updated
> the flip about this part.
>
> 2. Are you referring to the situation where the downstream primary key
> (PK) differs from the upstream join key? Once the sink requires the
> upstream to send complete -U and +U data to satisfy its idempotent update
> (for example, 1. when the sink PK does not match the upstream upsert key,
> it results in a sink materialization node, or 2. when the sink is a retract
> sink instead of an upsert sink), a deduplication node will be introduced
> after the delta join to replay the +U data as both -U and +U. More details
> can be found in section "When and How to Convert." Please correct if I
> mistake your mean.
>
> 3. Yes, the majority of the design can be shared, including the index API,
> optimization phases, and parts of the operator implementation. The main
> difference is that strong consistency semantics require additional
> interfaces to establish certain agreements with the connector, such as
> whether snapshot scan reading and snapshot lookup are supported.
>
> 4. You've raised an important point: the introduction of a cache is not so
> straightforward. Each side has its own cache that stores remote data from
> this side's table. When one side receives data, it needs to perform the
> following steps:
>
> 1). Attempt to update its own cache:
>
> 1.1) If there is a cache, update it.
>
> 1.2) If there is no cache, skip the update.
>
> 2). Query the other side's cache or external table to perform the join:
>
> 2.1) If the other side has a cache, query it directly.
>
> 2.2) If the other side does not have a cache, query the external table and
> establish a cache on that side.
>
> In brief, the cache is established using data from the other side and is
> updated with data from its own side.
>
> 5. I've thought about this issue as well, that is to support *batch*
> lookup function. This feature would not only be beneficial for delta joins,
> but it would also be very useful for regular lookup syntax. However, it may
> be necessary to pursue a separate FLIP to address this.
>
> 6. Schema with indexes in the table can be retrieved by using the
> `getTable` on `Catalog`. I do not want the engine to create indexes
> automatically. Similar to primary keys, even if we introduce index syntax,
> it should be marked as a `NOT ENFORCED` constraint.
>
> 7. Thank you for the reminder. I noticed that Multi-Way Join is disabled
> by default. In simple terms, users can adjust various configs to enable
> either Multi-Way Join or Delta Join. If both are enabled, the Cost-Based
> Optimizer can help us choose the appropriate algorithm, although the
> current table cost is not accurate. Looking ahead, Delta Join will also
> support multi-level join scenarios, referred to as Multi-Way Delta Join.
>
>
>
>
> --
>
>     Best!
>     Xuyang
>
>
>
>
>
> At 2025-05-08 14:55:42, "Ron Liu" <ron9....@gmail.com> wrote:
> >Hi, Xuyang
> >
> >Thanks for initiating this FLIP, which is of great value in solving the
> >Streaming Join that troubles many users. Big +1 for it.
> >
> >For the overall design of FLIP, I have the following questions:
> >
> >1. Can you explain the currently supported Join types in FLIP, such as
> >Inner, Left, and Right Join?
> >
> >2. You mentioned strong consistency semantics and eventual consistency
> >semantics, and gave the corresponding derivation formulas. Under eventual
> >consistency semantics,
> >you mentioned that some intermediate data will be introduced. What
> confuses
> >me here is how to achieve eventual consistency semantics if Sink does not
> >support idempotent updates by Join key, after all, the Join operator
> >outputs some duplicate data.
> >
> >3. Due to storage reasons, the first step is to only consider supporting
> >eventual consistency semantics, but Paimon and other lake storages support
> >Snapshot, which can achieve strong consistency semantics. If strong
> >consistency is considered in the future, can it be well implemented based
> >on the current design?
> >
> >4. Regarding the LRU cache part, if the record in the source table is
> >updated, how to automatically update the cache, otherwise I feel that the
> >wrong data may be found from the cache.
> >
> >5. In terms of implementation, have you considered batching input records
> >and then querying them in batches to improve operator performance?
> >
> >6. Since Flink SQL syntax does not support indexes now, how can you create
> >indexes on the source table and pass them to Flink? I mean, how can users
> >use Delta Join?
> >
> >7. FLIP-516 solves the Streaming Join problem via Multi-Way Join. When
> both
> >Join optimizations are enabled, which one will take precedence, Delta Join
> >or Multi-Way Join? What is the relationship between the two?
> >
> >
> >Best,
> >Ron
> >
> >Xuyang <xyzhong...@163.com> 于2025年5月7日周三 10:02写道:
> >
> >> Hi, Weijie.
> >>
> >> Thanks for your review! Let me try to answer these questions.
> >>
> >> 1. The nodes between source and join must be included in the whitelist,
> >> such as Calc,
> >>
> >> DropUpdateBefore, WatermarkAssigner, Exchange and etc. You can see more
> in
> >> chapter
> >>
> >> "Limitations about Delta Join with Eventual Consistency".
> >>
> >> 2. Currently, we have only one vague plan. First and foremost, we need
> to
> >> ensure that storage
> >>
> >> engines like Paimon, Fluss and etcs support snapshot awareness during
> >> scans and including
> >>
> >> snapshots during lookups. Overall, it is likely to be supported in
> 2.2/2.3
> >> later...
> >>
> >>
> >>
> >>
> >> --
> >>
> >>     Best!
> >>     Xuyang
> >>
> >>
> >>
> >>
> >>
> >> At 2025-04-29 11:53:48, "weijie guo" <guoweijieres...@gmail.com> wrote:
> >> >Hi Xuyang,
> >> >
> >> >Thanks for driving this! The state of two-stream join is a very
> >> >headache-inducing problem in stream computing system.
> >> >
> >> >After reading this FLIP, I have two question:
> >> >
> >> >1. Can the two sides of the join operator be any stream or must it be
> >> >immediately followed by the LookupSource? If it is any stream, how
> should
> >> >the stateful operators in the path be handled?
> >> >2. Are there any plans to support strong consistency in the future?
> This
> >> >should be helpful for the scenarios of incremental computing.
> >> >
> >> >
> >> >
> >> >Best regards,
> >> >
> >> >Weijie
> >> >
> >> >
> >> >Xuyang <xyzhong...@163.com> 于2025年4月25日周五 10:52写道:
> >> >
> >> >> Hi, devs.
> >> >>
> >> >> I'd like to start a discussion on FLIP-486: Introduce a new
> >> DeltaJoin[1].
> >> >>
> >> >>
> >> >>
> >> >>
> >> >> In Flink streaming jobs, the large state of Join nodes has been a
> >> >> persistent concern for users.
> >> >>
> >> >> Since streaming jobs are long-running, the state of join generally
> >> >> increases in size over time.
> >> >>
> >> >> Although users can set the state TTL to mitigate this issue, it is
> not
> >> >> applicable to all scenarios
> >> >>
> >> >> and does not provide a fundamental solution.
> >> >>
> >> >> An oversized state can lead to a series of problems, including but
> not
> >> >> limited to:
> >> >>
> >> >> 1. Resource bottlenecks in individual tasks
> >> >>
> >> >> 2. Slow checkpointing, which affects job stability during the
> >> >> checkpointing process
> >> >>
> >> >> 3. Long recovery time from state
> >> >>
> >> >> In addition, when analyzing the join state, we find that in some
> >> >> scenarios, the state within the join
> >> >>
> >> >> actually contains redundant data from the source tables.
> >> >>
> >> >>
> >> >>
> >> >>
> >> >> To address this issue, we aim to introduce Delta Join, which is
> based on
> >> >> the core idea of leveraging
> >> >>
> >> >> a bidirectional lookup join approach to reuse source table data as a
> >> >> substitute for join state.
> >> >>
> >> >>
> >> >>
> >> >>
> >> >> You can find more details in this Flip. I'm looking forward to your
> >> >> comments and feedback.
> >> >>
> >> >>
> >> >>
> >> >>
> >> >> [1]
> >> >>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-486%3A+Introduce+A+New+DeltaJoin
> >> >>
> >> >>
> >> >>
> >> >>
> >> >> --
> >> >>
> >> >>     Best!
> >> >>     Xuyang
> >>
>

Reply via email to