Hi Jark, To my knowledge, Kafka's EXACTLY_ONCE transactions together with idempotent producers prevent duplicated records[1], at least in the cases when upstream does not produce them intentionally and across checkpoints.
Could you please elaborate or point me to the docs that explain the reason for duplicated records upstream and across checkpoints? I am relatively new to Flink and not aware of it. According to the kafka connector documentation, it does support exactly once semantics by configuring ' sink.delivery-guarantee'='exactly-once'[2]. It is not clear to me why we can't make upsert-kafka configurable in the same way to support this delivery guarantee. Thank you, Alexander 1. https://www.confluent.io/blog/exactly-once-semantics-are-possible-heres-how-apache-kafka-does-it/ 2. https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/table/kafka/#consistency-guarantees On Fri, Apr 7, 2023 at 3:44 AM Jark Wu <imj...@gmail.com> wrote: > Hi Alexander, > > I’m not sure I fully understand the reasons. I left my comments inline. > > > 1. There might be other non-Flink topic consumers that would rather not > have duplicated records. > > Exactly once can’t avoid producing duplicated records. Because the upstream > produces duplicated records intentionally and across checkpoints. Exactly > once > can’t recognize duplicated records and drop duplications. That means > duplicated > records are written into topics even if exactly-once mode is enabled. > > > > 2. Multiple upsert-kafka producers might cause keys to roll back to > previous values. > > Sorry, I don’t understand how exactly once can prevent this rollback > behavior. > Even in your example with EXACTLY_ONCE enabled, the x will go to a5, and > b5, > then back a5 if jobs perform checkpoints after producing records. > > > Best, > Jark > > > > 2023年4月5日 09:39,Alexander Sorokoumov <asorokou...@confluent.io.INVALID> > 写道: > > > > Hello Flink community, > > > > I would like to discuss if it is worth adding EXACTLY_ONCE delivery > > semantics to upsert-kafka connector. According to upsert-kafka docs[1] > and > > ReducingUpsertSink javadoc[2], the connector is correct even with > duplicate > > records under AT_LEAST_ONCE because the records are idempotent, and the > > read path de-duplicates them. However, there are at least 2 reasons to > > configure the connector with EXACTLY_ONCE: > > > > 1. There might be other non-Flink topic consumers that would rather not > > have duplicated records. > > 2. Multiple upsert-kafka producers might cause keys to roll back to > > previous values. Consider a scenario where 2 producing jobs A and B write > > to the same topic with AT_LEAST_ONCE, and a consuming job reads from the > > topic. Both producers write unique, monotonically increasing sequences to > > the same key. Job A writes x=a1,a2,a3,a4,a5… Job B writes > > x=b1,b2,b3,b4,b5,.... With this setup, we can have the following > sequence: > > > > 1. Job A produces x=a5. > > 2. Job B produces x=b5. > > 3. Job A produces the duplicate write x=5. > > > > The consuming job would observe x going to a5, then to b5, then back a5. > > EXACTLY_ONCE would prevent this behavior. > > > > I created https://issues.apache.org/jira/browse/FLINK-31408 and a WIP > patch > > to add EXACTLY_ONCE to upsert-kafka, but would like to know what the > > community thinks about it before moving forward with it. > > > > Thanks, > > Alexander > > > > 1. > > > https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/upsert-kafka/#consistency-guarantees > > 2. > > > https://github.com/apache/flink-connector-kafka/blob/40cf9994dd847c13602acf1f90895cf9f89b2ce6/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertSink.java#L31-L37 > >