Thank you for the information.  That still does not answer my question
though.  How do I configure Flink in 1.12 using the KafkaSourceBuilder so
that consumer should commit offsets back to Kafka on checkpoints?

FlinkKafkaConsumer#setCommitOffsetsOnCheckpoints(boolean) has this method.

But now that I am using KafkaSourceBuilder, how do I configure that
behavior so that offsets get committed on checkpoints?  Or is that the
default behavior with checkpoints?

-Marco

On Mon, Nov 29, 2021 at 5:58 PM Caizhi Weng <tsreape...@gmail.com> wrote:

> Hi!
>
> Flink 1.14 release note states about this. See [1].
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/release-notes/flink-1.14/#deprecate-flinkkafkaconsumer
>
> Marco Villalobos <mvillalo...@kineteque.com> 于2021年11月30日周二 上午7:12写道:
>
>> Hi everybody,
>>
>> I am using Flink 1.12 and migrating my code from using FlinkKafkaConsumer
>> to using the KafkaSourceBuilder.
>>
>> FlinkKafkaConsumer has the method
>>
>> /**
>>>  * Specifies whether or not the consumer should commit offsets back to
>>> Kafka on checkpoints.
>>>  * This setting will only have effect if checkpointing is enabled for
>>> the job. If checkpointing isn't
>>>  * enabled, only the "auto.commit.enable" (for 0.8) /
>>> "enable.auto.commit" (for 0.9+) property
>>>  * settings will be used.
>>> */
>>> FlinkKafkaConsumer#setCommitOffsetsOnCheckpoints(boolean)
>>
>>
>> How do I setup that parameter when using the KafkaSourceBuilder? If I
>> already have checkpointing configured, is it necessary to setup "commit
>> offsets on checkpoints"?
>>
>> The Flink 1.12 documentation does not discuss this topic, and the Flink
>> 1.14 documentation says little about it.
>>
>>  For example, the Flink 1.14 documentation states:
>>
>> Additional Properties
>>> In addition to properties described above, you can set arbitrary
>>> properties for KafkaSource and KafkaConsumer by using
>>> setProperties(Properties) and setProperty(String, String). KafkaSource has
>>> following options for configuration:
>>> commit.offsets.on.checkpoint specifies whether to commit consuming
>>> offsets to Kafka brokers on checkpoint
>>
>>
>> And the 1.12 documentation states:
>>
>> With Flink’s checkpointing enabled, the Flink Kafka Consumer will consume
>>> records from a topic and periodically checkpoint all its Kafka offsets,
>>> together with the state of other operations. In case of a job failure,
>>> Flink will restore the streaming program to the state of the latest
>>> checkpoint and re-consume the records from Kafka, starting from the offsets
>>> that were stored in the checkpoint.
>>> The interval of drawing checkpoints therefore defines how much the
>>> program may have to go back at most, in case of a failure. To use fault
>>> tolerant Kafka Consumers, checkpointing of the topology needs to be enabled
>>> in the job.
>>> If checkpointing is disabled, the Kafka consumer will periodically
>>> commit the offsets to Zookeeper.
>>
>>
>> Thank you.
>>
>> Marco
>>
>>
>>

Reply via email to