Hi Xingcan, Thanks for your comments on the FLIP!
The join's behavior when starting from a savepoint is indeed an important aspect to consider and the problem of a rapidly advancing dimension (build-side) table is of course real. I would argue that watermark alignment should significantly reduce the impact of this. If enabled, sources align their consumption based on their current watermark such that the (presumably much smaller) build-side source would be slowed down to the event-time progress of the probe-side. While watermark alignment is not an "exact" mechanism, the semantics of the new processing-time join also do not guarantee "exact" results. At the same time, alignment should ensure that build and probe-side are roughly aligned in event-time (without the strict guarantees that the event-time temporal table join provides). However, I really like your idea of starting in event-time mode and flipping to processing-time after the initialization duration passed. I'm not sure if it would fully address the problem you described. As you said, users would need to be able to reconfigure the flip-point and I'm not sure if there's a good mechanism for this yet. But it might have some other properties that would be beneficial, so I'll think about that. Best, Fabian Am Do., 28. Mai 2026 um 18:21 Uhr schrieb Fabian Hueske <[email protected]>: > Thanks for your feedback David! > > > One question: If I understand correctly, during the JOIN phase of an > INNER > join, if the desired build-side record is missing, nothing will be emitted > for the unmatched probe-side record. For an INNER join, I can imagine > wanting to buffer unmatched probe-side records, expecting the build side > will arrive soon. What's your thinking there? > > Your understanding is correct. If a probe-side record arrives during LOAD > phase but no matching build-side record is received, > the probe-side record would be discarded without being joined during the > transition from LOAD to JOIN. > > I would argue that users that want to prevent this, would need to > configure a longer initialization time. > IMO, dropping unmatched probe records is not a "bad" property of INNER > joins but an essential part of their semantics. It might even be desired by > some users. > If we would buffer probe-side records for INNER joins beyond the > transition point, we: > * would have different behaviors for INNER and LEFT joins > * could not start to emit probe-side watermarks as long as there are still > probe-side records buffered (or at least not advance past them without > emitting late data at a later point of time) > * would either need another config knob to specify when to "really" clean > up the probe-side state or keep such unmatched records forever in state (we > could also use state TTL...) > > So, I don't think that we should buffer unmatched probe-side records > beyond the flip point. > > Best, Fabian > > Am Do., 28. Mai 2026 um 17:05 Uhr schrieb Xingcan Cui <[email protected] > >: > >> Hi Fabian, >> >> Thanks for this FLIP! The two-phase design is excellent for avoiding >> early-joining bugs while maintaining low-latency processing-time >> semantics. >> >> After thinking more about the proposal, I'd like to point out an edge case >> related to the initialization phase or recovery after prolonged downtime >> (for example, when a job has been down for a day). While a processing-time >> join works well for live streaming, where results can reasonably depend on >> the immediate arrival order of live data, it does not work as well for >> catch-up scenarios. >> >> Currently, if a job initializes or restores from a checkpoint after a long >> downtime, the operator resumes directly in the processing-time join phase. >> During catch-up, however, the natural chronological arrival order of the >> live data is completely lost. As a result, these replayed fact records are >> evaluated against the current machine time and may blindly join with the >> rapidly advancing "current" dimension snapshot, rather than the historical >> versions they were originally supposed to match. >> >> To handle this edge case, could we consider: >> >> 1. changing the first phase into an event-time join phase, and >> >> 2. allowing the operator to switch back to the first phase after a >> restart? >> >> For example, users could configure a timestamp threshold. Before the >> watermark reaches that point, the operator would run as an event-time >> versioned join to safely process the catch-up phase through watermark >> alignment. Once the watermark passes the threshold, the operator could >> purge the old multi-version state and seamlessly transition back to the >> pure processing-time join phase for live traffic. >> >> After a job restart, users could either update the target timestamp to >> reset the operator back into the event-time phase, or leave it unchanged >> to >> continue operating in the processing-time phase. >> >> I completely understand that this would introduce significant complexity >> to >> the operator's state management and lifecycle, so this is only a tentative >> proposal to explore whether it might be worth considering for the >> long-term >> robustness of the design. >> >> Best, >> >> Xingcan >> >> On Thu, May 28, 2026 at 8:17 AM David Anderson <[email protected]> >> wrote: >> >> > I'm quite enthusiastic about this. I want to thank Fabian for putting >> > together such a well-crafted FLIP. And I look forward to updating the >> > awkward educational content this FLIP will make obsolete. >> > >> > To my mind, the syntax expresses the semantics of this join rather well. >> > >> > Until now, developers using event-time temporal joins sometimes >> resorted to >> > doing weird things with watermarks to handle a build side that's mostly >> > idle; this lateral snapshot join is clearly better -- not to mention the >> > added bonus of pre-loading the build table. >> > >> > One question: If I understand correctly, during the JOIN phase of an >> INNER >> > join, if the desired build-side record is missing, nothing will be >> emitted >> > for the unmatched probe-side record. For an INNER join, I can imagine >> > wanting to buffer unmatched probe-side records, expecting the build side >> > will arrive soon. What's your thinking there? >> > >> > David >> > >> > On Wed, May 27, 2026 at 12:44 PM Fabian Hueske <[email protected]> >> wrote: >> > >> > > Thanks Gustavo and Timo for the positive feedback! >> > > >> > > I'd like to bump this thread up to collect more feedback. >> > > If there are no more responses, I will start a vote on this FLIP next >> > > Monday, June 1st. >> > > >> > > Best, Fabian >> > > >> > > Am Do., 21. Mai 2026 um 12:15 Uhr schrieb Timo Walther < >> > [email protected] >> > > >: >> > > >> > > > Hi Fabian, >> > > > >> > > > thanks for proposing this FLIP. I agree that this join is super >> common, >> > > > after talking to many people at conferences, I could imagine it >> will be >> > > > one of the most used kinds of joins going forward. >> > > > >> > > > Tightly coupling it with watermarks fits both from a semantical >> point >> > of >> > > > view but also with other efforts such as FLIP-558 (Improvements to >> > > > SinkUpsertMaterializer and changelog disorder) [1]. In the near >> future, >> > > > we should work on more automated watermarking to power these >> > > > watermark-based operators, but this is an orthogonal effort. >> > > > >> > > > Overall I'm strongly +1 on this. Also +1 on the syntax improvements >> for >> > > > lateral table functions by dropping the TABLE() wrapper. >> > > > >> > > > Cheers, >> > > > Timo >> > > > >> > > > [1] >> > > > >> > > > >> > > >> > >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-558%3A+Improvements+to+SinkUpsertMaterializer+and+changelog+disorder >> > > > >> > > > On 18.05.26 11:47, Gustavo de Morais wrote: >> > > > > Hi Fabian, >> > > > > >> > > > > In general a strong +1 for the feature, without getting into the >> > > details >> > > > of >> > > > > the FLIP yet. This is a missing feature for years and I'm happy >> that >> > > > we're >> > > > > putting the time to address this - while also getting rid of some >> of >> > > the >> > > > > hard restrictions we had. Thanks! >> > > > > >> > > > > Kind regards, >> > > > > Gustavo >> > > > > >> > > > > On Fri, 15 May 2026 at 16:39, Fabian Hueske <[email protected]> >> > > wrote: >> > > > > >> > > > >> Hi everyone, >> > > > >> >> > > > >> I'd like to start a discussion on FLIP-579: LATERAL SNAPSHOT Join >> > [1]. >> > > > >> >> > > > >> Enriching a stream with data from a (slowly changing) dynamic >> table >> > > is a >> > > > >> super common use case. >> > > > >> Flink SQL features Temporal Joins [2] to address these use cases. >> > > > >> However, SQL users can only use the event-time variant which has >> > many >> > > > >> limitations (heavy dependency on frequent WM updates on both >> inputs, >> > > > >> build-side table requires a PK, the join predicate must include >> the >> > > > >> build-side PK, etc). >> > > > >> The processing-time temporal join is disabled (due to build-side >> > > > >> initialization issues [3]) and temporal table function joins are >> > > > >> only available in Table API. >> > > > >> >> > > > >> FLIP-579 proposes a new temporal join operator that operates in >> > > > >> processing-time and addresses the limitations of the existing >> > > > >> implementations: >> > > > >> * initialization of the build-side before joining >> > > > >> * no requirement of continuous, frequent build-side WMs (after >> the >> > > > >> initialization completed) >> > > > >> * no requirement for a PK on the build-side >> > > > >> * table function-based syntax [4] via a built-in SNAPSHOT >> function >> > > > >> (proposed in FLIP-517 [4]) >> > > > >> >> > > > >> Looking forward to your feedback. >> > > > >> >> > > > >> Best, >> > > > >> Fabian >> > > > >> >> > > > >> [1] >> > > > >> >> > > > >> >> > > > >> > > >> > >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-579%3A+LATERAL+SNAPSHOT+Join >> > > > >> [2] >> > > > >> >> > > > >> >> > > > >> > > >> > >> https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/table/sql/queries/joins/#temporal-joins >> > > > >> [3] https://issues.apache.org/jira/browse/FLINK-19830 >> > > > >> [4] >> > > > >> >> > > > >> >> > > > >> > > >> > >> https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/table/sql/queries/joins/#temporal-table-function-join >> > > > >> [5] >> > > > >> >> > > > >> >> > > > >> > > >> > >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-517%3A+Better+Handling+of+Dynamic+Table+Primitives+with+PTFs#FLIP517:BetterHandlingofDynamicTablePrimitiveswithPTFs-SNAPSHOTfortemporaljoins >> > > > >> >> > > > > >> > > > >> > > > >> > > >> > >> >
