Hi Jark, I don't want to answer for Fabian, but I have looked into these two points in the past while exploring design ideas for a temporal processing join and can provide some context about the two mentioned features:
*(1) RecordAttributes#isBacklog* I agree this would be ideal. The challenge is that RecordAttributes#isBacklog was designed as a throughput optimization hint, not as a correctness signal. Using it as a correctness boundary would need a FLIP amendment. Also, AFAIK most connectors don't emit it today - only HybridSource does. *(2) - InputSelectable * Using InputSelectable to avoid probe-side buffering sounds like a good idea as well. But InputSelectable itself doesn't work in streaming. The reason is that checkpoint barriers must flow through all inputs - blocking the probe input prevents barriers from arriving, causing checkpoint deadlock. AFAIK all production implementations of it are batch-only. This is unfortunately a blocker. I really like the ideas and think they would be clean solutions. However, they currently have considerable blockers that keep us from using them for a streaming proposal. I think Fabian's proposal navigates around that and gave us a good alternative without relying on them. Kind regards, Gustavo On Mon, 1 Jun 2026 at 15:11, Jark Wu <[email protected]> wrote: > Hi Fabian, > > Thanks for driving this FLIP! Overall I think this is a great idea and > a long-missing feature, so a big +1 from me. I only have two concerns > I'd like to discuss. > > (1) The build-side flip point is not exact. > > As Xingcan already touched on, because the LOAD→JOIN transition is > based on a time/idle heuristic rather than a consistent boundary, > probe-side records may end up joining the wrong historical version of > the build side, or miss matches entirely. > > That said, many storage systems and connectors are actually able to > provide a consistent switch point. For example, mysql-cdc emits a > "backlog" (binlog-start) event once the snapshot phase has finished. > The join operator could switch from the LOAD phase to the > processing-time JOIN phase exactly when it receives such a backlog > event, giving a precise and correct initialization boundary instead of > a heuristic one. > > Flink already provides RecordAttributes#isBacklog framework for such > backlog mechanism. But we may still need non-trivial effort to make it > adapt for this use case. So I suggest we can consider this mechansim, > but I'm totally fine with leaving this as future work. > > (2) Buffering probe-side records in operator state during the LOAD phase. > > If I understand the design correctly, during the LOAD phase the > probe-side records are buffered into operator state. For > high-throughput probe streams and/or long initialization times, this > state can grow very large, which may lead to problems such as > checkpoint timeouts during the LOAD phase. > > I think a more elegant design would be to follow the batch hash join > approach and use the InputSelectable interface: during the LOAD phase > the operator simply refuses to consume probe-side input until the > build side is initialized. This would let backpressure naturally hold > back the probe side instead of materializing it into state, which I > believe would significantly simplify the architecture and improve job > stability. > > Curious to hear your thoughts on both points. > > Best, > Jark Wu > > On Mon, 1 Jun 2026 at 16:03, Fabian Hueske <[email protected]> wrote: > > > > Hi Leonard, > > > > Sorry, missed your email and already started the vote. > > Let me put it on hold for now and continue discussing the proposal. > > > > Looking forward to your comments, > > Fabian > > > > Am Mo., 1. Juni 2026 um 09:56 Uhr schrieb Leonard Xu <[email protected] > >: > > > > > @Fabian Thanks for driving this FLIP, sorry for late reply due to my > > > personal reason that I shouldn’t miss such an important FLIP. > > > > > > I’m reviewing the FLIP and will try to finish it today, could you > kindly > > > wait one minute to start the vote? > > > > > > And sorry for interrupt your plan again. > > > > > > Best, > > > Leonard > > > > > > > 2026 6月 1 15:51,Fabian Hueske <[email protected]> 写道: > > > > > > > > Thanks everyone for your comments on the FLIP. > > > > I will start the vote. > > > > > > > > Best, Fabian > > > > > > > > Am Do., 28. Mai 2026 um 20:13 Uhr schrieb David Anderson < > > > > [email protected]>: > > > > > > > >> Fabian, > > > >> > > > >>> So, I don't think that we should buffer unmatched probe-side > records > > > >> beyond > > > >> the flip point. > > > >> > > > >> Thanks for explaining your reasoning. Makes sense to me. > > > >> > > > >> David > > > >> > > > >> On Thu, May 28, 2026 at 6:55 PM Fabian Hueske <[email protected]> > > > wrote: > > > >> > > > >>> Hi Xingcan, > > > >>> > > > >>> Thanks for your comments on the FLIP! > > > >>> > > > >>> The join's behavior when starting from a savepoint is indeed an > > > important > > > >>> aspect to consider and the problem of a rapidly advancing dimension > > > >>> (build-side) table is of course real. > > > >>> > > > >>> I would argue that watermark alignment should significantly reduce > the > > > >>> impact of this. > > > >>> If enabled, sources align their consumption based on their current > > > >>> watermark such that the (presumably much smaller) build-side source > > > would > > > >>> be slowed down to the event-time progress of the probe-side. > > > >>> While watermark alignment is not an "exact" mechanism, the > semantics of > > > >> the > > > >>> new processing-time join also do not guarantee "exact" results. > > > >>> At the same time, alignment should ensure that build and > probe-side are > > > >>> roughly aligned in event-time (without the strict guarantees that > the > > > >>> event-time temporal table join provides). > > > >>> > > > >>> However, I really like your idea of starting in event-time mode and > > > >>> flipping to processing-time after the initialization duration > passed. > > > >>> I'm not sure if it would fully address the problem you described. > As > > > you > > > >>> said, users would need to be able to reconfigure the flip-point > and I'm > > > >> not > > > >>> sure if there's a good mechanism for this yet. > > > >>> But it might have some other properties that would be beneficial, > so > > > I'll > > > >>> think about that. > > > >>> > > > >>> Best, > > > >>> Fabian > > > >>> > > > >>> > > > >>> Am Do., 28. Mai 2026 um 18:21 Uhr schrieb Fabian Hueske < > > > >> [email protected] > > > >>>> : > > > >>> > > > >>>> Thanks for your feedback David! > > > >>>> > > > >>>>> One question: If I understand correctly, during the JOIN phase > of an > > > >>>> INNER > > > >>>> join, if the desired build-side record is missing, nothing will be > > > >>> emitted > > > >>>> for the unmatched probe-side record. For an INNER join, I can > imagine > > > >>>> wanting to buffer unmatched probe-side records, expecting the > build > > > >> side > > > >>>> will arrive soon. What's your thinking there? > > > >>>> > > > >>>> Your understanding is correct. If a probe-side record arrives > during > > > >> LOAD > > > >>>> phase but no matching build-side record is received, > > > >>>> the probe-side record would be discarded without being joined > during > > > >> the > > > >>>> transition from LOAD to JOIN. > > > >>>> > > > >>>> I would argue that users that want to prevent this, would need to > > > >>>> configure a longer initialization time. > > > >>>> IMO, dropping unmatched probe records is not a "bad" property of > INNER > > > >>>> joins but an essential part of their semantics. It might even be > > > >> desired > > > >>> by > > > >>>> some users. > > > >>>> If we would buffer probe-side records for INNER joins beyond the > > > >>>> transition point, we: > > > >>>> * would have different behaviors for INNER and LEFT joins > > > >>>> * could not start to emit probe-side watermarks as long as there > are > > > >>> still > > > >>>> probe-side records buffered (or at least not advance past them > without > > > >>>> emitting late data at a later point of time) > > > >>>> * would either need another config knob to specify when to > "really" > > > >> clean > > > >>>> up the probe-side state or keep such unmatched records forever in > > > state > > > >>> (we > > > >>>> could also use state TTL...) > > > >>>> > > > >>>> So, I don't think that we should buffer unmatched probe-side > records > > > >>>> beyond the flip point. > > > >>>> > > > >>>> Best, Fabian > > > >>>> > > > >>>> Am Do., 28. Mai 2026 um 17:05 Uhr schrieb Xingcan Cui < > > > >>> [email protected] > > > >>>>> : > > > >>>> > > > >>>>> Hi Fabian, > > > >>>>> > > > >>>>> Thanks for this FLIP! The two-phase design is excellent for > avoiding > > > >>>>> early-joining bugs while maintaining low-latency processing-time > > > >>>>> semantics. > > > >>>>> > > > >>>>> After thinking more about the proposal, I'd like to point out an > edge > > > >>> case > > > >>>>> related to the initialization phase or recovery after prolonged > > > >> downtime > > > >>>>> (for example, when a job has been down for a day). While a > > > >>> processing-time > > > >>>>> join works well for live streaming, where results can reasonably > > > >> depend > > > >>> on > > > >>>>> the immediate arrival order of live data, it does not work as > well > > > for > > > >>>>> catch-up scenarios. > > > >>>>> > > > >>>>> Currently, if a job initializes or restores from a checkpoint > after a > > > >>> long > > > >>>>> downtime, the operator resumes directly in the processing-time > join > > > >>> phase. > > > >>>>> During catch-up, however, the natural chronological arrival > order of > > > >> the > > > >>>>> live data is completely lost. As a result, these replayed fact > > > records > > > >>> are > > > >>>>> evaluated against the current machine time and may blindly join > with > > > >> the > > > >>>>> rapidly advancing "current" dimension snapshot, rather than the > > > >>> historical > > > >>>>> versions they were originally supposed to match. > > > >>>>> > > > >>>>> To handle this edge case, could we consider: > > > >>>>> > > > >>>>> 1. changing the first phase into an event-time join phase, and > > > >>>>> > > > >>>>> 2. allowing the operator to switch back to the first phase after > a > > > >>>>> restart? > > > >>>>> > > > >>>>> For example, users could configure a timestamp threshold. Before > the > > > >>>>> watermark reaches that point, the operator would run as an > event-time > > > >>>>> versioned join to safely process the catch-up phase through > watermark > > > >>>>> alignment. Once the watermark passes the threshold, the operator > > > could > > > >>>>> purge the old multi-version state and seamlessly transition back > to > > > >> the > > > >>>>> pure processing-time join phase for live traffic. > > > >>>>> > > > >>>>> After a job restart, users could either update the target > timestamp > > > to > > > >>>>> reset the operator back into the event-time phase, or leave it > > > >> unchanged > > > >>>>> to > > > >>>>> continue operating in the processing-time phase. > > > >>>>> > > > >>>>> I completely understand that this would introduce significant > > > >> complexity > > > >>>>> to > > > >>>>> the operator's state management and lifecycle, so this is only a > > > >>> tentative > > > >>>>> proposal to explore whether it might be worth considering for the > > > >>>>> long-term > > > >>>>> robustness of the design. > > > >>>>> > > > >>>>> Best, > > > >>>>> > > > >>>>> Xingcan > > > >>>>> > > > >>>>> On Thu, May 28, 2026 at 8:17 AM David Anderson < > [email protected] > > > > > > > >>>>> wrote: > > > >>>>> > > > >>>>>> I'm quite enthusiastic about this. I want to thank Fabian for > > > >> putting > > > >>>>>> together such a well-crafted FLIP. And I look forward to > updating > > > >> the > > > >>>>>> awkward educational content this FLIP will make obsolete. > > > >>>>>> > > > >>>>>> To my mind, the syntax expresses the semantics of this join > rather > > > >>> well. > > > >>>>>> > > > >>>>>> Until now, developers using event-time temporal joins sometimes > > > >>>>> resorted to > > > >>>>>> doing weird things with watermarks to handle a build side that's > > > >>> mostly > > > >>>>>> idle; this lateral snapshot join is clearly better -- not to > mention > > > >>> the > > > >>>>>> added bonus of pre-loading the build table. > > > >>>>>> > > > >>>>>> One question: If I understand correctly, during the JOIN phase > of an > > > >>>>> INNER > > > >>>>>> join, if the desired build-side record is missing, nothing will > be > > > >>>>> emitted > > > >>>>>> for the unmatched probe-side record. For an INNER join, I can > > > >> imagine > > > >>>>>> wanting to buffer unmatched probe-side records, expecting the > build > > > >>> side > > > >>>>>> will arrive soon. What's your thinking there? > > > >>>>>> > > > >>>>>> David > > > >>>>>> > > > >>>>>> On Wed, May 27, 2026 at 12:44 PM Fabian Hueske < > [email protected]> > > > >>>>> wrote: > > > >>>>>> > > > >>>>>>> Thanks Gustavo and Timo for the positive feedback! > > > >>>>>>> > > > >>>>>>> I'd like to bump this thread up to collect more feedback. > > > >>>>>>> If there are no more responses, I will start a vote on this > FLIP > > > >>> next > > > >>>>>>> Monday, June 1st. > > > >>>>>>> > > > >>>>>>> Best, Fabian > > > >>>>>>> > > > >>>>>>> Am Do., 21. Mai 2026 um 12:15 Uhr schrieb Timo Walther < > > > >>>>>> [email protected] > > > >>>>>>>> : > > > >>>>>>> > > > >>>>>>>> Hi Fabian, > > > >>>>>>>> > > > >>>>>>>> thanks for proposing this FLIP. I agree that this join is > super > > > >>>>> common, > > > >>>>>>>> after talking to many people at conferences, I could imagine > it > > > >>>>> will be > > > >>>>>>>> one of the most used kinds of joins going forward. > > > >>>>>>>> > > > >>>>>>>> Tightly coupling it with watermarks fits both from a > semantical > > > >>>>> point > > > >>>>>> of > > > >>>>>>>> view but also with other efforts such as FLIP-558 > (Improvements > > > >> to > > > >>>>>>>> SinkUpsertMaterializer and changelog disorder) [1]. In the > near > > > >>>>> future, > > > >>>>>>>> we should work on more automated watermarking to power these > > > >>>>>>>> watermark-based operators, but this is an orthogonal effort. > > > >>>>>>>> > > > >>>>>>>> Overall I'm strongly +1 on this. Also +1 on the syntax > > > >>> improvements > > > >>>>> for > > > >>>>>>>> lateral table functions by dropping the TABLE() wrapper. > > > >>>>>>>> > > > >>>>>>>> Cheers, > > > >>>>>>>> Timo > > > >>>>>>>> > > > >>>>>>>> [1] > > > >>>>>>>> > > > >>>>>>>> > > > >>>>>>> > > > >>>>>> > > > >>>>> > > > >>> > > > >> > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-558%3A+Improvements+to+SinkUpsertMaterializer+and+changelog+disorder > > > >>>>>>>> > > > >>>>>>>> On 18.05.26 11:47, Gustavo de Morais wrote: > > > >>>>>>>>> Hi Fabian, > > > >>>>>>>>> > > > >>>>>>>>> In general a strong +1 for the feature, without getting into > > > >> the > > > >>>>>>> details > > > >>>>>>>> of > > > >>>>>>>>> the FLIP yet. This is a missing feature for years and I'm > > > >> happy > > > >>>>> that > > > >>>>>>>> we're > > > >>>>>>>>> putting the time to address this - while also getting rid of > > > >>> some > > > >>>>> of > > > >>>>>>> the > > > >>>>>>>>> hard restrictions we had. Thanks! > > > >>>>>>>>> > > > >>>>>>>>> Kind regards, > > > >>>>>>>>> Gustavo > > > >>>>>>>>> > > > >>>>>>>>> On Fri, 15 May 2026 at 16:39, Fabian Hueske < > > > >> [email protected] > > > >>>> > > > >>>>>>> wrote: > > > >>>>>>>>> > > > >>>>>>>>>> Hi everyone, > > > >>>>>>>>>> > > > >>>>>>>>>> I'd like to start a discussion on FLIP-579: LATERAL SNAPSHOT > > > >>> Join > > > >>>>>> [1]. > > > >>>>>>>>>> > > > >>>>>>>>>> Enriching a stream with data from a (slowly changing) > dynamic > > > >>>>> table > > > >>>>>>> is a > > > >>>>>>>>>> super common use case. > > > >>>>>>>>>> Flink SQL features Temporal Joins [2] to address these use > > > >>> cases. > > > >>>>>>>>>> However, SQL users can only use the event-time variant which > > > >>> has > > > >>>>>> many > > > >>>>>>>>>> limitations (heavy dependency on frequent WM updates on both > > > >>>>> inputs, > > > >>>>>>>>>> build-side table requires a PK, the join predicate must > > > >> include > > > >>>>> the > > > >>>>>>>>>> build-side PK, etc). > > > >>>>>>>>>> The processing-time temporal join is disabled (due to > > > >>> build-side > > > >>>>>>>>>> initialization issues [3]) and temporal table function joins > > > >>> are > > > >>>>>>>>>> only available in Table API. > > > >>>>>>>>>> > > > >>>>>>>>>> FLIP-579 proposes a new temporal join operator that operates > > > >> in > > > >>>>>>>>>> processing-time and addresses the limitations of the > existing > > > >>>>>>>>>> implementations: > > > >>>>>>>>>> * initialization of the build-side before joining > > > >>>>>>>>>> * no requirement of continuous, frequent build-side WMs > > > >> (after > > > >>>>> the > > > >>>>>>>>>> initialization completed) > > > >>>>>>>>>> * no requirement for a PK on the build-side > > > >>>>>>>>>> * table function-based syntax [4] via a built-in SNAPSHOT > > > >>>>> function > > > >>>>>>>>>> (proposed in FLIP-517 [4]) > > > >>>>>>>>>> > > > >>>>>>>>>> Looking forward to your feedback. > > > >>>>>>>>>> > > > >>>>>>>>>> Best, > > > >>>>>>>>>> Fabian > > > >>>>>>>>>> > > > >>>>>>>>>> [1] > > > >>>>>>>>>> > > > >>>>>>>>>> > > > >>>>>>>> > > > >>>>>>> > > > >>>>>> > > > >>>>> > > > >>> > > > >> > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-579%3A+LATERAL+SNAPSHOT+Join > > > >>>>>>>>>> [2] > > > >>>>>>>>>> > > > >>>>>>>>>> > > > >>>>>>>> > > > >>>>>>> > > > >>>>>> > > > >>>>> > > > >>> > > > >> > > > > https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/table/sql/queries/joins/#temporal-joins > > > >>>>>>>>>> [3] https://issues.apache.org/jira/browse/FLINK-19830 > > > >>>>>>>>>> [4] > > > >>>>>>>>>> > > > >>>>>>>>>> > > > >>>>>>>> > > > >>>>>>> > > > >>>>>> > > > >>>>> > > > >>> > > > >> > > > > https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/table/sql/queries/joins/#temporal-table-function-join > > > >>>>>>>>>> [5] > > > >>>>>>>>>> > > > >>>>>>>>>> > > > >>>>>>>> > > > >>>>>>> > > > >>>>>> > > > >>>>> > > > >>> > > > >> > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-517%3A+Better+Handling+of+Dynamic+Table+Primitives+with+PTFs#FLIP517:BetterHandlingofDynamicTablePrimitiveswithPTFs-SNAPSHOTfortemporaljoins > > > >>>>>>>>>> > > > >>>>>>>>> > > > >>>>>>>> > > > >>>>>>>> > > > >>>>>>> > > > >>>>>> > > > >>>>> > > > >>>> > > > >>> > > > >> > > > > > > >
