Hi Jark,

I don't want to answer for Fabian, but I have looked into these two points
in the past while exploring design ideas for a temporal processing join and
can provide some context about the two mentioned features:

*(1) RecordAttributes#isBacklog*

I agree this would be ideal. The challenge is that
RecordAttributes#isBacklog was designed as a throughput optimization hint,
not as a correctness signal. Using it as a correctness boundary would need
a FLIP amendment. Also, AFAIK most connectors don't emit it today - only
HybridSource does.

*(2) - InputSelectable *

Using InputSelectable to avoid probe-side buffering sounds like a good idea
as well. But InputSelectable itself doesn't work in streaming. The reason
is that checkpoint barriers must flow through all inputs - blocking the
probe input prevents barriers from arriving, causing checkpoint deadlock.
AFAIK all production implementations of it are batch-only. This is
unfortunately a blocker.

I really like the ideas and think they would be clean solutions. However,
they currently have considerable blockers that keep us from using them for
a streaming proposal. I think Fabian's proposal navigates around that and
gave us a good alternative without relying on them.

Kind regards,
Gustavo



On Mon, 1 Jun 2026 at 15:11, Jark Wu <[email protected]> wrote:

> Hi Fabian,
>
> Thanks for driving this FLIP! Overall I think this is a great idea and
> a long-missing feature, so a big +1 from me. I only have two concerns
> I'd like to discuss.
>
> (1) The build-side flip point is not exact.
>
> As Xingcan already touched on, because the LOAD→JOIN transition is
> based on a time/idle heuristic rather than a consistent boundary,
> probe-side records may end up joining the wrong historical version of
> the build side, or miss matches entirely.
>
> That said, many storage systems and connectors are actually able to
> provide a consistent switch point. For example, mysql-cdc emits a
> "backlog" (binlog-start) event once the snapshot phase has finished.
> The join operator could switch from the LOAD phase to the
> processing-time JOIN phase exactly when it receives such a backlog
> event, giving a precise and correct initialization boundary instead of
> a heuristic one.
>
> Flink already provides RecordAttributes#isBacklog framework for such
> backlog mechanism. But we may still need non-trivial effort to make it
> adapt for this use case. So I suggest we can consider this mechansim,
> but I'm totally fine with leaving this as future work.
>
> (2) Buffering probe-side records in operator state during the LOAD phase.
>
> If I understand the design correctly, during the LOAD phase the
> probe-side records are buffered into operator state. For
> high-throughput probe streams and/or long initialization times, this
> state can grow very large, which may lead to problems such as
> checkpoint timeouts during the LOAD phase.
>
> I think a more elegant design would be to follow the batch hash join
> approach and use the InputSelectable interface: during the LOAD phase
> the operator simply refuses to consume probe-side input until the
> build side is initialized. This would let backpressure naturally hold
> back the probe side instead of materializing it into state, which I
> believe would significantly simplify the architecture and improve job
> stability.
>
> Curious to hear your thoughts on both points.
>
> Best,
> Jark Wu
>
> On Mon, 1 Jun 2026 at 16:03, Fabian Hueske <[email protected]> wrote:
> >
> > Hi Leonard,
> >
> > Sorry, missed your email and already started the vote.
> > Let me put it on hold for now and continue discussing the proposal.
> >
> > Looking forward to your comments,
> > Fabian
> >
> > Am Mo., 1. Juni 2026 um 09:56 Uhr schrieb Leonard Xu <[email protected]
> >:
> >
> > > @Fabian Thanks for driving this FLIP, sorry for late reply due to my
> > > personal reason that I shouldn’t miss such an important FLIP.
> > >
> > > I’m reviewing the FLIP and will try to finish it today, could you
> kindly
> > > wait one minute to start the vote?
> > >
> > > And sorry for interrupt your plan again.
> > >
> > > Best,
> > > Leonard
> > >
> > > > 2026 6月 1 15:51,Fabian Hueske <[email protected]> 写道:
> > > >
> > > > Thanks everyone for your comments on the FLIP.
> > > > I will start the vote.
> > > >
> > > > Best, Fabian
> > > >
> > > > Am Do., 28. Mai 2026 um 20:13 Uhr schrieb David Anderson <
> > > > [email protected]>:
> > > >
> > > >> Fabian,
> > > >>
> > > >>> So, I don't think that we should buffer unmatched probe-side
> records
> > > >> beyond
> > > >> the flip point.
> > > >>
> > > >> Thanks for explaining your reasoning. Makes sense to me.
> > > >>
> > > >> David
> > > >>
> > > >> On Thu, May 28, 2026 at 6:55 PM Fabian Hueske <[email protected]>
> > > wrote:
> > > >>
> > > >>> Hi Xingcan,
> > > >>>
> > > >>> Thanks for your comments on the FLIP!
> > > >>>
> > > >>> The join's behavior when starting from a savepoint is indeed an
> > > important
> > > >>> aspect to consider and the problem of a rapidly advancing dimension
> > > >>> (build-side) table is of course real.
> > > >>>
> > > >>> I would argue that watermark alignment should significantly reduce
> the
> > > >>> impact of this.
> > > >>> If enabled, sources align their consumption based on their current
> > > >>> watermark such that the (presumably much smaller) build-side source
> > > would
> > > >>> be slowed down to the event-time progress of the probe-side.
> > > >>> While watermark alignment is not an "exact" mechanism, the
> semantics of
> > > >> the
> > > >>> new processing-time join also do not guarantee "exact" results.
> > > >>> At the same time, alignment should ensure that build and
> probe-side are
> > > >>> roughly aligned in event-time (without the strict guarantees that
> the
> > > >>> event-time temporal table join provides).
> > > >>>
> > > >>> However, I really like your idea of starting in event-time mode and
> > > >>> flipping to processing-time after the initialization duration
> passed.
> > > >>> I'm not sure if it would fully address the problem you described.
> As
> > > you
> > > >>> said, users would need to be able to reconfigure the flip-point
> and I'm
> > > >> not
> > > >>> sure if there's a good mechanism for this yet.
> > > >>> But it might have some other properties that would be beneficial,
> so
> > > I'll
> > > >>> think about that.
> > > >>>
> > > >>> Best,
> > > >>> Fabian
> > > >>>
> > > >>>
> > > >>> Am Do., 28. Mai 2026 um 18:21 Uhr schrieb Fabian Hueske <
> > > >> [email protected]
> > > >>>> :
> > > >>>
> > > >>>> Thanks for your feedback David!
> > > >>>>
> > > >>>>> One question: If I understand correctly, during the JOIN phase
> of an
> > > >>>> INNER
> > > >>>> join, if the desired build-side record is missing, nothing will be
> > > >>> emitted
> > > >>>> for the unmatched probe-side record. For an INNER join, I can
> imagine
> > > >>>> wanting to buffer unmatched probe-side records, expecting the
> build
> > > >> side
> > > >>>> will arrive soon. What's your thinking there?
> > > >>>>
> > > >>>> Your understanding is correct. If a probe-side record arrives
> during
> > > >> LOAD
> > > >>>> phase but no matching build-side record is received,
> > > >>>> the probe-side record would be discarded without being joined
> during
> > > >> the
> > > >>>> transition from LOAD to JOIN.
> > > >>>>
> > > >>>> I would argue that users that want to prevent this, would need to
> > > >>>> configure a longer initialization time.
> > > >>>> IMO, dropping unmatched probe records is not a "bad" property of
> INNER
> > > >>>> joins but an essential part of their semantics. It might even be
> > > >> desired
> > > >>> by
> > > >>>> some users.
> > > >>>> If we would buffer probe-side records for INNER joins beyond the
> > > >>>> transition point, we:
> > > >>>> * would have different behaviors for INNER and LEFT joins
> > > >>>> * could not start to emit probe-side watermarks as long as there
> are
> > > >>> still
> > > >>>> probe-side records buffered (or at least not advance past them
> without
> > > >>>> emitting late data at a later point of time)
> > > >>>> * would either need another config knob to specify when to
> "really"
> > > >> clean
> > > >>>> up the probe-side state or keep such unmatched records forever in
> > > state
> > > >>> (we
> > > >>>> could also use state TTL...)
> > > >>>>
> > > >>>> So, I don't think that we should buffer unmatched probe-side
> records
> > > >>>> beyond the flip point.
> > > >>>>
> > > >>>> Best, Fabian
> > > >>>>
> > > >>>> Am Do., 28. Mai 2026 um 17:05 Uhr schrieb Xingcan Cui <
> > > >>> [email protected]
> > > >>>>> :
> > > >>>>
> > > >>>>> Hi Fabian,
> > > >>>>>
> > > >>>>> Thanks for this FLIP! The two-phase design is excellent for
> avoiding
> > > >>>>> early-joining bugs while maintaining low-latency processing-time
> > > >>>>> semantics.
> > > >>>>>
> > > >>>>> After thinking more about the proposal, I'd like to point out an
> edge
> > > >>> case
> > > >>>>> related to the initialization phase or recovery after prolonged
> > > >> downtime
> > > >>>>> (for example, when a job has been down for a day). While a
> > > >>> processing-time
> > > >>>>> join works well for live streaming, where results can reasonably
> > > >> depend
> > > >>> on
> > > >>>>> the immediate arrival order of live data, it does not work as
> well
> > > for
> > > >>>>> catch-up scenarios.
> > > >>>>>
> > > >>>>> Currently, if a job initializes or restores from a checkpoint
> after a
> > > >>> long
> > > >>>>> downtime, the operator resumes directly in the processing-time
> join
> > > >>> phase.
> > > >>>>> During catch-up, however, the natural chronological arrival
> order of
> > > >> the
> > > >>>>> live data is completely lost. As a result, these replayed fact
> > > records
> > > >>> are
> > > >>>>> evaluated against the current machine time and may blindly join
> with
> > > >> the
> > > >>>>> rapidly advancing "current" dimension snapshot, rather than the
> > > >>> historical
> > > >>>>> versions they were originally supposed to match.
> > > >>>>>
> > > >>>>> To handle this edge case, could we consider:
> > > >>>>>
> > > >>>>> 1. changing the first phase into an event-time join phase, and
> > > >>>>>
> > > >>>>> 2. allowing the operator to switch back to the first phase after
> a
> > > >>>>> restart?
> > > >>>>>
> > > >>>>> For example, users could configure a timestamp threshold. Before
> the
> > > >>>>> watermark reaches that point, the operator would run as an
> event-time
> > > >>>>> versioned join to safely process the catch-up phase through
> watermark
> > > >>>>> alignment. Once the watermark passes the threshold, the operator
> > > could
> > > >>>>> purge the old multi-version state and seamlessly transition back
> to
> > > >> the
> > > >>>>> pure processing-time join phase for live traffic.
> > > >>>>>
> > > >>>>> After a job restart, users could either update the target
> timestamp
> > > to
> > > >>>>> reset the operator back into the event-time phase, or leave it
> > > >> unchanged
> > > >>>>> to
> > > >>>>> continue operating in the processing-time phase.
> > > >>>>>
> > > >>>>> I completely understand that this would introduce significant
> > > >> complexity
> > > >>>>> to
> > > >>>>> the operator's state management and lifecycle, so this is only a
> > > >>> tentative
> > > >>>>> proposal to explore whether it might be worth considering for the
> > > >>>>> long-term
> > > >>>>> robustness of the design.
> > > >>>>>
> > > >>>>> Best,
> > > >>>>>
> > > >>>>> Xingcan
> > > >>>>>
> > > >>>>> On Thu, May 28, 2026 at 8:17 AM David Anderson <
> [email protected]
> > > >
> > > >>>>> wrote:
> > > >>>>>
> > > >>>>>> I'm quite enthusiastic about this. I want to thank Fabian for
> > > >> putting
> > > >>>>>> together such a well-crafted FLIP. And I look forward to
> updating
> > > >> the
> > > >>>>>> awkward educational content this FLIP will make obsolete.
> > > >>>>>>
> > > >>>>>> To my mind, the syntax expresses the semantics of this join
> rather
> > > >>> well.
> > > >>>>>>
> > > >>>>>> Until now, developers using event-time temporal joins sometimes
> > > >>>>> resorted to
> > > >>>>>> doing weird things with watermarks to handle a build side that's
> > > >>> mostly
> > > >>>>>> idle; this lateral snapshot join is clearly better -- not to
> mention
> > > >>> the
> > > >>>>>> added bonus of pre-loading the build table.
> > > >>>>>>
> > > >>>>>> One question: If I understand correctly, during the JOIN phase
> of an
> > > >>>>> INNER
> > > >>>>>> join, if the desired build-side record is missing, nothing will
> be
> > > >>>>> emitted
> > > >>>>>> for the unmatched probe-side record. For an INNER join, I can
> > > >> imagine
> > > >>>>>> wanting to buffer unmatched probe-side records, expecting the
> build
> > > >>> side
> > > >>>>>> will arrive soon. What's your thinking there?
> > > >>>>>>
> > > >>>>>> David
> > > >>>>>>
> > > >>>>>> On Wed, May 27, 2026 at 12:44 PM Fabian Hueske <
> [email protected]>
> > > >>>>> wrote:
> > > >>>>>>
> > > >>>>>>> Thanks Gustavo and Timo for the positive feedback!
> > > >>>>>>>
> > > >>>>>>> I'd like to bump this thread up to collect more feedback.
> > > >>>>>>> If there are no more responses, I will start a vote on this
> FLIP
> > > >>> next
> > > >>>>>>> Monday, June 1st.
> > > >>>>>>>
> > > >>>>>>> Best, Fabian
> > > >>>>>>>
> > > >>>>>>> Am Do., 21. Mai 2026 um 12:15 Uhr schrieb Timo Walther <
> > > >>>>>> [email protected]
> > > >>>>>>>> :
> > > >>>>>>>
> > > >>>>>>>> Hi Fabian,
> > > >>>>>>>>
> > > >>>>>>>> thanks for proposing this FLIP. I agree that this join is
> super
> > > >>>>> common,
> > > >>>>>>>> after talking to many people at conferences, I could imagine
> it
> > > >>>>> will be
> > > >>>>>>>> one of the most used kinds of joins going forward.
> > > >>>>>>>>
> > > >>>>>>>> Tightly coupling it with watermarks fits both from a
> semantical
> > > >>>>> point
> > > >>>>>> of
> > > >>>>>>>> view but also with other efforts such as FLIP-558
> (Improvements
> > > >> to
> > > >>>>>>>> SinkUpsertMaterializer and changelog disorder) [1]. In the
> near
> > > >>>>> future,
> > > >>>>>>>> we should work on more automated watermarking to power these
> > > >>>>>>>> watermark-based operators, but this is an orthogonal effort.
> > > >>>>>>>>
> > > >>>>>>>> Overall I'm strongly +1 on this. Also +1 on the syntax
> > > >>> improvements
> > > >>>>> for
> > > >>>>>>>> lateral table functions by dropping the TABLE() wrapper.
> > > >>>>>>>>
> > > >>>>>>>> Cheers,
> > > >>>>>>>> Timo
> > > >>>>>>>>
> > > >>>>>>>> [1]
> > > >>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>
> > > >>>>>>
> > > >>>>>
> > > >>>
> > > >>
> > >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-558%3A+Improvements+to+SinkUpsertMaterializer+and+changelog+disorder
> > > >>>>>>>>
> > > >>>>>>>> On 18.05.26 11:47, Gustavo de Morais wrote:
> > > >>>>>>>>> Hi Fabian,
> > > >>>>>>>>>
> > > >>>>>>>>> In general a strong +1 for the feature, without getting into
> > > >> the
> > > >>>>>>> details
> > > >>>>>>>> of
> > > >>>>>>>>> the FLIP yet. This is a missing feature for years and I'm
> > > >> happy
> > > >>>>> that
> > > >>>>>>>> we're
> > > >>>>>>>>> putting the time to address this - while also getting rid of
> > > >>> some
> > > >>>>> of
> > > >>>>>>> the
> > > >>>>>>>>> hard restrictions we had. Thanks!
> > > >>>>>>>>>
> > > >>>>>>>>> Kind regards,
> > > >>>>>>>>> Gustavo
> > > >>>>>>>>>
> > > >>>>>>>>> On Fri, 15 May 2026 at 16:39, Fabian Hueske <
> > > >> [email protected]
> > > >>>>
> > > >>>>>>> wrote:
> > > >>>>>>>>>
> > > >>>>>>>>>> Hi everyone,
> > > >>>>>>>>>>
> > > >>>>>>>>>> I'd like to start a discussion on FLIP-579: LATERAL SNAPSHOT
> > > >>> Join
> > > >>>>>> [1].
> > > >>>>>>>>>>
> > > >>>>>>>>>> Enriching a stream with data from a (slowly changing)
> dynamic
> > > >>>>> table
> > > >>>>>>> is a
> > > >>>>>>>>>> super common use case.
> > > >>>>>>>>>> Flink SQL features Temporal Joins [2] to address these use
> > > >>> cases.
> > > >>>>>>>>>> However, SQL users can only use the event-time variant which
> > > >>> has
> > > >>>>>> many
> > > >>>>>>>>>> limitations (heavy dependency on frequent WM updates on both
> > > >>>>> inputs,
> > > >>>>>>>>>> build-side table requires a PK, the join predicate must
> > > >> include
> > > >>>>> the
> > > >>>>>>>>>> build-side PK, etc).
> > > >>>>>>>>>> The processing-time temporal join is disabled (due to
> > > >>> build-side
> > > >>>>>>>>>> initialization issues [3]) and temporal table function joins
> > > >>> are
> > > >>>>>>>>>> only available in Table API.
> > > >>>>>>>>>>
> > > >>>>>>>>>> FLIP-579 proposes a new temporal join operator that operates
> > > >> in
> > > >>>>>>>>>> processing-time and addresses the limitations of the
> existing
> > > >>>>>>>>>> implementations:
> > > >>>>>>>>>> * initialization of the build-side before joining
> > > >>>>>>>>>> * no requirement of continuous, frequent build-side WMs
> > > >> (after
> > > >>>>> the
> > > >>>>>>>>>> initialization completed)
> > > >>>>>>>>>> * no requirement for a PK on the build-side
> > > >>>>>>>>>> * table function-based syntax [4] via a built-in SNAPSHOT
> > > >>>>> function
> > > >>>>>>>>>> (proposed in FLIP-517 [4])
> > > >>>>>>>>>>
> > > >>>>>>>>>> Looking forward to your feedback.
> > > >>>>>>>>>>
> > > >>>>>>>>>> Best,
> > > >>>>>>>>>> Fabian
> > > >>>>>>>>>>
> > > >>>>>>>>>> [1]
> > > >>>>>>>>>>
> > > >>>>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>
> > > >>>>>>
> > > >>>>>
> > > >>>
> > > >>
> > >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-579%3A+LATERAL+SNAPSHOT+Join
> > > >>>>>>>>>> [2]
> > > >>>>>>>>>>
> > > >>>>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>
> > > >>>>>>
> > > >>>>>
> > > >>>
> > > >>
> > >
> https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/table/sql/queries/joins/#temporal-joins
> > > >>>>>>>>>> [3] https://issues.apache.org/jira/browse/FLINK-19830
> > > >>>>>>>>>> [4]
> > > >>>>>>>>>>
> > > >>>>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>
> > > >>>>>>
> > > >>>>>
> > > >>>
> > > >>
> > >
> https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/table/sql/queries/joins/#temporal-table-function-join
> > > >>>>>>>>>> [5]
> > > >>>>>>>>>>
> > > >>>>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>
> > > >>>>>>
> > > >>>>>
> > > >>>
> > > >>
> > >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-517%3A+Better+Handling+of+Dynamic+Table+Primitives+with+PTFs#FLIP517:BetterHandlingofDynamicTablePrimitiveswithPTFs-SNAPSHOTfortemporaljoins
> > > >>>>>>>>>>
> > > >>>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>
> > > >>>>>>
> > > >>>>>
> > > >>>>
> > > >>>
> > > >>
> > >
> > >
>

Reply via email to