I learned about this a couple of years ago when I was investigating the feasibility of a "Kafka stretch" cluster. From that email thread <https://lists.apache.org/thread/1f01zo1lqcmhvosptpjlm6k3mgx0sv1m>:
> I got excited about this possibility, only to learn that Flink's KafkaSource does not use Kafka for consumer assignment. I think I understand why it does this: the Source API can do a lot more than Kafka, so having some kind of state management (offsets) and task assignment (Kafka consumer balance protocol) outside of the usual Flink Source would be pretty weird. Implementing offset and task assignment inside of the KafkaSource allows it to work like any other Source implementation. Basically, other Sources don't have watermark state management like Kafka does, so keeping watermark management inside the Flink framework makes it easier to swap Sources around, enabling things like Hybrid Source <https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/hybridsource/> . On Thu, Mar 27, 2025 at 5:07 AM mejri houssem <mejrihousse...@gmail.com> wrote: > Hello, > > In this paraphrase of the documentation [1], it is mentioned that <<*Kafka > source does not **rely on committed offsets for fault tolerance. > Committing offset is only for exposing the progress of consumer and > consuming group for monitoring*>>. > > Can someone explain please why the kafka source does not rely on the > committed offset for recovery, even though the offset stored in > checkpoint/savepoint is the same as the one committed to kafka ? > > [1] > https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/#consumer-offset-committing > > Best Regards, > > Le mer. 26 mars 2025 à 14:27, Gabor Somogyi <gabor.g.somo...@gmail.com> a > écrit : > >> > Are they two different things? >> There are no consumer and broker offsets, there are offsets which belong >> to a topic + partition pair. >> >> > And which offset is saved in the checkpoint/savepoint? >> Which Flink thinks is processed already. >> >> Regarding the PROD deploy now you know the risks so feel free to pick one. >> >> BR, >> G >> >> >> On Wed, Mar 26, 2025 at 2:11 PM mejri houssem <mejrihousse...@gmail.com> >> wrote: >> >>> Hello Gabor, >>> >>> Thanks for your response. >>> >>> I just want to clarify one thing: is there any difference between the >>> Kafka source offset and the Kafka broker offset? Are they two different >>> things? And which offset is saved in the checkpoint/savepoint? >>> >>> For our use case, we intend to take a savepoint only once before >>> updating the job in production. This means we stop the job with a >>> savepoint, make some updates, and then restart from the savepoint we have >>> taken. >>> >>> Best Regards, >>> >>> Le mer. 26 mars 2025 à 07:18, Gabor Somogyi <gabor.g.somo...@gmail.com> >>> a écrit : >>> >>>> In short it's encouraged to use savepoint because of the following >>>> situation: >>>> * You start processing data from offset 0 >>>> * 2 savepoints created, one with offset 10, another with 20 >>>> * This timeframe Kafka has offset 20 since that's the last processed >>>> * At offset 30 you realize that data processed between 10 and 30 are >>>> just faulty because of broken job logic >>>> >>>> Reading offsets from savepoint is relatively easy, just restart the job >>>> from offset 10 savepoint. >>>> When Kafka is the source of truth then you need to do some mumbo-jumbo >>>> to cut back the Kafka offsets + you've >>>> most probably no idea where to cut back. >>>> >>>> RabbitMQ source (consumer) reads from a queue and acknowledges messages >>>> on checkpoints. >>>> When checkpointing is enabled, it guarantees exactly-once processing >>>> semantics. Please see [1] for further details. >>>> >>>> Hope this helps. >>>> >>>> [1] >>>> https://github.com/apache/flink-connector-rabbitmq/blob/66e323a3e79befc08ae03f2789a8aa94b343d504/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java#L49-L76 >>>> >>>> BR, >>>> G >>>> >>>> >>>> On Wed, Mar 26, 2025 at 1:05 AM mejri houssem <mejrihousse...@gmail.com> >>>> wrote: >>>> >>>>> Hello, >>>>> >>>>> Is there any further clarification or explanation regarding the >>>>> subject, please? >>>>> >>>>> Best regards. >>>>> >>>>> Le mer. 19 mars 2025 à 15:02, mejri houssem <mejrihousse...@gmail.com> >>>>> a écrit : >>>>> >>>>>> Hello, >>>>>> >>>>>> So if I understand you well, I cannot rely on the kafka broker >>>>>> offset to achieve at-least-once guarantee. Without checkpoint/savepoint >>>>>> enabled, that would not be possible. >>>>>> >>>>>> Best regards >>>>>> >>>>>> Le mer. 19 mars 2025 à 12:00, Ahmed Hamdy <hamdy10...@gmail.com> a >>>>>> écrit : >>>>>> >>>>>>> Hi Mejri, >>>>>>> Not exactly, you can still rely on savepoint to restart/redeploy the >>>>>>> job from the latest offset recorded in Flink, my reply was regarding >>>>>>> your >>>>>>> question if you can replace that and just depend on the committed >>>>>>> offsets >>>>>>> in the kafka broker. For at-least-once semantic savepoints and >>>>>>> checkpoints >>>>>>> book-keep the offset for the Flink job after the initialization, the >>>>>>> config >>>>>>> I mentioned only configures the initialization of the consumers. If you >>>>>>> start the job without savepoint and it falls back to the config (which >>>>>>> may >>>>>>> be using the broker committed offset) that might achieve the semantic >>>>>>> but >>>>>>> it doesn't guarantee that. >>>>>>> For example assume you restore from save point, job completes a >>>>>>> couple of checkpoints hence the offset committed is updated in kafka >>>>>>> then >>>>>>> for some reason you figure out a bug, if you only depend on Kafka broker >>>>>>> committed offset you would probably break the semantic while if you use >>>>>>> savepoints you can redeploy from the last correct version savepoint and >>>>>>> reprocess the data that was processed by the buggy job. >>>>>>> >>>>>>> Best Regards >>>>>>> Ahmed Hamdy >>>>>>> >>>>>>> >>>>>>> On Wed, 19 Mar 2025 at 00:54, mejri houssem < >>>>>>> mejrihousse...@gmail.com> wrote: >>>>>>> >>>>>>>> Hello Ahmed, >>>>>>>> >>>>>>>> Thanks for the response. >>>>>>>> >>>>>>>> Does that mean checkpoints and savepoints have nothing to do with >>>>>>>> the at-least-once guarantee, since it depends solely on the starting >>>>>>>> offset >>>>>>>> configuration? >>>>>>>> >>>>>>>> Best Regards >>>>>>>> >>>>>>>> Le mar. 18 mars 2025 à 23:59, Ahmed Hamdy <hamdy10...@gmail.com> a >>>>>>>> écrit : >>>>>>>> >>>>>>>>> Hi Mejri >>>>>>>>> >>>>>>>>> > I’m wondering if this is strictly necessary, since the Kafka >>>>>>>>> broker itself keeps track of offsets (i am not mistaken). In other >>>>>>>>> words, >>>>>>>>> if we redeploy the job, will it automatically resume from the last >>>>>>>>> Kafka >>>>>>>>> offset, or should we still rely on Flink’s checkpoint/savepoint >>>>>>>>> mechanism >>>>>>>>> to ensure correct offset recovery? >>>>>>>>> >>>>>>>>> This depends on the starting offset you set in the source >>>>>>>>> config[1]. you can configure it to start from earliest or last >>>>>>>>> committed or >>>>>>>>> latest or at specific offset. >>>>>>>>> >>>>>>>>> I am not 100% sure about RabbitMQ, IIRC it uses checkpoints to ack >>>>>>>>> read messages unlike Kafka. >>>>>>>>> >>>>>>>>> >>>>>>>>> 1- >>>>>>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#starting-offset >>>>>>>>> Best Regards >>>>>>>>> Ahmed Hamdy >>>>>>>>> >>>>>>>>> >>>>>>>>> On Tue, 18 Mar 2025 at 22:20, mejri houssem < >>>>>>>>> mejrihousse...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> >>>>>>>>>> Hello everyone, >>>>>>>>>> >>>>>>>>>> We have a stateless Flink job that uses a Kafka source with >>>>>>>>>> at-least-once guarantees. We’ve enabled checkpoints so that, in the >>>>>>>>>> event >>>>>>>>>> of a restart, Flink can restore from the last committed offset >>>>>>>>>> stored in a >>>>>>>>>> successful checkpoint. Now we’re considering enabling savepoints for >>>>>>>>>> our >>>>>>>>>> production deployment. >>>>>>>>>> >>>>>>>>>> I’m wondering if this is strictly necessary, since the Kafka >>>>>>>>>> broker itself keeps track of offsets (i am not mistaken). In other >>>>>>>>>> words, >>>>>>>>>> if we redeploy the job, will it automatically resume from the last >>>>>>>>>> Kafka >>>>>>>>>> offset, or should we still rely on Flink’s checkpoint/savepoint >>>>>>>>>> mechanism >>>>>>>>>> to ensure correct offset recovery? >>>>>>>>>> >>>>>>>>>> Additionally, we have another job that uses a RabbitMQ source >>>>>>>>>> with checkpoints enabled to manage manual acknowledgments. Does the >>>>>>>>>> same >>>>>>>>>> logic apply in that case as well? >>>>>>>>>> >>>>>>>>>> Thanks in advance for any guidance!point enabled in order to >>>>>>>>>> activate manual ack. Does this apply to this job also? >>>>>>>>>> >>>>>>>>>> Thanks in advance. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Best Regards. >>>>>>>>>> >>>>>>>>>