Dawid, I just want to add that I learned a lot from reading this FLIP. I would love to see some of the content extracted as a learning resource -- maybe a blog post?
David On Thu, Jan 8, 2026 at 4:47 AM Timo Walther <[email protected]> wrote: > Hi Dawid, > > thank you very much for working and proposing this FLIP. This is an > excellent design document that shows the deep research you have > conducted. Both the linked resources as well as the examples are very > helpful to get the big picture. > > Sink upsert materializer is a long-standing problem for Flink SQL > pipelines. Only a few people really understand why it exists and why it > is so expensive. I also know that some users have simply disabled it > because they are fine with the output results (potentially ignoring > corner cases intentially or by accident). > > In general the FLIP is in a very good shape, and you definitely get my > +1 on this. Just some last questions: > > 1) Will we support ON CONFLICT syntax also for append-only inputs? > > 2) Regarding naming, I find the config option is too long. > "table.exec.sink.upsert-materialize-barrier-mode.compaction-interval" > could we simplify it to something less internal? Maybe > table.exec.sink.upserts.compaction-interval? > > Cheers, > Timo > > > On 08.01.26 12:42, Dawid Wysakowicz wrote: > >> > >> Today there are still many retract sources, such as some sources in the > >> Flink CDC project (e.g., PG CDC, MySQL CDC), Paimon, Hudi, and some > >> formats, etc.These can be further divided into two categories. > >> One is like Debezium: there is only a single UPDATE record in the > physical > >> storage, and the corresponding Flink source connector further splits it > >> into UA/UB. The other is where UA and UB are already two separate > changelog > >> records in the physical storage. > >> For the former, we could generate a watermark boundary before the source > >> just like checkpoint barrier, so that UB and UA are guaranteed to fall > >> within the same boundary. This should actually be supportable. It’s > okay if > >> we don’t support it in the first version, but it may affect the overall > >> design—for example, how to generate the system watermark boundary. > >> For the latter, it’s probably more troublesome. I think it’s also fine > not > >> to support it. What do you think? > > > > > > When designing the FLIP I considered the debezium case and it's actually > > not so much of a problem as you correctly pointed out. The only > requirement > > is that the watermark is generated before the message split. I'd start > > without support for those sources and we can improve on that later on. > > > > 3. Oh, what I meant is how about renaming it to something like > >> "table.exec.sink.upsert-materialize-barrier-mode.compaction-interval"? > >> Because I think it may be not a “watermark”; it’s a compaction barrier, > and > >> this compaction can be 1) replaced by watermark, or 2) replaced by > >> checkpoint, or 3) generated by the Flink system internally. What do you > >> think? > > > > > > Fine by me. We can call > > it "table.exec.sink.upsert-materialize-barrier-mode.compaction-interval". > > > > 4. I’m also wondering whether we don’t even need the state about “a > single > >> result per key for a cross-watermark-boundary handover”? > > > > > > I am pretty sure we do in order to adhere to the ON CONFLICT ERROR > > behaviour. Bear in mind two "active" keys may come as part of two > separate > > barriers. > > > > On Thu, 8 Jan 2026 at 05:14, Xuyang <[email protected]> wrote: > > > >> 1. I agree it, at least it won’t be worse. Currently, for data that > >> contains non-deterministic functions without UK, SUM cannot properly > handle > >> correcting out-of-order records. > >> > >> > >> 2. It’s fine if we first don’t support it. Let me add some more context. > >> Today there are still many retract sources, such as some sources in the > >> Flink CDC project (e.g., PG CDC, MySQL CDC), Paimon, Hudi, and some > >> formats, etc.These can be further divided into two categories. > >> One is like Debezium: there is only a single UPDATE record in the > physical > >> storage, and the corresponding Flink source connector further splits it > >> into UA/UB. The other is where UA and UB are already two separate > changelog > >> records in the physical storage. > >> For the former, we could generate a watermark boundary before the source > >> just like checkpoint barrier, so that UB and UA are guaranteed to fall > >> within the same boundary. This should actually be supportable. It’s > okay if > >> we don’t support it in the first version, but it may affect the overall > >> design—for example, how to generate the system watermark boundary. > >> For the latter, it’s probably more troublesome. I think it’s also fine > not > >> to support it. What do you think? > >> > >> > >> 3. Oh, what I meant is how about renaming it to something like > >> "table.exec.sink.upsert-materialize-barrier-mode.compaction-interval"? > >> Because I think it may be not a “watermark”; it’s a compaction barrier, > and > >> this compaction can be 1) replaced by watermark, or 2) replaced by > >> checkpoint, or 3) generated by the Flink system internally. What do you > >> think? > >> > >> > >> 4. I’m also wondering whether we don’t even need the state about “a > single > >> result per key for a cross-watermark-boundary handover”? > >> > >> > >> > >> > >> > >> -- > >> > >> Best! > >> Xuyang > >> > >> > >> > >> 在 2026-01-07 19:54:47,"Dawid Wysakowicz" <[email protected]> 写道: > >>>> 1. Without upsert key, how do we determine the order of multiple > records > >>> within the same watermark boundary when non-deterministic functions are > >>> involved? For example, if we receive data like the “disorder (2)” case > >>> below, and the upsert key is lost after a join, what will the final > output > >>> be (without internal consistency issues)? > >>> > >>> If you have non-deterministic functions like in your example the > >> retraction > >>> does not work anyhow. For a retraction to work if there is no primary > key > >>> defined we require all columns to be deterministic: > >>> > >> > https://github.com/apache/flink/blob/fec9336e7d99500aeb9097e441d8da0e6bde5943/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/StreamNonDeterministicUpdatePlanVisitor.java#L142 > >>> > >>>> 2. To address internal consistency issues caused by emitting -U, we > need > >>> each watermark boundary to contain paired -U/+U. That means we have to > >>> track what the source sends, and only emit the "watermark boundary" > after > >>> the +U has been sent, right? For an upsert source, this is easy > because we > >>> have ChangelogNormalize to output the -U/+U pairs. But for a retract > >>> source, we may need to introduce a new stateful operator. This seems > >>> unavoidable unless the source to output -U and +U together. > >>> > >>> Yes, this assumption does not hold for retracting sources. So far we > don't > >>> have any such sources. I'll introduce a check that would fail for a > >>> combination of a retracting source and DO ERROR/DO NOTHING. > >>> > >>>> 3. About > >>> "table.exec.sink.upsert-materialize-barrier-mode.watermark-interval", > it’s > >>> not actually called “watermark internal”, because it doesn't have > >> watermark > >>> semantics to drop late data. It’s actually a compaction barrier, right? > >>> > >>> I think I don't fully understand this point. Could you please explain > it > >>> one more time? Are you suggesting a different name? > >>> > >>>> 4. The state in SUM is used to handle rollback under out-of-order > >>> scenarios. Since we resolve out-of-orderness within a watermark > boundary, > >>> does that mean we don’t need state anymore? More clearly, we only need > a > >>> "temporary" state that lives within each watermark boundary. (Or what I > >> can > >>> think of is: you’re using this persistent state to support the > subsequent > >>> `ON CONFLICT conflict_action`.) > >>> > >>> Yes, I think more or less that is correct. We need the "temporary" > state > >>> within boundary + a single result per key for a cross watermark > boundary > >>> handover. I explain that in the FLIP. > >>> > >>> Best, > >>> Dawid > >>> > >>> On Mon, 5 Jan 2026 at 09:39, Xuyang <[email protected]> wrote: > >>> > >>>> Thank you for the explanation. I think I understand what you mean > now. I > >>>> have a few questions I’d like to confirm: > >>>> 1. Without upsert key, how do we determine the order of multiple > records > >>>> within the same watermark boundary when non-deterministic functions > are > >>>> involved? For example, if we receive data like the “disorder (2)” case > >>>> below, and the upsert key is lost after a join, what will the final > >> output > >>>> be (without internal consistency issues)? > >>>> > >>>> > >>>> +U(id=1, level=20, attr='b1', rand='1.5') > >>>> +U(id=1, level=10, attr='a1', rand='1') // originally +I; I slightly > >>>> modified it > >>>> -U(id=1, level=10, attr='a1', rand='2') > >>>> > >>>> > >>>>> The watermark is an internal consistency boundary. You will always > >> have > >>>> a UB for the old value and an UA for the new value. > >>>> 2. To address internal consistency issues caused by emitting -U, we > need > >>>> each watermark boundary to contain paired -U/+U. That means we have to > >>>> track what the source sends, and only emit the "watermark boundary" > >> after > >>>> the +U has been sent, right? For an upsert source, this is easy > because > >> we > >>>> have ChangelogNormalize to output the -U/+U pairs. But for a retract > >>>> source, we may need to introduce a new stateful operator. This seems > >>>> unavoidable unless the source to output -U and +U together. > >>>> > >>>> > >>>> 3. About > >>>> "table.exec.sink.upsert-materialize-barrier-mode.watermark-interval", > >> it’s > >>>> not actually called “watermark internal”, because it doesn't have > >> watermark > >>>> semantics to drop late data. It’s actually a compaction barrier, > right? > >>>> > >>>> > >>>> 4. The state in SUM is used to handle rollback under out-of-order > >>>> scenarios. Since we resolve out-of-orderness within a watermark > >> boundary, > >>>> does that mean we don’t need state anymore? More clearly, we only > need a > >>>> "temporary" state that lives within each watermark boundary. (Or what > I > >> can > >>>> think of is: you’re using this persistent state to support the > >> subsequent > >>>> `ON CONFLICT conflict_action`.) > >>>> > >>>> > >>>> > >>>> -- > >>>> > >>>> Best! > >>>> Xuyang > >>>> > >>>> > >>>> > >>>> 在 2025-12-29 17:51:40,"Dawid Wysakowicz" <[email protected]> 写道: > >>>>>> But I’d like to add a clarification. Take the “Changelog Disorder” > >>>>> example described in the FLIP ( > >>>>> > >>>> > >> > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=399279158#FLIP558:ImprovementstoSinkUpsertMaterializerandchangelogdisorder-ExampleofChangelogDisorder > >>>> ). > >>>>> Let’s look at disorder (2) and disorder (3). Under the default ON > >> CONFLICT > >>>>> ERROR, IIUC, the expected behavior is that Flink should fail. > However, > >>>>> those inserts and updates actually all come from the same PK on table > >> s1 > >>>>> (id = 1). From a relational-algebra perspective, this does not > violate > >> the > >>>>> PK constraint; it only happens because we shuffle by level and end up > >> with > >>>>> out-of-order issues under multi parallelisms. In other words, if we > run > >>>>> this SQL in batch, the pk conflict will not happen and ON CONFLICT > >> ERROR > >>>>> should not fail. If the streaming job fails, users will be confused. > >> For > >>>>> disorder issues introduced by Flink internally, I believe Flink > should > >>>>> handle them internally. > >>>>> > >>>>> Let's first clarify this point, because I think it's vital for the > >>>>> understanding of the proposal so we must be on the same page before > we > >>>> talk > >>>>> about other points. > >>>>> > >>>>> No, the example you pointed out would not throw an error in `ON > >> CONFLICT > >>>>> ERROR`. As you pointed out yourself those come from the same PK on > >> table > >>>>> S1, therefore you will not have two active records with (id = 1) in > the > >>>>> sink on the watermark boundary. The watermark is an internal > >> consistency > >>>>> boundary. You will always have a UB for the old value and an UA for > the > >>>> new > >>>>> value. Therefore you will only ever have a single value after the > >>>>> compaction. We would throw only if we try to upsert into a single row > >> in > >>>>> the sink from two rows from s1 with different ids e.g. {source_id=1, > >>>>> sink_id=1}, {source_id=2, sink_id=1}. > >>>>> > >>>>>> Before I talk about 3, let me talk about 4 first. If I’m not > >> mistaken, > >>>> we > >>>>> need a deterministic boundary to determine that upstream data will no > >>>>> longer be updated, so that we can output the “final” result. > >>>>> > >>>>> No, that's not what I understand as internal consistency. An internal > >>>>> consistency is that a single change in the source produces a single > >> change > >>>>> in the sink, without incorrect intermediate states caused by > >> processing UB > >>>>> separately from UA. I don't want to process multiple source changes > in > >> a > >>>>> single batch. I'd rather call it "external" consistency or snapshot > >>>>> processing as you are suggesting, but in my mind this is an > orthogonal > >>>>> topic from what I am trying to solve here. > >>>>> > >>>>> On Tue, 23 Dec 2025 at 12:28, Xuyang <[email protected]> wrote: > >>>>> > >>>>>> Hi, Dawid. Thank you for your detailed explanation and the update. > >> Let > >>>> me > >>>>>> share my thoughts. > >>>>>> 1. In fact, I agree with what you said: for clearly problematic > >> queries, > >>>>>> we should fail fast — for example, the case you mentioned (writing > >> data > >>>>>> from a source table whose PK is id into a sink table whose PK is > >> name). > >>>> We > >>>>>> can fail like a traditional database: PK conflict. That’s totally > >> fine. > >>>>>> 2. But I’d like to add a clarification. Take the “Changelog > Disorder” > >>>>>> example described in the FLIP ( > >>>>>> > >>>> > >> > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=399279158#FLIP558:ImprovementstoSinkUpsertMaterializerandchangelogdisorder-ExampleofChangelogDisorder > >>>> ). > >>>>>> Let’s look at disorder (2) and disorder (3). Under the default ON > >>>> CONFLICT > >>>>>> ERROR, IIUC, the expected behavior is that Flink should fail. > >> However, > >>>>>> those inserts and updates actually all come from the same PK on > >> table s1 > >>>>>> (id = 1). From a relational-algebra perspective, this does not > >> violate > >>>> the > >>>>>> PK constraint; it only happens because we shuffle by level and end > up > >>>> with > >>>>>> out-of-order issues under multi parallelisms. In other words, if we > >> run > >>>>>> this SQL in batch, the pk conflict will not happen and ON CONFLICT > >> ERROR > >>>>>> should not fail. If the streaming job fails, users will be confused. > >> For > >>>>>> disorder issues introduced by Flink internally, I believe Flink > >> should > >>>>>> handle them internally. > >>>>>> 4. Before I talk about 3, let me talk about 4 first. If I’m not > >>>> mistaken, > >>>>>> we need a deterministic boundary to determine that upstream data > >> will no > >>>>>> longer be updated, so that we can output the “final” result. I think > >> our > >>>>>> disagreement is about where that “data boundary” is. In this FLIP, > >> the > >>>>>> boundary is described as: 1) watermark or 2) checkpoint. But I think > >>>> such a > >>>>>> boundary is tied to the table, for example, “the creation of a > >> specific > >>>>>> historical snapshot version of a table.” Since data in the snapshot > >> is > >>>>>> immutable, we can output results at that point. What do you think? > >>>>>> 3. If we must choose one of the introduced options, I lean toward > >> Option > >>>>>> 1, because we already have a clear definition for watermarks defined > >> on > >>>> a > >>>>>> table: “allowing for consistent results despite out-of-order or late > >>>>>> events” ( > >>>>>> > >>>> > >> > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/concepts/time_attributes/#event-time > >>>> ). > >>>>>> This “drop late events” semantic does not exist in checkpoint. > >> However, > >>>> my > >>>>>> concern is that in most scenarios, a CDC source may produce multiple > >>>>>> updates for the same PK over a long time span, so the watermark > >> should > >>>> be > >>>>>> defined very large, which will cause the job to produce no output > >> for a > >>>>>> long time. > >>>>>> > >>>>>> > >>>>>> > >>>>>> -- > >>>>>> > >>>>>> Best! > >>>>>> Xuyang > >>>>>> > >>>>>> > >>>>>> > >>>>>> At 2025-12-17 16:52:26, "Dawid Wysakowicz" <[email protected]> > >>>> wrote: > >>>>>>> Hey Gustavo, Xuyang > >>>>>>> I tried incorporating your suggestions into the FLIP. Please take > >>>> another > >>>>>>> look. > >>>>>>> Best, > >>>>>>> Dawid > >>>>>>> > >>>>>>> On Fri, 12 Dec 2025 at 16:05, Dawid Wysakowicz < > >> [email protected] > >>>>> > >>>>>>> wrote: > >>>>>>> > >>>>>>>> 1. The default behavior changes if no ON CONFLICT is defined. I > >> am a > >>>>>>>>> little concerned that this may cause errors in a large number of > >>>>>> existing > >>>>>>>>> cases. > >>>>>>>> > >>>>>>>> I can be convinced to leave the default behaviour as it is now. I > >> am > >>>>>>>> worried though, very rarely the current behaviour of SUM is what > >>>> people > >>>>>>>> actually want. As mentioned in the FLIP I wholeheartedly believe > >>>> there > >>>>>> are > >>>>>>>> very little if any real world scenarios where you need the > >>>> deduplicate > >>>>>>>> behaviour. I try to elaborate a bit more in 2) > >>>>>>>> > >>>>>>>> 2. Regarding On Conflict Errors, in the context of CDC streams, > >> it is > >>>>>>>>> expected that the vast majority of cases cannot generate only one > >>>>>> record > >>>>>>>>> with one primary key. The only solutions I can think of are > >>>> append-only > >>>>>>>>> top1, deduplication, or aggregating the first row. > >>>>>>>> > >>>>>>>> I disagree with that statement. I don't think CDC streams change > >>>>>> anything > >>>>>>>> in that regard. Maybe there is some misunderstanding about what a > >> one > >>>>>>>> record means in this context. > >>>>>>>> > >>>>>>>> I agree almost certainly there will be a sequence of UA, UB for a > >>>> single > >>>>>>>> sink's primary key. > >>>>>>>> > >>>>>>>> My claim is that users almost never want a situation where they > >> have > >>>>>> more > >>>>>>>> than one "active" upsert key/record for one sink's primary key. I > >>>> tried > >>>>>> to > >>>>>>>> explain that in the FLIP, but let me try to give one more example > >>>> here. > >>>>>>>> > >>>>>>>> Imagine two tables: > >>>>>>>> CREATE TABLE source ( > >>>>>>>> id bigint PRIMARY KEY, > >>>>>>>> name string, > >>>>>>>> value string > >>>>>>>> ) > >>>>>>>> > >>>>>>>> CREATE TABLE sink ( > >>>>>>>> name string PRIMARY KEY, > >>>>>>>> value string > >>>>>>>> ) > >>>>>>>> > >>>>>>>> INSERT INTO sink SELECT name, value; > >>>>>>>> > >>>>>>>> === Input > >>>>>>>> (1, "Apple", "ABC") > >>>>>>>> (2, "Apple", "DEF") > >>>>>>>> > >>>>>>>> In the scenario above a SUM is inserted which will deduplicate the > >>>> rows > >>>>>>>> and override the value for "Apple" with "DEF". In my opinion it's > >>>>>> entirely > >>>>>>>> wrong, instead an exception should be thrown that there is > >> actually a > >>>>>>>> constraint validation. > >>>>>>>> > >>>>>>>> I am absolutely more than happy to be proved wrong. If you do > >> have a > >>>>>> real > >>>>>>>> world scenario where the deduplication logic is actually correct > >> and > >>>>>>>> expected please, please do share. So far I have not seen one, nor > >>>> was I > >>>>>>>> able to come up with one. And yet I am not suggesting to remove > >> the > >>>>>>>> deduplication logic entirely, users can still use it with ON > >> CONFLICT > >>>>>>>> DEDUPLICATE. > >>>>>>>> > >>>>>>>> 3. The special watermark generation interval affects the > >> visibility > >>>> of > >>>>>>>>> results. How can users configure this generation interval? > >>>>>>>> > >>>>>>>> > >>>>>>>> That's a fair question I'll try to elaborate on in the FLIP. I can > >>>> see > >>>>>> two > >>>>>>>> options: > >>>>>>>> 1. We piggyback on existing watermarks in the query, if there are > >> no > >>>>>>>> watermarks (tables don't have a watermark definition) we fail > >> during > >>>>>>>> planning > >>>>>>>> 2. We add a new parameter option for a specialized generalized > >>>> watermark > >>>>>>>> > >>>>>>>> Let me think for some more on that and I'll come back with a more > >>>>>> concrete > >>>>>>>> proposal. > >>>>>>>> > >>>>>>>> > >>>>>>>>> 4. I believe that resolving out-of-order issues and addressing > >>>> internal > >>>>>>>>> consistency are two separate problems. As I understand the > >> current > >>>>>>>>> solution, it does not really resolve the internal consistency > >>>> issue. > >>>>>> We > >>>>>>>>> could first resolve the out-of-order problem. For most scenarios > >>>> that > >>>>>>>>> require real-time response, we can directly output intermediate > >>>> results > >>>>>>>>> promptly. > >>>>>>>> > >>>>>>>> > >>>>>>>> Why doesn't it solve it? It does. Given a pair of UB/UA we won't > >> emit > >>>>>> the > >>>>>>>> temporary state after processing the UB. > >>>>>>>> > >>>>>>>> 5. How can we compact data with the same custom watermark? If > >>>> detailed > >>>>>>>>> comparisons are necessary, I think we still need to preserve all > >> key > >>>>>> data; > >>>>>>>>> we would just be compressing this data further at time t. > >>>>>>>> > >>>>>>>> Yes, we need to preserve all key data, but only between two > >>>> watermarks. > >>>>>>>> Assuming frequent watermarks, that's for a very short time. > >>>>>>>> > >>>>>>>> 6. If neither this proposed solution nor the reject solution can > >>>> resolve > >>>>>>>>> internal consistency, we need to reconsider the differences > >> between > >>>>>> the two > >>>>>>>>> approaches. > >>>>>>>> > >>>>>>>> I'll copy the explanation why the rejected alternative should be > >>>>>> rejected > >>>>>>>> from the FLIP: > >>>>>>>> > >>>>>>>> The solution can help us to solve the changelog disorder problem, > >>>> but it > >>>>>>>> does not help with the *internal consistency *issue. If we want to > >>>> fix > >>>>>>>> that as well, we still need the compaction on watermarks. At the > >> same > >>>>>> time > >>>>>>>> it increases the size of all flowing records. Therefore it was > >>>> rejected > >>>>>> in > >>>>>>>> favour of simply compacting all records once on the progression of > >>>>>>>> watermarks. > >>>>>>>> > >>>>>>>> Best, > >>>>>>>> Dawid > >>>>>>>> > >>>>>> > >>>> > >> > > > >
