Hi, Xuyang

Thanks for your reply, looks good to me.

Best,
Ron

Xuyang <xyzhong...@163.com> 于2025年5月13日周二 15:00写道:

> Hi, Feng. Let me address your questions:
>
> 1. As you mentioned, these filters should be recalculated. We can apply
> the filters after lookup the source,
>
> and then perform the join. Moreover, we can consider pushing the filters
> down to the lookup source to
>
> achieve a more efficient lookup.
>
> 2. You are correct that in the Calc, the join key must be referenced
> directly without any transformations.
>
> Otherwise, we won’t be able to revert the transformed join key back to its
> original ones. Let me clarify this
>
> part in the Flip.
>
>
>
>
> --
>
>     Best!
>     Xuyang
>
>
>
>
>
> 在 2025-05-12 10:33:41,"Feng Jin" <jinfeng1...@gmail.com> 写道:
> >Hi, xuyang
> >
> >Thanks for proposing this FLIP — it’s very helpful for scenarios involving
> >large states.
> >
> >I have a few questions regarding the node whitelist section:
> >
> >1. If the TableSource implements FilterPushdownSpec, take LEFT JOIN as an
> >example — when the right side supports filter pushdown, the source should
> >be able to apply the filter. However, in the case of a LookupJoin, it
> seems
> >that filter conditions cannot be pushed down. In such cases, are the
> >filters re-applied after the join?
> >
> >2. When there is a Calc node between the source and the join, and the join
> >key involves some computation, does this prevent the plan from being
> >transformed into a Delta Join?
> >
> >
> >Best,
> >Feng
> >
> >
> >On Fri, May 9, 2025 at 11:34 AM Xuyang <xyzhong...@163.com> wrote:
> >
> >> Hi, Ron. Thanks for your attention to this flip.
> >>
> >> 1. At first, inner/left/right/full join will be supported. I have
> updated
> >> the flip about this part.
> >>
> >> 2. Are you referring to the situation where the downstream primary key
> >> (PK) differs from the upstream join key? Once the sink requires the
> >> upstream to send complete -U and +U data to satisfy its idempotent
> update
> >> (for example, 1. when the sink PK does not match the upstream upsert
> key,
> >> it results in a sink materialization node, or 2. when the sink is a
> retract
> >> sink instead of an upsert sink), a deduplication node will be introduced
> >> after the delta join to replay the +U data as both -U and +U. More
> details
> >> can be found in section "When and How to Convert." Please correct if I
> >> mistake your mean.
> >>
> >> 3. Yes, the majority of the design can be shared, including the index
> API,
> >> optimization phases, and parts of the operator implementation. The main
> >> difference is that strong consistency semantics require additional
> >> interfaces to establish certain agreements with the connector, such as
> >> whether snapshot scan reading and snapshot lookup are supported.
> >>
> >> 4. You've raised an important point: the introduction of a cache is not
> so
> >> straightforward. Each side has its own cache that stores remote data
> from
> >> this side's table. When one side receives data, it needs to perform the
> >> following steps:
> >>
> >> 1). Attempt to update its own cache:
> >>
> >> 1.1) If there is a cache, update it.
> >>
> >> 1.2) If there is no cache, skip the update.
> >>
> >> 2). Query the other side's cache or external table to perform the join:
> >>
> >> 2.1) If the other side has a cache, query it directly.
> >>
> >> 2.2) If the other side does not have a cache, query the external table
> and
> >> establish a cache on that side.
> >>
> >> In brief, the cache is established using data from the other side and is
> >> updated with data from its own side.
> >>
> >> 5. I've thought about this issue as well, that is to support *batch*
> >> lookup function. This feature would not only be beneficial for delta
> joins,
> >> but it would also be very useful for regular lookup syntax. However, it
> may
> >> be necessary to pursue a separate FLIP to address this.
> >>
> >> 6. Schema with indexes in the table can be retrieved by using the
> >> `getTable` on `Catalog`. I do not want the engine to create indexes
> >> automatically. Similar to primary keys, even if we introduce index
> syntax,
> >> it should be marked as a `NOT ENFORCED` constraint.
> >>
> >> 7. Thank you for the reminder. I noticed that Multi-Way Join is disabled
> >> by default. In simple terms, users can adjust various configs to enable
> >> either Multi-Way Join or Delta Join. If both are enabled, the Cost-Based
> >> Optimizer can help us choose the appropriate algorithm, although the
> >> current table cost is not accurate. Looking ahead, Delta Join will also
> >> support multi-level join scenarios, referred to as Multi-Way Delta Join.
> >>
> >>
> >>
> >>
> >> --
> >>
> >>     Best!
> >>     Xuyang
> >>
> >>
> >>
> >>
> >>
> >> At 2025-05-08 14:55:42, "Ron Liu" <ron9....@gmail.com> wrote:
> >> >Hi, Xuyang
> >> >
> >> >Thanks for initiating this FLIP, which is of great value in solving the
> >> >Streaming Join that troubles many users. Big +1 for it.
> >> >
> >> >For the overall design of FLIP, I have the following questions:
> >> >
> >> >1. Can you explain the currently supported Join types in FLIP, such as
> >> >Inner, Left, and Right Join?
> >> >
> >> >2. You mentioned strong consistency semantics and eventual consistency
> >> >semantics, and gave the corresponding derivation formulas. Under
> eventual
> >> >consistency semantics,
> >> >you mentioned that some intermediate data will be introduced. What
> >> confuses
> >> >me here is how to achieve eventual consistency semantics if Sink does
> not
> >> >support idempotent updates by Join key, after all, the Join operator
> >> >outputs some duplicate data.
> >> >
> >> >3. Due to storage reasons, the first step is to only consider
> supporting
> >> >eventual consistency semantics, but Paimon and other lake storages
> support
> >> >Snapshot, which can achieve strong consistency semantics. If strong
> >> >consistency is considered in the future, can it be well implemented
> based
> >> >on the current design?
> >> >
> >> >4. Regarding the LRU cache part, if the record in the source table is
> >> >updated, how to automatically update the cache, otherwise I feel that
> the
> >> >wrong data may be found from the cache.
> >> >
> >> >5. In terms of implementation, have you considered batching input
> records
> >> >and then querying them in batches to improve operator performance?
> >> >
> >> >6. Since Flink SQL syntax does not support indexes now, how can you
> create
> >> >indexes on the source table and pass them to Flink? I mean, how can
> users
> >> >use Delta Join?
> >> >
> >> >7. FLIP-516 solves the Streaming Join problem via Multi-Way Join. When
> >> both
> >> >Join optimizations are enabled, which one will take precedence, Delta
> Join
> >> >or Multi-Way Join? What is the relationship between the two?
> >> >
> >> >
> >> >Best,
> >> >Ron
> >> >
> >> >Xuyang <xyzhong...@163.com> 于2025年5月7日周三 10:02写道:
> >> >
> >> >> Hi, Weijie.
> >> >>
> >> >> Thanks for your review! Let me try to answer these questions.
> >> >>
> >> >> 1. The nodes between source and join must be included in the
> whitelist,
> >> >> such as Calc,
> >> >>
> >> >> DropUpdateBefore, WatermarkAssigner, Exchange and etc. You can see
> more
> >> in
> >> >> chapter
> >> >>
> >> >> "Limitations about Delta Join with Eventual Consistency".
> >> >>
> >> >> 2. Currently, we have only one vague plan. First and foremost, we
> need
> >> to
> >> >> ensure that storage
> >> >>
> >> >> engines like Paimon, Fluss and etcs support snapshot awareness during
> >> >> scans and including
> >> >>
> >> >> snapshots during lookups. Overall, it is likely to be supported in
> >> 2.2/2.3
> >> >> later...
> >> >>
> >> >>
> >> >>
> >> >>
> >> >> --
> >> >>
> >> >>     Best!
> >> >>     Xuyang
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >> At 2025-04-29 11:53:48, "weijie guo" <guoweijieres...@gmail.com>
> wrote:
> >> >> >Hi Xuyang,
> >> >> >
> >> >> >Thanks for driving this! The state of two-stream join is a very
> >> >> >headache-inducing problem in stream computing system.
> >> >> >
> >> >> >After reading this FLIP, I have two question:
> >> >> >
> >> >> >1. Can the two sides of the join operator be any stream or must it
> be
> >> >> >immediately followed by the LookupSource? If it is any stream, how
> >> should
> >> >> >the stateful operators in the path be handled?
> >> >> >2. Are there any plans to support strong consistency in the future?
> >> This
> >> >> >should be helpful for the scenarios of incremental computing.
> >> >> >
> >> >> >
> >> >> >
> >> >> >Best regards,
> >> >> >
> >> >> >Weijie
> >> >> >
> >> >> >
> >> >> >Xuyang <xyzhong...@163.com> 于2025年4月25日周五 10:52写道:
> >> >> >
> >> >> >> Hi, devs.
> >> >> >>
> >> >> >> I'd like to start a discussion on FLIP-486: Introduce a new
> >> >> DeltaJoin[1].
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >> In Flink streaming jobs, the large state of Join nodes has been a
> >> >> >> persistent concern for users.
> >> >> >>
> >> >> >> Since streaming jobs are long-running, the state of join generally
> >> >> >> increases in size over time.
> >> >> >>
> >> >> >> Although users can set the state TTL to mitigate this issue, it is
> >> not
> >> >> >> applicable to all scenarios
> >> >> >>
> >> >> >> and does not provide a fundamental solution.
> >> >> >>
> >> >> >> An oversized state can lead to a series of problems, including but
> >> not
> >> >> >> limited to:
> >> >> >>
> >> >> >> 1. Resource bottlenecks in individual tasks
> >> >> >>
> >> >> >> 2. Slow checkpointing, which affects job stability during the
> >> >> >> checkpointing process
> >> >> >>
> >> >> >> 3. Long recovery time from state
> >> >> >>
> >> >> >> In addition, when analyzing the join state, we find that in some
> >> >> >> scenarios, the state within the join
> >> >> >>
> >> >> >> actually contains redundant data from the source tables.
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >> To address this issue, we aim to introduce Delta Join, which is
> >> based on
> >> >> >> the core idea of leveraging
> >> >> >>
> >> >> >> a bidirectional lookup join approach to reuse source table data
> as a
> >> >> >> substitute for join state.
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >> You can find more details in this Flip. I'm looking forward to
> your
> >> >> >> comments and feedback.
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >> [1]
> >> >> >>
> >> >>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-486%3A+Introduce+A+New+DeltaJoin
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >> --
> >> >> >>
> >> >> >>     Best!
> >> >> >>     Xuyang
> >> >>
> >>
>

Reply via email to