Hi Banu, This behavior of source is expected, the guarantee of the RMQSource is exactly once which is achieved by acknowledging envelopes on checkpoints hence the source would never re-read a message after checkpoint even if it was still inside the pipeline and not yet passed to sink, eager acknowledgment causes risk of data loss on failure and restoring from a previous checkpoint hence breaking all delivery guarantees. In concept there is no guarantee that a Flink pipeline achieves end to end exactly once without an exactly once sink as well (which is not the case for RMQSink). In your case, reprocessing is bound by the checkpoint interval which is 5 minutes, you can make it tighter if it suits your case better.
Best Regards Ahmed Hamdy On Thu, 18 Jul 2024 at 11:37, banu priya <banuke...@gmail.com> wrote: > Hi All, > > Gentle reminder about bow query. > > Thanks > Banu > > On Tue, 9 Jul, 2024, 1:42 pm banu priya, <banuke...@gmail.com> wrote: > >> Hi All, >> >> I have a Flink job with a RMQ source, tumbling windows (fires for each >> 2s), an aggregator, then a RMQ sink. Incremental RocksDB checkpointing is >> enabled with an interval of 5 minutes. >> >> I was trying to understand Flink failure recovery. My checkpoint X is >> started, I have sent one event to my source. As windows are triggered every >> 2s, my sink is updated with the aggregated result. But when I checked the >> RabbitMQ console, my source queue still had unacked messages. (It is >> expected and it is as per design >> https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/rabbitmq/#rabbitmq-source >> ). >> >> Now I restarted my task manager, as restart happens within the same >> checkpoint interval and checkpoint X has not yet completed. The message is >> not acknowledged and is sent again. Duplicate processing of events happens. >> >> How to avoid these duplicates? >> >> >> Thanks >> >> Banu >> >