In short it's encouraged to use savepoint because of the following situation: * You start processing data from offset 0 * 2 savepoints created, one with offset 10, another with 20 * This timeframe Kafka has offset 20 since that's the last processed * At offset 30 you realize that data processed between 10 and 30 are just faulty because of broken job logic
Reading offsets from savepoint is relatively easy, just restart the job from offset 10 savepoint. When Kafka is the source of truth then you need to do some mumbo-jumbo to cut back the Kafka offsets + you've most probably no idea where to cut back. RabbitMQ source (consumer) reads from a queue and acknowledges messages on checkpoints. When checkpointing is enabled, it guarantees exactly-once processing semantics. Please see [1] for further details. Hope this helps. [1] https://github.com/apache/flink-connector-rabbitmq/blob/66e323a3e79befc08ae03f2789a8aa94b343d504/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java#L49-L76 BR, G On Wed, Mar 26, 2025 at 1:05 AM mejri houssem <mejrihousse...@gmail.com> wrote: > Hello, > > Is there any further clarification or explanation regarding the subject, > please? > > Best regards. > > Le mer. 19 mars 2025 à 15:02, mejri houssem <mejrihousse...@gmail.com> a > écrit : > >> Hello, >> >> So if I understand you well, I cannot rely on the kafka broker offset to >> achieve at-least-once guarantee. Without checkpoint/savepoint enabled, >> that would not be possible. >> >> Best regards >> >> Le mer. 19 mars 2025 à 12:00, Ahmed Hamdy <hamdy10...@gmail.com> a >> écrit : >> >>> Hi Mejri, >>> Not exactly, you can still rely on savepoint to restart/redeploy the job >>> from the latest offset recorded in Flink, my reply was regarding your >>> question if you can replace that and just depend on the committed offsets >>> in the kafka broker. For at-least-once semantic savepoints and checkpoints >>> book-keep the offset for the Flink job after the initialization, the config >>> I mentioned only configures the initialization of the consumers. If you >>> start the job without savepoint and it falls back to the config (which may >>> be using the broker committed offset) that might achieve the semantic but >>> it doesn't guarantee that. >>> For example assume you restore from save point, job completes a couple >>> of checkpoints hence the offset committed is updated in kafka then for some >>> reason you figure out a bug, if you only depend on Kafka broker committed >>> offset you would probably break the semantic while if you use savepoints >>> you can redeploy from the last correct version savepoint and reprocess the >>> data that was processed by the buggy job. >>> >>> Best Regards >>> Ahmed Hamdy >>> >>> >>> On Wed, 19 Mar 2025 at 00:54, mejri houssem <mejrihousse...@gmail.com> >>> wrote: >>> >>>> Hello Ahmed, >>>> >>>> Thanks for the response. >>>> >>>> Does that mean checkpoints and savepoints have nothing to do with the >>>> at-least-once guarantee, since it depends solely on the starting offset >>>> configuration? >>>> >>>> Best Regards >>>> >>>> Le mar. 18 mars 2025 à 23:59, Ahmed Hamdy <hamdy10...@gmail.com> a >>>> écrit : >>>> >>>>> Hi Mejri >>>>> >>>>> > I’m wondering if this is strictly necessary, since the Kafka broker >>>>> itself keeps track of offsets (i am not mistaken). In other words, if we >>>>> redeploy the job, will it automatically resume from the last Kafka offset, >>>>> or should we still rely on Flink’s checkpoint/savepoint mechanism to >>>>> ensure >>>>> correct offset recovery? >>>>> >>>>> This depends on the starting offset you set in the source config[1]. >>>>> you can configure it to start from earliest or last committed or latest or >>>>> at specific offset. >>>>> >>>>> I am not 100% sure about RabbitMQ, IIRC it uses checkpoints to ack >>>>> read messages unlike Kafka. >>>>> >>>>> >>>>> 1- >>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#starting-offset >>>>> Best Regards >>>>> Ahmed Hamdy >>>>> >>>>> >>>>> On Tue, 18 Mar 2025 at 22:20, mejri houssem <mejrihousse...@gmail.com> >>>>> wrote: >>>>> >>>>>> >>>>>> Hello everyone, >>>>>> >>>>>> We have a stateless Flink job that uses a Kafka source with >>>>>> at-least-once guarantees. We’ve enabled checkpoints so that, in the event >>>>>> of a restart, Flink can restore from the last committed offset stored in >>>>>> a >>>>>> successful checkpoint. Now we’re considering enabling savepoints for our >>>>>> production deployment. >>>>>> >>>>>> I’m wondering if this is strictly necessary, since the Kafka broker >>>>>> itself keeps track of offsets (i am not mistaken). In other words, if we >>>>>> redeploy the job, will it automatically resume from the last Kafka >>>>>> offset, >>>>>> or should we still rely on Flink’s checkpoint/savepoint mechanism to >>>>>> ensure >>>>>> correct offset recovery? >>>>>> >>>>>> Additionally, we have another job that uses a RabbitMQ source with >>>>>> checkpoints enabled to manage manual acknowledgments. Does the same logic >>>>>> apply in that case as well? >>>>>> >>>>>> Thanks in advance for any guidance!point enabled in order to activate >>>>>> manual ack. Does this apply to this job also? >>>>>> >>>>>> Thanks in advance. >>>>>> >>>>>> >>>>>> Best Regards. >>>>>> >>>>>