Hi Fabian

Thanks for driving this FLIP and your kind patience. The motivation is spot-on, 
and the LOAD→JOIN two-phase design is the right structural fix for the 
FLINK-19830 initialization problem. Overall direction +1 from my side.

Besides Jark’s idea about backlog and InputSelectable which may need more 
prerequisites, I’ve two concerns about current proposal:

1. LOAD phase backpressure. The FLIP assumes "seconds to a few minutes" for 
build-side init, but nothing enforces it. Large build-side tables (e.g., 50M 
rows) + fast probe streams → unbuffered probe-side state explosion. Should we 
add a config  like max-buffer-size that applies backpressure when exceeded or 
some metrics about buffer, rather than silently piling up records?

2. Flip-point CPU spike. Joining all buffered probe records × build-side state 
in one shot differs fundamentally from event-time join's incremental 
watermark-batched emission. In the worst case this could cause a TaskManager 
CPU spike and downstream shock. Worth considering micro-batch draining during 
flip transition? 

Looking forward to your thoughts.

Best, 
Leonard

> 2026 6月 1 16:02,Fabian Hueske <[email protected]> 写道:
> 
> Hi Leonard,
> 
> Sorry, missed your email and already started the vote.
> Let me put it on hold for now and continue discussing the proposal.
> 
> Looking forward to your comments,
> Fabian
> 
> Am Mo., 1. Juni 2026 um 09:56 Uhr schrieb Leonard Xu <[email protected]>:
> 
>> @Fabian Thanks for driving this FLIP, sorry for late reply due to my
>> personal reason that I shouldn’t miss such an important FLIP.
>> 
>> I’m reviewing the FLIP and will try to finish it today, could you kindly
>> wait one minute to start the vote?
>> 
>> And sorry for interrupt your plan again.
>> 
>> Best,
>> Leonard
>> 
>>> 2026 6月 1 15:51,Fabian Hueske <[email protected]> 写道:
>>> 
>>> Thanks everyone for your comments on the FLIP.
>>> I will start the vote.
>>> 
>>> Best, Fabian
>>> 
>>> Am Do., 28. Mai 2026 um 20:13 Uhr schrieb David Anderson <
>>> [email protected]>:
>>> 
>>>> Fabian,
>>>> 
>>>>> So, I don't think that we should buffer unmatched probe-side records
>>>> beyond
>>>> the flip point.
>>>> 
>>>> Thanks for explaining your reasoning. Makes sense to me.
>>>> 
>>>> David
>>>> 
>>>> On Thu, May 28, 2026 at 6:55 PM Fabian Hueske <[email protected]>
>> wrote:
>>>> 
>>>>> Hi Xingcan,
>>>>> 
>>>>> Thanks for your comments on the FLIP!
>>>>> 
>>>>> The join's behavior when starting from a savepoint is indeed an
>> important
>>>>> aspect to consider and the problem of a rapidly advancing dimension
>>>>> (build-side) table is of course real.
>>>>> 
>>>>> I would argue that watermark alignment should significantly reduce the
>>>>> impact of this.
>>>>> If enabled, sources align their consumption based on their current
>>>>> watermark such that the (presumably much smaller) build-side source
>> would
>>>>> be slowed down to the event-time progress of the probe-side.
>>>>> While watermark alignment is not an "exact" mechanism, the semantics of
>>>> the
>>>>> new processing-time join also do not guarantee "exact" results.
>>>>> At the same time, alignment should ensure that build and probe-side are
>>>>> roughly aligned in event-time (without the strict guarantees that the
>>>>> event-time temporal table join provides).
>>>>> 
>>>>> However, I really like your idea of starting in event-time mode and
>>>>> flipping to processing-time after the initialization duration passed.
>>>>> I'm not sure if it would fully address the problem you described. As
>> you
>>>>> said, users would need to be able to reconfigure the flip-point and I'm
>>>> not
>>>>> sure if there's a good mechanism for this yet.
>>>>> But it might have some other properties that would be beneficial, so
>> I'll
>>>>> think about that.
>>>>> 
>>>>> Best,
>>>>> Fabian
>>>>> 
>>>>> 
>>>>> Am Do., 28. Mai 2026 um 18:21 Uhr schrieb Fabian Hueske <
>>>> [email protected]
>>>>>> :
>>>>> 
>>>>>> Thanks for your feedback David!
>>>>>> 
>>>>>>> One question: If I understand correctly, during the JOIN phase of an
>>>>>> INNER
>>>>>> join, if the desired build-side record is missing, nothing will be
>>>>> emitted
>>>>>> for the unmatched probe-side record. For an INNER join, I can imagine
>>>>>> wanting to buffer unmatched probe-side records, expecting the build
>>>> side
>>>>>> will arrive soon. What's your thinking there?
>>>>>> 
>>>>>> Your understanding is correct. If a probe-side record arrives during
>>>> LOAD
>>>>>> phase but no matching build-side record is received,
>>>>>> the probe-side record would be discarded without being joined during
>>>> the
>>>>>> transition from LOAD to JOIN.
>>>>>> 
>>>>>> I would argue that users that want to prevent this, would need to
>>>>>> configure a longer initialization time.
>>>>>> IMO, dropping unmatched probe records is not a "bad" property of INNER
>>>>>> joins but an essential part of their semantics. It might even be
>>>> desired
>>>>> by
>>>>>> some users.
>>>>>> If we would buffer probe-side records for INNER joins beyond the
>>>>>> transition point, we:
>>>>>> * would have different behaviors for INNER and LEFT joins
>>>>>> * could not start to emit probe-side watermarks as long as there are
>>>>> still
>>>>>> probe-side records buffered (or at least not advance past them without
>>>>>> emitting late data at a later point of time)
>>>>>> * would either need another config knob to specify when to "really"
>>>> clean
>>>>>> up the probe-side state or keep such unmatched records forever in
>> state
>>>>> (we
>>>>>> could also use state TTL...)
>>>>>> 
>>>>>> So, I don't think that we should buffer unmatched probe-side records
>>>>>> beyond the flip point.
>>>>>> 
>>>>>> Best, Fabian
>>>>>> 
>>>>>> Am Do., 28. Mai 2026 um 17:05 Uhr schrieb Xingcan Cui <
>>>>> [email protected]
>>>>>>> :
>>>>>> 
>>>>>>> Hi Fabian,
>>>>>>> 
>>>>>>> Thanks for this FLIP! The two-phase design is excellent for avoiding
>>>>>>> early-joining bugs while maintaining low-latency processing-time
>>>>>>> semantics.
>>>>>>> 
>>>>>>> After thinking more about the proposal, I'd like to point out an edge
>>>>> case
>>>>>>> related to the initialization phase or recovery after prolonged
>>>> downtime
>>>>>>> (for example, when a job has been down for a day). While a
>>>>> processing-time
>>>>>>> join works well for live streaming, where results can reasonably
>>>> depend
>>>>> on
>>>>>>> the immediate arrival order of live data, it does not work as well
>> for
>>>>>>> catch-up scenarios.
>>>>>>> 
>>>>>>> Currently, if a job initializes or restores from a checkpoint after a
>>>>> long
>>>>>>> downtime, the operator resumes directly in the processing-time join
>>>>> phase.
>>>>>>> During catch-up, however, the natural chronological arrival order of
>>>> the
>>>>>>> live data is completely lost. As a result, these replayed fact
>> records
>>>>> are
>>>>>>> evaluated against the current machine time and may blindly join with
>>>> the
>>>>>>> rapidly advancing "current" dimension snapshot, rather than the
>>>>> historical
>>>>>>> versions they were originally supposed to match.
>>>>>>> 
>>>>>>> To handle this edge case, could we consider:
>>>>>>> 
>>>>>>> 1. changing the first phase into an event-time join phase, and
>>>>>>> 
>>>>>>> 2. allowing the operator to switch back to the first phase after a
>>>>>>> restart?
>>>>>>> 
>>>>>>> For example, users could configure a timestamp threshold. Before the
>>>>>>> watermark reaches that point, the operator would run as an event-time
>>>>>>> versioned join to safely process the catch-up phase through watermark
>>>>>>> alignment. Once the watermark passes the threshold, the operator
>> could
>>>>>>> purge the old multi-version state and seamlessly transition back to
>>>> the
>>>>>>> pure processing-time join phase for live traffic.
>>>>>>> 
>>>>>>> After a job restart, users could either update the target timestamp
>> to
>>>>>>> reset the operator back into the event-time phase, or leave it
>>>> unchanged
>>>>>>> to
>>>>>>> continue operating in the processing-time phase.
>>>>>>> 
>>>>>>> I completely understand that this would introduce significant
>>>> complexity
>>>>>>> to
>>>>>>> the operator's state management and lifecycle, so this is only a
>>>>> tentative
>>>>>>> proposal to explore whether it might be worth considering for the
>>>>>>> long-term
>>>>>>> robustness of the design.
>>>>>>> 
>>>>>>> Best,
>>>>>>> 
>>>>>>> Xingcan
>>>>>>> 
>>>>>>> On Thu, May 28, 2026 at 8:17 AM David Anderson <[email protected]
>>> 
>>>>>>> wrote:
>>>>>>> 
>>>>>>>> I'm quite enthusiastic about this. I want to thank Fabian for
>>>> putting
>>>>>>>> together such a well-crafted FLIP. And I look forward to updating
>>>> the
>>>>>>>> awkward educational content this FLIP will make obsolete.
>>>>>>>> 
>>>>>>>> To my mind, the syntax expresses the semantics of this join rather
>>>>> well.
>>>>>>>> 
>>>>>>>> Until now, developers using event-time temporal joins sometimes
>>>>>>> resorted to
>>>>>>>> doing weird things with watermarks to handle a build side that's
>>>>> mostly
>>>>>>>> idle; this lateral snapshot join is clearly better -- not to mention
>>>>> the
>>>>>>>> added bonus of pre-loading the build table.
>>>>>>>> 
>>>>>>>> One question: If I understand correctly, during the JOIN phase of an
>>>>>>> INNER
>>>>>>>> join, if the desired build-side record is missing, nothing will be
>>>>>>> emitted
>>>>>>>> for the unmatched probe-side record. For an INNER join, I can
>>>> imagine
>>>>>>>> wanting to buffer unmatched probe-side records, expecting the build
>>>>> side
>>>>>>>> will arrive soon. What's your thinking there?
>>>>>>>> 
>>>>>>>> David
>>>>>>>> 
>>>>>>>> On Wed, May 27, 2026 at 12:44 PM Fabian Hueske <[email protected]>
>>>>>>> wrote:
>>>>>>>> 
>>>>>>>>> Thanks Gustavo and Timo for the positive feedback!
>>>>>>>>> 
>>>>>>>>> I'd like to bump this thread up to collect more feedback.
>>>>>>>>> If there are no more responses, I will start a vote on this FLIP
>>>>> next
>>>>>>>>> Monday, June 1st.
>>>>>>>>> 
>>>>>>>>> Best, Fabian
>>>>>>>>> 
>>>>>>>>> Am Do., 21. Mai 2026 um 12:15 Uhr schrieb Timo Walther <
>>>>>>>> [email protected]
>>>>>>>>>> :
>>>>>>>>> 
>>>>>>>>>> Hi Fabian,
>>>>>>>>>> 
>>>>>>>>>> thanks for proposing this FLIP. I agree that this join is super
>>>>>>> common,
>>>>>>>>>> after talking to many people at conferences, I could imagine it
>>>>>>> will be
>>>>>>>>>> one of the most used kinds of joins going forward.
>>>>>>>>>> 
>>>>>>>>>> Tightly coupling it with watermarks fits both from a semantical
>>>>>>> point
>>>>>>>> of
>>>>>>>>>> view but also with other efforts such as FLIP-558 (Improvements
>>>> to
>>>>>>>>>> SinkUpsertMaterializer and changelog disorder) [1]. In the near
>>>>>>> future,
>>>>>>>>>> we should work on more automated watermarking to power these
>>>>>>>>>> watermark-based operators, but this is an orthogonal effort.
>>>>>>>>>> 
>>>>>>>>>> Overall I'm strongly +1 on this. Also +1 on the syntax
>>>>> improvements
>>>>>>> for
>>>>>>>>>> lateral table functions by dropping the TABLE() wrapper.
>>>>>>>>>> 
>>>>>>>>>> Cheers,
>>>>>>>>>> Timo
>>>>>>>>>> 
>>>>>>>>>> [1]
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>> 
>>>> 
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-558%3A+Improvements+to+SinkUpsertMaterializer+and+changelog+disorder
>>>>>>>>>> 
>>>>>>>>>> On 18.05.26 11:47, Gustavo de Morais wrote:
>>>>>>>>>>> Hi Fabian,
>>>>>>>>>>> 
>>>>>>>>>>> In general a strong +1 for the feature, without getting into
>>>> the
>>>>>>>>> details
>>>>>>>>>> of
>>>>>>>>>>> the FLIP yet. This is a missing feature for years and I'm
>>>> happy
>>>>>>> that
>>>>>>>>>> we're
>>>>>>>>>>> putting the time to address this - while also getting rid of
>>>>> some
>>>>>>> of
>>>>>>>>> the
>>>>>>>>>>> hard restrictions we had. Thanks!
>>>>>>>>>>> 
>>>>>>>>>>> Kind regards,
>>>>>>>>>>> Gustavo
>>>>>>>>>>> 
>>>>>>>>>>> On Fri, 15 May 2026 at 16:39, Fabian Hueske <
>>>> [email protected]
>>>>>> 
>>>>>>>>> wrote:
>>>>>>>>>>> 
>>>>>>>>>>>> Hi everyone,
>>>>>>>>>>>> 
>>>>>>>>>>>> I'd like to start a discussion on FLIP-579: LATERAL SNAPSHOT
>>>>> Join
>>>>>>>> [1].
>>>>>>>>>>>> 
>>>>>>>>>>>> Enriching a stream with data from a (slowly changing) dynamic
>>>>>>> table
>>>>>>>>> is a
>>>>>>>>>>>> super common use case.
>>>>>>>>>>>> Flink SQL features Temporal Joins [2] to address these use
>>>>> cases.
>>>>>>>>>>>> However, SQL users can only use the event-time variant which
>>>>> has
>>>>>>>> many
>>>>>>>>>>>> limitations (heavy dependency on frequent WM updates on both
>>>>>>> inputs,
>>>>>>>>>>>> build-side table requires a PK, the join predicate must
>>>> include
>>>>>>> the
>>>>>>>>>>>> build-side PK, etc).
>>>>>>>>>>>> The processing-time temporal join is disabled (due to
>>>>> build-side
>>>>>>>>>>>> initialization issues [3]) and temporal table function joins
>>>>> are
>>>>>>>>>>>> only available in Table API.
>>>>>>>>>>>> 
>>>>>>>>>>>> FLIP-579 proposes a new temporal join operator that operates
>>>> in
>>>>>>>>>>>> processing-time and addresses the limitations of the existing
>>>>>>>>>>>> implementations:
>>>>>>>>>>>> * initialization of the build-side before joining
>>>>>>>>>>>> * no requirement of continuous, frequent build-side WMs
>>>> (after
>>>>>>> the
>>>>>>>>>>>> initialization completed)
>>>>>>>>>>>> * no requirement for a PK on the build-side
>>>>>>>>>>>> * table function-based syntax [4] via a built-in SNAPSHOT
>>>>>>> function
>>>>>>>>>>>> (proposed in FLIP-517 [4])
>>>>>>>>>>>> 
>>>>>>>>>>>> Looking forward to your feedback.
>>>>>>>>>>>> 
>>>>>>>>>>>> Best,
>>>>>>>>>>>> Fabian
>>>>>>>>>>>> 
>>>>>>>>>>>> [1]
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>> 
>>>> 
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-579%3A+LATERAL+SNAPSHOT+Join
>>>>>>>>>>>> [2]
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>> 
>>>> 
>> https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/table/sql/queries/joins/#temporal-joins
>>>>>>>>>>>> [3] https://issues.apache.org/jira/browse/FLINK-19830
>>>>>>>>>>>> [4]
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>> 
>>>> 
>> https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/table/sql/queries/joins/#temporal-table-function-join
>>>>>>>>>>>> [5]
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>> 
>>>> 
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-517%3A+Better+Handling+of+Dynamic+Table+Primitives+with+PTFs#FLIP517:BetterHandlingofDynamicTablePrimitiveswithPTFs-SNAPSHOTfortemporaljoins
>>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>> 
>> 

Reply via email to