> Are they two different things? There are no consumer and broker offsets, there are offsets which belong to a topic + partition pair.
> And which offset is saved in the checkpoint/savepoint? Which Flink thinks is processed already. Regarding the PROD deploy now you know the risks so feel free to pick one. BR, G On Wed, Mar 26, 2025 at 2:11 PM mejri houssem <mejrihousse...@gmail.com> wrote: > Hello Gabor, > > Thanks for your response. > > I just want to clarify one thing: is there any difference between the > Kafka source offset and the Kafka broker offset? Are they two different > things? And which offset is saved in the checkpoint/savepoint? > > For our use case, we intend to take a savepoint only once before updating > the job in production. This means we stop the job with a savepoint, make > some updates, and then restart from the savepoint we have taken. > > Best Regards, > > Le mer. 26 mars 2025 à 07:18, Gabor Somogyi <gabor.g.somo...@gmail.com> a > écrit : > >> In short it's encouraged to use savepoint because of the following >> situation: >> * You start processing data from offset 0 >> * 2 savepoints created, one with offset 10, another with 20 >> * This timeframe Kafka has offset 20 since that's the last processed >> * At offset 30 you realize that data processed between 10 and 30 are just >> faulty because of broken job logic >> >> Reading offsets from savepoint is relatively easy, just restart the job >> from offset 10 savepoint. >> When Kafka is the source of truth then you need to do some mumbo-jumbo to >> cut back the Kafka offsets + you've >> most probably no idea where to cut back. >> >> RabbitMQ source (consumer) reads from a queue and acknowledges messages >> on checkpoints. >> When checkpointing is enabled, it guarantees exactly-once processing >> semantics. Please see [1] for further details. >> >> Hope this helps. >> >> [1] >> https://github.com/apache/flink-connector-rabbitmq/blob/66e323a3e79befc08ae03f2789a8aa94b343d504/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java#L49-L76 >> >> BR, >> G >> >> >> On Wed, Mar 26, 2025 at 1:05 AM mejri houssem <mejrihousse...@gmail.com> >> wrote: >> >>> Hello, >>> >>> Is there any further clarification or explanation regarding the subject, >>> please? >>> >>> Best regards. >>> >>> Le mer. 19 mars 2025 à 15:02, mejri houssem <mejrihousse...@gmail.com> >>> a écrit : >>> >>>> Hello, >>>> >>>> So if I understand you well, I cannot rely on the kafka broker offset >>>> to achieve at-least-once guarantee. Without checkpoint/savepoint enabled, >>>> that would not be possible. >>>> >>>> Best regards >>>> >>>> Le mer. 19 mars 2025 à 12:00, Ahmed Hamdy <hamdy10...@gmail.com> a >>>> écrit : >>>> >>>>> Hi Mejri, >>>>> Not exactly, you can still rely on savepoint to restart/redeploy the >>>>> job from the latest offset recorded in Flink, my reply was regarding your >>>>> question if you can replace that and just depend on the committed offsets >>>>> in the kafka broker. For at-least-once semantic savepoints and checkpoints >>>>> book-keep the offset for the Flink job after the initialization, the >>>>> config >>>>> I mentioned only configures the initialization of the consumers. If you >>>>> start the job without savepoint and it falls back to the config (which may >>>>> be using the broker committed offset) that might achieve the semantic but >>>>> it doesn't guarantee that. >>>>> For example assume you restore from save point, job completes a couple >>>>> of checkpoints hence the offset committed is updated in kafka then for >>>>> some >>>>> reason you figure out a bug, if you only depend on Kafka broker committed >>>>> offset you would probably break the semantic while if you use savepoints >>>>> you can redeploy from the last correct version savepoint and reprocess the >>>>> data that was processed by the buggy job. >>>>> >>>>> Best Regards >>>>> Ahmed Hamdy >>>>> >>>>> >>>>> On Wed, 19 Mar 2025 at 00:54, mejri houssem <mejrihousse...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hello Ahmed, >>>>>> >>>>>> Thanks for the response. >>>>>> >>>>>> Does that mean checkpoints and savepoints have nothing to do with the >>>>>> at-least-once guarantee, since it depends solely on the starting offset >>>>>> configuration? >>>>>> >>>>>> Best Regards >>>>>> >>>>>> Le mar. 18 mars 2025 à 23:59, Ahmed Hamdy <hamdy10...@gmail.com> a >>>>>> écrit : >>>>>> >>>>>>> Hi Mejri >>>>>>> >>>>>>> > I’m wondering if this is strictly necessary, since the Kafka >>>>>>> broker itself keeps track of offsets (i am not mistaken). In other >>>>>>> words, >>>>>>> if we redeploy the job, will it automatically resume from the last Kafka >>>>>>> offset, or should we still rely on Flink’s checkpoint/savepoint >>>>>>> mechanism >>>>>>> to ensure correct offset recovery? >>>>>>> >>>>>>> This depends on the starting offset you set in the source config[1]. >>>>>>> you can configure it to start from earliest or last committed or latest >>>>>>> or >>>>>>> at specific offset. >>>>>>> >>>>>>> I am not 100% sure about RabbitMQ, IIRC it uses checkpoints to ack >>>>>>> read messages unlike Kafka. >>>>>>> >>>>>>> >>>>>>> 1- >>>>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#starting-offset >>>>>>> Best Regards >>>>>>> Ahmed Hamdy >>>>>>> >>>>>>> >>>>>>> On Tue, 18 Mar 2025 at 22:20, mejri houssem < >>>>>>> mejrihousse...@gmail.com> wrote: >>>>>>> >>>>>>>> >>>>>>>> Hello everyone, >>>>>>>> >>>>>>>> We have a stateless Flink job that uses a Kafka source with >>>>>>>> at-least-once guarantees. We’ve enabled checkpoints so that, in the >>>>>>>> event >>>>>>>> of a restart, Flink can restore from the last committed offset stored >>>>>>>> in a >>>>>>>> successful checkpoint. Now we’re considering enabling savepoints for >>>>>>>> our >>>>>>>> production deployment. >>>>>>>> >>>>>>>> I’m wondering if this is strictly necessary, since the Kafka broker >>>>>>>> itself keeps track of offsets (i am not mistaken). In other words, if >>>>>>>> we >>>>>>>> redeploy the job, will it automatically resume from the last Kafka >>>>>>>> offset, >>>>>>>> or should we still rely on Flink’s checkpoint/savepoint mechanism to >>>>>>>> ensure >>>>>>>> correct offset recovery? >>>>>>>> >>>>>>>> Additionally, we have another job that uses a RabbitMQ source with >>>>>>>> checkpoints enabled to manage manual acknowledgments. Does the same >>>>>>>> logic >>>>>>>> apply in that case as well? >>>>>>>> >>>>>>>> Thanks in advance for any guidance!point enabled in order to >>>>>>>> activate manual ack. Does this apply to this job also? >>>>>>>> >>>>>>>> Thanks in advance. >>>>>>>> >>>>>>>> >>>>>>>> Best Regards. >>>>>>>> >>>>>>>