Hello, In this paraphrase of the documentation [1], it is mentioned that <<*Kafka source does not **rely on committed offsets for fault tolerance. Committing offset is only for exposing the progress of consumer and consuming group for monitoring*>>.
Can someone explain please why the kafka source does not rely on the committed offset for recovery, even though the offset stored in checkpoint/savepoint is the same as the one committed to kafka ? [1] https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/#consumer-offset-committing Best Regards, Le mer. 26 mars 2025 à 14:27, Gabor Somogyi <gabor.g.somo...@gmail.com> a écrit : > > Are they two different things? > There are no consumer and broker offsets, there are offsets which belong > to a topic + partition pair. > > > And which offset is saved in the checkpoint/savepoint? > Which Flink thinks is processed already. > > Regarding the PROD deploy now you know the risks so feel free to pick one. > > BR, > G > > > On Wed, Mar 26, 2025 at 2:11 PM mejri houssem <mejrihousse...@gmail.com> > wrote: > >> Hello Gabor, >> >> Thanks for your response. >> >> I just want to clarify one thing: is there any difference between the >> Kafka source offset and the Kafka broker offset? Are they two different >> things? And which offset is saved in the checkpoint/savepoint? >> >> For our use case, we intend to take a savepoint only once before updating >> the job in production. This means we stop the job with a savepoint, make >> some updates, and then restart from the savepoint we have taken. >> >> Best Regards, >> >> Le mer. 26 mars 2025 à 07:18, Gabor Somogyi <gabor.g.somo...@gmail.com> >> a écrit : >> >>> In short it's encouraged to use savepoint because of the following >>> situation: >>> * You start processing data from offset 0 >>> * 2 savepoints created, one with offset 10, another with 20 >>> * This timeframe Kafka has offset 20 since that's the last processed >>> * At offset 30 you realize that data processed between 10 and 30 are >>> just faulty because of broken job logic >>> >>> Reading offsets from savepoint is relatively easy, just restart the job >>> from offset 10 savepoint. >>> When Kafka is the source of truth then you need to do some mumbo-jumbo >>> to cut back the Kafka offsets + you've >>> most probably no idea where to cut back. >>> >>> RabbitMQ source (consumer) reads from a queue and acknowledges messages >>> on checkpoints. >>> When checkpointing is enabled, it guarantees exactly-once processing >>> semantics. Please see [1] for further details. >>> >>> Hope this helps. >>> >>> [1] >>> https://github.com/apache/flink-connector-rabbitmq/blob/66e323a3e79befc08ae03f2789a8aa94b343d504/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java#L49-L76 >>> >>> BR, >>> G >>> >>> >>> On Wed, Mar 26, 2025 at 1:05 AM mejri houssem <mejrihousse...@gmail.com> >>> wrote: >>> >>>> Hello, >>>> >>>> Is there any further clarification or explanation regarding the >>>> subject, please? >>>> >>>> Best regards. >>>> >>>> Le mer. 19 mars 2025 à 15:02, mejri houssem <mejrihousse...@gmail.com> >>>> a écrit : >>>> >>>>> Hello, >>>>> >>>>> So if I understand you well, I cannot rely on the kafka broker offset >>>>> to achieve at-least-once guarantee. Without checkpoint/savepoint enabled, >>>>> that would not be possible. >>>>> >>>>> Best regards >>>>> >>>>> Le mer. 19 mars 2025 à 12:00, Ahmed Hamdy <hamdy10...@gmail.com> a >>>>> écrit : >>>>> >>>>>> Hi Mejri, >>>>>> Not exactly, you can still rely on savepoint to restart/redeploy the >>>>>> job from the latest offset recorded in Flink, my reply was regarding your >>>>>> question if you can replace that and just depend on the committed offsets >>>>>> in the kafka broker. For at-least-once semantic savepoints and >>>>>> checkpoints >>>>>> book-keep the offset for the Flink job after the initialization, the >>>>>> config >>>>>> I mentioned only configures the initialization of the consumers. If you >>>>>> start the job without savepoint and it falls back to the config (which >>>>>> may >>>>>> be using the broker committed offset) that might achieve the semantic but >>>>>> it doesn't guarantee that. >>>>>> For example assume you restore from save point, job completes a >>>>>> couple of checkpoints hence the offset committed is updated in kafka then >>>>>> for some reason you figure out a bug, if you only depend on Kafka broker >>>>>> committed offset you would probably break the semantic while if you use >>>>>> savepoints you can redeploy from the last correct version savepoint and >>>>>> reprocess the data that was processed by the buggy job. >>>>>> >>>>>> Best Regards >>>>>> Ahmed Hamdy >>>>>> >>>>>> >>>>>> On Wed, 19 Mar 2025 at 00:54, mejri houssem <mejrihousse...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Hello Ahmed, >>>>>>> >>>>>>> Thanks for the response. >>>>>>> >>>>>>> Does that mean checkpoints and savepoints have nothing to do with >>>>>>> the at-least-once guarantee, since it depends solely on the starting >>>>>>> offset >>>>>>> configuration? >>>>>>> >>>>>>> Best Regards >>>>>>> >>>>>>> Le mar. 18 mars 2025 à 23:59, Ahmed Hamdy <hamdy10...@gmail.com> a >>>>>>> écrit : >>>>>>> >>>>>>>> Hi Mejri >>>>>>>> >>>>>>>> > I’m wondering if this is strictly necessary, since the Kafka >>>>>>>> broker itself keeps track of offsets (i am not mistaken). In other >>>>>>>> words, >>>>>>>> if we redeploy the job, will it automatically resume from the last >>>>>>>> Kafka >>>>>>>> offset, or should we still rely on Flink’s checkpoint/savepoint >>>>>>>> mechanism >>>>>>>> to ensure correct offset recovery? >>>>>>>> >>>>>>>> This depends on the starting offset you set in the source >>>>>>>> config[1]. you can configure it to start from earliest or last >>>>>>>> committed or >>>>>>>> latest or at specific offset. >>>>>>>> >>>>>>>> I am not 100% sure about RabbitMQ, IIRC it uses checkpoints to ack >>>>>>>> read messages unlike Kafka. >>>>>>>> >>>>>>>> >>>>>>>> 1- >>>>>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#starting-offset >>>>>>>> Best Regards >>>>>>>> Ahmed Hamdy >>>>>>>> >>>>>>>> >>>>>>>> On Tue, 18 Mar 2025 at 22:20, mejri houssem < >>>>>>>> mejrihousse...@gmail.com> wrote: >>>>>>>> >>>>>>>>> >>>>>>>>> Hello everyone, >>>>>>>>> >>>>>>>>> We have a stateless Flink job that uses a Kafka source with >>>>>>>>> at-least-once guarantees. We’ve enabled checkpoints so that, in the >>>>>>>>> event >>>>>>>>> of a restart, Flink can restore from the last committed offset stored >>>>>>>>> in a >>>>>>>>> successful checkpoint. Now we’re considering enabling savepoints for >>>>>>>>> our >>>>>>>>> production deployment. >>>>>>>>> >>>>>>>>> I’m wondering if this is strictly necessary, since the Kafka >>>>>>>>> broker itself keeps track of offsets (i am not mistaken). In other >>>>>>>>> words, >>>>>>>>> if we redeploy the job, will it automatically resume from the last >>>>>>>>> Kafka >>>>>>>>> offset, or should we still rely on Flink’s checkpoint/savepoint >>>>>>>>> mechanism >>>>>>>>> to ensure correct offset recovery? >>>>>>>>> >>>>>>>>> Additionally, we have another job that uses a RabbitMQ source with >>>>>>>>> checkpoints enabled to manage manual acknowledgments. Does the same >>>>>>>>> logic >>>>>>>>> apply in that case as well? >>>>>>>>> >>>>>>>>> Thanks in advance for any guidance!point enabled in order to >>>>>>>>> activate manual ack. Does this apply to this job also? >>>>>>>>> >>>>>>>>> Thanks in advance. >>>>>>>>> >>>>>>>>> >>>>>>>>> Best Regards. >>>>>>>>> >>>>>>>>