Hey Xuyang, Happy to see the FLIP being approved. Could you reply to the questions above if you have some time?
Thanks, Gustavo On Tue, 20 May 2025 at 08:56, Gustavo de Morais <gustavopg...@gmail.com> wrote: > Hey Xuyang, > > Thanks for proposing this and driving the discussion. In general, a very > interesting idea. > > - Can you go a bit in detail on how the optimizer will work in AUTO/FORCE > modes? When are we returning an error or falling back to the regular > streaming join operator? Do we support, for example, a delta join for the > first level and then regular join operator for the next join levels or will > the optimizer just return an error if you have FORCE set and use multiple > chained joins? > > - FLIP-516 adds support for a multiple join operator, which eliminates > intermediate state between joins. I see the potential of combining this > with FLIP-516 for some use cases. The same strategy described here could be > used in combination with it to also reduce the amount of state used by the > input streams in a second interaction. Or did you have a different strategy > to approach cascading joins? > > Best, > Gustavo > > On Thu, 15 May 2025 at 04:53, Ron Liu <ron9....@gmail.com> wrote: > >> Hi, Xuyang >> >> Thanks for your reply, looks good to me. >> >> Best, >> Ron >> >> Xuyang <xyzhong...@163.com> 于2025年5月13日周二 15:00写道: >> >> > Hi, Feng. Let me address your questions: >> > >> > 1. As you mentioned, these filters should be recalculated. We can apply >> > the filters after lookup the source, >> > >> > and then perform the join. Moreover, we can consider pushing the filters >> > down to the lookup source to >> > >> > achieve a more efficient lookup. >> > >> > 2. You are correct that in the Calc, the join key must be referenced >> > directly without any transformations. >> > >> > Otherwise, we won’t be able to revert the transformed join key back to >> its >> > original ones. Let me clarify this >> > >> > part in the Flip. >> > >> > >> > >> > >> > -- >> > >> > Best! >> > Xuyang >> > >> > >> > >> > >> > >> > 在 2025-05-12 10:33:41,"Feng Jin" <jinfeng1...@gmail.com> 写道: >> > >Hi, xuyang >> > > >> > >Thanks for proposing this FLIP — it’s very helpful for scenarios >> involving >> > >large states. >> > > >> > >I have a few questions regarding the node whitelist section: >> > > >> > >1. If the TableSource implements FilterPushdownSpec, take LEFT JOIN as >> an >> > >example — when the right side supports filter pushdown, the source >> should >> > >be able to apply the filter. However, in the case of a LookupJoin, it >> > seems >> > >that filter conditions cannot be pushed down. In such cases, are the >> > >filters re-applied after the join? >> > > >> > >2. When there is a Calc node between the source and the join, and the >> join >> > >key involves some computation, does this prevent the plan from being >> > >transformed into a Delta Join? >> > > >> > > >> > >Best, >> > >Feng >> > > >> > > >> > >On Fri, May 9, 2025 at 11:34 AM Xuyang <xyzhong...@163.com> wrote: >> > > >> > >> Hi, Ron. Thanks for your attention to this flip. >> > >> >> > >> 1. At first, inner/left/right/full join will be supported. I have >> > updated >> > >> the flip about this part. >> > >> >> > >> 2. Are you referring to the situation where the downstream primary >> key >> > >> (PK) differs from the upstream join key? Once the sink requires the >> > >> upstream to send complete -U and +U data to satisfy its idempotent >> > update >> > >> (for example, 1. when the sink PK does not match the upstream upsert >> > key, >> > >> it results in a sink materialization node, or 2. when the sink is a >> > retract >> > >> sink instead of an upsert sink), a deduplication node will be >> introduced >> > >> after the delta join to replay the +U data as both -U and +U. More >> > details >> > >> can be found in section "When and How to Convert." Please correct if >> I >> > >> mistake your mean. >> > >> >> > >> 3. Yes, the majority of the design can be shared, including the index >> > API, >> > >> optimization phases, and parts of the operator implementation. The >> main >> > >> difference is that strong consistency semantics require additional >> > >> interfaces to establish certain agreements with the connector, such >> as >> > >> whether snapshot scan reading and snapshot lookup are supported. >> > >> >> > >> 4. You've raised an important point: the introduction of a cache is >> not >> > so >> > >> straightforward. Each side has its own cache that stores remote data >> > from >> > >> this side's table. When one side receives data, it needs to perform >> the >> > >> following steps: >> > >> >> > >> 1). Attempt to update its own cache: >> > >> >> > >> 1.1) If there is a cache, update it. >> > >> >> > >> 1.2) If there is no cache, skip the update. >> > >> >> > >> 2). Query the other side's cache or external table to perform the >> join: >> > >> >> > >> 2.1) If the other side has a cache, query it directly. >> > >> >> > >> 2.2) If the other side does not have a cache, query the external >> table >> > and >> > >> establish a cache on that side. >> > >> >> > >> In brief, the cache is established using data from the other side >> and is >> > >> updated with data from its own side. >> > >> >> > >> 5. I've thought about this issue as well, that is to support *batch* >> > >> lookup function. This feature would not only be beneficial for delta >> > joins, >> > >> but it would also be very useful for regular lookup syntax. However, >> it >> > may >> > >> be necessary to pursue a separate FLIP to address this. >> > >> >> > >> 6. Schema with indexes in the table can be retrieved by using the >> > >> `getTable` on `Catalog`. I do not want the engine to create indexes >> > >> automatically. Similar to primary keys, even if we introduce index >> > syntax, >> > >> it should be marked as a `NOT ENFORCED` constraint. >> > >> >> > >> 7. Thank you for the reminder. I noticed that Multi-Way Join is >> disabled >> > >> by default. In simple terms, users can adjust various configs to >> enable >> > >> either Multi-Way Join or Delta Join. If both are enabled, the >> Cost-Based >> > >> Optimizer can help us choose the appropriate algorithm, although the >> > >> current table cost is not accurate. Looking ahead, Delta Join will >> also >> > >> support multi-level join scenarios, referred to as Multi-Way Delta >> Join. >> > >> >> > >> >> > >> >> > >> >> > >> -- >> > >> >> > >> Best! >> > >> Xuyang >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> At 2025-05-08 14:55:42, "Ron Liu" <ron9....@gmail.com> wrote: >> > >> >Hi, Xuyang >> > >> > >> > >> >Thanks for initiating this FLIP, which is of great value in solving >> the >> > >> >Streaming Join that troubles many users. Big +1 for it. >> > >> > >> > >> >For the overall design of FLIP, I have the following questions: >> > >> > >> > >> >1. Can you explain the currently supported Join types in FLIP, such >> as >> > >> >Inner, Left, and Right Join? >> > >> > >> > >> >2. You mentioned strong consistency semantics and eventual >> consistency >> > >> >semantics, and gave the corresponding derivation formulas. Under >> > eventual >> > >> >consistency semantics, >> > >> >you mentioned that some intermediate data will be introduced. What >> > >> confuses >> > >> >me here is how to achieve eventual consistency semantics if Sink >> does >> > not >> > >> >support idempotent updates by Join key, after all, the Join operator >> > >> >outputs some duplicate data. >> > >> > >> > >> >3. Due to storage reasons, the first step is to only consider >> > supporting >> > >> >eventual consistency semantics, but Paimon and other lake storages >> > support >> > >> >Snapshot, which can achieve strong consistency semantics. If strong >> > >> >consistency is considered in the future, can it be well implemented >> > based >> > >> >on the current design? >> > >> > >> > >> >4. Regarding the LRU cache part, if the record in the source table >> is >> > >> >updated, how to automatically update the cache, otherwise I feel >> that >> > the >> > >> >wrong data may be found from the cache. >> > >> > >> > >> >5. In terms of implementation, have you considered batching input >> > records >> > >> >and then querying them in batches to improve operator performance? >> > >> > >> > >> >6. Since Flink SQL syntax does not support indexes now, how can you >> > create >> > >> >indexes on the source table and pass them to Flink? I mean, how can >> > users >> > >> >use Delta Join? >> > >> > >> > >> >7. FLIP-516 solves the Streaming Join problem via Multi-Way Join. >> When >> > >> both >> > >> >Join optimizations are enabled, which one will take precedence, >> Delta >> > Join >> > >> >or Multi-Way Join? What is the relationship between the two? >> > >> > >> > >> > >> > >> >Best, >> > >> >Ron >> > >> > >> > >> >Xuyang <xyzhong...@163.com> 于2025年5月7日周三 10:02写道: >> > >> > >> > >> >> Hi, Weijie. >> > >> >> >> > >> >> Thanks for your review! Let me try to answer these questions. >> > >> >> >> > >> >> 1. The nodes between source and join must be included in the >> > whitelist, >> > >> >> such as Calc, >> > >> >> >> > >> >> DropUpdateBefore, WatermarkAssigner, Exchange and etc. You can see >> > more >> > >> in >> > >> >> chapter >> > >> >> >> > >> >> "Limitations about Delta Join with Eventual Consistency". >> > >> >> >> > >> >> 2. Currently, we have only one vague plan. First and foremost, we >> > need >> > >> to >> > >> >> ensure that storage >> > >> >> >> > >> >> engines like Paimon, Fluss and etcs support snapshot awareness >> during >> > >> >> scans and including >> > >> >> >> > >> >> snapshots during lookups. Overall, it is likely to be supported in >> > >> 2.2/2.3 >> > >> >> later... >> > >> >> >> > >> >> >> > >> >> >> > >> >> >> > >> >> -- >> > >> >> >> > >> >> Best! >> > >> >> Xuyang >> > >> >> >> > >> >> >> > >> >> >> > >> >> >> > >> >> >> > >> >> At 2025-04-29 11:53:48, "weijie guo" <guoweijieres...@gmail.com> >> > wrote: >> > >> >> >Hi Xuyang, >> > >> >> > >> > >> >> >Thanks for driving this! The state of two-stream join is a very >> > >> >> >headache-inducing problem in stream computing system. >> > >> >> > >> > >> >> >After reading this FLIP, I have two question: >> > >> >> > >> > >> >> >1. Can the two sides of the join operator be any stream or must >> it >> > be >> > >> >> >immediately followed by the LookupSource? If it is any stream, >> how >> > >> should >> > >> >> >the stateful operators in the path be handled? >> > >> >> >2. Are there any plans to support strong consistency in the >> future? >> > >> This >> > >> >> >should be helpful for the scenarios of incremental computing. >> > >> >> > >> > >> >> > >> > >> >> > >> > >> >> >Best regards, >> > >> >> > >> > >> >> >Weijie >> > >> >> > >> > >> >> > >> > >> >> >Xuyang <xyzhong...@163.com> 于2025年4月25日周五 10:52写道: >> > >> >> > >> > >> >> >> Hi, devs. >> > >> >> >> >> > >> >> >> I'd like to start a discussion on FLIP-486: Introduce a new >> > >> >> DeltaJoin[1]. >> > >> >> >> >> > >> >> >> >> > >> >> >> >> > >> >> >> >> > >> >> >> In Flink streaming jobs, the large state of Join nodes has >> been a >> > >> >> >> persistent concern for users. >> > >> >> >> >> > >> >> >> Since streaming jobs are long-running, the state of join >> generally >> > >> >> >> increases in size over time. >> > >> >> >> >> > >> >> >> Although users can set the state TTL to mitigate this issue, >> it is >> > >> not >> > >> >> >> applicable to all scenarios >> > >> >> >> >> > >> >> >> and does not provide a fundamental solution. >> > >> >> >> >> > >> >> >> An oversized state can lead to a series of problems, including >> but >> > >> not >> > >> >> >> limited to: >> > >> >> >> >> > >> >> >> 1. Resource bottlenecks in individual tasks >> > >> >> >> >> > >> >> >> 2. Slow checkpointing, which affects job stability during the >> > >> >> >> checkpointing process >> > >> >> >> >> > >> >> >> 3. Long recovery time from state >> > >> >> >> >> > >> >> >> In addition, when analyzing the join state, we find that in >> some >> > >> >> >> scenarios, the state within the join >> > >> >> >> >> > >> >> >> actually contains redundant data from the source tables. >> > >> >> >> >> > >> >> >> >> > >> >> >> >> > >> >> >> >> > >> >> >> To address this issue, we aim to introduce Delta Join, which is >> > >> based on >> > >> >> >> the core idea of leveraging >> > >> >> >> >> > >> >> >> a bidirectional lookup join approach to reuse source table data >> > as a >> > >> >> >> substitute for join state. >> > >> >> >> >> > >> >> >> >> > >> >> >> >> > >> >> >> >> > >> >> >> You can find more details in this Flip. I'm looking forward to >> > your >> > >> >> >> comments and feedback. >> > >> >> >> >> > >> >> >> >> > >> >> >> >> > >> >> >> >> > >> >> >> [1] >> > >> >> >> >> > >> >> >> > >> >> > >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-486%3A+Introduce+A+New+DeltaJoin >> > >> >> >> >> > >> >> >> >> > >> >> >> >> > >> >> >> >> > >> >> >> -- >> > >> >> >> >> > >> >> >> Best! >> > >> >> >> Xuyang >> > >> >> >> > >> >> > >> >