Hi, devs. Due to the long duration of this discussion. ff there are no further objections, I will start the voting thread tomorrow.
-- Best! Xuyang At 2025-05-19 10:05:46, "Xuyang" <xyzhong...@163.com> wrote: >Hi, Xiangyu. > >You are correct that the lookup function on the lookup side should not use >caching. Therefore, we need to unwrap the actual lookup function from the >CachingLookupFunction(instead of updating it in place). We assume that the >cached function utilized by the lookup table is based on >CachingLookupFunction; otherwise, there may be some issues. Additionally, >since only source that proactively report index will optimize to a delta join, >there should not be any issues with historical jobs. > > > > >-- > > Best! > Xuyang > > > > > >At 2025-05-16 01:07:21, "xiangyu feng" <xiangyu...@gmail.com> wrote: >>Hi Xuyang, >> >>Thx for driving this FLIP. I believe delta join is truly helpful for large >>state scenarios. The overall design LGTM. Also, I like the idea of updating >>the cache using the data from its own side. >> >>I have one minor question: for storages like Paimon, it has its own cache >>in its LookupFunction. There can be inconsistencies between the cache in >>the DeltaJoinOperator and the cache inside the LookupFunticon, because the >>incoming data will only update the former. >> >>How should we address this issue, is it possible the incoming data could >>also update the cache inside LookupFunction? Or we can assume that all >>LookupFunction with cache should extend `CachingLookupFunction`, then we >>can use the incoming data to update its `LookupCache`. Best, >>Xiangyu Feng >> >>Ron Liu <ron9....@gmail.com> 于2025年5月15日周四 10:53写道: >> >>> Hi, Xuyang >>> >>> Thanks for your reply, looks good to me. >>> >>> Best, >>> Ron >>> >>> Xuyang <xyzhong...@163.com> 于2025年5月13日周二 15:00写道: >>> >>> > Hi, Feng. Let me address your questions: >>> > >>> > 1. As you mentioned, these filters should be recalculated. We can apply >>> > the filters after lookup the source, >>> > >>> > and then perform the join. Moreover, we can consider pushing the filters >>> > down to the lookup source to >>> > >>> > achieve a more efficient lookup. >>> > >>> > 2. You are correct that in the Calc, the join key must be referenced >>> > directly without any transformations. >>> > >>> > Otherwise, we won’t be able to revert the transformed join key back to >>> its >>> > original ones. Let me clarify this >>> > >>> > part in the Flip. >>> > >>> > >>> > >>> > >>> > -- >>> > >>> > Best! >>> > Xuyang >>> > >>> > >>> > >>> > >>> > >>> > 在 2025-05-12 10:33:41,"Feng Jin" <jinfeng1...@gmail.com> 写道: >>> > >Hi, xuyang >>> > > >>> > >Thanks for proposing this FLIP — it’s very helpful for scenarios >>> involving >>> > >large states. >>> > > >>> > >I have a few questions regarding the node whitelist section: >>> > > >>> > >1. If the TableSource implements FilterPushdownSpec, take LEFT JOIN as >>> an >>> > >example — when the right side supports filter pushdown, the source >>> should >>> > >be able to apply the filter. However, in the case of a LookupJoin, it >>> > seems >>> > >that filter conditions cannot be pushed down. In such cases, are the >>> > >filters re-applied after the join? >>> > > >>> > >2. When there is a Calc node between the source and the join, and the >>> join >>> > >key involves some computation, does this prevent the plan from being >>> > >transformed into a Delta Join? >>> > > >>> > > >>> > >Best, >>> > >Feng >>> > > >>> > > >>> > >On Fri, May 9, 2025 at 11:34 AM Xuyang <xyzhong...@163.com> wrote: >>> > > >>> > >> Hi, Ron. Thanks for your attention to this flip. >>> > >> >>> > >> 1. At first, inner/left/right/full join will be supported. I have >>> > updated >>> > >> the flip about this part. >>> > >> >>> > >> 2. Are you referring to the situation where the downstream primary key >>> > >> (PK) differs from the upstream join key? Once the sink requires the >>> > >> upstream to send complete -U and +U data to satisfy its idempotent >>> > update >>> > >> (for example, 1. when the sink PK does not match the upstream upsert >>> > key, >>> > >> it results in a sink materialization node, or 2. when the sink is a >>> > retract >>> > >> sink instead of an upsert sink), a deduplication node will be >>> introduced >>> > >> after the delta join to replay the +U data as both -U and +U. More >>> > details >>> > >> can be found in section "When and How to Convert." Please correct if I >>> > >> mistake your mean. >>> > >> >>> > >> 3. Yes, the majority of the design can be shared, including the index >>> > API, >>> > >> optimization phases, and parts of the operator implementation. The >>> main >>> > >> difference is that strong consistency semantics require additional >>> > >> interfaces to establish certain agreements with the connector, such as >>> > >> whether snapshot scan reading and snapshot lookup are supported. >>> > >> >>> > >> 4. You've raised an important point: the introduction of a cache is >>> not >>> > so >>> > >> straightforward. Each side has its own cache that stores remote data >>> > from >>> > >> this side's table. When one side receives data, it needs to perform >>> the >>> > >> following steps: >>> > >> >>> > >> 1). Attempt to update its own cache: >>> > >> >>> > >> 1.1) If there is a cache, update it. >>> > >> >>> > >> 1.2) If there is no cache, skip the update. >>> > >> >>> > >> 2). Query the other side's cache or external table to perform the >>> join: >>> > >> >>> > >> 2.1) If the other side has a cache, query it directly. >>> > >> >>> > >> 2.2) If the other side does not have a cache, query the external table >>> > and >>> > >> establish a cache on that side. >>> > >> >>> > >> In brief, the cache is established using data from the other side and >>> is >>> > >> updated with data from its own side. >>> > >> >>> > >> 5. I've thought about this issue as well, that is to support *batch* >>> > >> lookup function. This feature would not only be beneficial for delta >>> > joins, >>> > >> but it would also be very useful for regular lookup syntax. However, >>> it >>> > may >>> > >> be necessary to pursue a separate FLIP to address this. >>> > >> >>> > >> 6. Schema with indexes in the table can be retrieved by using the >>> > >> `getTable` on `Catalog`. I do not want the engine to create indexes >>> > >> automatically. Similar to primary keys, even if we introduce index >>> > syntax, >>> > >> it should be marked as a `NOT ENFORCED` constraint. >>> > >> >>> > >> 7. Thank you for the reminder. I noticed that Multi-Way Join is >>> disabled >>> > >> by default. In simple terms, users can adjust various configs to >>> enable >>> > >> either Multi-Way Join or Delta Join. If both are enabled, the >>> Cost-Based >>> > >> Optimizer can help us choose the appropriate algorithm, although the >>> > >> current table cost is not accurate. Looking ahead, Delta Join will >>> also >>> > >> support multi-level join scenarios, referred to as Multi-Way Delta >>> Join. >>> > >> >>> > >> >>> > >> >>> > >> >>> > >> -- >>> > >> >>> > >> Best! >>> > >> Xuyang >>> > >> >>> > >> >>> > >> >>> > >> >>> > >> >>> > >> At 2025-05-08 14:55:42, "Ron Liu" <ron9....@gmail.com> wrote: >>> > >> >Hi, Xuyang >>> > >> > >>> > >> >Thanks for initiating this FLIP, which is of great value in solving >>> the >>> > >> >Streaming Join that troubles many users. Big +1 for it. >>> > >> > >>> > >> >For the overall design of FLIP, I have the following questions: >>> > >> > >>> > >> >1. Can you explain the currently supported Join types in FLIP, such >>> as >>> > >> >Inner, Left, and Right Join? >>> > >> > >>> > >> >2. You mentioned strong consistency semantics and eventual >>> consistency >>> > >> >semantics, and gave the corresponding derivation formulas. Under >>> > eventual >>> > >> >consistency semantics, >>> > >> >you mentioned that some intermediate data will be introduced. What >>> > >> confuses >>> > >> >me here is how to achieve eventual consistency semantics if Sink does >>> > not >>> > >> >support idempotent updates by Join key, after all, the Join operator >>> > >> >outputs some duplicate data. >>> > >> > >>> > >> >3. Due to storage reasons, the first step is to only consider >>> > supporting >>> > >> >eventual consistency semantics, but Paimon and other lake storages >>> > support >>> > >> >Snapshot, which can achieve strong consistency semantics. If strong >>> > >> >consistency is considered in the future, can it be well implemented >>> > based >>> > >> >on the current design? >>> > >> > >>> > >> >4. Regarding the LRU cache part, if the record in the source table is >>> > >> >updated, how to automatically update the cache, otherwise I feel that >>> > the >>> > >> >wrong data may be found from the cache. >>> > >> > >>> > >> >5. In terms of implementation, have you considered batching input >>> > records >>> > >> >and then querying them in batches to improve operator performance? >>> > >> > >>> > >> >6. Since Flink SQL syntax does not support indexes now, how can you >>> > create >>> > >> >indexes on the source table and pass them to Flink? I mean, how can >>> > users >>> > >> >use Delta Join? >>> > >> > >>> > >> >7. FLIP-516 solves the Streaming Join problem via Multi-Way Join. >>> When >>> > >> both >>> > >> >Join optimizations are enabled, which one will take precedence, Delta >>> > Join >>> > >> >or Multi-Way Join? What is the relationship between the two? >>> > >> > >>> > >> > >>> > >> >Best, >>> > >> >Ron >>> > >> > >>> > >> >Xuyang <xyzhong...@163.com> 于2025年5月7日周三 10:02写道: >>> > >> > >>> > >> >> Hi, Weijie. >>> > >> >> >>> > >> >> Thanks for your review! Let me try to answer these questions. >>> > >> >> >>> > >> >> 1. The nodes between source and join must be included in the >>> > whitelist, >>> > >> >> such as Calc, >>> > >> >> >>> > >> >> DropUpdateBefore, WatermarkAssigner, Exchange and etc. You can see >>> > more >>> > >> in >>> > >> >> chapter >>> > >> >> >>> > >> >> "Limitations about Delta Join with Eventual Consistency". >>> > >> >> >>> > >> >> 2. Currently, we have only one vague plan. First and foremost, we >>> > need >>> > >> to >>> > >> >> ensure that storage >>> > >> >> >>> > >> >> engines like Paimon, Fluss and etcs support snapshot awareness >>> during >>> > >> >> scans and including >>> > >> >> >>> > >> >> snapshots during lookups. Overall, it is likely to be supported in >>> > >> 2.2/2.3 >>> > >> >> later... >>> > >> >> >>> > >> >> >>> > >> >> >>> > >> >> >>> > >> >> -- >>> > >> >> >>> > >> >> Best! >>> > >> >> Xuyang >>> > >> >> >>> > >> >> >>> > >> >> >>> > >> >> >>> > >> >> >>> > >> >> At 2025-04-29 11:53:48, "weijie guo" <guoweijieres...@gmail.com> >>> > wrote: >>> > >> >> >Hi Xuyang, >>> > >> >> > >>> > >> >> >Thanks for driving this! The state of two-stream join is a very >>> > >> >> >headache-inducing problem in stream computing system. >>> > >> >> > >>> > >> >> >After reading this FLIP, I have two question: >>> > >> >> > >>> > >> >> >1. Can the two sides of the join operator be any stream or must it >>> > be >>> > >> >> >immediately followed by the LookupSource? If it is any stream, how >>> > >> should >>> > >> >> >the stateful operators in the path be handled? >>> > >> >> >2. Are there any plans to support strong consistency in the >>> future? >>> > >> This >>> > >> >> >should be helpful for the scenarios of incremental computing. >>> > >> >> > >>> > >> >> > >>> > >> >> > >>> > >> >> >Best regards, >>> > >> >> > >>> > >> >> >Weijie >>> > >> >> > >>> > >> >> > >>> > >> >> >Xuyang <xyzhong...@163.com> 于2025年4月25日周五 10:52写道: >>> > >> >> > >>> > >> >> >> Hi, devs. >>> > >> >> >> >>> > >> >> >> I'd like to start a discussion on FLIP-486: Introduce a new >>> > >> >> DeltaJoin[1]. >>> > >> >> >> >>> > >> >> >> >>> > >> >> >> >>> > >> >> >> >>> > >> >> >> In Flink streaming jobs, the large state of Join nodes has been >>> a >>> > >> >> >> persistent concern for users. >>> > >> >> >> >>> > >> >> >> Since streaming jobs are long-running, the state of join >>> generally >>> > >> >> >> increases in size over time. >>> > >> >> >> >>> > >> >> >> Although users can set the state TTL to mitigate this issue, it >>> is >>> > >> not >>> > >> >> >> applicable to all scenarios >>> > >> >> >> >>> > >> >> >> and does not provide a fundamental solution. >>> > >> >> >> >>> > >> >> >> An oversized state can lead to a series of problems, including >>> but >>> > >> not >>> > >> >> >> limited to: >>> > >> >> >> >>> > >> >> >> 1. Resource bottlenecks in individual tasks >>> > >> >> >> >>> > >> >> >> 2. Slow checkpointing, which affects job stability during the >>> > >> >> >> checkpointing process >>> > >> >> >> >>> > >> >> >> 3. Long recovery time from state >>> > >> >> >> >>> > >> >> >> In addition, when analyzing the join state, we find that in some >>> > >> >> >> scenarios, the state within the join >>> > >> >> >> >>> > >> >> >> actually contains redundant data from the source tables. >>> > >> >> >> >>> > >> >> >> >>> > >> >> >> >>> > >> >> >> >>> > >> >> >> To address this issue, we aim to introduce Delta Join, which is >>> > >> based on >>> > >> >> >> the core idea of leveraging >>> > >> >> >> >>> > >> >> >> a bidirectional lookup join approach to reuse source table data >>> > as a >>> > >> >> >> substitute for join state. >>> > >> >> >> >>> > >> >> >> >>> > >> >> >> >>> > >> >> >> >>> > >> >> >> You can find more details in this Flip. I'm looking forward to >>> > your >>> > >> >> >> comments and feedback. >>> > >> >> >> >>> > >> >> >> >>> > >> >> >> >>> > >> >> >> >>> > >> >> >> [1] >>> > >> >> >> >>> > >> >> >>> > >> >>> > >>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-486%3A+Introduce+A+New+DeltaJoin >>> > >> >> >> >>> > >> >> >> >>> > >> >> >> >>> > >> >> >> >>> > >> >> >> -- >>> > >> >> >> >>> > >> >> >> Best! >>> > >> >> >> Xuyang >>> > >> >> >>> > >> >>> > >>> > > >