> But I’d like to add a clarification. Take the “Changelog Disorder”
example described in the FLIP (
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=399279158#FLIP558:ImprovementstoSinkUpsertMaterializerandchangelogdisorder-ExampleofChangelogDisorder).
Let’s look at disorder (2) and disorder (3). Under the default ON CONFLICT
ERROR, IIUC, the expected behavior is that Flink should fail. However,
those inserts and updates actually all come from the same PK on table s1
(id = 1). From a relational-algebra perspective, this does not violate the
PK constraint; it only happens because we shuffle by level and end up with
out-of-order issues under multi parallelisms. In other words, if we run
this SQL in batch, the pk conflict will not happen and ON CONFLICT ERROR
should not fail. If the streaming job fails, users will be confused. For
disorder issues introduced by Flink internally, I believe Flink should
handle them internally.
Let's first clarify this point, because I think it's vital for the
understanding of the proposal so we must be on the same page before we talk
about other points.
No, the example you pointed out would not throw an error in `ON CONFLICT
ERROR`. As you pointed out yourself those come from the same PK on table
S1, therefore you will not have two active records with (id = 1) in the
sink on the watermark boundary. The watermark is an internal consistency
boundary. You will always have a UB for the old value and an UA for the new
value. Therefore you will only ever have a single value after the
compaction. We would throw only if we try to upsert into a single row in
the sink from two rows from s1 with different ids e.g. {source_id=1,
sink_id=1}, {source_id=2, sink_id=1}.
> Before I talk about 3, let me talk about 4 first. If I’m not mistaken, we
need a deterministic boundary to determine that upstream data will no
longer be updated, so that we can output the “final” result.
No, that's not what I understand as internal consistency. An internal
consistency is that a single change in the source produces a single change
in the sink, without incorrect intermediate states caused by processing UB
separately from UA. I don't want to process multiple source changes in a
single batch. I'd rather call it "external" consistency or snapshot
processing as you are suggesting, but in my mind this is an orthogonal
topic from what I am trying to solve here.
On Tue, 23 Dec 2025 at 12:28, Xuyang <[email protected]> wrote:
> Hi, Dawid. Thank you for your detailed explanation and the update. Let me
> share my thoughts.
> 1. In fact, I agree with what you said: for clearly problematic queries,
> we should fail fast — for example, the case you mentioned (writing data
> from a source table whose PK is id into a sink table whose PK is name). We
> can fail like a traditional database: PK conflict. That’s totally fine.
> 2. But I’d like to add a clarification. Take the “Changelog Disorder”
> example described in the FLIP (
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=399279158#FLIP558:ImprovementstoSinkUpsertMaterializerandchangelogdisorder-ExampleofChangelogDisorder).
> Let’s look at disorder (2) and disorder (3). Under the default ON CONFLICT
> ERROR, IIUC, the expected behavior is that Flink should fail. However,
> those inserts and updates actually all come from the same PK on table s1
> (id = 1). From a relational-algebra perspective, this does not violate the
> PK constraint; it only happens because we shuffle by level and end up with
> out-of-order issues under multi parallelisms. In other words, if we run
> this SQL in batch, the pk conflict will not happen and ON CONFLICT ERROR
> should not fail. If the streaming job fails, users will be confused. For
> disorder issues introduced by Flink internally, I believe Flink should
> handle them internally.
> 4. Before I talk about 3, let me talk about 4 first. If I’m not mistaken,
> we need a deterministic boundary to determine that upstream data will no
> longer be updated, so that we can output the “final” result. I think our
> disagreement is about where that “data boundary” is. In this FLIP, the
> boundary is described as: 1) watermark or 2) checkpoint. But I think such a
> boundary is tied to the table, for example, “the creation of a specific
> historical snapshot version of a table.” Since data in the snapshot is
> immutable, we can output results at that point. What do you think?
> 3. If we must choose one of the introduced options, I lean toward Option
> 1, because we already have a clear definition for watermarks defined on a
> table: “allowing for consistent results despite out-of-order or late
> events” (
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/concepts/time_attributes/#event-time).
> This “drop late events” semantic does not exist in checkpoint. However, my
> concern is that in most scenarios, a CDC source may produce multiple
> updates for the same PK over a long time span, so the watermark should be
> defined very large, which will cause the job to produce no output for a
> long time.
>
>
>
> --
>
> Best!
> Xuyang
>
>
>
> At 2025-12-17 16:52:26, "Dawid Wysakowicz" <[email protected]> wrote:
> >Hey Gustavo, Xuyang
> >I tried incorporating your suggestions into the FLIP. Please take another
> >look.
> >Best,
> >Dawid
> >
> >On Fri, 12 Dec 2025 at 16:05, Dawid Wysakowicz <[email protected]>
> >wrote:
> >
> >> 1. The default behavior changes if no ON CONFLICT is defined. I am a
> >>> little concerned that this may cause errors in a large number of
> existing
> >>> cases.
> >>
> >> I can be convinced to leave the default behaviour as it is now. I am
> >> worried though, very rarely the current behaviour of SUM is what people
> >> actually want. As mentioned in the FLIP I wholeheartedly believe there
> are
> >> very little if any real world scenarios where you need the deduplicate
> >> behaviour. I try to elaborate a bit more in 2)
> >>
> >> 2. Regarding On Conflict Errors, in the context of CDC streams, it is
> >>> expected that the vast majority of cases cannot generate only one
> record
> >>> with one primary key. The only solutions I can think of are append-only
> >>> top1, deduplication, or aggregating the first row.
> >>
> >> I disagree with that statement. I don't think CDC streams change
> anything
> >> in that regard. Maybe there is some misunderstanding about what a one
> >> record means in this context.
> >>
> >> I agree almost certainly there will be a sequence of UA, UB for a single
> >> sink's primary key.
> >>
> >> My claim is that users almost never want a situation where they have
> more
> >> than one "active" upsert key/record for one sink's primary key. I tried
> to
> >> explain that in the FLIP, but let me try to give one more example here.
> >>
> >> Imagine two tables:
> >> CREATE TABLE source (
> >> id bigint PRIMARY KEY,
> >> name string,
> >> value string
> >> )
> >>
> >> CREATE TABLE sink (
> >> name string PRIMARY KEY,
> >> value string
> >> )
> >>
> >> INSERT INTO sink SELECT name, value;
> >>
> >> === Input
> >> (1, "Apple", "ABC")
> >> (2, "Apple", "DEF")
> >>
> >> In the scenario above a SUM is inserted which will deduplicate the rows
> >> and override the value for "Apple" with "DEF". In my opinion it's
> entirely
> >> wrong, instead an exception should be thrown that there is actually a
> >> constraint validation.
> >>
> >> I am absolutely more than happy to be proved wrong. If you do have a
> real
> >> world scenario where the deduplication logic is actually correct and
> >> expected please, please do share. So far I have not seen one, nor was I
> >> able to come up with one. And yet I am not suggesting to remove the
> >> deduplication logic entirely, users can still use it with ON CONFLICT
> >> DEDUPLICATE.
> >>
> >> 3. The special watermark generation interval affects the visibility of
> >>> results. How can users configure this generation interval?
> >>
> >>
> >> That's a fair question I'll try to elaborate on in the FLIP. I can see
> two
> >> options:
> >> 1. We piggyback on existing watermarks in the query, if there are no
> >> watermarks (tables don't have a watermark definition) we fail during
> >> planning
> >> 2. We add a new parameter option for a specialized generalized watermark
> >>
> >> Let me think for some more on that and I'll come back with a more
> concrete
> >> proposal.
> >>
> >>
> >>> 4. I believe that resolving out-of-order issues and addressing internal
> >>> consistency are two separate problems. As I understand the current
> >>> solution, it does not really resolve the internal consistency issue.
> We
> >>> could first resolve the out-of-order problem. For most scenarios that
> >>> require real-time response, we can directly output intermediate results
> >>> promptly.
> >>
> >>
> >> Why doesn't it solve it? It does. Given a pair of UB/UA we won't emit
> the
> >> temporary state after processing the UB.
> >>
> >> 5. How can we compact data with the same custom watermark? If detailed
> >>> comparisons are necessary, I think we still need to preserve all key
> data;
> >>> we would just be compressing this data further at time t.
> >>
> >> Yes, we need to preserve all key data, but only between two watermarks.
> >> Assuming frequent watermarks, that's for a very short time.
> >>
> >> 6. If neither this proposed solution nor the reject solution can resolve
> >>> internal consistency, we need to reconsider the differences between
> the two
> >>> approaches.
> >>
> >> I'll copy the explanation why the rejected alternative should be
> rejected
> >> from the FLIP:
> >>
> >> The solution can help us to solve the changelog disorder problem, but it
> >> does not help with the *internal consistency *issue. If we want to fix
> >> that as well, we still need the compaction on watermarks. At the same
> time
> >> it increases the size of all flowing records. Therefore it was rejected
> in
> >> favour of simply compacting all records once on the progression of
> >> watermarks.
> >>
> >> Best,
> >> Dawid
> >>
>