Hey Xuyang,

Happy to see the FLIP being approved. Could you reply to the questions
above if you have some time?

Thanks,
Gustavo

On Tue, 20 May 2025 at 08:56, Gustavo de Morais <gustavopg...@gmail.com>
wrote:

> Hey Xuyang,
>
> Thanks for proposing this and driving the discussion. In general, a very
> interesting idea.
>
> - Can you go a bit in detail on how the optimizer will work in AUTO/FORCE
> modes? When are we returning an error or falling back to the regular
> streaming join operator? Do we support, for example, a delta join for the
> first level and then regular join operator for the next join levels or will
> the optimizer just return an error if you have FORCE set and use multiple
> chained joins?
>
> - FLIP-516 adds support for a multiple join operator, which eliminates
> intermediate state between joins. I see the potential of combining this
> with FLIP-516 for some use cases. The same strategy described here could be
> used in combination with it to also reduce the amount of state used by the
> input streams in a second interaction. Or did you have a different strategy
> to approach cascading joins?
>
> Best,
> Gustavo
>
> On Thu, 15 May 2025 at 04:53, Ron Liu <ron9....@gmail.com> wrote:
>
>> Hi, Xuyang
>>
>> Thanks for your reply, looks good to me.
>>
>> Best,
>> Ron
>>
>> Xuyang <xyzhong...@163.com> 于2025年5月13日周二 15:00写道:
>>
>> > Hi, Feng. Let me address your questions:
>> >
>> > 1. As you mentioned, these filters should be recalculated. We can apply
>> > the filters after lookup the source,
>> >
>> > and then perform the join. Moreover, we can consider pushing the filters
>> > down to the lookup source to
>> >
>> > achieve a more efficient lookup.
>> >
>> > 2. You are correct that in the Calc, the join key must be referenced
>> > directly without any transformations.
>> >
>> > Otherwise, we won’t be able to revert the transformed join key back to
>> its
>> > original ones. Let me clarify this
>> >
>> > part in the Flip.
>> >
>> >
>> >
>> >
>> > --
>> >
>> >     Best!
>> >     Xuyang
>> >
>> >
>> >
>> >
>> >
>> > 在 2025-05-12 10:33:41,"Feng Jin" <jinfeng1...@gmail.com> 写道:
>> > >Hi, xuyang
>> > >
>> > >Thanks for proposing this FLIP — it’s very helpful for scenarios
>> involving
>> > >large states.
>> > >
>> > >I have a few questions regarding the node whitelist section:
>> > >
>> > >1. If the TableSource implements FilterPushdownSpec, take LEFT JOIN as
>> an
>> > >example — when the right side supports filter pushdown, the source
>> should
>> > >be able to apply the filter. However, in the case of a LookupJoin, it
>> > seems
>> > >that filter conditions cannot be pushed down. In such cases, are the
>> > >filters re-applied after the join?
>> > >
>> > >2. When there is a Calc node between the source and the join, and the
>> join
>> > >key involves some computation, does this prevent the plan from being
>> > >transformed into a Delta Join?
>> > >
>> > >
>> > >Best,
>> > >Feng
>> > >
>> > >
>> > >On Fri, May 9, 2025 at 11:34 AM Xuyang <xyzhong...@163.com> wrote:
>> > >
>> > >> Hi, Ron. Thanks for your attention to this flip.
>> > >>
>> > >> 1. At first, inner/left/right/full join will be supported. I have
>> > updated
>> > >> the flip about this part.
>> > >>
>> > >> 2. Are you referring to the situation where the downstream primary
>> key
>> > >> (PK) differs from the upstream join key? Once the sink requires the
>> > >> upstream to send complete -U and +U data to satisfy its idempotent
>> > update
>> > >> (for example, 1. when the sink PK does not match the upstream upsert
>> > key,
>> > >> it results in a sink materialization node, or 2. when the sink is a
>> > retract
>> > >> sink instead of an upsert sink), a deduplication node will be
>> introduced
>> > >> after the delta join to replay the +U data as both -U and +U. More
>> > details
>> > >> can be found in section "When and How to Convert." Please correct if
>> I
>> > >> mistake your mean.
>> > >>
>> > >> 3. Yes, the majority of the design can be shared, including the index
>> > API,
>> > >> optimization phases, and parts of the operator implementation. The
>> main
>> > >> difference is that strong consistency semantics require additional
>> > >> interfaces to establish certain agreements with the connector, such
>> as
>> > >> whether snapshot scan reading and snapshot lookup are supported.
>> > >>
>> > >> 4. You've raised an important point: the introduction of a cache is
>> not
>> > so
>> > >> straightforward. Each side has its own cache that stores remote data
>> > from
>> > >> this side's table. When one side receives data, it needs to perform
>> the
>> > >> following steps:
>> > >>
>> > >> 1). Attempt to update its own cache:
>> > >>
>> > >> 1.1) If there is a cache, update it.
>> > >>
>> > >> 1.2) If there is no cache, skip the update.
>> > >>
>> > >> 2). Query the other side's cache or external table to perform the
>> join:
>> > >>
>> > >> 2.1) If the other side has a cache, query it directly.
>> > >>
>> > >> 2.2) If the other side does not have a cache, query the external
>> table
>> > and
>> > >> establish a cache on that side.
>> > >>
>> > >> In brief, the cache is established using data from the other side
>> and is
>> > >> updated with data from its own side.
>> > >>
>> > >> 5. I've thought about this issue as well, that is to support *batch*
>> > >> lookup function. This feature would not only be beneficial for delta
>> > joins,
>> > >> but it would also be very useful for regular lookup syntax. However,
>> it
>> > may
>> > >> be necessary to pursue a separate FLIP to address this.
>> > >>
>> > >> 6. Schema with indexes in the table can be retrieved by using the
>> > >> `getTable` on `Catalog`. I do not want the engine to create indexes
>> > >> automatically. Similar to primary keys, even if we introduce index
>> > syntax,
>> > >> it should be marked as a `NOT ENFORCED` constraint.
>> > >>
>> > >> 7. Thank you for the reminder. I noticed that Multi-Way Join is
>> disabled
>> > >> by default. In simple terms, users can adjust various configs to
>> enable
>> > >> either Multi-Way Join or Delta Join. If both are enabled, the
>> Cost-Based
>> > >> Optimizer can help us choose the appropriate algorithm, although the
>> > >> current table cost is not accurate. Looking ahead, Delta Join will
>> also
>> > >> support multi-level join scenarios, referred to as Multi-Way Delta
>> Join.
>> > >>
>> > >>
>> > >>
>> > >>
>> > >> --
>> > >>
>> > >>     Best!
>> > >>     Xuyang
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >> At 2025-05-08 14:55:42, "Ron Liu" <ron9....@gmail.com> wrote:
>> > >> >Hi, Xuyang
>> > >> >
>> > >> >Thanks for initiating this FLIP, which is of great value in solving
>> the
>> > >> >Streaming Join that troubles many users. Big +1 for it.
>> > >> >
>> > >> >For the overall design of FLIP, I have the following questions:
>> > >> >
>> > >> >1. Can you explain the currently supported Join types in FLIP, such
>> as
>> > >> >Inner, Left, and Right Join?
>> > >> >
>> > >> >2. You mentioned strong consistency semantics and eventual
>> consistency
>> > >> >semantics, and gave the corresponding derivation formulas. Under
>> > eventual
>> > >> >consistency semantics,
>> > >> >you mentioned that some intermediate data will be introduced. What
>> > >> confuses
>> > >> >me here is how to achieve eventual consistency semantics if Sink
>> does
>> > not
>> > >> >support idempotent updates by Join key, after all, the Join operator
>> > >> >outputs some duplicate data.
>> > >> >
>> > >> >3. Due to storage reasons, the first step is to only consider
>> > supporting
>> > >> >eventual consistency semantics, but Paimon and other lake storages
>> > support
>> > >> >Snapshot, which can achieve strong consistency semantics. If strong
>> > >> >consistency is considered in the future, can it be well implemented
>> > based
>> > >> >on the current design?
>> > >> >
>> > >> >4. Regarding the LRU cache part, if the record in the source table
>> is
>> > >> >updated, how to automatically update the cache, otherwise I feel
>> that
>> > the
>> > >> >wrong data may be found from the cache.
>> > >> >
>> > >> >5. In terms of implementation, have you considered batching input
>> > records
>> > >> >and then querying them in batches to improve operator performance?
>> > >> >
>> > >> >6. Since Flink SQL syntax does not support indexes now, how can you
>> > create
>> > >> >indexes on the source table and pass them to Flink? I mean, how can
>> > users
>> > >> >use Delta Join?
>> > >> >
>> > >> >7. FLIP-516 solves the Streaming Join problem via Multi-Way Join.
>> When
>> > >> both
>> > >> >Join optimizations are enabled, which one will take precedence,
>> Delta
>> > Join
>> > >> >or Multi-Way Join? What is the relationship between the two?
>> > >> >
>> > >> >
>> > >> >Best,
>> > >> >Ron
>> > >> >
>> > >> >Xuyang <xyzhong...@163.com> 于2025年5月7日周三 10:02写道:
>> > >> >
>> > >> >> Hi, Weijie.
>> > >> >>
>> > >> >> Thanks for your review! Let me try to answer these questions.
>> > >> >>
>> > >> >> 1. The nodes between source and join must be included in the
>> > whitelist,
>> > >> >> such as Calc,
>> > >> >>
>> > >> >> DropUpdateBefore, WatermarkAssigner, Exchange and etc. You can see
>> > more
>> > >> in
>> > >> >> chapter
>> > >> >>
>> > >> >> "Limitations about Delta Join with Eventual Consistency".
>> > >> >>
>> > >> >> 2. Currently, we have only one vague plan. First and foremost, we
>> > need
>> > >> to
>> > >> >> ensure that storage
>> > >> >>
>> > >> >> engines like Paimon, Fluss and etcs support snapshot awareness
>> during
>> > >> >> scans and including
>> > >> >>
>> > >> >> snapshots during lookups. Overall, it is likely to be supported in
>> > >> 2.2/2.3
>> > >> >> later...
>> > >> >>
>> > >> >>
>> > >> >>
>> > >> >>
>> > >> >> --
>> > >> >>
>> > >> >>     Best!
>> > >> >>     Xuyang
>> > >> >>
>> > >> >>
>> > >> >>
>> > >> >>
>> > >> >>
>> > >> >> At 2025-04-29 11:53:48, "weijie guo" <guoweijieres...@gmail.com>
>> > wrote:
>> > >> >> >Hi Xuyang,
>> > >> >> >
>> > >> >> >Thanks for driving this! The state of two-stream join is a very
>> > >> >> >headache-inducing problem in stream computing system.
>> > >> >> >
>> > >> >> >After reading this FLIP, I have two question:
>> > >> >> >
>> > >> >> >1. Can the two sides of the join operator be any stream or must
>> it
>> > be
>> > >> >> >immediately followed by the LookupSource? If it is any stream,
>> how
>> > >> should
>> > >> >> >the stateful operators in the path be handled?
>> > >> >> >2. Are there any plans to support strong consistency in the
>> future?
>> > >> This
>> > >> >> >should be helpful for the scenarios of incremental computing.
>> > >> >> >
>> > >> >> >
>> > >> >> >
>> > >> >> >Best regards,
>> > >> >> >
>> > >> >> >Weijie
>> > >> >> >
>> > >> >> >
>> > >> >> >Xuyang <xyzhong...@163.com> 于2025年4月25日周五 10:52写道:
>> > >> >> >
>> > >> >> >> Hi, devs.
>> > >> >> >>
>> > >> >> >> I'd like to start a discussion on FLIP-486: Introduce a new
>> > >> >> DeltaJoin[1].
>> > >> >> >>
>> > >> >> >>
>> > >> >> >>
>> > >> >> >>
>> > >> >> >> In Flink streaming jobs, the large state of Join nodes has
>> been a
>> > >> >> >> persistent concern for users.
>> > >> >> >>
>> > >> >> >> Since streaming jobs are long-running, the state of join
>> generally
>> > >> >> >> increases in size over time.
>> > >> >> >>
>> > >> >> >> Although users can set the state TTL to mitigate this issue,
>> it is
>> > >> not
>> > >> >> >> applicable to all scenarios
>> > >> >> >>
>> > >> >> >> and does not provide a fundamental solution.
>> > >> >> >>
>> > >> >> >> An oversized state can lead to a series of problems, including
>> but
>> > >> not
>> > >> >> >> limited to:
>> > >> >> >>
>> > >> >> >> 1. Resource bottlenecks in individual tasks
>> > >> >> >>
>> > >> >> >> 2. Slow checkpointing, which affects job stability during the
>> > >> >> >> checkpointing process
>> > >> >> >>
>> > >> >> >> 3. Long recovery time from state
>> > >> >> >>
>> > >> >> >> In addition, when analyzing the join state, we find that in
>> some
>> > >> >> >> scenarios, the state within the join
>> > >> >> >>
>> > >> >> >> actually contains redundant data from the source tables.
>> > >> >> >>
>> > >> >> >>
>> > >> >> >>
>> > >> >> >>
>> > >> >> >> To address this issue, we aim to introduce Delta Join, which is
>> > >> based on
>> > >> >> >> the core idea of leveraging
>> > >> >> >>
>> > >> >> >> a bidirectional lookup join approach to reuse source table data
>> > as a
>> > >> >> >> substitute for join state.
>> > >> >> >>
>> > >> >> >>
>> > >> >> >>
>> > >> >> >>
>> > >> >> >> You can find more details in this Flip. I'm looking forward to
>> > your
>> > >> >> >> comments and feedback.
>> > >> >> >>
>> > >> >> >>
>> > >> >> >>
>> > >> >> >>
>> > >> >> >> [1]
>> > >> >> >>
>> > >> >>
>> > >>
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-486%3A+Introduce+A+New+DeltaJoin
>> > >> >> >>
>> > >> >> >>
>> > >> >> >>
>> > >> >> >>
>> > >> >> >> --
>> > >> >> >>
>> > >> >> >>     Best!
>> > >> >> >>     Xuyang
>> > >> >>
>> > >>
>> >
>>
>

Reply via email to