Hi, xuyang Thanks for proposing this FLIP — it’s very helpful for scenarios involving large states.
I have a few questions regarding the node whitelist section: 1. If the TableSource implements FilterPushdownSpec, take LEFT JOIN as an example — when the right side supports filter pushdown, the source should be able to apply the filter. However, in the case of a LookupJoin, it seems that filter conditions cannot be pushed down. In such cases, are the filters re-applied after the join? 2. When there is a Calc node between the source and the join, and the join key involves some computation, does this prevent the plan from being transformed into a Delta Join? Best, Feng On Fri, May 9, 2025 at 11:34 AM Xuyang <xyzhong...@163.com> wrote: > Hi, Ron. Thanks for your attention to this flip. > > 1. At first, inner/left/right/full join will be supported. I have updated > the flip about this part. > > 2. Are you referring to the situation where the downstream primary key > (PK) differs from the upstream join key? Once the sink requires the > upstream to send complete -U and +U data to satisfy its idempotent update > (for example, 1. when the sink PK does not match the upstream upsert key, > it results in a sink materialization node, or 2. when the sink is a retract > sink instead of an upsert sink), a deduplication node will be introduced > after the delta join to replay the +U data as both -U and +U. More details > can be found in section "When and How to Convert." Please correct if I > mistake your mean. > > 3. Yes, the majority of the design can be shared, including the index API, > optimization phases, and parts of the operator implementation. The main > difference is that strong consistency semantics require additional > interfaces to establish certain agreements with the connector, such as > whether snapshot scan reading and snapshot lookup are supported. > > 4. You've raised an important point: the introduction of a cache is not so > straightforward. Each side has its own cache that stores remote data from > this side's table. When one side receives data, it needs to perform the > following steps: > > 1). Attempt to update its own cache: > > 1.1) If there is a cache, update it. > > 1.2) If there is no cache, skip the update. > > 2). Query the other side's cache or external table to perform the join: > > 2.1) If the other side has a cache, query it directly. > > 2.2) If the other side does not have a cache, query the external table and > establish a cache on that side. > > In brief, the cache is established using data from the other side and is > updated with data from its own side. > > 5. I've thought about this issue as well, that is to support *batch* > lookup function. This feature would not only be beneficial for delta joins, > but it would also be very useful for regular lookup syntax. However, it may > be necessary to pursue a separate FLIP to address this. > > 6. Schema with indexes in the table can be retrieved by using the > `getTable` on `Catalog`. I do not want the engine to create indexes > automatically. Similar to primary keys, even if we introduce index syntax, > it should be marked as a `NOT ENFORCED` constraint. > > 7. Thank you for the reminder. I noticed that Multi-Way Join is disabled > by default. In simple terms, users can adjust various configs to enable > either Multi-Way Join or Delta Join. If both are enabled, the Cost-Based > Optimizer can help us choose the appropriate algorithm, although the > current table cost is not accurate. Looking ahead, Delta Join will also > support multi-level join scenarios, referred to as Multi-Way Delta Join. > > > > > -- > > Best! > Xuyang > > > > > > At 2025-05-08 14:55:42, "Ron Liu" <ron9....@gmail.com> wrote: > >Hi, Xuyang > > > >Thanks for initiating this FLIP, which is of great value in solving the > >Streaming Join that troubles many users. Big +1 for it. > > > >For the overall design of FLIP, I have the following questions: > > > >1. Can you explain the currently supported Join types in FLIP, such as > >Inner, Left, and Right Join? > > > >2. You mentioned strong consistency semantics and eventual consistency > >semantics, and gave the corresponding derivation formulas. Under eventual > >consistency semantics, > >you mentioned that some intermediate data will be introduced. What > confuses > >me here is how to achieve eventual consistency semantics if Sink does not > >support idempotent updates by Join key, after all, the Join operator > >outputs some duplicate data. > > > >3. Due to storage reasons, the first step is to only consider supporting > >eventual consistency semantics, but Paimon and other lake storages support > >Snapshot, which can achieve strong consistency semantics. If strong > >consistency is considered in the future, can it be well implemented based > >on the current design? > > > >4. Regarding the LRU cache part, if the record in the source table is > >updated, how to automatically update the cache, otherwise I feel that the > >wrong data may be found from the cache. > > > >5. In terms of implementation, have you considered batching input records > >and then querying them in batches to improve operator performance? > > > >6. Since Flink SQL syntax does not support indexes now, how can you create > >indexes on the source table and pass them to Flink? I mean, how can users > >use Delta Join? > > > >7. FLIP-516 solves the Streaming Join problem via Multi-Way Join. When > both > >Join optimizations are enabled, which one will take precedence, Delta Join > >or Multi-Way Join? What is the relationship between the two? > > > > > >Best, > >Ron > > > >Xuyang <xyzhong...@163.com> 于2025年5月7日周三 10:02写道: > > > >> Hi, Weijie. > >> > >> Thanks for your review! Let me try to answer these questions. > >> > >> 1. The nodes between source and join must be included in the whitelist, > >> such as Calc, > >> > >> DropUpdateBefore, WatermarkAssigner, Exchange and etc. You can see more > in > >> chapter > >> > >> "Limitations about Delta Join with Eventual Consistency". > >> > >> 2. Currently, we have only one vague plan. First and foremost, we need > to > >> ensure that storage > >> > >> engines like Paimon, Fluss and etcs support snapshot awareness during > >> scans and including > >> > >> snapshots during lookups. Overall, it is likely to be supported in > 2.2/2.3 > >> later... > >> > >> > >> > >> > >> -- > >> > >> Best! > >> Xuyang > >> > >> > >> > >> > >> > >> At 2025-04-29 11:53:48, "weijie guo" <guoweijieres...@gmail.com> wrote: > >> >Hi Xuyang, > >> > > >> >Thanks for driving this! The state of two-stream join is a very > >> >headache-inducing problem in stream computing system. > >> > > >> >After reading this FLIP, I have two question: > >> > > >> >1. Can the two sides of the join operator be any stream or must it be > >> >immediately followed by the LookupSource? If it is any stream, how > should > >> >the stateful operators in the path be handled? > >> >2. Are there any plans to support strong consistency in the future? > This > >> >should be helpful for the scenarios of incremental computing. > >> > > >> > > >> > > >> >Best regards, > >> > > >> >Weijie > >> > > >> > > >> >Xuyang <xyzhong...@163.com> 于2025年4月25日周五 10:52写道: > >> > > >> >> Hi, devs. > >> >> > >> >> I'd like to start a discussion on FLIP-486: Introduce a new > >> DeltaJoin[1]. > >> >> > >> >> > >> >> > >> >> > >> >> In Flink streaming jobs, the large state of Join nodes has been a > >> >> persistent concern for users. > >> >> > >> >> Since streaming jobs are long-running, the state of join generally > >> >> increases in size over time. > >> >> > >> >> Although users can set the state TTL to mitigate this issue, it is > not > >> >> applicable to all scenarios > >> >> > >> >> and does not provide a fundamental solution. > >> >> > >> >> An oversized state can lead to a series of problems, including but > not > >> >> limited to: > >> >> > >> >> 1. Resource bottlenecks in individual tasks > >> >> > >> >> 2. Slow checkpointing, which affects job stability during the > >> >> checkpointing process > >> >> > >> >> 3. Long recovery time from state > >> >> > >> >> In addition, when analyzing the join state, we find that in some > >> >> scenarios, the state within the join > >> >> > >> >> actually contains redundant data from the source tables. > >> >> > >> >> > >> >> > >> >> > >> >> To address this issue, we aim to introduce Delta Join, which is > based on > >> >> the core idea of leveraging > >> >> > >> >> a bidirectional lookup join approach to reuse source table data as a > >> >> substitute for join state. > >> >> > >> >> > >> >> > >> >> > >> >> You can find more details in this Flip. I'm looking forward to your > >> >> comments and feedback. > >> >> > >> >> > >> >> > >> >> > >> >> [1] > >> >> > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-486%3A+Introduce+A+New+DeltaJoin > >> >> > >> >> > >> >> > >> >> > >> >> -- > >> >> > >> >> Best! > >> >> Xuyang > >> >