Hello Flink community, I would like to discuss if it is worth adding EXACTLY_ONCE delivery semantics to upsert-kafka connector. According to upsert-kafka docs[1] and ReducingUpsertSink javadoc[2], the connector is correct even with duplicate records under AT_LEAST_ONCE because the records are idempotent, and the read path de-duplicates them. However, there are at least 2 reasons to configure the connector with EXACTLY_ONCE:
1. There might be other non-Flink topic consumers that would rather not have duplicated records. 2. Multiple upsert-kafka producers might cause keys to roll back to previous values. Consider a scenario where 2 producing jobs A and B write to the same topic with AT_LEAST_ONCE, and a consuming job reads from the topic. Both producers write unique, monotonically increasing sequences to the same key. Job A writes x=a1,a2,a3,a4,a5… Job B writes x=b1,b2,b3,b4,b5,.... With this setup, we can have the following sequence: 1. Job A produces x=a5. 2. Job B produces x=b5. 3. Job A produces the duplicate write x=5. The consuming job would observe x going to a5, then to b5, then back a5. EXACTLY_ONCE would prevent this behavior. I created https://issues.apache.org/jira/browse/FLINK-31408 and a WIP patch to add EXACTLY_ONCE to upsert-kafka, but would like to know what the community thinks about it before moving forward with it. Thanks, Alexander 1. https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/upsert-kafka/#consistency-guarantees 2. https://github.com/apache/flink-connector-kafka/blob/40cf9994dd847c13602acf1f90895cf9f89b2ce6/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertSink.java#L31-L37