Hello,

In this paraphrase of the documentation  [1], it is mentioned  that <<*Kafka
source does not **rely on committed offsets for fault tolerance. Committing
offset is only for exposing the progress of consumer and consuming group
for monitoring*>>.

Can someone explain please why the kafka source does not rely on the
committed offset for recovery, even though the offset stored in
checkpoint/savepoint is the same as the one committed to kafka ?

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/#consumer-offset-committing

Best Regards,

Le mer. 26 mars 2025 à 14:27, Gabor Somogyi <gabor.g.somo...@gmail.com> a
écrit :

> > Are they two different things?
> There are no consumer and broker offsets, there are offsets which belong
> to a topic + partition pair.
>
> > And which offset is saved in the checkpoint/savepoint?
> Which Flink thinks is processed already.
>
> Regarding the PROD deploy now you know the risks so feel free to pick one.
>
> BR,
> G
>
>
> On Wed, Mar 26, 2025 at 2:11 PM mejri houssem <mejrihousse...@gmail.com>
> wrote:
>
>> Hello Gabor,
>>
>> Thanks for your response.
>>
>> I just want to clarify one thing: is there any difference between the
>> Kafka source offset and the Kafka broker offset? Are they two different
>> things? And which offset is saved in the checkpoint/savepoint?
>>
>> For our use case, we intend to take a savepoint only once before updating
>> the job in production. This means we stop the job with a savepoint, make
>> some updates, and then restart from the savepoint we have taken.
>>
>> Best Regards,
>>
>> Le mer. 26 mars 2025 à 07:18, Gabor Somogyi <gabor.g.somo...@gmail.com>
>> a écrit :
>>
>>> In short it's encouraged to use savepoint because of the following
>>> situation:
>>> * You start processing data from offset 0
>>> * 2 savepoints created, one with offset 10, another with 20
>>> * This timeframe Kafka has offset 20 since that's the last processed
>>> * At offset 30 you realize that data processed between 10 and 30 are
>>> just faulty because of broken job logic
>>>
>>> Reading offsets from savepoint is relatively easy, just restart the job
>>> from offset 10 savepoint.
>>> When Kafka is the source of truth then you need to do some mumbo-jumbo
>>> to cut back the Kafka offsets + you've
>>> most probably no idea where to cut back.
>>>
>>> RabbitMQ source (consumer) reads from a queue and acknowledges messages
>>> on checkpoints.
>>> When checkpointing is enabled, it guarantees exactly-once processing
>>> semantics. Please see [1] for further details.
>>>
>>> Hope this helps.
>>>
>>> [1]
>>> https://github.com/apache/flink-connector-rabbitmq/blob/66e323a3e79befc08ae03f2789a8aa94b343d504/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java#L49-L76
>>>
>>> BR,
>>> G
>>>
>>>
>>> On Wed, Mar 26, 2025 at 1:05 AM mejri houssem <mejrihousse...@gmail.com>
>>> wrote:
>>>
>>>> Hello,
>>>>
>>>> Is there any further clarification or explanation regarding the
>>>> subject, please?
>>>>
>>>> Best regards.
>>>>
>>>> Le mer. 19 mars 2025 à 15:02, mejri houssem <mejrihousse...@gmail.com>
>>>> a écrit :
>>>>
>>>>> Hello,
>>>>>
>>>>> So if I understand you well,  I cannot rely on the kafka broker offset
>>>>> to achieve at-least-once guarantee. Without checkpoint/savepoint enabled,
>>>>> that would not be possible.
>>>>>
>>>>> Best regards
>>>>>
>>>>> Le mer. 19 mars 2025 à 12:00, Ahmed Hamdy <hamdy10...@gmail.com> a
>>>>> écrit :
>>>>>
>>>>>> Hi Mejri,
>>>>>> Not exactly, you can still rely on savepoint to restart/redeploy the
>>>>>> job from the latest offset recorded in Flink, my reply was regarding your
>>>>>> question if you can replace that and just depend on the committed offsets
>>>>>> in the kafka broker. For at-least-once semantic savepoints and 
>>>>>> checkpoints
>>>>>> book-keep the offset for the Flink job after the initialization, the 
>>>>>> config
>>>>>> I mentioned only configures the initialization of the consumers. If you
>>>>>> start the job without savepoint and it falls back to the config (which 
>>>>>> may
>>>>>> be using the broker committed offset) that might achieve the semantic but
>>>>>> it doesn't guarantee that.
>>>>>> For example assume you restore from save point, job completes a
>>>>>> couple of checkpoints hence the offset committed is updated in kafka then
>>>>>> for some reason you figure out a bug, if you only depend on Kafka broker
>>>>>> committed offset you would probably break the semantic while if you use
>>>>>> savepoints you can redeploy from the last correct version savepoint and
>>>>>> reprocess the data that was processed by the buggy job.
>>>>>>
>>>>>> Best Regards
>>>>>> Ahmed Hamdy
>>>>>>
>>>>>>
>>>>>> On Wed, 19 Mar 2025 at 00:54, mejri houssem <mejrihousse...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hello Ahmed,
>>>>>>>
>>>>>>> Thanks for the response.
>>>>>>>
>>>>>>> Does that mean checkpoints and savepoints have nothing to do with
>>>>>>> the at-least-once guarantee, since it depends solely on the starting 
>>>>>>> offset
>>>>>>> configuration?
>>>>>>>
>>>>>>> Best Regards
>>>>>>>
>>>>>>> Le mar. 18 mars 2025 à 23:59, Ahmed Hamdy <hamdy10...@gmail.com> a
>>>>>>> écrit :
>>>>>>>
>>>>>>>> Hi Mejri
>>>>>>>>
>>>>>>>> > I’m wondering if this is strictly necessary, since the Kafka
>>>>>>>> broker itself keeps track of offsets (i am not mistaken). In other 
>>>>>>>> words,
>>>>>>>> if we redeploy the job, will it automatically resume from the last 
>>>>>>>> Kafka
>>>>>>>> offset, or should we still rely on Flink’s checkpoint/savepoint 
>>>>>>>> mechanism
>>>>>>>> to ensure correct offset recovery?
>>>>>>>>
>>>>>>>> This depends on the starting offset you set in the source
>>>>>>>> config[1]. you can configure it to start from earliest or last 
>>>>>>>> committed or
>>>>>>>> latest or at specific offset.
>>>>>>>>
>>>>>>>> I am not 100% sure about RabbitMQ, IIRC it uses checkpoints to ack
>>>>>>>> read messages unlike Kafka.
>>>>>>>>
>>>>>>>>
>>>>>>>> 1-
>>>>>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#starting-offset
>>>>>>>> Best Regards
>>>>>>>> Ahmed Hamdy
>>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, 18 Mar 2025 at 22:20, mejri houssem <
>>>>>>>> mejrihousse...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>>
>>>>>>>>> Hello everyone,
>>>>>>>>>
>>>>>>>>> We have a stateless Flink job that uses a Kafka source with
>>>>>>>>> at-least-once guarantees. We’ve enabled checkpoints so that, in the 
>>>>>>>>> event
>>>>>>>>> of a restart, Flink can restore from the last committed offset stored 
>>>>>>>>> in a
>>>>>>>>> successful checkpoint. Now we’re considering enabling savepoints for 
>>>>>>>>> our
>>>>>>>>> production deployment.
>>>>>>>>>
>>>>>>>>> I’m wondering if this is strictly necessary, since the Kafka
>>>>>>>>> broker itself keeps track of offsets (i am not mistaken). In other 
>>>>>>>>> words,
>>>>>>>>> if we redeploy the job, will it automatically resume from the last 
>>>>>>>>> Kafka
>>>>>>>>> offset, or should we still rely on Flink’s checkpoint/savepoint 
>>>>>>>>> mechanism
>>>>>>>>> to ensure correct offset recovery?
>>>>>>>>>
>>>>>>>>> Additionally, we have another job that uses a RabbitMQ source with
>>>>>>>>> checkpoints enabled to manage manual acknowledgments. Does the same 
>>>>>>>>> logic
>>>>>>>>> apply in that case as well?
>>>>>>>>>
>>>>>>>>> Thanks in advance for any guidance!point enabled in order to
>>>>>>>>> activate manual ack. Does this apply to this job also?
>>>>>>>>>
>>>>>>>>> Thanks in advance.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Best Regards.
>>>>>>>>>
>>>>>>>>

Reply via email to