@Timo

1) Will we support ON CONFLICT syntax also for append-only inputs?


That's a very good point! I think we should. In my opinion we should
support it so that:
1. ERROR -> checks if we don't insert multiple times to the same PRIMARY KEY
2. NOTHING -> that emits only the first value that's inserted
3. DEDUPLICATE -> the current behaviour, we would always overwrite the
value. we don't have a disorder problem though thus we don't need the SUM
operator

 2) Regarding naming, I find the config option is too long.
> "table.exec.sink.upsert-materialize-barrier-mode.compaction-interval"
> could we simplify it to something less internal? Maybe
> table.exec.sink.upserts.compaction-interval?


Fine by me. I'll update the FLIP to:
"table.exec.sink.upserts.compaction-interval"

@David
Thank you for the kind words! I'll think of having it as a blogpost.

If there are no further objections/comments. I'd like to start a vote on
this some time next week.

On Thu, 8 Jan 2026 at 13:48, Timo Walther <[email protected]> wrote:

> Hi Dawid,
>
> thank you very much for working and proposing this FLIP. This is an
> excellent design document that shows the deep research you have
> conducted. Both the linked resources as well as the examples are very
> helpful to get the big picture.
>
> Sink upsert materializer is a long-standing problem for Flink SQL
> pipelines. Only a few people really understand why it exists and why it
> is so expensive. I also know that some users have simply disabled it
> because they are fine with the output results (potentially ignoring
> corner cases intentially or by accident).
>
> In general the FLIP is in a very good shape, and you definitely get my
> +1 on this. Just some last questions:
>
> 1) Will we support ON CONFLICT syntax also for append-only inputs?
>
> 2) Regarding naming, I find the config option is too long.
> "table.exec.sink.upsert-materialize-barrier-mode.compaction-interval"
> could we simplify it to something less internal? Maybe
> table.exec.sink.upserts.compaction-interval?
>
> Cheers,
> Timo
>
>
> On 08.01.26 12:42, Dawid Wysakowicz wrote:
> >>
> >> Today there are still many retract sources, such as some sources in the
> >> Flink CDC project (e.g., PG CDC, MySQL CDC), Paimon, Hudi, and some
> >> formats, etc.These can be further divided into two categories.
> >> One is like Debezium: there is only a single UPDATE record in the
> physical
> >> storage, and the corresponding Flink source connector further splits it
> >> into UA/UB. The other is where UA and UB are already two separate
> changelog
> >> records in the physical storage.
> >> For the former, we could generate a watermark boundary before the source
> >> just like checkpoint barrier, so that UB and UA are guaranteed to fall
> >> within the same boundary. This should actually be supportable. It’s
> okay if
> >> we don’t support it in the first version, but it may affect the overall
> >> design—for example, how to generate the system watermark boundary.
> >> For the latter, it’s probably more troublesome. I think it’s also fine
> not
> >> to support it. What do you think?
> >
> >
> > When designing the FLIP I considered the debezium case and it's actually
> > not so much of a problem as you correctly pointed out. The only
> requirement
> > is that the watermark is generated before the message split. I'd start
> > without support for those sources and we can improve on that later on.
> >
> > 3. Oh, what I meant is how about renaming it to something like
> >> "table.exec.sink.upsert-materialize-barrier-mode.compaction-interval"?
> >> Because I think it may be not a “watermark”; it’s a compaction barrier,
> and
> >> this compaction can be 1) replaced by watermark, or 2) replaced by
> >> checkpoint, or 3) generated by the Flink system internally. What do you
> >> think?
> >
> >
> > Fine by me. We can call
> > it "table.exec.sink.upsert-materialize-barrier-mode.compaction-interval".
> >
> > 4. I’m also wondering whether we don’t even need the state about “a
> single
> >> result per key for a cross-watermark-boundary handover”?
> >
> >
> > I am pretty sure we do in order to adhere to the ON CONFLICT ERROR
> > behaviour. Bear in mind two "active" keys may come as part of two
> separate
> > barriers.
> >
> > On Thu, 8 Jan 2026 at 05:14, Xuyang <[email protected]> wrote:
> >
> >> 1. I agree it, at least it won’t be worse. Currently, for data that
> >> contains non-deterministic functions without UK, SUM cannot properly
> handle
> >> correcting out-of-order records.
> >>
> >>
> >> 2. It’s fine if we first don’t support it. Let me add some more context.
> >> Today there are still many retract sources, such as some sources in the
> >> Flink CDC project (e.g., PG CDC, MySQL CDC), Paimon, Hudi, and some
> >> formats, etc.These can be further divided into two categories.
> >> One is like Debezium: there is only a single UPDATE record in the
> physical
> >> storage, and the corresponding Flink source connector further splits it
> >> into UA/UB. The other is where UA and UB are already two separate
> changelog
> >> records in the physical storage.
> >> For the former, we could generate a watermark boundary before the source
> >> just like checkpoint barrier, so that UB and UA are guaranteed to fall
> >> within the same boundary. This should actually be supportable. It’s
> okay if
> >> we don’t support it in the first version, but it may affect the overall
> >> design—for example, how to generate the system watermark boundary.
> >> For the latter, it’s probably more troublesome. I think it’s also fine
> not
> >> to support it. What do you think?
> >>
> >>
> >> 3. Oh, what I meant is how about renaming it to something like
> >> "table.exec.sink.upsert-materialize-barrier-mode.compaction-interval"?
> >> Because I think it may be not a “watermark”; it’s a compaction barrier,
> and
> >> this compaction can be 1) replaced by watermark, or 2) replaced by
> >> checkpoint, or 3) generated by the Flink system internally. What do you
> >> think?
> >>
> >>
> >> 4. I’m also wondering whether we don’t even need the state about “a
> single
> >> result per key for a cross-watermark-boundary handover”?
> >>
> >>
> >>
> >>
> >>
> >> --
> >>
> >>      Best!
> >>      Xuyang
> >>
> >>
> >>
> >> 在 2026-01-07 19:54:47,"Dawid Wysakowicz" <[email protected]> 写道:
> >>>> 1. Without upsert key, how do we determine the order of multiple
> records
> >>> within the same watermark boundary when non-deterministic functions are
> >>> involved? For example, if we receive data like the “disorder (2)” case
> >>> below, and the upsert key is lost after a join, what will the final
> output
> >>> be (without internal consistency issues)?
> >>>
> >>> If you have non-deterministic functions like in your example the
> >> retraction
> >>> does not work anyhow. For a retraction to work if there is no primary
> key
> >>> defined we require all columns to be deterministic:
> >>>
> >>
> https://github.com/apache/flink/blob/fec9336e7d99500aeb9097e441d8da0e6bde5943/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/StreamNonDeterministicUpdatePlanVisitor.java#L142
> >>>
> >>>> 2. To address internal consistency issues caused by emitting -U, we
> need
> >>> each watermark boundary to contain paired -U/+U. That means we have to
> >>> track what the source sends, and only emit the "watermark boundary"
> after
> >>> the +U has been sent, right? For an upsert source, this is easy
> because we
> >>> have ChangelogNormalize to output the -U/+U pairs. But for a retract
> >>> source, we may need to introduce a new stateful operator. This seems
> >>> unavoidable unless the source to output -U and +U together.
> >>>
> >>> Yes, this assumption does not hold for retracting sources. So far we
> don't
> >>> have any such sources. I'll introduce a check that would fail for a
> >>> combination of a retracting source and DO ERROR/DO NOTHING.
> >>>
> >>>> 3. About
> >>> "table.exec.sink.upsert-materialize-barrier-mode.watermark-interval",
> it’s
> >>> not actually called “watermark internal”, because it doesn't have
> >> watermark
> >>> semantics to drop late data. It’s actually a compaction barrier, right?
> >>>
> >>> I think I don't fully understand this point. Could you please explain
> it
> >>> one more time? Are you suggesting a different name?
> >>>
> >>>> 4. The state in SUM is used to handle rollback under out-of-order
> >>> scenarios. Since we resolve out-of-orderness within a watermark
> boundary,
> >>> does that mean we don’t need state anymore? More clearly, we only need
> a
> >>> "temporary" state that lives within each watermark boundary. (Or what I
> >> can
> >>> think of is: you’re using this persistent state to support the
> subsequent
> >>> `ON CONFLICT conflict_action`.)
> >>>
> >>> Yes, I think more or less that is correct. We need the "temporary"
> state
> >>> within boundary + a single result per key for a cross watermark
> boundary
> >>> handover. I explain that in the FLIP.
> >>>
> >>> Best,
> >>> Dawid
> >>>
> >>> On Mon, 5 Jan 2026 at 09:39, Xuyang <[email protected]> wrote:
> >>>
> >>>> Thank you for the explanation. I think I understand what you mean
> now. I
> >>>> have a few questions I’d like to confirm:
> >>>> 1. Without upsert key, how do we determine the order of multiple
> records
> >>>> within the same watermark boundary when non-deterministic functions
> are
> >>>> involved? For example, if we receive data like the “disorder (2)” case
> >>>> below, and the upsert key is lost after a join, what will the final
> >> output
> >>>> be (without internal consistency issues)?
> >>>>
> >>>>
> >>>> +U(id=1, level=20, attr='b1', rand='1.5')
> >>>> +U(id=1, level=10, attr='a1', rand='1') // originally +I; I slightly
> >>>> modified it
> >>>> -U(id=1, level=10, attr='a1', rand='2')
> >>>>
> >>>>
> >>>>> The watermark is an internal consistency boundary. You will always
> >> have
> >>>> a UB for the old value and an UA for the new value.
> >>>> 2. To address internal consistency issues caused by emitting -U, we
> need
> >>>> each watermark boundary to contain paired -U/+U. That means we have to
> >>>> track what the source sends, and only emit the "watermark boundary"
> >> after
> >>>> the +U has been sent, right? For an upsert source, this is easy
> because
> >> we
> >>>> have ChangelogNormalize to output the -U/+U pairs. But for a retract
> >>>> source, we may need to introduce a new stateful operator. This seems
> >>>> unavoidable unless the source to output -U and +U together.
> >>>>
> >>>>
> >>>> 3. About
> >>>> "table.exec.sink.upsert-materialize-barrier-mode.watermark-interval",
> >> it’s
> >>>> not actually called “watermark internal”, because it doesn't have
> >> watermark
> >>>> semantics to drop late data. It’s actually a compaction barrier,
> right?
> >>>>
> >>>>
> >>>> 4. The state in SUM is used to handle rollback under out-of-order
> >>>> scenarios. Since we resolve out-of-orderness within a watermark
> >> boundary,
> >>>> does that mean we don’t need state anymore? More clearly, we only
> need a
> >>>> "temporary" state that lives within each watermark boundary. (Or what
> I
> >> can
> >>>> think of is: you’re using this persistent state to support the
> >> subsequent
> >>>> `ON CONFLICT conflict_action`.)
> >>>>
> >>>>
> >>>>
> >>>> --
> >>>>
> >>>>      Best!
> >>>>      Xuyang
> >>>>
> >>>>
> >>>>
> >>>> 在 2025-12-29 17:51:40,"Dawid Wysakowicz" <[email protected]> 写道:
> >>>>>> But I’d like to add a clarification. Take the “Changelog Disorder”
> >>>>> example described in the FLIP (
> >>>>>
> >>>>
> >>
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=399279158#FLIP558:ImprovementstoSinkUpsertMaterializerandchangelogdisorder-ExampleofChangelogDisorder
> >>>> ).
> >>>>> Let’s look at disorder (2) and disorder (3). Under the default ON
> >> CONFLICT
> >>>>> ERROR, IIUC, the expected behavior is that Flink should fail.
> However,
> >>>>> those inserts and updates actually all come from the same PK on table
> >> s1
> >>>>> (id = 1). From a relational-algebra perspective, this does not
> violate
> >> the
> >>>>> PK constraint; it only happens because we shuffle by level and end up
> >> with
> >>>>> out-of-order issues under multi parallelisms. In other words, if we
> run
> >>>>> this SQL in batch, the pk conflict will not happen and ON CONFLICT
> >> ERROR
> >>>>> should not fail. If the streaming job fails, users will be confused.
> >> For
> >>>>> disorder issues introduced by Flink internally, I believe Flink
> should
> >>>>> handle them internally.
> >>>>>
> >>>>> Let's first clarify this point, because I think it's vital for the
> >>>>> understanding of the proposal so we must be on the same page before
> we
> >>>> talk
> >>>>> about other points.
> >>>>>
> >>>>> No, the example you pointed out would not throw an error in `ON
> >> CONFLICT
> >>>>> ERROR`. As you pointed out yourself those come from the same PK on
> >> table
> >>>>> S1, therefore you will not have two active records with (id = 1) in
> the
> >>>>> sink on the watermark boundary. The watermark is an internal
> >> consistency
> >>>>> boundary. You will always have a UB for the old value and an UA for
> the
> >>>> new
> >>>>> value. Therefore you will only ever have a single value after the
> >>>>> compaction. We would throw only if we try to upsert into a single row
> >> in
> >>>>> the sink from two rows from s1 with different ids e.g. {source_id=1,
> >>>>> sink_id=1}, {source_id=2, sink_id=1}.
> >>>>>
> >>>>>> Before I talk about 3, let me talk about 4 first. If I’m not
> >> mistaken,
> >>>> we
> >>>>> need a deterministic boundary to determine that upstream data will no
> >>>>> longer be updated, so that we can output the “final” result.
> >>>>>
> >>>>> No, that's not what I understand as internal consistency. An internal
> >>>>> consistency is that a single change in the source produces a single
> >> change
> >>>>> in the sink, without incorrect intermediate states caused by
> >> processing UB
> >>>>> separately from UA. I don't want to process multiple source changes
> in
> >> a
> >>>>> single batch. I'd rather call it "external" consistency or snapshot
> >>>>> processing as you are suggesting, but in my mind this is an
> orthogonal
> >>>>> topic from what I am trying to solve here.
> >>>>>
> >>>>> On Tue, 23 Dec 2025 at 12:28, Xuyang <[email protected]> wrote:
> >>>>>
> >>>>>> Hi, Dawid. Thank you for your detailed explanation and the update.
> >> Let
> >>>> me
> >>>>>> share my thoughts.
> >>>>>> 1. In fact, I agree with what you said: for clearly problematic
> >> queries,
> >>>>>> we should fail fast — for example, the case you mentioned (writing
> >> data
> >>>>>> from a source table whose PK is id into a sink table whose PK is
> >> name).
> >>>> We
> >>>>>> can fail like a traditional database: PK conflict. That’s totally
> >> fine.
> >>>>>> 2. But I’d like to add a clarification. Take the “Changelog
> Disorder”
> >>>>>> example described in the FLIP (
> >>>>>>
> >>>>
> >>
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=399279158#FLIP558:ImprovementstoSinkUpsertMaterializerandchangelogdisorder-ExampleofChangelogDisorder
> >>>> ).
> >>>>>> Let’s look at disorder (2) and disorder (3). Under the default ON
> >>>> CONFLICT
> >>>>>> ERROR, IIUC, the expected behavior is that Flink should fail.
> >> However,
> >>>>>> those inserts and updates actually all come from the same PK on
> >> table s1
> >>>>>> (id = 1). From a relational-algebra perspective, this does not
> >> violate
> >>>> the
> >>>>>> PK constraint; it only happens because we shuffle by level and end
> up
> >>>> with
> >>>>>> out-of-order issues under multi parallelisms. In other words, if we
> >> run
> >>>>>> this SQL in batch, the pk conflict will not happen and ON CONFLICT
> >> ERROR
> >>>>>> should not fail. If the streaming job fails, users will be confused.
> >> For
> >>>>>> disorder issues introduced by Flink internally, I believe Flink
> >> should
> >>>>>> handle them internally.
> >>>>>> 4. Before I talk about 3, let me talk about 4 first. If I’m not
> >>>> mistaken,
> >>>>>> we need a deterministic boundary to determine that upstream data
> >> will no
> >>>>>> longer be updated, so that we can output the “final” result. I think
> >> our
> >>>>>> disagreement is about where that “data boundary” is. In this FLIP,
> >> the
> >>>>>> boundary is described as: 1) watermark or 2) checkpoint. But I think
> >>>> such a
> >>>>>> boundary is tied to the table, for example, “the creation of a
> >> specific
> >>>>>> historical snapshot version of a table.” Since data in the snapshot
> >> is
> >>>>>> immutable, we can output results at that point. What do you think?
> >>>>>> 3. If we must choose one of the introduced options, I lean toward
> >> Option
> >>>>>> 1, because we already have a clear definition for watermarks defined
> >> on
> >>>> a
> >>>>>> table: “allowing for consistent results despite out-of-order or late
> >>>>>> events” (
> >>>>>>
> >>>>
> >>
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/concepts/time_attributes/#event-time
> >>>> ).
> >>>>>> This “drop late events” semantic does not exist in checkpoint.
> >> However,
> >>>> my
> >>>>>> concern is that in most scenarios, a CDC source may produce multiple
> >>>>>> updates for the same PK over a long time span, so the watermark
> >> should
> >>>> be
> >>>>>> defined very large, which will cause the job to produce no output
> >> for a
> >>>>>> long time.
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> --
> >>>>>>
> >>>>>>      Best!
> >>>>>>      Xuyang
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> At 2025-12-17 16:52:26, "Dawid Wysakowicz" <[email protected]>
> >>>> wrote:
> >>>>>>> Hey Gustavo, Xuyang
> >>>>>>> I tried incorporating your suggestions into the FLIP. Please take
> >>>> another
> >>>>>>> look.
> >>>>>>> Best,
> >>>>>>> Dawid
> >>>>>>>
> >>>>>>> On Fri, 12 Dec 2025 at 16:05, Dawid Wysakowicz <
> >> [email protected]
> >>>>>
> >>>>>>> wrote:
> >>>>>>>
> >>>>>>>> 1. The default behavior changes if no ON CONFLICT is defined. I
> >> am a
> >>>>>>>>> little concerned that this may cause errors in a large number of
> >>>>>> existing
> >>>>>>>>> cases.
> >>>>>>>>
> >>>>>>>> I can be convinced to leave the default behaviour as it is now. I
> >> am
> >>>>>>>> worried though, very rarely the current behaviour of SUM is what
> >>>> people
> >>>>>>>> actually want. As mentioned in the FLIP I wholeheartedly believe
> >>>> there
> >>>>>> are
> >>>>>>>> very little if any real world scenarios where you need the
> >>>> deduplicate
> >>>>>>>> behaviour. I try to elaborate a bit more in 2)
> >>>>>>>>
> >>>>>>>> 2. Regarding On Conflict Errors, in the context of CDC streams,
> >> it is
> >>>>>>>>> expected that the vast majority of cases cannot generate only one
> >>>>>> record
> >>>>>>>>> with one primary key. The only solutions I can think of are
> >>>> append-only
> >>>>>>>>> top1, deduplication, or aggregating the first row.
> >>>>>>>>
> >>>>>>>> I disagree with that statement. I don't think CDC streams change
> >>>>>> anything
> >>>>>>>> in that regard. Maybe there is some misunderstanding about what a
> >> one
> >>>>>>>> record means in this context.
> >>>>>>>>
> >>>>>>>> I agree almost certainly there will be a sequence of UA, UB for a
> >>>> single
> >>>>>>>> sink's primary key.
> >>>>>>>>
> >>>>>>>> My claim is that users almost never want a situation where they
> >> have
> >>>>>> more
> >>>>>>>> than one "active" upsert key/record for one sink's primary key. I
> >>>> tried
> >>>>>> to
> >>>>>>>> explain that in the FLIP, but let me try to give one more example
> >>>> here.
> >>>>>>>>
> >>>>>>>> Imagine two tables:
> >>>>>>>> CREATE TABLE source (
> >>>>>>>>    id bigint PRIMARY KEY,
> >>>>>>>>    name string,
> >>>>>>>>    value string
> >>>>>>>> )
> >>>>>>>>
> >>>>>>>> CREATE TABLE sink (
> >>>>>>>>    name string PRIMARY KEY,
> >>>>>>>>    value string
> >>>>>>>> )
> >>>>>>>>
> >>>>>>>> INSERT INTO sink SELECT name, value;
> >>>>>>>>
> >>>>>>>> === Input
> >>>>>>>> (1, "Apple", "ABC")
> >>>>>>>> (2, "Apple", "DEF")
> >>>>>>>>
> >>>>>>>> In the scenario above a SUM is inserted which will deduplicate the
> >>>> rows
> >>>>>>>> and override the value for "Apple" with "DEF". In my opinion it's
> >>>>>> entirely
> >>>>>>>> wrong, instead an exception should be thrown that there is
> >> actually a
> >>>>>>>> constraint validation.
> >>>>>>>>
> >>>>>>>> I am absolutely more than happy to be proved wrong. If you do
> >> have a
> >>>>>> real
> >>>>>>>> world scenario where the deduplication logic is actually correct
> >> and
> >>>>>>>> expected please, please do share. So far I have not seen one, nor
> >>>> was I
> >>>>>>>> able to come up with one. And yet I am not suggesting to remove
> >> the
> >>>>>>>> deduplication logic entirely, users can still use it with ON
> >> CONFLICT
> >>>>>>>> DEDUPLICATE.
> >>>>>>>>
> >>>>>>>> 3. The special watermark generation interval affects the
> >> visibility
> >>>> of
> >>>>>>>>> results. How can users configure this generation interval?
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> That's a fair question I'll try to elaborate on in the FLIP. I can
> >>>> see
> >>>>>> two
> >>>>>>>> options:
> >>>>>>>> 1. We piggyback on existing watermarks in the query, if there are
> >> no
> >>>>>>>> watermarks (tables don't have a watermark definition) we fail
> >> during
> >>>>>>>> planning
> >>>>>>>> 2. We add a new parameter option for a specialized generalized
> >>>> watermark
> >>>>>>>>
> >>>>>>>> Let me think for some more on that and I'll come back with a more
> >>>>>> concrete
> >>>>>>>> proposal.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>> 4. I believe that resolving out-of-order issues and addressing
> >>>> internal
> >>>>>>>>> consistency are two separate problems. As I understand the
> >> current
> >>>>>>>>> solution, it does not  really resolve the internal consistency
> >>>> issue.
> >>>>>> We
> >>>>>>>>> could first resolve the out-of-order problem. For most scenarios
> >>>> that
> >>>>>>>>> require real-time response, we can directly output intermediate
> >>>> results
> >>>>>>>>> promptly.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> Why doesn't it solve it? It does. Given a pair of UB/UA we won't
> >> emit
> >>>>>> the
> >>>>>>>> temporary state after processing the UB.
> >>>>>>>>
> >>>>>>>> 5. How can we compact data with the same custom watermark? If
> >>>> detailed
> >>>>>>>>> comparisons are necessary, I think we still need to preserve all
> >> key
> >>>>>> data;
> >>>>>>>>> we would just be compressing this data further at time t.
> >>>>>>>>
> >>>>>>>> Yes, we need to preserve all key data, but only between two
> >>>> watermarks.
> >>>>>>>> Assuming frequent watermarks, that's for a very short time.
> >>>>>>>>
> >>>>>>>> 6. If neither this proposed solution nor the reject solution can
> >>>> resolve
> >>>>>>>>> internal consistency, we need to reconsider the differences
> >> between
> >>>>>> the two
> >>>>>>>>> approaches.
> >>>>>>>>
> >>>>>>>> I'll copy the explanation why the rejected alternative should be
> >>>>>> rejected
> >>>>>>>> from the FLIP:
> >>>>>>>>
> >>>>>>>> The solution can help us to solve the changelog disorder problem,
> >>>> but it
> >>>>>>>> does not help with the *internal consistency *issue. If we want to
> >>>> fix
> >>>>>>>> that as well, we still need the compaction on watermarks. At the
> >> same
> >>>>>> time
> >>>>>>>> it increases the size of all flowing records. Therefore it was
> >>>> rejected
> >>>>>> in
> >>>>>>>> favour of simply compacting all records once on the progression of
> >>>>>>>> watermarks.
> >>>>>>>>
> >>>>>>>> Best,
> >>>>>>>> Dawid
> >>>>>>>>
> >>>>>>
> >>>>
> >>
> >
>
>

Reply via email to