> Are they two different things?
There are no consumer and broker offsets, there are offsets which belong to
a topic + partition pair.

> And which offset is saved in the checkpoint/savepoint?
Which Flink thinks is processed already.

Regarding the PROD deploy now you know the risks so feel free to pick one.

BR,
G


On Wed, Mar 26, 2025 at 2:11 PM mejri houssem <mejrihousse...@gmail.com>
wrote:

> Hello Gabor,
>
> Thanks for your response.
>
> I just want to clarify one thing: is there any difference between the
> Kafka source offset and the Kafka broker offset? Are they two different
> things? And which offset is saved in the checkpoint/savepoint?
>
> For our use case, we intend to take a savepoint only once before updating
> the job in production. This means we stop the job with a savepoint, make
> some updates, and then restart from the savepoint we have taken.
>
> Best Regards,
>
> Le mer. 26 mars 2025 à 07:18, Gabor Somogyi <gabor.g.somo...@gmail.com> a
> écrit :
>
>> In short it's encouraged to use savepoint because of the following
>> situation:
>> * You start processing data from offset 0
>> * 2 savepoints created, one with offset 10, another with 20
>> * This timeframe Kafka has offset 20 since that's the last processed
>> * At offset 30 you realize that data processed between 10 and 30 are just
>> faulty because of broken job logic
>>
>> Reading offsets from savepoint is relatively easy, just restart the job
>> from offset 10 savepoint.
>> When Kafka is the source of truth then you need to do some mumbo-jumbo to
>> cut back the Kafka offsets + you've
>> most probably no idea where to cut back.
>>
>> RabbitMQ source (consumer) reads from a queue and acknowledges messages
>> on checkpoints.
>> When checkpointing is enabled, it guarantees exactly-once processing
>> semantics. Please see [1] for further details.
>>
>> Hope this helps.
>>
>> [1]
>> https://github.com/apache/flink-connector-rabbitmq/blob/66e323a3e79befc08ae03f2789a8aa94b343d504/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java#L49-L76
>>
>> BR,
>> G
>>
>>
>> On Wed, Mar 26, 2025 at 1:05 AM mejri houssem <mejrihousse...@gmail.com>
>> wrote:
>>
>>> Hello,
>>>
>>> Is there any further clarification or explanation regarding the subject,
>>> please?
>>>
>>> Best regards.
>>>
>>> Le mer. 19 mars 2025 à 15:02, mejri houssem <mejrihousse...@gmail.com>
>>> a écrit :
>>>
>>>> Hello,
>>>>
>>>> So if I understand you well,  I cannot rely on the kafka broker offset
>>>> to achieve at-least-once guarantee. Without checkpoint/savepoint enabled,
>>>> that would not be possible.
>>>>
>>>> Best regards
>>>>
>>>> Le mer. 19 mars 2025 à 12:00, Ahmed Hamdy <hamdy10...@gmail.com> a
>>>> écrit :
>>>>
>>>>> Hi Mejri,
>>>>> Not exactly, you can still rely on savepoint to restart/redeploy the
>>>>> job from the latest offset recorded in Flink, my reply was regarding your
>>>>> question if you can replace that and just depend on the committed offsets
>>>>> in the kafka broker. For at-least-once semantic savepoints and checkpoints
>>>>> book-keep the offset for the Flink job after the initialization, the 
>>>>> config
>>>>> I mentioned only configures the initialization of the consumers. If you
>>>>> start the job without savepoint and it falls back to the config (which may
>>>>> be using the broker committed offset) that might achieve the semantic but
>>>>> it doesn't guarantee that.
>>>>> For example assume you restore from save point, job completes a couple
>>>>> of checkpoints hence the offset committed is updated in kafka then for 
>>>>> some
>>>>> reason you figure out a bug, if you only depend on Kafka broker committed
>>>>> offset you would probably break the semantic while if you use savepoints
>>>>> you can redeploy from the last correct version savepoint and reprocess the
>>>>> data that was processed by the buggy job.
>>>>>
>>>>> Best Regards
>>>>> Ahmed Hamdy
>>>>>
>>>>>
>>>>> On Wed, 19 Mar 2025 at 00:54, mejri houssem <mejrihousse...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hello Ahmed,
>>>>>>
>>>>>> Thanks for the response.
>>>>>>
>>>>>> Does that mean checkpoints and savepoints have nothing to do with the
>>>>>> at-least-once guarantee, since it depends solely on the starting offset
>>>>>> configuration?
>>>>>>
>>>>>> Best Regards
>>>>>>
>>>>>> Le mar. 18 mars 2025 à 23:59, Ahmed Hamdy <hamdy10...@gmail.com> a
>>>>>> écrit :
>>>>>>
>>>>>>> Hi Mejri
>>>>>>>
>>>>>>> > I’m wondering if this is strictly necessary, since the Kafka
>>>>>>> broker itself keeps track of offsets (i am not mistaken). In other 
>>>>>>> words,
>>>>>>> if we redeploy the job, will it automatically resume from the last Kafka
>>>>>>> offset, or should we still rely on Flink’s checkpoint/savepoint 
>>>>>>> mechanism
>>>>>>> to ensure correct offset recovery?
>>>>>>>
>>>>>>> This depends on the starting offset you set in the source config[1].
>>>>>>> you can configure it to start from earliest or last committed or latest 
>>>>>>> or
>>>>>>> at specific offset.
>>>>>>>
>>>>>>> I am not 100% sure about RabbitMQ, IIRC it uses checkpoints to ack
>>>>>>> read messages unlike Kafka.
>>>>>>>
>>>>>>>
>>>>>>> 1-
>>>>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#starting-offset
>>>>>>> Best Regards
>>>>>>> Ahmed Hamdy
>>>>>>>
>>>>>>>
>>>>>>> On Tue, 18 Mar 2025 at 22:20, mejri houssem <
>>>>>>> mejrihousse...@gmail.com> wrote:
>>>>>>>
>>>>>>>>
>>>>>>>> Hello everyone,
>>>>>>>>
>>>>>>>> We have a stateless Flink job that uses a Kafka source with
>>>>>>>> at-least-once guarantees. We’ve enabled checkpoints so that, in the 
>>>>>>>> event
>>>>>>>> of a restart, Flink can restore from the last committed offset stored 
>>>>>>>> in a
>>>>>>>> successful checkpoint. Now we’re considering enabling savepoints for 
>>>>>>>> our
>>>>>>>> production deployment.
>>>>>>>>
>>>>>>>> I’m wondering if this is strictly necessary, since the Kafka broker
>>>>>>>> itself keeps track of offsets (i am not mistaken). In other words, if 
>>>>>>>> we
>>>>>>>> redeploy the job, will it automatically resume from the last Kafka 
>>>>>>>> offset,
>>>>>>>> or should we still rely on Flink’s checkpoint/savepoint mechanism to 
>>>>>>>> ensure
>>>>>>>> correct offset recovery?
>>>>>>>>
>>>>>>>> Additionally, we have another job that uses a RabbitMQ source with
>>>>>>>> checkpoints enabled to manage manual acknowledgments. Does the same 
>>>>>>>> logic
>>>>>>>> apply in that case as well?
>>>>>>>>
>>>>>>>> Thanks in advance for any guidance!point enabled in order to
>>>>>>>> activate manual ack. Does this apply to this job also?
>>>>>>>>
>>>>>>>> Thanks in advance.
>>>>>>>>
>>>>>>>>
>>>>>>>> Best Regards.
>>>>>>>>
>>>>>>>

Reply via email to