Thank you for the explanation. I think I understand what you mean now. I have a
few questions I’d like to confirm:
1. Without upsert key, how do we determine the order of multiple records within
the same watermark boundary when non-deterministic functions are involved? For
example, if we receive data like the “disorder (2)” case below, and the upsert
key is lost after a join, what will the final output be (without internal
consistency issues)?
+U(id=1, level=20, attr='b1', rand='1.5')
+U(id=1, level=10, attr='a1', rand='1') // originally +I; I slightly modified it
-U(id=1, level=10, attr='a1', rand='2')
> The watermark is an internal consistency boundary. You will always have a UB
> for the old value and an UA for the new value.
2. To address internal consistency issues caused by emitting -U, we need each
watermark boundary to contain paired -U/+U. That means we have to track what
the source sends, and only emit the "watermark boundary" after the +U has been
sent, right? For an upsert source, this is easy because we have
ChangelogNormalize to output the -U/+U pairs. But for a retract source, we may
need to introduce a new stateful operator. This seems unavoidable unless the
source to output -U and +U together.
3. About "table.exec.sink.upsert-materialize-barrier-mode.watermark-interval",
it’s not actually called “watermark internal”, because it doesn't have
watermark semantics to drop late data. It’s actually a compaction barrier,
right?
4. The state in SUM is used to handle rollback under out-of-order scenarios.
Since we resolve out-of-orderness within a watermark boundary, does that mean
we don’t need state anymore? More clearly, we only need a "temporary" state
that lives within each watermark boundary. (Or what I can think of is: you’re
using this persistent state to support the subsequent `ON CONFLICT
conflict_action`.)
--
Best!
Xuyang
在 2025-12-29 17:51:40,"Dawid Wysakowicz" <[email protected]> 写道:
>> But I’d like to add a clarification. Take the “Changelog Disorder”
>example described in the FLIP (
>https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=399279158#FLIP558:ImprovementstoSinkUpsertMaterializerandchangelogdisorder-ExampleofChangelogDisorder).
>Let’s look at disorder (2) and disorder (3). Under the default ON CONFLICT
>ERROR, IIUC, the expected behavior is that Flink should fail. However,
>those inserts and updates actually all come from the same PK on table s1
>(id = 1). From a relational-algebra perspective, this does not violate the
>PK constraint; it only happens because we shuffle by level and end up with
>out-of-order issues under multi parallelisms. In other words, if we run
>this SQL in batch, the pk conflict will not happen and ON CONFLICT ERROR
>should not fail. If the streaming job fails, users will be confused. For
>disorder issues introduced by Flink internally, I believe Flink should
>handle them internally.
>
>Let's first clarify this point, because I think it's vital for the
>understanding of the proposal so we must be on the same page before we talk
>about other points.
>
>No, the example you pointed out would not throw an error in `ON CONFLICT
>ERROR`. As you pointed out yourself those come from the same PK on table
>S1, therefore you will not have two active records with (id = 1) in the
>sink on the watermark boundary. The watermark is an internal consistency
>boundary. You will always have a UB for the old value and an UA for the new
>value. Therefore you will only ever have a single value after the
>compaction. We would throw only if we try to upsert into a single row in
>the sink from two rows from s1 with different ids e.g. {source_id=1,
>sink_id=1}, {source_id=2, sink_id=1}.
>
>> Before I talk about 3, let me talk about 4 first. If I’m not mistaken, we
>need a deterministic boundary to determine that upstream data will no
>longer be updated, so that we can output the “final” result.
>
>No, that's not what I understand as internal consistency. An internal
>consistency is that a single change in the source produces a single change
>in the sink, without incorrect intermediate states caused by processing UB
>separately from UA. I don't want to process multiple source changes in a
>single batch. I'd rather call it "external" consistency or snapshot
>processing as you are suggesting, but in my mind this is an orthogonal
>topic from what I am trying to solve here.
>
>On Tue, 23 Dec 2025 at 12:28, Xuyang <[email protected]> wrote:
>
>> Hi, Dawid. Thank you for your detailed explanation and the update. Let me
>> share my thoughts.
>> 1. In fact, I agree with what you said: for clearly problematic queries,
>> we should fail fast — for example, the case you mentioned (writing data
>> from a source table whose PK is id into a sink table whose PK is name). We
>> can fail like a traditional database: PK conflict. That’s totally fine.
>> 2. But I’d like to add a clarification. Take the “Changelog Disorder”
>> example described in the FLIP (
>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=399279158#FLIP558:ImprovementstoSinkUpsertMaterializerandchangelogdisorder-ExampleofChangelogDisorder).
>> Let’s look at disorder (2) and disorder (3). Under the default ON CONFLICT
>> ERROR, IIUC, the expected behavior is that Flink should fail. However,
>> those inserts and updates actually all come from the same PK on table s1
>> (id = 1). From a relational-algebra perspective, this does not violate the
>> PK constraint; it only happens because we shuffle by level and end up with
>> out-of-order issues under multi parallelisms. In other words, if we run
>> this SQL in batch, the pk conflict will not happen and ON CONFLICT ERROR
>> should not fail. If the streaming job fails, users will be confused. For
>> disorder issues introduced by Flink internally, I believe Flink should
>> handle them internally.
>> 4. Before I talk about 3, let me talk about 4 first. If I’m not mistaken,
>> we need a deterministic boundary to determine that upstream data will no
>> longer be updated, so that we can output the “final” result. I think our
>> disagreement is about where that “data boundary” is. In this FLIP, the
>> boundary is described as: 1) watermark or 2) checkpoint. But I think such a
>> boundary is tied to the table, for example, “the creation of a specific
>> historical snapshot version of a table.” Since data in the snapshot is
>> immutable, we can output results at that point. What do you think?
>> 3. If we must choose one of the introduced options, I lean toward Option
>> 1, because we already have a clear definition for watermarks defined on a
>> table: “allowing for consistent results despite out-of-order or late
>> events” (
>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/concepts/time_attributes/#event-time).
>> This “drop late events” semantic does not exist in checkpoint. However, my
>> concern is that in most scenarios, a CDC source may produce multiple
>> updates for the same PK over a long time span, so the watermark should be
>> defined very large, which will cause the job to produce no output for a
>> long time.
>>
>>
>>
>> --
>>
>> Best!
>> Xuyang
>>
>>
>>
>> At 2025-12-17 16:52:26, "Dawid Wysakowicz" <[email protected]> wrote:
>> >Hey Gustavo, Xuyang
>> >I tried incorporating your suggestions into the FLIP. Please take another
>> >look.
>> >Best,
>> >Dawid
>> >
>> >On Fri, 12 Dec 2025 at 16:05, Dawid Wysakowicz <[email protected]>
>> >wrote:
>> >
>> >> 1. The default behavior changes if no ON CONFLICT is defined. I am a
>> >>> little concerned that this may cause errors in a large number of
>> existing
>> >>> cases.
>> >>
>> >> I can be convinced to leave the default behaviour as it is now. I am
>> >> worried though, very rarely the current behaviour of SUM is what people
>> >> actually want. As mentioned in the FLIP I wholeheartedly believe there
>> are
>> >> very little if any real world scenarios where you need the deduplicate
>> >> behaviour. I try to elaborate a bit more in 2)
>> >>
>> >> 2. Regarding On Conflict Errors, in the context of CDC streams, it is
>> >>> expected that the vast majority of cases cannot generate only one
>> record
>> >>> with one primary key. The only solutions I can think of are append-only
>> >>> top1, deduplication, or aggregating the first row.
>> >>
>> >> I disagree with that statement. I don't think CDC streams change
>> anything
>> >> in that regard. Maybe there is some misunderstanding about what a one
>> >> record means in this context.
>> >>
>> >> I agree almost certainly there will be a sequence of UA, UB for a single
>> >> sink's primary key.
>> >>
>> >> My claim is that users almost never want a situation where they have
>> more
>> >> than one "active" upsert key/record for one sink's primary key. I tried
>> to
>> >> explain that in the FLIP, but let me try to give one more example here.
>> >>
>> >> Imagine two tables:
>> >> CREATE TABLE source (
>> >> id bigint PRIMARY KEY,
>> >> name string,
>> >> value string
>> >> )
>> >>
>> >> CREATE TABLE sink (
>> >> name string PRIMARY KEY,
>> >> value string
>> >> )
>> >>
>> >> INSERT INTO sink SELECT name, value;
>> >>
>> >> === Input
>> >> (1, "Apple", "ABC")
>> >> (2, "Apple", "DEF")
>> >>
>> >> In the scenario above a SUM is inserted which will deduplicate the rows
>> >> and override the value for "Apple" with "DEF". In my opinion it's
>> entirely
>> >> wrong, instead an exception should be thrown that there is actually a
>> >> constraint validation.
>> >>
>> >> I am absolutely more than happy to be proved wrong. If you do have a
>> real
>> >> world scenario where the deduplication logic is actually correct and
>> >> expected please, please do share. So far I have not seen one, nor was I
>> >> able to come up with one. And yet I am not suggesting to remove the
>> >> deduplication logic entirely, users can still use it with ON CONFLICT
>> >> DEDUPLICATE.
>> >>
>> >> 3. The special watermark generation interval affects the visibility of
>> >>> results. How can users configure this generation interval?
>> >>
>> >>
>> >> That's a fair question I'll try to elaborate on in the FLIP. I can see
>> two
>> >> options:
>> >> 1. We piggyback on existing watermarks in the query, if there are no
>> >> watermarks (tables don't have a watermark definition) we fail during
>> >> planning
>> >> 2. We add a new parameter option for a specialized generalized watermark
>> >>
>> >> Let me think for some more on that and I'll come back with a more
>> concrete
>> >> proposal.
>> >>
>> >>
>> >>> 4. I believe that resolving out-of-order issues and addressing internal
>> >>> consistency are two separate problems. As I understand the current
>> >>> solution, it does not really resolve the internal consistency issue.
>> We
>> >>> could first resolve the out-of-order problem. For most scenarios that
>> >>> require real-time response, we can directly output intermediate results
>> >>> promptly.
>> >>
>> >>
>> >> Why doesn't it solve it? It does. Given a pair of UB/UA we won't emit
>> the
>> >> temporary state after processing the UB.
>> >>
>> >> 5. How can we compact data with the same custom watermark? If detailed
>> >>> comparisons are necessary, I think we still need to preserve all key
>> data;
>> >>> we would just be compressing this data further at time t.
>> >>
>> >> Yes, we need to preserve all key data, but only between two watermarks.
>> >> Assuming frequent watermarks, that's for a very short time.
>> >>
>> >> 6. If neither this proposed solution nor the reject solution can resolve
>> >>> internal consistency, we need to reconsider the differences between
>> the two
>> >>> approaches.
>> >>
>> >> I'll copy the explanation why the rejected alternative should be
>> rejected
>> >> from the FLIP:
>> >>
>> >> The solution can help us to solve the changelog disorder problem, but it
>> >> does not help with the *internal consistency *issue. If we want to fix
>> >> that as well, we still need the compaction on watermarks. At the same
>> time
>> >> it increases the size of all flowing records. Therefore it was rejected
>> in
>> >> favour of simply compacting all records once on the progression of
>> >> watermarks.
>> >>
>> >> Best,
>> >> Dawid
>> >>
>>