Hi, Xuyang Thanks for your reply, looks good to me.
Best, Ron Xuyang <xyzhong...@163.com> 于2025年5月13日周二 15:00写道: > Hi, Feng. Let me address your questions: > > 1. As you mentioned, these filters should be recalculated. We can apply > the filters after lookup the source, > > and then perform the join. Moreover, we can consider pushing the filters > down to the lookup source to > > achieve a more efficient lookup. > > 2. You are correct that in the Calc, the join key must be referenced > directly without any transformations. > > Otherwise, we won’t be able to revert the transformed join key back to its > original ones. Let me clarify this > > part in the Flip. > > > > > -- > > Best! > Xuyang > > > > > > 在 2025-05-12 10:33:41,"Feng Jin" <jinfeng1...@gmail.com> 写道: > >Hi, xuyang > > > >Thanks for proposing this FLIP — it’s very helpful for scenarios involving > >large states. > > > >I have a few questions regarding the node whitelist section: > > > >1. If the TableSource implements FilterPushdownSpec, take LEFT JOIN as an > >example — when the right side supports filter pushdown, the source should > >be able to apply the filter. However, in the case of a LookupJoin, it > seems > >that filter conditions cannot be pushed down. In such cases, are the > >filters re-applied after the join? > > > >2. When there is a Calc node between the source and the join, and the join > >key involves some computation, does this prevent the plan from being > >transformed into a Delta Join? > > > > > >Best, > >Feng > > > > > >On Fri, May 9, 2025 at 11:34 AM Xuyang <xyzhong...@163.com> wrote: > > > >> Hi, Ron. Thanks for your attention to this flip. > >> > >> 1. At first, inner/left/right/full join will be supported. I have > updated > >> the flip about this part. > >> > >> 2. Are you referring to the situation where the downstream primary key > >> (PK) differs from the upstream join key? Once the sink requires the > >> upstream to send complete -U and +U data to satisfy its idempotent > update > >> (for example, 1. when the sink PK does not match the upstream upsert > key, > >> it results in a sink materialization node, or 2. when the sink is a > retract > >> sink instead of an upsert sink), a deduplication node will be introduced > >> after the delta join to replay the +U data as both -U and +U. More > details > >> can be found in section "When and How to Convert." Please correct if I > >> mistake your mean. > >> > >> 3. Yes, the majority of the design can be shared, including the index > API, > >> optimization phases, and parts of the operator implementation. The main > >> difference is that strong consistency semantics require additional > >> interfaces to establish certain agreements with the connector, such as > >> whether snapshot scan reading and snapshot lookup are supported. > >> > >> 4. You've raised an important point: the introduction of a cache is not > so > >> straightforward. Each side has its own cache that stores remote data > from > >> this side's table. When one side receives data, it needs to perform the > >> following steps: > >> > >> 1). Attempt to update its own cache: > >> > >> 1.1) If there is a cache, update it. > >> > >> 1.2) If there is no cache, skip the update. > >> > >> 2). Query the other side's cache or external table to perform the join: > >> > >> 2.1) If the other side has a cache, query it directly. > >> > >> 2.2) If the other side does not have a cache, query the external table > and > >> establish a cache on that side. > >> > >> In brief, the cache is established using data from the other side and is > >> updated with data from its own side. > >> > >> 5. I've thought about this issue as well, that is to support *batch* > >> lookup function. This feature would not only be beneficial for delta > joins, > >> but it would also be very useful for regular lookup syntax. However, it > may > >> be necessary to pursue a separate FLIP to address this. > >> > >> 6. Schema with indexes in the table can be retrieved by using the > >> `getTable` on `Catalog`. I do not want the engine to create indexes > >> automatically. Similar to primary keys, even if we introduce index > syntax, > >> it should be marked as a `NOT ENFORCED` constraint. > >> > >> 7. Thank you for the reminder. I noticed that Multi-Way Join is disabled > >> by default. In simple terms, users can adjust various configs to enable > >> either Multi-Way Join or Delta Join. If both are enabled, the Cost-Based > >> Optimizer can help us choose the appropriate algorithm, although the > >> current table cost is not accurate. Looking ahead, Delta Join will also > >> support multi-level join scenarios, referred to as Multi-Way Delta Join. > >> > >> > >> > >> > >> -- > >> > >> Best! > >> Xuyang > >> > >> > >> > >> > >> > >> At 2025-05-08 14:55:42, "Ron Liu" <ron9....@gmail.com> wrote: > >> >Hi, Xuyang > >> > > >> >Thanks for initiating this FLIP, which is of great value in solving the > >> >Streaming Join that troubles many users. Big +1 for it. > >> > > >> >For the overall design of FLIP, I have the following questions: > >> > > >> >1. Can you explain the currently supported Join types in FLIP, such as > >> >Inner, Left, and Right Join? > >> > > >> >2. You mentioned strong consistency semantics and eventual consistency > >> >semantics, and gave the corresponding derivation formulas. Under > eventual > >> >consistency semantics, > >> >you mentioned that some intermediate data will be introduced. What > >> confuses > >> >me here is how to achieve eventual consistency semantics if Sink does > not > >> >support idempotent updates by Join key, after all, the Join operator > >> >outputs some duplicate data. > >> > > >> >3. Due to storage reasons, the first step is to only consider > supporting > >> >eventual consistency semantics, but Paimon and other lake storages > support > >> >Snapshot, which can achieve strong consistency semantics. If strong > >> >consistency is considered in the future, can it be well implemented > based > >> >on the current design? > >> > > >> >4. Regarding the LRU cache part, if the record in the source table is > >> >updated, how to automatically update the cache, otherwise I feel that > the > >> >wrong data may be found from the cache. > >> > > >> >5. In terms of implementation, have you considered batching input > records > >> >and then querying them in batches to improve operator performance? > >> > > >> >6. Since Flink SQL syntax does not support indexes now, how can you > create > >> >indexes on the source table and pass them to Flink? I mean, how can > users > >> >use Delta Join? > >> > > >> >7. FLIP-516 solves the Streaming Join problem via Multi-Way Join. When > >> both > >> >Join optimizations are enabled, which one will take precedence, Delta > Join > >> >or Multi-Way Join? What is the relationship between the two? > >> > > >> > > >> >Best, > >> >Ron > >> > > >> >Xuyang <xyzhong...@163.com> 于2025年5月7日周三 10:02写道: > >> > > >> >> Hi, Weijie. > >> >> > >> >> Thanks for your review! Let me try to answer these questions. > >> >> > >> >> 1. The nodes between source and join must be included in the > whitelist, > >> >> such as Calc, > >> >> > >> >> DropUpdateBefore, WatermarkAssigner, Exchange and etc. You can see > more > >> in > >> >> chapter > >> >> > >> >> "Limitations about Delta Join with Eventual Consistency". > >> >> > >> >> 2. Currently, we have only one vague plan. First and foremost, we > need > >> to > >> >> ensure that storage > >> >> > >> >> engines like Paimon, Fluss and etcs support snapshot awareness during > >> >> scans and including > >> >> > >> >> snapshots during lookups. Overall, it is likely to be supported in > >> 2.2/2.3 > >> >> later... > >> >> > >> >> > >> >> > >> >> > >> >> -- > >> >> > >> >> Best! > >> >> Xuyang > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> At 2025-04-29 11:53:48, "weijie guo" <guoweijieres...@gmail.com> > wrote: > >> >> >Hi Xuyang, > >> >> > > >> >> >Thanks for driving this! The state of two-stream join is a very > >> >> >headache-inducing problem in stream computing system. > >> >> > > >> >> >After reading this FLIP, I have two question: > >> >> > > >> >> >1. Can the two sides of the join operator be any stream or must it > be > >> >> >immediately followed by the LookupSource? If it is any stream, how > >> should > >> >> >the stateful operators in the path be handled? > >> >> >2. Are there any plans to support strong consistency in the future? > >> This > >> >> >should be helpful for the scenarios of incremental computing. > >> >> > > >> >> > > >> >> > > >> >> >Best regards, > >> >> > > >> >> >Weijie > >> >> > > >> >> > > >> >> >Xuyang <xyzhong...@163.com> 于2025年4月25日周五 10:52写道: > >> >> > > >> >> >> Hi, devs. > >> >> >> > >> >> >> I'd like to start a discussion on FLIP-486: Introduce a new > >> >> DeltaJoin[1]. > >> >> >> > >> >> >> > >> >> >> > >> >> >> > >> >> >> In Flink streaming jobs, the large state of Join nodes has been a > >> >> >> persistent concern for users. > >> >> >> > >> >> >> Since streaming jobs are long-running, the state of join generally > >> >> >> increases in size over time. > >> >> >> > >> >> >> Although users can set the state TTL to mitigate this issue, it is > >> not > >> >> >> applicable to all scenarios > >> >> >> > >> >> >> and does not provide a fundamental solution. > >> >> >> > >> >> >> An oversized state can lead to a series of problems, including but > >> not > >> >> >> limited to: > >> >> >> > >> >> >> 1. Resource bottlenecks in individual tasks > >> >> >> > >> >> >> 2. Slow checkpointing, which affects job stability during the > >> >> >> checkpointing process > >> >> >> > >> >> >> 3. Long recovery time from state > >> >> >> > >> >> >> In addition, when analyzing the join state, we find that in some > >> >> >> scenarios, the state within the join > >> >> >> > >> >> >> actually contains redundant data from the source tables. > >> >> >> > >> >> >> > >> >> >> > >> >> >> > >> >> >> To address this issue, we aim to introduce Delta Join, which is > >> based on > >> >> >> the core idea of leveraging > >> >> >> > >> >> >> a bidirectional lookup join approach to reuse source table data > as a > >> >> >> substitute for join state. > >> >> >> > >> >> >> > >> >> >> > >> >> >> > >> >> >> You can find more details in this Flip. I'm looking forward to > your > >> >> >> comments and feedback. > >> >> >> > >> >> >> > >> >> >> > >> >> >> > >> >> >> [1] > >> >> >> > >> >> > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-486%3A+Introduce+A+New+DeltaJoin > >> >> >> > >> >> >> > >> >> >> > >> >> >> > >> >> >> -- > >> >> >> > >> >> >> Best! > >> >> >> Xuyang > >> >> > >> >