Thank you for the constructive discussions! I'll start a vote in a separate thread.
On Mon, 12 Jan 2026 at 13:34, Timo Walther <[email protected]> wrote: > Thanks for the clarification, Dawid. > > +1 for voting. > > Cheers, > Timo > On 12.01.26 03:09, Xuyang wrote: > > Thank you for the explanation. big +1 for this. > > > > > > > > -- > > > > Best! > > Xuyang > > > > > > > > At 2026-01-09 19:16:52, "Dawid Wysakowicz" <[email protected]> > wrote: > >> @Timo > >> > >> 1) Will we support ON CONFLICT syntax also for append-only inputs? > >> > >> > >> That's a very good point! I think we should. In my opinion we should > >> support it so that: > >> 1. ERROR -> checks if we don't insert multiple times to the same > PRIMARY KEY > >> 2. NOTHING -> that emits only the first value that's inserted > >> 3. DEDUPLICATE -> the current behaviour, we would always overwrite the > >> value. we don't have a disorder problem though thus we don't need the > SUM > >> operator > >> > >> 2) Regarding naming, I find the config option is too long. > >>> "table.exec.sink.upsert-materialize-barrier-mode.compaction-interval" > >>> could we simplify it to something less internal? Maybe > >>> table.exec.sink.upserts.compaction-interval? > >> > >> > >> Fine by me. I'll update the FLIP to: > >> "table.exec.sink.upserts.compaction-interval" > >> > >> @David > >> Thank you for the kind words! I'll think of having it as a blogpost. > >> > >> If there are no further objections/comments. I'd like to start a vote on > >> this some time next week. > >> > >> On Thu, 8 Jan 2026 at 13:48, Timo Walther <[email protected]> wrote: > >> > >>> Hi Dawid, > >>> > >>> thank you very much for working and proposing this FLIP. This is an > >>> excellent design document that shows the deep research you have > >>> conducted. Both the linked resources as well as the examples are very > >>> helpful to get the big picture. > >>> > >>> Sink upsert materializer is a long-standing problem for Flink SQL > >>> pipelines. Only a few people really understand why it exists and why it > >>> is so expensive. I also know that some users have simply disabled it > >>> because they are fine with the output results (potentially ignoring > >>> corner cases intentially or by accident). > >>> > >>> In general the FLIP is in a very good shape, and you definitely get my > >>> +1 on this. Just some last questions: > >>> > >>> 1) Will we support ON CONFLICT syntax also for append-only inputs? > >>> > >>> 2) Regarding naming, I find the config option is too long. > >>> "table.exec.sink.upsert-materialize-barrier-mode.compaction-interval" > >>> could we simplify it to something less internal? Maybe > >>> table.exec.sink.upserts.compaction-interval? > >>> > >>> Cheers, > >>> Timo > >>> > >>> > >>> On 08.01.26 12:42, Dawid Wysakowicz wrote: > >>>>> > >>>>> Today there are still many retract sources, such as some sources in > the > >>>>> Flink CDC project (e.g., PG CDC, MySQL CDC), Paimon, Hudi, and some > >>>>> formats, etc.These can be further divided into two categories. > >>>>> One is like Debezium: there is only a single UPDATE record in the > >>> physical > >>>>> storage, and the corresponding Flink source connector further splits > it > >>>>> into UA/UB. The other is where UA and UB are already two separate > >>> changelog > >>>>> records in the physical storage. > >>>>> For the former, we could generate a watermark boundary before the > source > >>>>> just like checkpoint barrier, so that UB and UA are guaranteed to > fall > >>>>> within the same boundary. This should actually be supportable. It’s > >>> okay if > >>>>> we don’t support it in the first version, but it may affect the > overall > >>>>> design—for example, how to generate the system watermark boundary. > >>>>> For the latter, it’s probably more troublesome. I think it’s also > fine > >>> not > >>>>> to support it. What do you think? > >>>> > >>>> > >>>> When designing the FLIP I considered the debezium case and it's > actually > >>>> not so much of a problem as you correctly pointed out. The only > >>> requirement > >>>> is that the watermark is generated before the message split. I'd start > >>>> without support for those sources and we can improve on that later on. > >>>> > >>>> 3. Oh, what I meant is how about renaming it to something like > >>>>> > "table.exec.sink.upsert-materialize-barrier-mode.compaction-interval"? > >>>>> Because I think it may be not a “watermark”; it’s a compaction > barrier, > >>> and > >>>>> this compaction can be 1) replaced by watermark, or 2) replaced by > >>>>> checkpoint, or 3) generated by the Flink system internally. What do > you > >>>>> think? > >>>> > >>>> > >>>> Fine by me. We can call > >>>> it > "table.exec.sink.upsert-materialize-barrier-mode.compaction-interval". > >>>> > >>>> 4. I’m also wondering whether we don’t even need the state about “a > >>> single > >>>>> result per key for a cross-watermark-boundary handover”? > >>>> > >>>> > >>>> I am pretty sure we do in order to adhere to the ON CONFLICT ERROR > >>>> behaviour. Bear in mind two "active" keys may come as part of two > >>> separate > >>>> barriers. > >>>> > >>>> On Thu, 8 Jan 2026 at 05:14, Xuyang <[email protected]> wrote: > >>>> > >>>>> 1. I agree it, at least it won’t be worse. Currently, for data that > >>>>> contains non-deterministic functions without UK, SUM cannot properly > >>> handle > >>>>> correcting out-of-order records. > >>>>> > >>>>> > >>>>> 2. It’s fine if we first don’t support it. Let me add some more > context. > >>>>> Today there are still many retract sources, such as some sources in > the > >>>>> Flink CDC project (e.g., PG CDC, MySQL CDC), Paimon, Hudi, and some > >>>>> formats, etc.These can be further divided into two categories. > >>>>> One is like Debezium: there is only a single UPDATE record in the > >>> physical > >>>>> storage, and the corresponding Flink source connector further splits > it > >>>>> into UA/UB. The other is where UA and UB are already two separate > >>> changelog > >>>>> records in the physical storage. > >>>>> For the former, we could generate a watermark boundary before the > source > >>>>> just like checkpoint barrier, so that UB and UA are guaranteed to > fall > >>>>> within the same boundary. This should actually be supportable. It’s > >>> okay if > >>>>> we don’t support it in the first version, but it may affect the > overall > >>>>> design—for example, how to generate the system watermark boundary. > >>>>> For the latter, it’s probably more troublesome. I think it’s also > fine > >>> not > >>>>> to support it. What do you think? > >>>>> > >>>>> > >>>>> 3. Oh, what I meant is how about renaming it to something like > >>>>> > "table.exec.sink.upsert-materialize-barrier-mode.compaction-interval"? > >>>>> Because I think it may be not a “watermark”; it’s a compaction > barrier, > >>> and > >>>>> this compaction can be 1) replaced by watermark, or 2) replaced by > >>>>> checkpoint, or 3) generated by the Flink system internally. What do > you > >>>>> think? > >>>>> > >>>>> > >>>>> 4. I’m also wondering whether we don’t even need the state about “a > >>> single > >>>>> result per key for a cross-watermark-boundary handover”? > >>>>> > >>>>> > >>>>> > >>>>> > >>>>> > >>>>> -- > >>>>> > >>>>> Best! > >>>>> Xuyang > >>>>> > >>>>> > >>>>> > >>>>> 在 2026-01-07 19:54:47,"Dawid Wysakowicz" <[email protected]> > 写道: > >>>>>>> 1. Without upsert key, how do we determine the order of multiple > >>> records > >>>>>> within the same watermark boundary when non-deterministic functions > are > >>>>>> involved? For example, if we receive data like the “disorder (2)” > case > >>>>>> below, and the upsert key is lost after a join, what will the final > >>> output > >>>>>> be (without internal consistency issues)? > >>>>>> > >>>>>> If you have non-deterministic functions like in your example the > >>>>> retraction > >>>>>> does not work anyhow. For a retraction to work if there is no > primary > >>> key > >>>>>> defined we require all columns to be deterministic: > >>>>>> > >>>>> > >>> > https://github.com/apache/flink/blob/fec9336e7d99500aeb9097e441d8da0e6bde5943/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/StreamNonDeterministicUpdatePlanVisitor.java#L142 > >>>>>> > >>>>>>> 2. To address internal consistency issues caused by emitting -U, we > >>> need > >>>>>> each watermark boundary to contain paired -U/+U. That means we have > to > >>>>>> track what the source sends, and only emit the "watermark boundary" > >>> after > >>>>>> the +U has been sent, right? For an upsert source, this is easy > >>> because we > >>>>>> have ChangelogNormalize to output the -U/+U pairs. But for a retract > >>>>>> source, we may need to introduce a new stateful operator. This seems > >>>>>> unavoidable unless the source to output -U and +U together. > >>>>>> > >>>>>> Yes, this assumption does not hold for retracting sources. So far we > >>> don't > >>>>>> have any such sources. I'll introduce a check that would fail for a > >>>>>> combination of a retracting source and DO ERROR/DO NOTHING. > >>>>>> > >>>>>>> 3. About > >>>>>> > "table.exec.sink.upsert-materialize-barrier-mode.watermark-interval", > >>> it’s > >>>>>> not actually called “watermark internal”, because it doesn't have > >>>>> watermark > >>>>>> semantics to drop late data. It’s actually a compaction barrier, > right? > >>>>>> > >>>>>> I think I don't fully understand this point. Could you please > explain > >>> it > >>>>>> one more time? Are you suggesting a different name? > >>>>>> > >>>>>>> 4. The state in SUM is used to handle rollback under out-of-order > >>>>>> scenarios. Since we resolve out-of-orderness within a watermark > >>> boundary, > >>>>>> does that mean we don’t need state anymore? More clearly, we only > need > >>> a > >>>>>> "temporary" state that lives within each watermark boundary. (Or > what I > >>>>> can > >>>>>> think of is: you’re using this persistent state to support the > >>> subsequent > >>>>>> `ON CONFLICT conflict_action`.) > >>>>>> > >>>>>> Yes, I think more or less that is correct. We need the "temporary" > >>> state > >>>>>> within boundary + a single result per key for a cross watermark > >>> boundary > >>>>>> handover. I explain that in the FLIP. > >>>>>> > >>>>>> Best, > >>>>>> Dawid > >>>>>> > >>>>>> On Mon, 5 Jan 2026 at 09:39, Xuyang <[email protected]> wrote: > >>>>>> > >>>>>>> Thank you for the explanation. I think I understand what you mean > >>> now. I > >>>>>>> have a few questions I’d like to confirm: > >>>>>>> 1. Without upsert key, how do we determine the order of multiple > >>> records > >>>>>>> within the same watermark boundary when non-deterministic functions > >>> are > >>>>>>> involved? For example, if we receive data like the “disorder (2)” > case > >>>>>>> below, and the upsert key is lost after a join, what will the final > >>>>> output > >>>>>>> be (without internal consistency issues)? > >>>>>>> > >>>>>>> > >>>>>>> +U(id=1, level=20, attr='b1', rand='1.5') > >>>>>>> +U(id=1, level=10, attr='a1', rand='1') // originally +I; I > slightly > >>>>>>> modified it > >>>>>>> -U(id=1, level=10, attr='a1', rand='2') > >>>>>>> > >>>>>>> > >>>>>>>> The watermark is an internal consistency boundary. You will always > >>>>> have > >>>>>>> a UB for the old value and an UA for the new value. > >>>>>>> 2. To address internal consistency issues caused by emitting -U, we > >>> need > >>>>>>> each watermark boundary to contain paired -U/+U. That means we > have to > >>>>>>> track what the source sends, and only emit the "watermark boundary" > >>>>> after > >>>>>>> the +U has been sent, right? For an upsert source, this is easy > >>> because > >>>>> we > >>>>>>> have ChangelogNormalize to output the -U/+U pairs. But for a > retract > >>>>>>> source, we may need to introduce a new stateful operator. This > seems > >>>>>>> unavoidable unless the source to output -U and +U together. > >>>>>>> > >>>>>>> > >>>>>>> 3. About > >>>>>>> > "table.exec.sink.upsert-materialize-barrier-mode.watermark-interval", > >>>>> it’s > >>>>>>> not actually called “watermark internal”, because it doesn't have > >>>>> watermark > >>>>>>> semantics to drop late data. It’s actually a compaction barrier, > >>> right? > >>>>>>> > >>>>>>> > >>>>>>> 4. The state in SUM is used to handle rollback under out-of-order > >>>>>>> scenarios. Since we resolve out-of-orderness within a watermark > >>>>> boundary, > >>>>>>> does that mean we don’t need state anymore? More clearly, we only > >>> need a > >>>>>>> "temporary" state that lives within each watermark boundary. (Or > what > >>> I > >>>>> can > >>>>>>> think of is: you’re using this persistent state to support the > >>>>> subsequent > >>>>>>> `ON CONFLICT conflict_action`.) > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> -- > >>>>>>> > >>>>>>> Best! > >>>>>>> Xuyang > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> 在 2025-12-29 17:51:40,"Dawid Wysakowicz" <[email protected]> > 写道: > >>>>>>>>> But I’d like to add a clarification. Take the “Changelog > Disorder” > >>>>>>>> example described in the FLIP ( > >>>>>>>> > >>>>>>> > >>>>> > >>> > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=399279158#FLIP558:ImprovementstoSinkUpsertMaterializerandchangelogdisorder-ExampleofChangelogDisorder > >>>>>>> ). > >>>>>>>> Let’s look at disorder (2) and disorder (3). Under the default ON > >>>>> CONFLICT > >>>>>>>> ERROR, IIUC, the expected behavior is that Flink should fail. > >>> However, > >>>>>>>> those inserts and updates actually all come from the same PK on > table > >>>>> s1 > >>>>>>>> (id = 1). From a relational-algebra perspective, this does not > >>> violate > >>>>> the > >>>>>>>> PK constraint; it only happens because we shuffle by level and > end up > >>>>> with > >>>>>>>> out-of-order issues under multi parallelisms. In other words, if > we > >>> run > >>>>>>>> this SQL in batch, the pk conflict will not happen and ON CONFLICT > >>>>> ERROR > >>>>>>>> should not fail. If the streaming job fails, users will be > confused. > >>>>> For > >>>>>>>> disorder issues introduced by Flink internally, I believe Flink > >>> should > >>>>>>>> handle them internally. > >>>>>>>> > >>>>>>>> Let's first clarify this point, because I think it's vital for the > >>>>>>>> understanding of the proposal so we must be on the same page > before > >>> we > >>>>>>> talk > >>>>>>>> about other points. > >>>>>>>> > >>>>>>>> No, the example you pointed out would not throw an error in `ON > >>>>> CONFLICT > >>>>>>>> ERROR`. As you pointed out yourself those come from the same PK on > >>>>> table > >>>>>>>> S1, therefore you will not have two active records with (id = 1) > in > >>> the > >>>>>>>> sink on the watermark boundary. The watermark is an internal > >>>>> consistency > >>>>>>>> boundary. You will always have a UB for the old value and an UA > for > >>> the > >>>>>>> new > >>>>>>>> value. Therefore you will only ever have a single value after the > >>>>>>>> compaction. We would throw only if we try to upsert into a single > row > >>>>> in > >>>>>>>> the sink from two rows from s1 with different ids e.g. > {source_id=1, > >>>>>>>> sink_id=1}, {source_id=2, sink_id=1}. > >>>>>>>> > >>>>>>>>> Before I talk about 3, let me talk about 4 first. If I’m not > >>>>> mistaken, > >>>>>>> we > >>>>>>>> need a deterministic boundary to determine that upstream data > will no > >>>>>>>> longer be updated, so that we can output the “final” result. > >>>>>>>> > >>>>>>>> No, that's not what I understand as internal consistency. An > internal > >>>>>>>> consistency is that a single change in the source produces a > single > >>>>> change > >>>>>>>> in the sink, without incorrect intermediate states caused by > >>>>> processing UB > >>>>>>>> separately from UA. I don't want to process multiple source > changes > >>> in > >>>>> a > >>>>>>>> single batch. I'd rather call it "external" consistency or > snapshot > >>>>>>>> processing as you are suggesting, but in my mind this is an > >>> orthogonal > >>>>>>>> topic from what I am trying to solve here. > >>>>>>>> > >>>>>>>> On Tue, 23 Dec 2025 at 12:28, Xuyang <[email protected]> wrote: > >>>>>>>> > >>>>>>>>> Hi, Dawid. Thank you for your detailed explanation and the > update. > >>>>> Let > >>>>>>> me > >>>>>>>>> share my thoughts. > >>>>>>>>> 1. In fact, I agree with what you said: for clearly problematic > >>>>> queries, > >>>>>>>>> we should fail fast — for example, the case you mentioned > (writing > >>>>> data > >>>>>>>>> from a source table whose PK is id into a sink table whose PK is > >>>>> name). > >>>>>>> We > >>>>>>>>> can fail like a traditional database: PK conflict. That’s totally > >>>>> fine. > >>>>>>>>> 2. But I’d like to add a clarification. Take the “Changelog > >>> Disorder” > >>>>>>>>> example described in the FLIP ( > >>>>>>>>> > >>>>>>> > >>>>> > >>> > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=399279158#FLIP558:ImprovementstoSinkUpsertMaterializerandchangelogdisorder-ExampleofChangelogDisorder > >>>>>>> ). > >>>>>>>>> Let’s look at disorder (2) and disorder (3). Under the default ON > >>>>>>> CONFLICT > >>>>>>>>> ERROR, IIUC, the expected behavior is that Flink should fail. > >>>>> However, > >>>>>>>>> those inserts and updates actually all come from the same PK on > >>>>> table s1 > >>>>>>>>> (id = 1). From a relational-algebra perspective, this does not > >>>>> violate > >>>>>>> the > >>>>>>>>> PK constraint; it only happens because we shuffle by level and > end > >>> up > >>>>>>> with > >>>>>>>>> out-of-order issues under multi parallelisms. In other words, if > we > >>>>> run > >>>>>>>>> this SQL in batch, the pk conflict will not happen and ON > CONFLICT > >>>>> ERROR > >>>>>>>>> should not fail. If the streaming job fails, users will be > confused. > >>>>> For > >>>>>>>>> disorder issues introduced by Flink internally, I believe Flink > >>>>> should > >>>>>>>>> handle them internally. > >>>>>>>>> 4. Before I talk about 3, let me talk about 4 first. If I’m not > >>>>>>> mistaken, > >>>>>>>>> we need a deterministic boundary to determine that upstream data > >>>>> will no > >>>>>>>>> longer be updated, so that we can output the “final” result. I > think > >>>>> our > >>>>>>>>> disagreement is about where that “data boundary” is. In this > FLIP, > >>>>> the > >>>>>>>>> boundary is described as: 1) watermark or 2) checkpoint. But I > think > >>>>>>> such a > >>>>>>>>> boundary is tied to the table, for example, “the creation of a > >>>>> specific > >>>>>>>>> historical snapshot version of a table.” Since data in the > snapshot > >>>>> is > >>>>>>>>> immutable, we can output results at that point. What do you > think? > >>>>>>>>> 3. If we must choose one of the introduced options, I lean toward > >>>>> Option > >>>>>>>>> 1, because we already have a clear definition for watermarks > defined > >>>>> on > >>>>>>> a > >>>>>>>>> table: “allowing for consistent results despite out-of-order or > late > >>>>>>>>> events” ( > >>>>>>>>> > >>>>>>> > >>>>> > >>> > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/concepts/time_attributes/#event-time > >>>>>>> ). > >>>>>>>>> This “drop late events” semantic does not exist in checkpoint. > >>>>> However, > >>>>>>> my > >>>>>>>>> concern is that in most scenarios, a CDC source may produce > multiple > >>>>>>>>> updates for the same PK over a long time span, so the watermark > >>>>> should > >>>>>>> be > >>>>>>>>> defined very large, which will cause the job to produce no output > >>>>> for a > >>>>>>>>> long time. > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> -- > >>>>>>>>> > >>>>>>>>> Best! > >>>>>>>>> Xuyang > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> At 2025-12-17 16:52:26, "Dawid Wysakowicz" < > [email protected]> > >>>>>>> wrote: > >>>>>>>>>> Hey Gustavo, Xuyang > >>>>>>>>>> I tried incorporating your suggestions into the FLIP. Please > take > >>>>>>> another > >>>>>>>>>> look. > >>>>>>>>>> Best, > >>>>>>>>>> Dawid > >>>>>>>>>> > >>>>>>>>>> On Fri, 12 Dec 2025 at 16:05, Dawid Wysakowicz < > >>>>> [email protected] > >>>>>>>> > >>>>>>>>>> wrote: > >>>>>>>>>> > >>>>>>>>>>> 1. The default behavior changes if no ON CONFLICT is defined. I > >>>>> am a > >>>>>>>>>>>> little concerned that this may cause errors in a large number > of > >>>>>>>>> existing > >>>>>>>>>>>> cases. > >>>>>>>>>>> > >>>>>>>>>>> I can be convinced to leave the default behaviour as it is > now. I > >>>>> am > >>>>>>>>>>> worried though, very rarely the current behaviour of SUM is > what > >>>>>>> people > >>>>>>>>>>> actually want. As mentioned in the FLIP I wholeheartedly > believe > >>>>>>> there > >>>>>>>>> are > >>>>>>>>>>> very little if any real world scenarios where you need the > >>>>>>> deduplicate > >>>>>>>>>>> behaviour. I try to elaborate a bit more in 2) > >>>>>>>>>>> > >>>>>>>>>>> 2. Regarding On Conflict Errors, in the context of CDC streams, > >>>>> it is > >>>>>>>>>>>> expected that the vast majority of cases cannot generate only > one > >>>>>>>>> record > >>>>>>>>>>>> with one primary key. The only solutions I can think of are > >>>>>>> append-only > >>>>>>>>>>>> top1, deduplication, or aggregating the first row. > >>>>>>>>>>> > >>>>>>>>>>> I disagree with that statement. I don't think CDC streams > change > >>>>>>>>> anything > >>>>>>>>>>> in that regard. Maybe there is some misunderstanding about > what a > >>>>> one > >>>>>>>>>>> record means in this context. > >>>>>>>>>>> > >>>>>>>>>>> I agree almost certainly there will be a sequence of UA, UB > for a > >>>>>>> single > >>>>>>>>>>> sink's primary key. > >>>>>>>>>>> > >>>>>>>>>>> My claim is that users almost never want a situation where they > >>>>> have > >>>>>>>>> more > >>>>>>>>>>> than one "active" upsert key/record for one sink's primary > key. I > >>>>>>> tried > >>>>>>>>> to > >>>>>>>>>>> explain that in the FLIP, but let me try to give one more > example > >>>>>>> here. > >>>>>>>>>>> > >>>>>>>>>>> Imagine two tables: > >>>>>>>>>>> CREATE TABLE source ( > >>>>>>>>>>> id bigint PRIMARY KEY, > >>>>>>>>>>> name string, > >>>>>>>>>>> value string > >>>>>>>>>>> ) > >>>>>>>>>>> > >>>>>>>>>>> CREATE TABLE sink ( > >>>>>>>>>>> name string PRIMARY KEY, > >>>>>>>>>>> value string > >>>>>>>>>>> ) > >>>>>>>>>>> > >>>>>>>>>>> INSERT INTO sink SELECT name, value; > >>>>>>>>>>> > >>>>>>>>>>> === Input > >>>>>>>>>>> (1, "Apple", "ABC") > >>>>>>>>>>> (2, "Apple", "DEF") > >>>>>>>>>>> > >>>>>>>>>>> In the scenario above a SUM is inserted which will deduplicate > the > >>>>>>> rows > >>>>>>>>>>> and override the value for "Apple" with "DEF". In my opinion > it's > >>>>>>>>> entirely > >>>>>>>>>>> wrong, instead an exception should be thrown that there is > >>>>> actually a > >>>>>>>>>>> constraint validation. > >>>>>>>>>>> > >>>>>>>>>>> I am absolutely more than happy to be proved wrong. If you do > >>>>> have a > >>>>>>>>> real > >>>>>>>>>>> world scenario where the deduplication logic is actually > correct > >>>>> and > >>>>>>>>>>> expected please, please do share. So far I have not seen one, > nor > >>>>>>> was I > >>>>>>>>>>> able to come up with one. And yet I am not suggesting to remove > >>>>> the > >>>>>>>>>>> deduplication logic entirely, users can still use it with ON > >>>>> CONFLICT > >>>>>>>>>>> DEDUPLICATE. > >>>>>>>>>>> > >>>>>>>>>>> 3. The special watermark generation interval affects the > >>>>> visibility > >>>>>>> of > >>>>>>>>>>>> results. How can users configure this generation interval? > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> That's a fair question I'll try to elaborate on in the FLIP. I > can > >>>>>>> see > >>>>>>>>> two > >>>>>>>>>>> options: > >>>>>>>>>>> 1. We piggyback on existing watermarks in the query, if there > are > >>>>> no > >>>>>>>>>>> watermarks (tables don't have a watermark definition) we fail > >>>>> during > >>>>>>>>>>> planning > >>>>>>>>>>> 2. We add a new parameter option for a specialized generalized > >>>>>>> watermark > >>>>>>>>>>> > >>>>>>>>>>> Let me think for some more on that and I'll come back with a > more > >>>>>>>>> concrete > >>>>>>>>>>> proposal. > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>>> 4. I believe that resolving out-of-order issues and addressing > >>>>>>> internal > >>>>>>>>>>>> consistency are two separate problems. As I understand the > >>>>> current > >>>>>>>>>>>> solution, it does not really resolve the internal consistency > >>>>>>> issue. > >>>>>>>>> We > >>>>>>>>>>>> could first resolve the out-of-order problem. For most > scenarios > >>>>>>> that > >>>>>>>>>>>> require real-time response, we can directly output > intermediate > >>>>>>> results > >>>>>>>>>>>> promptly. > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> Why doesn't it solve it? It does. Given a pair of UB/UA we > won't > >>>>> emit > >>>>>>>>> the > >>>>>>>>>>> temporary state after processing the UB. > >>>>>>>>>>> > >>>>>>>>>>> 5. How can we compact data with the same custom watermark? If > >>>>>>> detailed > >>>>>>>>>>>> comparisons are necessary, I think we still need to preserve > all > >>>>> key > >>>>>>>>> data; > >>>>>>>>>>>> we would just be compressing this data further at time t. > >>>>>>>>>>> > >>>>>>>>>>> Yes, we need to preserve all key data, but only between two > >>>>>>> watermarks. > >>>>>>>>>>> Assuming frequent watermarks, that's for a very short time. > >>>>>>>>>>> > >>>>>>>>>>> 6. If neither this proposed solution nor the reject solution > can > >>>>>>> resolve > >>>>>>>>>>>> internal consistency, we need to reconsider the differences > >>>>> between > >>>>>>>>> the two > >>>>>>>>>>>> approaches. > >>>>>>>>>>> > >>>>>>>>>>> I'll copy the explanation why the rejected alternative should > be > >>>>>>>>> rejected > >>>>>>>>>>> from the FLIP: > >>>>>>>>>>> > >>>>>>>>>>> The solution can help us to solve the changelog disorder > problem, > >>>>>>> but it > >>>>>>>>>>> does not help with the *internal consistency *issue. If we > want to > >>>>>>> fix > >>>>>>>>>>> that as well, we still need the compaction on watermarks. At > the > >>>>> same > >>>>>>>>> time > >>>>>>>>>>> it increases the size of all flowing records. Therefore it was > >>>>>>> rejected > >>>>>>>>> in > >>>>>>>>>>> favour of simply compacting all records once on the > progression of > >>>>>>>>>>> watermarks. > >>>>>>>>>>> > >>>>>>>>>>> Best, > >>>>>>>>>>> Dawid > >>>>>>>>>>> > >>>>>>>>> > >>>>>>> > >>>>> > >>>> > >>> > >>> > >
