Hello Gabor,

Thanks for your response.

I just want to clarify one thing: is there any difference between the Kafka
source offset and the Kafka broker offset? Are they two different things?
And which offset is saved in the checkpoint/savepoint?

For our use case, we intend to take a savepoint only once before updating
the job in production. This means we stop the job with a savepoint, make
some updates, and then restart from the savepoint we have taken.

Best Regards,

Le mer. 26 mars 2025 à 07:18, Gabor Somogyi <gabor.g.somo...@gmail.com> a
écrit :

> In short it's encouraged to use savepoint because of the following
> situation:
> * You start processing data from offset 0
> * 2 savepoints created, one with offset 10, another with 20
> * This timeframe Kafka has offset 20 since that's the last processed
> * At offset 30 you realize that data processed between 10 and 30 are just
> faulty because of broken job logic
>
> Reading offsets from savepoint is relatively easy, just restart the job
> from offset 10 savepoint.
> When Kafka is the source of truth then you need to do some mumbo-jumbo to
> cut back the Kafka offsets + you've
> most probably no idea where to cut back.
>
> RabbitMQ source (consumer) reads from a queue and acknowledges messages on
> checkpoints.
> When checkpointing is enabled, it guarantees exactly-once processing
> semantics. Please see [1] for further details.
>
> Hope this helps.
>
> [1]
> https://github.com/apache/flink-connector-rabbitmq/blob/66e323a3e79befc08ae03f2789a8aa94b343d504/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java#L49-L76
>
> BR,
> G
>
>
> On Wed, Mar 26, 2025 at 1:05 AM mejri houssem <mejrihousse...@gmail.com>
> wrote:
>
>> Hello,
>>
>> Is there any further clarification or explanation regarding the subject,
>> please?
>>
>> Best regards.
>>
>> Le mer. 19 mars 2025 à 15:02, mejri houssem <mejrihousse...@gmail.com> a
>> écrit :
>>
>>> Hello,
>>>
>>> So if I understand you well,  I cannot rely on the kafka broker offset
>>> to achieve at-least-once guarantee. Without checkpoint/savepoint enabled,
>>> that would not be possible.
>>>
>>> Best regards
>>>
>>> Le mer. 19 mars 2025 à 12:00, Ahmed Hamdy <hamdy10...@gmail.com> a
>>> écrit :
>>>
>>>> Hi Mejri,
>>>> Not exactly, you can still rely on savepoint to restart/redeploy the
>>>> job from the latest offset recorded in Flink, my reply was regarding your
>>>> question if you can replace that and just depend on the committed offsets
>>>> in the kafka broker. For at-least-once semantic savepoints and checkpoints
>>>> book-keep the offset for the Flink job after the initialization, the config
>>>> I mentioned only configures the initialization of the consumers. If you
>>>> start the job without savepoint and it falls back to the config (which may
>>>> be using the broker committed offset) that might achieve the semantic but
>>>> it doesn't guarantee that.
>>>> For example assume you restore from save point, job completes a couple
>>>> of checkpoints hence the offset committed is updated in kafka then for some
>>>> reason you figure out a bug, if you only depend on Kafka broker committed
>>>> offset you would probably break the semantic while if you use savepoints
>>>> you can redeploy from the last correct version savepoint and reprocess the
>>>> data that was processed by the buggy job.
>>>>
>>>> Best Regards
>>>> Ahmed Hamdy
>>>>
>>>>
>>>> On Wed, 19 Mar 2025 at 00:54, mejri houssem <mejrihousse...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hello Ahmed,
>>>>>
>>>>> Thanks for the response.
>>>>>
>>>>> Does that mean checkpoints and savepoints have nothing to do with the
>>>>> at-least-once guarantee, since it depends solely on the starting offset
>>>>> configuration?
>>>>>
>>>>> Best Regards
>>>>>
>>>>> Le mar. 18 mars 2025 à 23:59, Ahmed Hamdy <hamdy10...@gmail.com> a
>>>>> écrit :
>>>>>
>>>>>> Hi Mejri
>>>>>>
>>>>>> > I’m wondering if this is strictly necessary, since the Kafka broker
>>>>>> itself keeps track of offsets (i am not mistaken). In other words, if we
>>>>>> redeploy the job, will it automatically resume from the last Kafka 
>>>>>> offset,
>>>>>> or should we still rely on Flink’s checkpoint/savepoint mechanism to 
>>>>>> ensure
>>>>>> correct offset recovery?
>>>>>>
>>>>>> This depends on the starting offset you set in the source config[1].
>>>>>> you can configure it to start from earliest or last committed or latest 
>>>>>> or
>>>>>> at specific offset.
>>>>>>
>>>>>> I am not 100% sure about RabbitMQ, IIRC it uses checkpoints to ack
>>>>>> read messages unlike Kafka.
>>>>>>
>>>>>>
>>>>>> 1-
>>>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#starting-offset
>>>>>> Best Regards
>>>>>> Ahmed Hamdy
>>>>>>
>>>>>>
>>>>>> On Tue, 18 Mar 2025 at 22:20, mejri houssem <mejrihousse...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>>
>>>>>>> Hello everyone,
>>>>>>>
>>>>>>> We have a stateless Flink job that uses a Kafka source with
>>>>>>> at-least-once guarantees. We’ve enabled checkpoints so that, in the 
>>>>>>> event
>>>>>>> of a restart, Flink can restore from the last committed offset stored 
>>>>>>> in a
>>>>>>> successful checkpoint. Now we’re considering enabling savepoints for our
>>>>>>> production deployment.
>>>>>>>
>>>>>>> I’m wondering if this is strictly necessary, since the Kafka broker
>>>>>>> itself keeps track of offsets (i am not mistaken). In other words, if we
>>>>>>> redeploy the job, will it automatically resume from the last Kafka 
>>>>>>> offset,
>>>>>>> or should we still rely on Flink’s checkpoint/savepoint mechanism to 
>>>>>>> ensure
>>>>>>> correct offset recovery?
>>>>>>>
>>>>>>> Additionally, we have another job that uses a RabbitMQ source with
>>>>>>> checkpoints enabled to manage manual acknowledgments. Does the same 
>>>>>>> logic
>>>>>>> apply in that case as well?
>>>>>>>
>>>>>>> Thanks in advance for any guidance!point enabled in order to
>>>>>>> activate manual ack. Does this apply to this job also?
>>>>>>>
>>>>>>> Thanks in advance.
>>>>>>>
>>>>>>>
>>>>>>> Best Regards.
>>>>>>>
>>>>>>

Reply via email to