Hi Banu,
This behavior of source is expected, the guarantee of the RMQSource is
exactly once which is achieved by acknowledging envelopes on checkpoints
hence the source would never re-read a message after checkpoint even if it
was still inside the pipeline and not yet passed to sink, eager
acknowledgment causes risk of data loss on failure and restoring from a
previous checkpoint hence breaking all delivery guarantees.
In concept there is no guarantee that a Flink pipeline achieves end to end
exactly once without an exactly once sink as well (which is not the case
for RMQSink).
In your case, reprocessing is bound by the checkpoint interval which is 5
minutes, you can make it tighter if it suits your case better.

Best Regards
Ahmed Hamdy


On Thu, 18 Jul 2024 at 11:37, banu priya <banuke...@gmail.com> wrote:

> Hi All,
>
> Gentle reminder about bow query.
>
> Thanks
> Banu
>
> On Tue, 9 Jul, 2024, 1:42 pm banu priya, <banuke...@gmail.com> wrote:
>
>> Hi All,
>>
>> I have a Flink job with a RMQ source, tumbling windows (fires for each
>> 2s), an aggregator, then a RMQ sink. Incremental RocksDB checkpointing is
>> enabled with an interval of 5 minutes.
>>
>> I was trying to understand Flink failure recovery. My checkpoint X is
>> started, I have sent one event to my source. As windows are triggered every
>> 2s, my sink is updated with the aggregated result. But when I checked the
>> RabbitMQ console, my source queue still had unacked messages. (It is
>> expected and it is as per design
>> https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/rabbitmq/#rabbitmq-source
>> ).
>>
>> Now I restarted my task manager, as restart happens within the same
>> checkpoint interval and checkpoint X has not yet completed. The message is
>> not acknowledged and is sent again. Duplicate processing of events happens.
>>
>> How to avoid these duplicates?
>>
>>
>> Thanks
>>
>> Banu
>>
>

Reply via email to