Hi, In exactly-once mode, Flink sends processing results to Kafka in a transaction. It only commits this transaction once the checkpoint succeeds; otherwise, the transaction is rolled back. So reading the same records again on recovery should not create duplicates.
You're probably seeing duplicates by reading uncommitted data from Kafka. Please make sure to set isolation.level=read_committed in the downstream consumer config. For production purposes, I'd also recommend tuning Flink and Kafka according to [1] [1] https://ververica.zendesk.com/hc/en-us/articles/360013269680-Best-Practices-for-Using-Kafka-Sources-Sinks-in-Flink-Jobs Regards, Roman On Wed, Jan 12, 2022 at 5:44 PM <patrick.eif...@sony.com> wrote: > > Hi, > > > > we are working on a flink pipeline and running into duplicates in case of > checkpoint failures. > > > > The pipeline is running on Flink 1.13.2 and uses the source and sink classes > from the flink kafka connector library. > > > > The checkpointing is set to exactly once and we do care about correctness of > data and not so much about throughput speed. > > > > We observe that upon recovery Flink will reread all records from the offset > stored in the last successful checkpoint. Thus, the same records will be > replayed as have been generated in between last checkpoint and failure. > > > > How can we achieve end to end exactly once guarantee in our pipeline kafka -> > flink -> kafka and do not have duplicate records anymore and avoid data loss? > > > > Many thanks in advance! > > > > Patrick > > > > -- > > Patrick Eifler > > > > Senior Software Engineer (BI) > > Cloud Gaming Engineering & Infrastructure > Sony Interactive Entertainment LLC > > Wilhelmstraße 118, 10963 Berlin > > > Germany > > E: patrick.eif...@sony.com