Hi, devs.

Due to the long duration of this discussion. ff there are no further 
objections, I will start the voting thread tomorrow.




--

    Best!
    Xuyang





At 2025-05-19 10:05:46, "Xuyang" <xyzhong...@163.com> wrote:
>Hi, Xiangyu. 
>
>You are correct that the lookup function on the lookup side should not use 
>caching. Therefore, we need to unwrap the actual lookup function from the 
>CachingLookupFunction(instead of updating it in place). We assume that the 
>cached function utilized by the lookup table is based on 
>CachingLookupFunction; otherwise, there may be some issues. Additionally, 
>since only source that proactively report index will optimize to a delta join, 
>there should not be any issues with historical jobs.
>
>
>
>
>--
>
>    Best!
>    Xuyang
>
>
>
>
>
>At 2025-05-16 01:07:21, "xiangyu feng" <xiangyu...@gmail.com> wrote:
>>Hi Xuyang,
>>
>>Thx for driving this FLIP. I believe delta join is truly helpful for large
>>state scenarios. The overall design LGTM. Also, I like the idea of updating
>>the cache using the data from its own side.
>>
>>I have one minor question: for storages like Paimon, it has its own cache
>>in its LookupFunction. There can be inconsistencies between the cache in
>>the DeltaJoinOperator and the cache inside the LookupFunticon, because the
>>incoming data will only update the former.
>>
>>How should we address this issue, is it possible the incoming data could
>>also update the cache inside LookupFunction? Or we can assume that all
>>LookupFunction with cache should extend `CachingLookupFunction`, then we
>>can use the incoming data to update its `LookupCache`. Best,
>>Xiangyu Feng
>>
>>Ron Liu <ron9....@gmail.com> 于2025年5月15日周四 10:53写道:
>>
>>> Hi, Xuyang
>>>
>>> Thanks for your reply, looks good to me.
>>>
>>> Best,
>>> Ron
>>>
>>> Xuyang <xyzhong...@163.com> 于2025年5月13日周二 15:00写道:
>>>
>>> > Hi, Feng. Let me address your questions:
>>> >
>>> > 1. As you mentioned, these filters should be recalculated. We can apply
>>> > the filters after lookup the source,
>>> >
>>> > and then perform the join. Moreover, we can consider pushing the filters
>>> > down to the lookup source to
>>> >
>>> > achieve a more efficient lookup.
>>> >
>>> > 2. You are correct that in the Calc, the join key must be referenced
>>> > directly without any transformations.
>>> >
>>> > Otherwise, we won’t be able to revert the transformed join key back to
>>> its
>>> > original ones. Let me clarify this
>>> >
>>> > part in the Flip.
>>> >
>>> >
>>> >
>>> >
>>> > --
>>> >
>>> >     Best!
>>> >     Xuyang
>>> >
>>> >
>>> >
>>> >
>>> >
>>> > 在 2025-05-12 10:33:41,"Feng Jin" <jinfeng1...@gmail.com> 写道:
>>> > >Hi, xuyang
>>> > >
>>> > >Thanks for proposing this FLIP — it’s very helpful for scenarios
>>> involving
>>> > >large states.
>>> > >
>>> > >I have a few questions regarding the node whitelist section:
>>> > >
>>> > >1. If the TableSource implements FilterPushdownSpec, take LEFT JOIN as
>>> an
>>> > >example — when the right side supports filter pushdown, the source
>>> should
>>> > >be able to apply the filter. However, in the case of a LookupJoin, it
>>> > seems
>>> > >that filter conditions cannot be pushed down. In such cases, are the
>>> > >filters re-applied after the join?
>>> > >
>>> > >2. When there is a Calc node between the source and the join, and the
>>> join
>>> > >key involves some computation, does this prevent the plan from being
>>> > >transformed into a Delta Join?
>>> > >
>>> > >
>>> > >Best,
>>> > >Feng
>>> > >
>>> > >
>>> > >On Fri, May 9, 2025 at 11:34 AM Xuyang <xyzhong...@163.com> wrote:
>>> > >
>>> > >> Hi, Ron. Thanks for your attention to this flip.
>>> > >>
>>> > >> 1. At first, inner/left/right/full join will be supported. I have
>>> > updated
>>> > >> the flip about this part.
>>> > >>
>>> > >> 2. Are you referring to the situation where the downstream primary key
>>> > >> (PK) differs from the upstream join key? Once the sink requires the
>>> > >> upstream to send complete -U and +U data to satisfy its idempotent
>>> > update
>>> > >> (for example, 1. when the sink PK does not match the upstream upsert
>>> > key,
>>> > >> it results in a sink materialization node, or 2. when the sink is a
>>> > retract
>>> > >> sink instead of an upsert sink), a deduplication node will be
>>> introduced
>>> > >> after the delta join to replay the +U data as both -U and +U. More
>>> > details
>>> > >> can be found in section "When and How to Convert." Please correct if I
>>> > >> mistake your mean.
>>> > >>
>>> > >> 3. Yes, the majority of the design can be shared, including the index
>>> > API,
>>> > >> optimization phases, and parts of the operator implementation. The
>>> main
>>> > >> difference is that strong consistency semantics require additional
>>> > >> interfaces to establish certain agreements with the connector, such as
>>> > >> whether snapshot scan reading and snapshot lookup are supported.
>>> > >>
>>> > >> 4. You've raised an important point: the introduction of a cache is
>>> not
>>> > so
>>> > >> straightforward. Each side has its own cache that stores remote data
>>> > from
>>> > >> this side's table. When one side receives data, it needs to perform
>>> the
>>> > >> following steps:
>>> > >>
>>> > >> 1). Attempt to update its own cache:
>>> > >>
>>> > >> 1.1) If there is a cache, update it.
>>> > >>
>>> > >> 1.2) If there is no cache, skip the update.
>>> > >>
>>> > >> 2). Query the other side's cache or external table to perform the
>>> join:
>>> > >>
>>> > >> 2.1) If the other side has a cache, query it directly.
>>> > >>
>>> > >> 2.2) If the other side does not have a cache, query the external table
>>> > and
>>> > >> establish a cache on that side.
>>> > >>
>>> > >> In brief, the cache is established using data from the other side and
>>> is
>>> > >> updated with data from its own side.
>>> > >>
>>> > >> 5. I've thought about this issue as well, that is to support *batch*
>>> > >> lookup function. This feature would not only be beneficial for delta
>>> > joins,
>>> > >> but it would also be very useful for regular lookup syntax. However,
>>> it
>>> > may
>>> > >> be necessary to pursue a separate FLIP to address this.
>>> > >>
>>> > >> 6. Schema with indexes in the table can be retrieved by using the
>>> > >> `getTable` on `Catalog`. I do not want the engine to create indexes
>>> > >> automatically. Similar to primary keys, even if we introduce index
>>> > syntax,
>>> > >> it should be marked as a `NOT ENFORCED` constraint.
>>> > >>
>>> > >> 7. Thank you for the reminder. I noticed that Multi-Way Join is
>>> disabled
>>> > >> by default. In simple terms, users can adjust various configs to
>>> enable
>>> > >> either Multi-Way Join or Delta Join. If both are enabled, the
>>> Cost-Based
>>> > >> Optimizer can help us choose the appropriate algorithm, although the
>>> > >> current table cost is not accurate. Looking ahead, Delta Join will
>>> also
>>> > >> support multi-level join scenarios, referred to as Multi-Way Delta
>>> Join.
>>> > >>
>>> > >>
>>> > >>
>>> > >>
>>> > >> --
>>> > >>
>>> > >>     Best!
>>> > >>     Xuyang
>>> > >>
>>> > >>
>>> > >>
>>> > >>
>>> > >>
>>> > >> At 2025-05-08 14:55:42, "Ron Liu" <ron9....@gmail.com> wrote:
>>> > >> >Hi, Xuyang
>>> > >> >
>>> > >> >Thanks for initiating this FLIP, which is of great value in solving
>>> the
>>> > >> >Streaming Join that troubles many users. Big +1 for it.
>>> > >> >
>>> > >> >For the overall design of FLIP, I have the following questions:
>>> > >> >
>>> > >> >1. Can you explain the currently supported Join types in FLIP, such
>>> as
>>> > >> >Inner, Left, and Right Join?
>>> > >> >
>>> > >> >2. You mentioned strong consistency semantics and eventual
>>> consistency
>>> > >> >semantics, and gave the corresponding derivation formulas. Under
>>> > eventual
>>> > >> >consistency semantics,
>>> > >> >you mentioned that some intermediate data will be introduced. What
>>> > >> confuses
>>> > >> >me here is how to achieve eventual consistency semantics if Sink does
>>> > not
>>> > >> >support idempotent updates by Join key, after all, the Join operator
>>> > >> >outputs some duplicate data.
>>> > >> >
>>> > >> >3. Due to storage reasons, the first step is to only consider
>>> > supporting
>>> > >> >eventual consistency semantics, but Paimon and other lake storages
>>> > support
>>> > >> >Snapshot, which can achieve strong consistency semantics. If strong
>>> > >> >consistency is considered in the future, can it be well implemented
>>> > based
>>> > >> >on the current design?
>>> > >> >
>>> > >> >4. Regarding the LRU cache part, if the record in the source table is
>>> > >> >updated, how to automatically update the cache, otherwise I feel that
>>> > the
>>> > >> >wrong data may be found from the cache.
>>> > >> >
>>> > >> >5. In terms of implementation, have you considered batching input
>>> > records
>>> > >> >and then querying them in batches to improve operator performance?
>>> > >> >
>>> > >> >6. Since Flink SQL syntax does not support indexes now, how can you
>>> > create
>>> > >> >indexes on the source table and pass them to Flink? I mean, how can
>>> > users
>>> > >> >use Delta Join?
>>> > >> >
>>> > >> >7. FLIP-516 solves the Streaming Join problem via Multi-Way Join.
>>> When
>>> > >> both
>>> > >> >Join optimizations are enabled, which one will take precedence, Delta
>>> > Join
>>> > >> >or Multi-Way Join? What is the relationship between the two?
>>> > >> >
>>> > >> >
>>> > >> >Best,
>>> > >> >Ron
>>> > >> >
>>> > >> >Xuyang <xyzhong...@163.com> 于2025年5月7日周三 10:02写道:
>>> > >> >
>>> > >> >> Hi, Weijie.
>>> > >> >>
>>> > >> >> Thanks for your review! Let me try to answer these questions.
>>> > >> >>
>>> > >> >> 1. The nodes between source and join must be included in the
>>> > whitelist,
>>> > >> >> such as Calc,
>>> > >> >>
>>> > >> >> DropUpdateBefore, WatermarkAssigner, Exchange and etc. You can see
>>> > more
>>> > >> in
>>> > >> >> chapter
>>> > >> >>
>>> > >> >> "Limitations about Delta Join with Eventual Consistency".
>>> > >> >>
>>> > >> >> 2. Currently, we have only one vague plan. First and foremost, we
>>> > need
>>> > >> to
>>> > >> >> ensure that storage
>>> > >> >>
>>> > >> >> engines like Paimon, Fluss and etcs support snapshot awareness
>>> during
>>> > >> >> scans and including
>>> > >> >>
>>> > >> >> snapshots during lookups. Overall, it is likely to be supported in
>>> > >> 2.2/2.3
>>> > >> >> later...
>>> > >> >>
>>> > >> >>
>>> > >> >>
>>> > >> >>
>>> > >> >> --
>>> > >> >>
>>> > >> >>     Best!
>>> > >> >>     Xuyang
>>> > >> >>
>>> > >> >>
>>> > >> >>
>>> > >> >>
>>> > >> >>
>>> > >> >> At 2025-04-29 11:53:48, "weijie guo" <guoweijieres...@gmail.com>
>>> > wrote:
>>> > >> >> >Hi Xuyang,
>>> > >> >> >
>>> > >> >> >Thanks for driving this! The state of two-stream join is a very
>>> > >> >> >headache-inducing problem in stream computing system.
>>> > >> >> >
>>> > >> >> >After reading this FLIP, I have two question:
>>> > >> >> >
>>> > >> >> >1. Can the two sides of the join operator be any stream or must it
>>> > be
>>> > >> >> >immediately followed by the LookupSource? If it is any stream, how
>>> > >> should
>>> > >> >> >the stateful operators in the path be handled?
>>> > >> >> >2. Are there any plans to support strong consistency in the
>>> future?
>>> > >> This
>>> > >> >> >should be helpful for the scenarios of incremental computing.
>>> > >> >> >
>>> > >> >> >
>>> > >> >> >
>>> > >> >> >Best regards,
>>> > >> >> >
>>> > >> >> >Weijie
>>> > >> >> >
>>> > >> >> >
>>> > >> >> >Xuyang <xyzhong...@163.com> 于2025年4月25日周五 10:52写道:
>>> > >> >> >
>>> > >> >> >> Hi, devs.
>>> > >> >> >>
>>> > >> >> >> I'd like to start a discussion on FLIP-486: Introduce a new
>>> > >> >> DeltaJoin[1].
>>> > >> >> >>
>>> > >> >> >>
>>> > >> >> >>
>>> > >> >> >>
>>> > >> >> >> In Flink streaming jobs, the large state of Join nodes has been
>>> a
>>> > >> >> >> persistent concern for users.
>>> > >> >> >>
>>> > >> >> >> Since streaming jobs are long-running, the state of join
>>> generally
>>> > >> >> >> increases in size over time.
>>> > >> >> >>
>>> > >> >> >> Although users can set the state TTL to mitigate this issue, it
>>> is
>>> > >> not
>>> > >> >> >> applicable to all scenarios
>>> > >> >> >>
>>> > >> >> >> and does not provide a fundamental solution.
>>> > >> >> >>
>>> > >> >> >> An oversized state can lead to a series of problems, including
>>> but
>>> > >> not
>>> > >> >> >> limited to:
>>> > >> >> >>
>>> > >> >> >> 1. Resource bottlenecks in individual tasks
>>> > >> >> >>
>>> > >> >> >> 2. Slow checkpointing, which affects job stability during the
>>> > >> >> >> checkpointing process
>>> > >> >> >>
>>> > >> >> >> 3. Long recovery time from state
>>> > >> >> >>
>>> > >> >> >> In addition, when analyzing the join state, we find that in some
>>> > >> >> >> scenarios, the state within the join
>>> > >> >> >>
>>> > >> >> >> actually contains redundant data from the source tables.
>>> > >> >> >>
>>> > >> >> >>
>>> > >> >> >>
>>> > >> >> >>
>>> > >> >> >> To address this issue, we aim to introduce Delta Join, which is
>>> > >> based on
>>> > >> >> >> the core idea of leveraging
>>> > >> >> >>
>>> > >> >> >> a bidirectional lookup join approach to reuse source table data
>>> > as a
>>> > >> >> >> substitute for join state.
>>> > >> >> >>
>>> > >> >> >>
>>> > >> >> >>
>>> > >> >> >>
>>> > >> >> >> You can find more details in this Flip. I'm looking forward to
>>> > your
>>> > >> >> >> comments and feedback.
>>> > >> >> >>
>>> > >> >> >>
>>> > >> >> >>
>>> > >> >> >>
>>> > >> >> >> [1]
>>> > >> >> >>
>>> > >> >>
>>> > >>
>>> >
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-486%3A+Introduce+A+New+DeltaJoin
>>> > >> >> >>
>>> > >> >> >>
>>> > >> >> >>
>>> > >> >> >>
>>> > >> >> >> --
>>> > >> >> >>
>>> > >> >> >>     Best!
>>> > >> >> >>     Xuyang
>>> > >> >>
>>> > >>
>>> >
>>>
>
>
>

Reply via email to