Hi Fabian and Gustavo,

Thanks for the detailed reply. I agree that both the backlog-based
exact flip point and InputSelectable for probe-side buffering hit real
framework-level limitations today and deserve dedicated FLIPs rather
than being squeezed into this one. I do think both will become real
pain points at large-scale production usage, and I hope they can be a
key focus of future work. But the current proposal is already a solid
v1 that covers the majority of use cases, and we can iterate it based
on user feedback. +1 from me.

Best,
Jark

On Thu, 4 Jun 2026 at 23:56, Fabian Hueske <[email protected]> wrote:
>
> Hi everyone,
> thanks for the valuable feedback and discussions!
>
> If there are no further questions, I'd like to (re)start the vote for the
> FLIP tomorrow.
>
> Best,
> Fabian
>
> Am Do., 4. Juni 2026 um 13:06 Uhr schrieb Leonard Xu <[email protected]>:
>
> > The proposed metrics look good to me, thanks for the supplement.
> >
> > Best,
> > Leonard
> >
> > > 2026 6月 4 01:23,Fabian Hueske <[email protected]> 写道:
> > >
> > > Thanks Leonard,
> > >
> > > I've added a section about operator metrics to the proposal [1].
> > > If you have ideas for other useful metrics, please let me know and I'll
> > add
> > > them.
> > >
> > > Best, Fabian
> > >
> > > [1]
> > >
> > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=421958523#FLIP579:LATERALSNAPSHOTJoin-Metrics
> > >
> > > Am Mi., 3. Juni 2026 um 10:18 Uhr schrieb Fabian Hueske <
> > [email protected]
> > >> :
> > >
> > >> Thank you Hongshun for your feedback!
> > >>
> > >> You are right that restricting the probe-side input to append-only
> > changes
> > >> deviates from the existing event-time (and disabled proc-time) temporal
> > >> table joins.
> > >> I chose this restriction because processing time semantics do not work
> > >> well with retractions.
> > >> A probe record of +(k1, p1) could join against a build-side of (k1, v1),
> > >> while a later arriving probe-side retraction -(k1, p1) would join
> > against
> > >> (k1, v2). The resulting rectraction record of -(k1, p1, v2) would not
> > match
> > >> the earlier insertion +(k1, p1, v1).
> > >> The only changelog format that would work well would be upsert changes
> > >> (+I, +UA, -D) with key-only deletes (under the assumption that the
> > upsert
> > >> key is still unique after the join).
> > >>
> > >> I would like to keep the default behavior of only accepting append-only
> > >> inputs to prevent retraction mismatches.
> > >> At the same time, the operator implementation only needs to be slightly
> > >> adjusted to support arbitrary probe-side changes (if at all). The real
> > >> change would be in the planner / rules.
> > >> So allowing probe-side retraction input via a config option for
> > >> power-users who know what they are doing is certainly possible.
> > >>
> > >> Do you think this should be part of the proposal or would you be fine to
> > >> leave this as future work?
> > >>
> > >> Best, Fabian
> > >>
> > >> Am Mi., 3. Juni 2026 um 08:41 Uhr schrieb Leonard Xu <[email protected]
> > >:
> > >>
> > >>> Hi Fabian,
> > >>>
> > >>> Thanks for the detailed and thoughtful reply, and especially for
> > agreeing
> > >>> to add operator metrics — that alone will significantly improve the
> > >>> debuggability of the join in production.
> > >>>
> > >>> (1) On CPU spike / probe-side buffering
> > >>>
> > >>> +1 for your reasoning on both points — deferring InputSelectable until
> > >>> the unaligned-checkpoint limitation is resolved, and leaving
> > micro-batch
> > >>> transition out of v1. Watermark alignment + probe-side scan-offset is
> > the
> > >>> principled clean approach, agreed.
> > >>>
> > >>> (2) On Jark's backlog idea
> > >>>
> > >>> Between the two paths in your reply, I'd lean toward option (a) —
> > having
> > >>> the source connector emit a special WM (or a dedicated record
> > attribute) at
> > >>> the end of backlog — as the long-term direction for exact flip-point
> > >>> semantics. It naturally fits what is almost certainly going to be the
> > >>> dominant production scenario: CDC sources (mysql-cdc, mongodb-cdc,
> > >>> postgres-cdc, ...) as the dimension build-side, all of which already
> > have
> > >>> an explicit "snapshot finished → binlog start" boundary internally.
> > >>>
> > >>> My suggestion is to ship FLIP-579 as proposed and collect user feedback
> > >>> on LATERAL SNAPSHOT Join from real production usage — that will give us
> > >>> solid evidence to prioritize this follow-up direction.
> > >>>
> > >>> (3) On the vote
> > >>>
> > >>> Once the metrics section is added and there are no further objections
> > >>> from others, I'm fine to start the vote.
> > >>>
> > >>>
> > >>> Best,
> > >>> Leonard
> > >>>
> > >>>
> > >>>
> > >>>> 2026 6月 3 01:36,Fabian Hueske <[email protected]> 写道:
> > >>>>
> > >>>> Thanks for your valuable feedback Jark and Leonard!
> > >>>>
> > >>>> You are bringing up three of the tricky challenges that the new join
> > >>> needs
> > >>>> to deal with.
> > >>>>
> > >>>> (1) Jark: The build-side flip point is not exact
> > >>>>
> > >>>> This is correct. However, I would argue that a processing-time join
> > does
> > >>>> not have exact guarantees anyway and can only produce roughly
> > >>> time-aligned
> > >>>> results. WM alignment of build and probe-side input should help to
> > keep
> > >>> the
> > >>>> alignment somewhat close. Of course, this does not mean that we
> > >>> shouldn't
> > >>>> try to give as good guarantees as possible.
> > >>>>
> > >>>> The primary mechanism for flipping from LOAD to JOIN phase is the
> > >>>> build-side watermark crossing a configured point in time. Watermarks
> > are
> > >>>> used to track progress and completeness in Flink. Using them as a
> > >>> condition
> > >>>> to switch from LOAD to JOIN phase, means that the build-side received
> > at
> > >>>> least all changes up to that point in time. There might have been
> > >>> changes
> > >>>> with later timestamps as well. These could be buffered on the side to
> > >>> have
> > >>>> a stricter FLIP point, but IMO this additional data should be
> > tolerable
> > >>>> under proc-time semantics.
> > >>>>
> > >>>> If the build-side input becomes stale, the processing idle timeout
> > flip
> > >>>> condition gets applied. The assumption is that the build-side source
> > is
> > >>>> currently exhausted and all data was consumed but the WM didn't
> > progress
> > >>>> far enough to exceed the flip point. In this case, we want to flip and
> > >>>> start the regular JOIN phase.
> > >>>>
> > >>>> For the use case of sources with an exact flip point, users would need
> > >>> to
> > >>>> know the timestamp of the last backlog record (or compute it if they
> > >>> know
> > >>>> roughly how long it takes to scan the backlog if it is computed
> > >>> on-demand).
> > >>>> I agree, this is not very practical.
> > >>>> I can think of two options
> > >>>> a) the source connector emits a special WM when it reaches the end of
> > >>> the
> > >>>> backlog. This would not require changes to the join operator but to
> > the
> > >>>> source connectors.
> > >>>> b) The design of the SNAPSHOT function has the
> > >>> `load_completed_condition`
> > >>>> which is an extension point to add logic to determine the flip point.
> > >>>>
> > >>>>
> > >>>> (2) Jark: Buffering probe-side during LOAD phase
> > >>>>
> > >>>> I think this is very similar to Leonard's point about "LOAD phase
> > >>>> backpressure" and also closely related to Leonard's point about
> > >>> "Flip-point
> > >>>> CPU spike".
> > >>>>
> > >>>> This is indeed a potential problem. If used without care, the
> > probe-side
> > >>>> state might grow very large. Before talking about possible ways to
> > >>> address
> > >>>> this problem, let me explain how I think that the join would be used.
> > >>>>
> > >>>> A very common (maybe the most common?) use case should be to
> > initialize
> > >>> the
> > >>>> build-side input up to time t_b and then start processing the
> > probe-side
> > >>>> input from time t_p (with t_p = t_b, or slightly less than t_b)
> > >>> onwards. WM
> > >>>> alignment would help to roughly align build-side and probe-side inputs
> > >>>> (although not being perfectly aligned like the event-time join)
> > >>>> Initializing the build-side up to t_b and starting consuming the
> > >>> probe-side
> > >>>> from t_p with (t_p << t_b) would mean that the first probe-side
> > records
> > >>> are
> > >>>> joined with much later versions of the build-side.
> > >>>>
> > >>>> The first scenario (t_p = t_b) can be controlled with WM alignment and
> > >>>> scan-offset table hints on the probe-side input. Since the WMs of the
> > >>>> build-side input would be less than the WMs of the probe-side input,
> > the
> > >>>> probe-side input should be throttled until the build-side caught up
> > >>> (which
> > >>>> should be close the the flip point).
> > >>>> Other scenarios (including the t_p << t_b scenario) would benefit from
> > >>> an
> > >>>> idea [1] that is described in the future work section of the FLIP.
> > That
> > >>>> mechanism would also be based on WM alignment and would need some
> > >>>> collaboration from the build-side source operator to indicate
> > >>> completeness
> > >>>> of the backlog.
> > >>>>
> > >>>> Using the InputSelectable interface is an idea that we also looked
> > into
> > >>> (as
> > >>>> Gustavo already pointed out). Unfortunately, it is incompatible with
> > >>>> unaligned checkpoints and there are no other streaming operators that
> > >>>> implement the interface. I haven't looked in depth at the current
> > >>>> limitations, but if some of these would be resolved, it might be
> > >>> possible
> > >>>> to later extend the join operator. It might even work with relaxed
> > >>>> guarantees because we don't need to fully block the input but just
> > >>> throttle
> > >>>> it such that less probe-side data needs to be buffered.
> > >>>>
> > >>>> The idea of limiting the size of the probe-side buffer with a config
> > >>> like
> > >>>> `max-buffer-size` sounds interesting. However, I'm not sure if
> > applying
> > >>>> backpressure would really work because we still need to consume the
> > >>>> build-side to be able to reach the flip point and selective
> > >>> backpressure is
> > >>>> not possible without InputSelectible.
> > >>>>
> > >>>> An earlier draft of the proposal described eager joining (now moved to
> > >>> the
> > >>>> future work section [2]). The idea is that a probe-side record would
> > be
> > >>>> directly joined when a match was present or received during the LOAD
> > >>> phase.
> > >>>> After joining it wouldn't be put into state. This would of course
> > >>>> significantly reduce the state size and solved the issue of CPU spikes
> > >>>> during the transition but come at the cost of hard-to-explain
> > semantics
> > >>>> (the current semantics are rather simple, we collect until the flip
> > >>> point
> > >>>> and join against that).
> > >>>> Also, the idea of eager joining was developed when the join operator
> > was
> > >>>> still restricted to FK-PK joins (single build-side record per join
> > key).
> > >>>> The design generalized this restriction to arbitrary joins which means
> > >>>> there might be more build-side matches for a probe-side record such
> > that
> > >>>> the presence of a single join match (of a possibly earlier version)
> > does
> > >>>> not guarantee completeness anymore. That's why the idea of eager
> > joining
> > >>>> was discarded for now.
> > >>>>
> > >>>>
> > >>>> (3) Leonard: Flip-point CPU spikes
> > >>>>
> > >>>> This is also a very valid concern. I would argue that the primary
> > >>> mechanism
> > >>>> to address this point should be to reduce the amount of buffered
> > >>> probe-side
> > >>>> records (see point (2) above).
> > >>>>
> > >>>> I also thought about your idea to micro-batch the draining. In the
> > >>> design,
> > >>>> the transition join is triggered per-key by event-time timers that
> > also
> > >>>> emit the "current" probe-side WM downstream. If we want to continue
> > >>> using
> > >>>> this mechanism, we would need to schedule multiple timers and probably
> > >>> use
> > >>>> some clever mechanism to gradually advance probe-side WMs while still
> > >>>> consuming records. There could be a separate "TRANSITION" phase during
> > >>>> which we still append to the probe-buffer but use the mechanism for
> > >>> atomic
> > >>>> build-side updates. However, this would significantly complicate the
> > >>> design
> > >>>> affecting the control flow and recovery. Hence, I would first try to
> > >>>> address this issue by reducing the probe-side state.
> > >>>>
> > >>>> If you have better ideas for how to use micro-batching during flip
> > >>>> transition, I'm very open to exploring those.
> > >>>>
> > >>>> Leonard also brought up a very important point about metrics!
> > >>>> I will add a section on operator metrics that will help to understand
> > >>> the
> > >>>> state of the operator.
> > >>>>
> > >>>> Please let me know your thoughts!
> > >>>>
> > >>>> Best, Fabian
> > >>>>
> > >>>> [1]
> > >>>>
> > >>>
> > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=421958523#FLIP579:LATERALSNAPSHOTJoin-ReduceBufferingofProbe-SideviaBuild-SideWatermarkSuppression
> > >>>> [2]
> > >>>>
> > >>>
> > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=421958523#FLIP579:LATERALSNAPSHOTJoin-EagerjoinmodeduringLOADphase
> > >>>>
> > >>>>
> > >>>> Am Mo., 1. Juni 2026 um 16:44 Uhr schrieb Leonard Xu <
> > [email protected]
> > >>>> :
> > >>>>
> > >>>>> Hi Fabian
> > >>>>>
> > >>>>> Thanks for driving this FLIP and your kind patience. The motivation
> > is
> > >>>>> spot-on, and the LOAD→JOIN two-phase design is the right structural
> > >>> fix for
> > >>>>> the FLINK-19830 initialization problem. Overall direction +1 from my
> > >>> side.
> > >>>>>
> > >>>>> Besides Jark’s idea about backlog and InputSelectable which may need
> > >>> more
> > >>>>> prerequisites, I’ve two concerns about current proposal:
> > >>>>>
> > >>>>> 1. LOAD phase backpressure. The FLIP assumes "seconds to a few
> > minutes"
> > >>>>> for build-side init, but nothing enforces it. Large build-side tables
> > >>>>> (e.g., 50M rows) + fast probe streams → unbuffered probe-side state
> > >>>>> explosion. Should we add a config  like max-buffer-size that applies
> > >>>>> backpressure when exceeded or some metrics about buffer, rather than
> > >>>>> silently piling up records?
> > >>>>>
> > >>>>> 2. Flip-point CPU spike. Joining all buffered probe records ×
> > >>> build-side
> > >>>>> state in one shot differs fundamentally from event-time join's
> > >>> incremental
> > >>>>> watermark-batched emission. In the worst case this could cause a
> > >>>>> TaskManager CPU spike and downstream shock. Worth considering
> > >>> micro-batch
> > >>>>> draining during flip transition?
> > >>>>>
> > >>>>> Looking forward to your thoughts.
> > >>>>>
> > >>>>> Best,
> > >>>>> Leonard
> > >>>>>
> > >>>>>> 2026 6月 1 16:02,Fabian Hueske <[email protected]> 写道:
> > >>>>>>
> > >>>>>> Hi Leonard,
> > >>>>>>
> > >>>>>> Sorry, missed your email and already started the vote.
> > >>>>>> Let me put it on hold for now and continue discussing the proposal.
> > >>>>>>
> > >>>>>> Looking forward to your comments,
> > >>>>>> Fabian
> > >>>>>>
> > >>>>>> Am Mo., 1. Juni 2026 um 09:56 Uhr schrieb Leonard Xu <
> > >>> [email protected]
> > >>>>>> :
> > >>>>>>
> > >>>>>>> @Fabian Thanks for driving this FLIP, sorry for late reply due to
> > my
> > >>>>>>> personal reason that I shouldn’t miss such an important FLIP.
> > >>>>>>>
> > >>>>>>> I’m reviewing the FLIP and will try to finish it today, could you
> > >>> kindly
> > >>>>>>> wait one minute to start the vote?
> > >>>>>>>
> > >>>>>>> And sorry for interrupt your plan again.
> > >>>>>>>
> > >>>>>>> Best,
> > >>>>>>> Leonard
> > >>>>>>>
> > >>>>>>>> 2026 6月 1 15:51,Fabian Hueske <[email protected]> 写道:
> > >>>>>>>>
> > >>>>>>>> Thanks everyone for your comments on the FLIP.
> > >>>>>>>> I will start the vote.
> > >>>>>>>>
> > >>>>>>>> Best, Fabian
> > >>>>>>>>
> > >>>>>>>> Am Do., 28. Mai 2026 um 20:13 Uhr schrieb David Anderson <
> > >>>>>>>> [email protected]>:
> > >>>>>>>>
> > >>>>>>>>> Fabian,
> > >>>>>>>>>
> > >>>>>>>>>> So, I don't think that we should buffer unmatched probe-side
> > >>> records
> > >>>>>>>>> beyond
> > >>>>>>>>> the flip point.
> > >>>>>>>>>
> > >>>>>>>>> Thanks for explaining your reasoning. Makes sense to me.
> > >>>>>>>>>
> > >>>>>>>>> David
> > >>>>>>>>>
> > >>>>>>>>> On Thu, May 28, 2026 at 6:55 PM Fabian Hueske <[email protected]
> > >
> > >>>>>>> wrote:
> > >>>>>>>>>
> > >>>>>>>>>> Hi Xingcan,
> > >>>>>>>>>>
> > >>>>>>>>>> Thanks for your comments on the FLIP!
> > >>>>>>>>>>
> > >>>>>>>>>> The join's behavior when starting from a savepoint is indeed an
> > >>>>>>> important
> > >>>>>>>>>> aspect to consider and the problem of a rapidly advancing
> > >>> dimension
> > >>>>>>>>>> (build-side) table is of course real.
> > >>>>>>>>>>
> > >>>>>>>>>> I would argue that watermark alignment should significantly
> > reduce
> > >>>>> the
> > >>>>>>>>>> impact of this.
> > >>>>>>>>>> If enabled, sources align their consumption based on their
> > current
> > >>>>>>>>>> watermark such that the (presumably much smaller) build-side
> > >>> source
> > >>>>>>> would
> > >>>>>>>>>> be slowed down to the event-time progress of the probe-side.
> > >>>>>>>>>> While watermark alignment is not an "exact" mechanism, the
> > >>> semantics
> > >>>>> of
> > >>>>>>>>> the
> > >>>>>>>>>> new processing-time join also do not guarantee "exact" results.
> > >>>>>>>>>> At the same time, alignment should ensure that build and
> > >>> probe-side
> > >>>>> are
> > >>>>>>>>>> roughly aligned in event-time (without the strict guarantees
> > that
> > >>> the
> > >>>>>>>>>> event-time temporal table join provides).
> > >>>>>>>>>>
> > >>>>>>>>>> However, I really like your idea of starting in event-time mode
> > >>> and
> > >>>>>>>>>> flipping to processing-time after the initialization duration
> > >>> passed.
> > >>>>>>>>>> I'm not sure if it would fully address the problem you
> > described.
> > >>> As
> > >>>>>>> you
> > >>>>>>>>>> said, users would need to be able to reconfigure the flip-point
> > >>> and
> > >>>>> I'm
> > >>>>>>>>> not
> > >>>>>>>>>> sure if there's a good mechanism for this yet.
> > >>>>>>>>>> But it might have some other properties that would be
> > beneficial,
> > >>> so
> > >>>>>>> I'll
> > >>>>>>>>>> think about that.
> > >>>>>>>>>>
> > >>>>>>>>>> Best,
> > >>>>>>>>>> Fabian
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>> Am Do., 28. Mai 2026 um 18:21 Uhr schrieb Fabian Hueske <
> > >>>>>>>>> [email protected]
> > >>>>>>>>>>> :
> > >>>>>>>>>>
> > >>>>>>>>>>> Thanks for your feedback David!
> > >>>>>>>>>>>
> > >>>>>>>>>>>> One question: If I understand correctly, during the JOIN phase
> > >>> of
> > >>>>> an
> > >>>>>>>>>>> INNER
> > >>>>>>>>>>> join, if the desired build-side record is missing, nothing will
> > >>> be
> > >>>>>>>>>> emitted
> > >>>>>>>>>>> for the unmatched probe-side record. For an INNER join, I can
> > >>>>> imagine
> > >>>>>>>>>>> wanting to buffer unmatched probe-side records, expecting the
> > >>> build
> > >>>>>>>>> side
> > >>>>>>>>>>> will arrive soon. What's your thinking there?
> > >>>>>>>>>>>
> > >>>>>>>>>>> Your understanding is correct. If a probe-side record arrives
> > >>> during
> > >>>>>>>>> LOAD
> > >>>>>>>>>>> phase but no matching build-side record is received,
> > >>>>>>>>>>> the probe-side record would be discarded without being joined
> > >>> during
> > >>>>>>>>> the
> > >>>>>>>>>>> transition from LOAD to JOIN.
> > >>>>>>>>>>>
> > >>>>>>>>>>> I would argue that users that want to prevent this, would need
> > to
> > >>>>>>>>>>> configure a longer initialization time.
> > >>>>>>>>>>> IMO, dropping unmatched probe records is not a "bad" property
> > of
> > >>>>> INNER
> > >>>>>>>>>>> joins but an essential part of their semantics. It might even
> > be
> > >>>>>>>>> desired
> > >>>>>>>>>> by
> > >>>>>>>>>>> some users.
> > >>>>>>>>>>> If we would buffer probe-side records for INNER joins beyond
> > the
> > >>>>>>>>>>> transition point, we:
> > >>>>>>>>>>> * would have different behaviors for INNER and LEFT joins
> > >>>>>>>>>>> * could not start to emit probe-side watermarks as long as
> > there
> > >>> are
> > >>>>>>>>>> still
> > >>>>>>>>>>> probe-side records buffered (or at least not advance past them
> > >>>>> without
> > >>>>>>>>>>> emitting late data at a later point of time)
> > >>>>>>>>>>> * would either need another config knob to specify when to
> > >>> "really"
> > >>>>>>>>> clean
> > >>>>>>>>>>> up the probe-side state or keep such unmatched records forever
> > in
> > >>>>>>> state
> > >>>>>>>>>> (we
> > >>>>>>>>>>> could also use state TTL...)
> > >>>>>>>>>>>
> > >>>>>>>>>>> So, I don't think that we should buffer unmatched probe-side
> > >>> records
> > >>>>>>>>>>> beyond the flip point.
> > >>>>>>>>>>>
> > >>>>>>>>>>> Best, Fabian
> > >>>>>>>>>>>
> > >>>>>>>>>>> Am Do., 28. Mai 2026 um 17:05 Uhr schrieb Xingcan Cui <
> > >>>>>>>>>> [email protected]
> > >>>>>>>>>>>> :
> > >>>>>>>>>>>
> > >>>>>>>>>>>> Hi Fabian,
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> Thanks for this FLIP! The two-phase design is excellent for
> > >>>>> avoiding
> > >>>>>>>>>>>> early-joining bugs while maintaining low-latency
> > processing-time
> > >>>>>>>>>>>> semantics.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> After thinking more about the proposal, I'd like to point out
> > an
> > >>>>> edge
> > >>>>>>>>>> case
> > >>>>>>>>>>>> related to the initialization phase or recovery after
> > prolonged
> > >>>>>>>>> downtime
> > >>>>>>>>>>>> (for example, when a job has been down for a day). While a
> > >>>>>>>>>> processing-time
> > >>>>>>>>>>>> join works well for live streaming, where results can
> > reasonably
> > >>>>>>>>> depend
> > >>>>>>>>>> on
> > >>>>>>>>>>>> the immediate arrival order of live data, it does not work as
> > >>> well
> > >>>>>>> for
> > >>>>>>>>>>>> catch-up scenarios.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> Currently, if a job initializes or restores from a checkpoint
> > >>>>> after a
> > >>>>>>>>>> long
> > >>>>>>>>>>>> downtime, the operator resumes directly in the processing-time
> > >>> join
> > >>>>>>>>>> phase.
> > >>>>>>>>>>>> During catch-up, however, the natural chronological arrival
> > >>> order
> > >>>>> of
> > >>>>>>>>> the
> > >>>>>>>>>>>> live data is completely lost. As a result, these replayed fact
> > >>>>>>> records
> > >>>>>>>>>> are
> > >>>>>>>>>>>> evaluated against the current machine time and may blindly
> > join
> > >>>>> with
> > >>>>>>>>> the
> > >>>>>>>>>>>> rapidly advancing "current" dimension snapshot, rather than
> > the
> > >>>>>>>>>> historical
> > >>>>>>>>>>>> versions they were originally supposed to match.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> To handle this edge case, could we consider:
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> 1. changing the first phase into an event-time join phase, and
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> 2. allowing the operator to switch back to the first phase
> > >>> after a
> > >>>>>>>>>>>> restart?
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> For example, users could configure a timestamp threshold.
> > Before
> > >>>>> the
> > >>>>>>>>>>>> watermark reaches that point, the operator would run as an
> > >>>>> event-time
> > >>>>>>>>>>>> versioned join to safely process the catch-up phase through
> > >>>>> watermark
> > >>>>>>>>>>>> alignment. Once the watermark passes the threshold, the
> > operator
> > >>>>>>> could
> > >>>>>>>>>>>> purge the old multi-version state and seamlessly transition
> > >>> back to
> > >>>>>>>>> the
> > >>>>>>>>>>>> pure processing-time join phase for live traffic.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> After a job restart, users could either update the target
> > >>> timestamp
> > >>>>>>> to
> > >>>>>>>>>>>> reset the operator back into the event-time phase, or leave it
> > >>>>>>>>> unchanged
> > >>>>>>>>>>>> to
> > >>>>>>>>>>>> continue operating in the processing-time phase.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> I completely understand that this would introduce significant
> > >>>>>>>>> complexity
> > >>>>>>>>>>>> to
> > >>>>>>>>>>>> the operator's state management and lifecycle, so this is
> > only a
> > >>>>>>>>>> tentative
> > >>>>>>>>>>>> proposal to explore whether it might be worth considering for
> > >>> the
> > >>>>>>>>>>>> long-term
> > >>>>>>>>>>>> robustness of the design.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> Best,
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> Xingcan
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> On Thu, May 28, 2026 at 8:17 AM David Anderson <
> > >>>>> [email protected]
> > >>>>>>>>
> > >>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>> I'm quite enthusiastic about this. I want to thank Fabian for
> > >>>>>>>>> putting
> > >>>>>>>>>>>>> together such a well-crafted FLIP. And I look forward to
> > >>> updating
> > >>>>>>>>> the
> > >>>>>>>>>>>>> awkward educational content this FLIP will make obsolete.
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> To my mind, the syntax expresses the semantics of this join
> > >>> rather
> > >>>>>>>>>> well.
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> Until now, developers using event-time temporal joins
> > sometimes
> > >>>>>>>>>>>> resorted to
> > >>>>>>>>>>>>> doing weird things with watermarks to handle a build side
> > >>> that's
> > >>>>>>>>>> mostly
> > >>>>>>>>>>>>> idle; this lateral snapshot join is clearly better -- not to
> > >>>>> mention
> > >>>>>>>>>> the
> > >>>>>>>>>>>>> added bonus of pre-loading the build table.
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> One question: If I understand correctly, during the JOIN
> > phase
> > >>> of
> > >>>>> an
> > >>>>>>>>>>>> INNER
> > >>>>>>>>>>>>> join, if the desired build-side record is missing, nothing
> > >>> will be
> > >>>>>>>>>>>> emitted
> > >>>>>>>>>>>>> for the unmatched probe-side record. For an INNER join, I can
> > >>>>>>>>> imagine
> > >>>>>>>>>>>>> wanting to buffer unmatched probe-side records, expecting the
> > >>>>> build
> > >>>>>>>>>> side
> > >>>>>>>>>>>>> will arrive soon. What's your thinking there?
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> David
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> On Wed, May 27, 2026 at 12:44 PM Fabian Hueske <
> > >>>>> [email protected]>
> > >>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>> Thanks Gustavo and Timo for the positive feedback!
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> I'd like to bump this thread up to collect more feedback.
> > >>>>>>>>>>>>>> If there are no more responses, I will start a vote on this
> > >>> FLIP
> > >>>>>>>>>> next
> > >>>>>>>>>>>>>> Monday, June 1st.
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> Best, Fabian
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> Am Do., 21. Mai 2026 um 12:15 Uhr schrieb Timo Walther <
> > >>>>>>>>>>>>> [email protected]
> > >>>>>>>>>>>>>>> :
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> Hi Fabian,
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> thanks for proposing this FLIP. I agree that this join is
> > >>> super
> > >>>>>>>>>>>> common,
> > >>>>>>>>>>>>>>> after talking to many people at conferences, I could
> > imagine
> > >>> it
> > >>>>>>>>>>>> will be
> > >>>>>>>>>>>>>>> one of the most used kinds of joins going forward.
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> Tightly coupling it with watermarks fits both from a
> > >>> semantical
> > >>>>>>>>>>>> point
> > >>>>>>>>>>>>> of
> > >>>>>>>>>>>>>>> view but also with other efforts such as FLIP-558
> > >>> (Improvements
> > >>>>>>>>> to
> > >>>>>>>>>>>>>>> SinkUpsertMaterializer and changelog disorder) [1]. In the
> > >>> near
> > >>>>>>>>>>>> future,
> > >>>>>>>>>>>>>>> we should work on more automated watermarking to power
> > these
> > >>>>>>>>>>>>>>> watermark-based operators, but this is an orthogonal
> > effort.
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> Overall I'm strongly +1 on this. Also +1 on the syntax
> > >>>>>>>>>> improvements
> > >>>>>>>>>>>> for
> > >>>>>>>>>>>>>>> lateral table functions by dropping the TABLE() wrapper.
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> Cheers,
> > >>>>>>>>>>>>>>> Timo
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> [1]
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>
> > >>>>>
> > >>>
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-558%3A+Improvements+to+SinkUpsertMaterializer+and+changelog+disorder
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> On 18.05.26 11:47, Gustavo de Morais wrote:
> > >>>>>>>>>>>>>>>> Hi Fabian,
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> In general a strong +1 for the feature, without getting
> > into
> > >>>>>>>>> the
> > >>>>>>>>>>>>>> details
> > >>>>>>>>>>>>>>> of
> > >>>>>>>>>>>>>>>> the FLIP yet. This is a missing feature for years and I'm
> > >>>>>>>>> happy
> > >>>>>>>>>>>> that
> > >>>>>>>>>>>>>>> we're
> > >>>>>>>>>>>>>>>> putting the time to address this - while also getting rid
> > of
> > >>>>>>>>>> some
> > >>>>>>>>>>>> of
> > >>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>> hard restrictions we had. Thanks!
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> Kind regards,
> > >>>>>>>>>>>>>>>> Gustavo
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> On Fri, 15 May 2026 at 16:39, Fabian Hueske <
> > >>>>>>>>> [email protected]
> > >>>>>>>>>>>
> > >>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> Hi everyone,
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> I'd like to start a discussion on FLIP-579: LATERAL
> > >>> SNAPSHOT
> > >>>>>>>>>> Join
> > >>>>>>>>>>>>> [1].
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> Enriching a stream with data from a (slowly changing)
> > >>> dynamic
> > >>>>>>>>>>>> table
> > >>>>>>>>>>>>>> is a
> > >>>>>>>>>>>>>>>>> super common use case.
> > >>>>>>>>>>>>>>>>> Flink SQL features Temporal Joins [2] to address these
> > use
> > >>>>>>>>>> cases.
> > >>>>>>>>>>>>>>>>> However, SQL users can only use the event-time variant
> > >>> which
> > >>>>>>>>>> has
> > >>>>>>>>>>>>> many
> > >>>>>>>>>>>>>>>>> limitations (heavy dependency on frequent WM updates on
> > >>> both
> > >>>>>>>>>>>> inputs,
> > >>>>>>>>>>>>>>>>> build-side table requires a PK, the join predicate must
> > >>>>>>>>> include
> > >>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>> build-side PK, etc).
> > >>>>>>>>>>>>>>>>> The processing-time temporal join is disabled (due to
> > >>>>>>>>>> build-side
> > >>>>>>>>>>>>>>>>> initialization issues [3]) and temporal table function
> > >>> joins
> > >>>>>>>>>> are
> > >>>>>>>>>>>>>>>>> only available in Table API.
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> FLIP-579 proposes a new temporal join operator that
> > >>> operates
> > >>>>>>>>> in
> > >>>>>>>>>>>>>>>>> processing-time and addresses the limitations of the
> > >>> existing
> > >>>>>>>>>>>>>>>>> implementations:
> > >>>>>>>>>>>>>>>>> * initialization of the build-side before joining
> > >>>>>>>>>>>>>>>>> * no requirement of continuous, frequent build-side WMs
> > >>>>>>>>> (after
> > >>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>> initialization completed)
> > >>>>>>>>>>>>>>>>> * no requirement for a PK on the build-side
> > >>>>>>>>>>>>>>>>> * table function-based syntax [4] via a built-in SNAPSHOT
> > >>>>>>>>>>>> function
> > >>>>>>>>>>>>>>>>> (proposed in FLIP-517 [4])
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> Looking forward to your feedback.
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> Best,
> > >>>>>>>>>>>>>>>>> Fabian
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> [1]
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>
> > >>>>>
> > >>>
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-579%3A+LATERAL+SNAPSHOT+Join
> > >>>>>>>>>>>>>>>>> [2]
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>
> > >>>>>
> > >>>
> > https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/table/sql/queries/joins/#temporal-joins
> > >>>>>>>>>>>>>>>>> [3] https://issues.apache.org/jira/browse/FLINK-19830
> > >>>>>>>>>>>>>>>>> [4]
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>
> > >>>>>
> > >>>
> > https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/table/sql/queries/joins/#temporal-table-function-join
> > >>>>>>>>>>>>>>>>> [5]
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>
> > >>>>>
> > >>>
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-517%3A+Better+Handling+of+Dynamic+Table+Primitives+with+PTFs#FLIP517:BetterHandlingofDynamicTablePrimitiveswithPTFs-SNAPSHOTfortemporaljoins
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>
> > >>>>>>>
> > >>>>>
> > >>>>>
> > >>>
> > >>>
> >
> >

Reply via email to