Hi Alexander, Yes, Kafka’s exactly-once semantics are used to avoid duplicated records in case of producer retries or failovers. But as I explained above, it can’t avoid intentionally duplicated records. Actually, I would like to call them "upsert records" instead of "duplicates", that's why the connector is named "upsert-kafka", to make Kafka work like a database that supports updating and deleting by key.
For example, there is a SQL query: SELECT URL, COUNT(*) page_views FROM access_logs GROUP BY URL; This is a continuous query[1] that continuously emits a new <url, page_views> record once a new URL access entry is received. The same URLs in the log may be far away and be processed in different checkpoints. It's easy to make upsert-kafka to support exactly-once delivery guarantee, but as we discussed above, it's unnecessary to support it and we intend to expose as few configurations to users as possible. Best, Jark [1] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/group-agg/ On Sat, 8 Apr 2023 at 02:42, Alexander Sorokoumov <asorokou...@confluent.io.invalid> wrote: > Hi Jark, > > To my knowledge, Kafka's EXACTLY_ONCE transactions together with idempotent > producers prevent duplicated records[1], at least in the cases when > upstream does not produce them intentionally and across checkpoints. > > Could you please elaborate or point me to the docs that explain the reason > for duplicated records upstream and across checkpoints? I am relatively new > to Flink and not aware of it. According to the kafka connector > documentation, it does support exactly once semantics by configuring ' > sink.delivery-guarantee'='exactly-once'[2]. It is not clear to me why we > can't make upsert-kafka configurable in the same way to support this > delivery guarantee. > > Thank you, > Alexander > > 1. > > https://www.confluent.io/blog/exactly-once-semantics-are-possible-heres-how-apache-kafka-does-it/ > 2. > > https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/table/kafka/#consistency-guarantees > > > On Fri, Apr 7, 2023 at 3:44 AM Jark Wu <imj...@gmail.com> wrote: > > > Hi Alexander, > > > > I’m not sure I fully understand the reasons. I left my comments inline. > > > > > 1. There might be other non-Flink topic consumers that would rather not > > have duplicated records. > > > > Exactly once can’t avoid producing duplicated records. Because the > upstream > > produces duplicated records intentionally and across checkpoints. Exactly > > once > > can’t recognize duplicated records and drop duplications. That means > > duplicated > > records are written into topics even if exactly-once mode is enabled. > > > > > > > 2. Multiple upsert-kafka producers might cause keys to roll back to > > previous values. > > > > Sorry, I don’t understand how exactly once can prevent this rollback > > behavior. > > Even in your example with EXACTLY_ONCE enabled, the x will go to a5, and > > b5, > > then back a5 if jobs perform checkpoints after producing records. > > > > > > Best, > > Jark > > > > > > > 2023年4月5日 09:39,Alexander Sorokoumov <asorokou...@confluent.io > .INVALID> > > 写道: > > > > > > Hello Flink community, > > > > > > I would like to discuss if it is worth adding EXACTLY_ONCE delivery > > > semantics to upsert-kafka connector. According to upsert-kafka docs[1] > > and > > > ReducingUpsertSink javadoc[2], the connector is correct even with > > duplicate > > > records under AT_LEAST_ONCE because the records are idempotent, and the > > > read path de-duplicates them. However, there are at least 2 reasons to > > > configure the connector with EXACTLY_ONCE: > > > > > > 1. There might be other non-Flink topic consumers that would rather not > > > have duplicated records. > > > 2. Multiple upsert-kafka producers might cause keys to roll back to > > > previous values. Consider a scenario where 2 producing jobs A and B > write > > > to the same topic with AT_LEAST_ONCE, and a consuming job reads from > the > > > topic. Both producers write unique, monotonically increasing sequences > to > > > the same key. Job A writes x=a1,a2,a3,a4,a5… Job B writes > > > x=b1,b2,b3,b4,b5,.... With this setup, we can have the following > > sequence: > > > > > > 1. Job A produces x=a5. > > > 2. Job B produces x=b5. > > > 3. Job A produces the duplicate write x=5. > > > > > > The consuming job would observe x going to a5, then to b5, then back > a5. > > > EXACTLY_ONCE would prevent this behavior. > > > > > > I created https://issues.apache.org/jira/browse/FLINK-31408 and a WIP > > patch > > > to add EXACTLY_ONCE to upsert-kafka, but would like to know what the > > > community thinks about it before moving forward with it. > > > > > > Thanks, > > > Alexander > > > > > > 1. > > > > > > https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/upsert-kafka/#consistency-guarantees > > > 2. > > > > > > https://github.com/apache/flink-connector-kafka/blob/40cf9994dd847c13602acf1f90895cf9f89b2ce6/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertSink.java#L31-L37 > > > > >