Hi John, Thank you for your valuable input. It sounds reasonable to me.
>From this point of view, the exactly-once is used to guarantee transaction semantics other than avoid duplication/upserts. This is similar to the JDBC connectors that already support eventual consistency with idempotent updates, but we still add the support of 2PC[1]. Best, Jark [1]: https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/jdbc/#jdbcsinkexactlyoncesink On Wed, 12 Apr 2023 at 10:36, John Roesler <vvcep...@apache.org> wrote: > Hi Jark, > > I hope you don’t mind if I chime in. > > You have a good point that the sequence of upserts will eventually > converge to the correct value under the at-least-once delivery guarantee, > but it can still be important to avoid passing on uncommitted results. Some > thoughts, numbered for reference: > > 1. Most generally, if some result R is written to the sink topic, but then > the job fails before a checkpoint, rolls back, and reprocesses, producing > R’, then it is incorrect to call R an “upsert”. In fact, as far as the > system is concerned, R never happened at all (because it was part of a > rolled-back batch of processing). > > 2. Readers may reasonably wish to impose some meaning on the sequence of > upserts itself, so including aborted results can lead to wrong semantics > downstream. Eg: “how many times has ‘x’ been updated today”? > > 3. Note that processing may not be deterministic over failures, and, > building on (2), readers may have an expectation that every record in the > topic corresponds to a real value that was associated with that key at some > point. Eg, if we start with x=1, checkpoint, then produce x=99, crash, > restart and produce x=2. Under at-least-once, the history of x is[1,99,2], > while exactly-once would give the correct history of [1,2]. If we set up an > alert if the value of x is ever greater over 10, then at-least-once will > erroneously alert us, while exactly-once does not. > > 4. Sending results for failed processing can also cause operational > problems: if you’re processing a high volume of data, and you get into a > crash loop, you can create a flood of repeated results. I’ve seen this case > cause real world pain for people, and it’s nice to have a way to avoid it. > > I hope some of these examples show why a user might reasonably want to > configure the connector with the exactly-once guarantee. > > Thanks! > -John > > On Sat, Apr 8, 2023, at 10:03, Jark Wu wrote: > > Hi Alexander, > > > > Yes, Kafka’s exactly-once semantics are used to avoid duplicated records > in > > case of producer retries > > or failovers. But as I explained above, it can’t avoid intentionally > > duplicated records. Actually, I would > > like to call them "upsert records" instead of "duplicates", that's why > the > > connector is named "upsert-kafka", > > to make Kafka work like a database that supports updating and deleting by > > key. > > > > For example, there is a SQL query: > > > > SELECT URL, COUNT(*) page_views > > FROM access_logs > > GROUP BY URL; > > > > This is a continuous query[1] that continuously emits a new <url, > > page_views> record once a new URL > > access entry is received. The same URLs in the log may be far away and be > > processed in different checkpoints. > > > > It's easy to make upsert-kafka to support exactly-once delivery > guarantee, > > but as we discussed above, > > it's unnecessary to support it and we intend to expose as few > > configurations to users as possible. > > > > > > Best, > > Jark > > > > [1] > > > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/group-agg/ > > > > > > > > On Sat, 8 Apr 2023 at 02:42, Alexander Sorokoumov > > <asorokou...@confluent.io.invalid> wrote: > > > >> Hi Jark, > >> > >> To my knowledge, Kafka's EXACTLY_ONCE transactions together with > idempotent > >> producers prevent duplicated records[1], at least in the cases when > >> upstream does not produce them intentionally and across checkpoints. > >> > >> Could you please elaborate or point me to the docs that explain the > reason > >> for duplicated records upstream and across checkpoints? I am relatively > new > >> to Flink and not aware of it. According to the kafka connector > >> documentation, it does support exactly once semantics by configuring ' > >> sink.delivery-guarantee'='exactly-once'[2]. It is not clear to me why we > >> can't make upsert-kafka configurable in the same way to support this > >> delivery guarantee. > >> > >> Thank you, > >> Alexander > >> > >> 1. > >> > >> > https://www.confluent.io/blog/exactly-once-semantics-are-possible-heres-how-apache-kafka-does-it/ > >> 2. > >> > >> > https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/table/kafka/#consistency-guarantees > >> > >> > >> On Fri, Apr 7, 2023 at 3:44 AM Jark Wu <imj...@gmail.com> wrote: > >> > >> > Hi Alexander, > >> > > >> > I’m not sure I fully understand the reasons. I left my comments > inline. > >> > > >> > > 1. There might be other non-Flink topic consumers that would rather > not > >> > have duplicated records. > >> > > >> > Exactly once can’t avoid producing duplicated records. Because the > >> upstream > >> > produces duplicated records intentionally and across checkpoints. > Exactly > >> > once > >> > can’t recognize duplicated records and drop duplications. That means > >> > duplicated > >> > records are written into topics even if exactly-once mode is enabled. > >> > > >> > > >> > > 2. Multiple upsert-kafka producers might cause keys to roll back to > >> > previous values. > >> > > >> > Sorry, I don’t understand how exactly once can prevent this rollback > >> > behavior. > >> > Even in your example with EXACTLY_ONCE enabled, the x will go to a5, > and > >> > b5, > >> > then back a5 if jobs perform checkpoints after producing records. > >> > > >> > > >> > Best, > >> > Jark > >> > > >> > > >> > > 2023年4月5日 09:39,Alexander Sorokoumov <asorokou...@confluent.io > >> .INVALID> > >> > 写道: > >> > > > >> > > Hello Flink community, > >> > > > >> > > I would like to discuss if it is worth adding EXACTLY_ONCE delivery > >> > > semantics to upsert-kafka connector. According to upsert-kafka > docs[1] > >> > and > >> > > ReducingUpsertSink javadoc[2], the connector is correct even with > >> > duplicate > >> > > records under AT_LEAST_ONCE because the records are idempotent, and > the > >> > > read path de-duplicates them. However, there are at least 2 reasons > to > >> > > configure the connector with EXACTLY_ONCE: > >> > > > >> > > 1. There might be other non-Flink topic consumers that would rather > not > >> > > have duplicated records. > >> > > 2. Multiple upsert-kafka producers might cause keys to roll back to > >> > > previous values. Consider a scenario where 2 producing jobs A and B > >> write > >> > > to the same topic with AT_LEAST_ONCE, and a consuming job reads from > >> the > >> > > topic. Both producers write unique, monotonically increasing > sequences > >> to > >> > > the same key. Job A writes x=a1,a2,a3,a4,a5… Job B writes > >> > > x=b1,b2,b3,b4,b5,.... With this setup, we can have the following > >> > sequence: > >> > > > >> > > 1. Job A produces x=a5. > >> > > 2. Job B produces x=b5. > >> > > 3. Job A produces the duplicate write x=5. > >> > > > >> > > The consuming job would observe x going to a5, then to b5, then back > >> a5. > >> > > EXACTLY_ONCE would prevent this behavior. > >> > > > >> > > I created https://issues.apache.org/jira/browse/FLINK-31408 and a > WIP > >> > patch > >> > > to add EXACTLY_ONCE to upsert-kafka, but would like to know what the > >> > > community thinks about it before moving forward with it. > >> > > > >> > > Thanks, > >> > > Alexander > >> > > > >> > > 1. > >> > > > >> > > >> > https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/upsert-kafka/#consistency-guarantees > >> > > 2. > >> > > > >> > > >> > https://github.com/apache/flink-connector-kafka/blob/40cf9994dd847c13602acf1f90895cf9f89b2ce6/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertSink.java#L31-L37 > >> > > >> > > >> >