Thank you for the information. That still does not answer my question though. How do I configure Flink in 1.12 using the KafkaSourceBuilder so that consumer should commit offsets back to Kafka on checkpoints?
FlinkKafkaConsumer#setCommitOffsetsOnCheckpoints(boolean) has this method. But now that I am using KafkaSourceBuilder, how do I configure that behavior so that offsets get committed on checkpoints? Or is that the default behavior with checkpoints? -Marco On Mon, Nov 29, 2021 at 5:58 PM Caizhi Weng <tsreape...@gmail.com> wrote: > Hi! > > Flink 1.14 release note states about this. See [1]. > > [1] > https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/release-notes/flink-1.14/#deprecate-flinkkafkaconsumer > > Marco Villalobos <mvillalo...@kineteque.com> 于2021年11月30日周二 上午7:12写道: > >> Hi everybody, >> >> I am using Flink 1.12 and migrating my code from using FlinkKafkaConsumer >> to using the KafkaSourceBuilder. >> >> FlinkKafkaConsumer has the method >> >> /** >>> * Specifies whether or not the consumer should commit offsets back to >>> Kafka on checkpoints. >>> * This setting will only have effect if checkpointing is enabled for >>> the job. If checkpointing isn't >>> * enabled, only the "auto.commit.enable" (for 0.8) / >>> "enable.auto.commit" (for 0.9+) property >>> * settings will be used. >>> */ >>> FlinkKafkaConsumer#setCommitOffsetsOnCheckpoints(boolean) >> >> >> How do I setup that parameter when using the KafkaSourceBuilder? If I >> already have checkpointing configured, is it necessary to setup "commit >> offsets on checkpoints"? >> >> The Flink 1.12 documentation does not discuss this topic, and the Flink >> 1.14 documentation says little about it. >> >> For example, the Flink 1.14 documentation states: >> >> Additional Properties >>> In addition to properties described above, you can set arbitrary >>> properties for KafkaSource and KafkaConsumer by using >>> setProperties(Properties) and setProperty(String, String). KafkaSource has >>> following options for configuration: >>> commit.offsets.on.checkpoint specifies whether to commit consuming >>> offsets to Kafka brokers on checkpoint >> >> >> And the 1.12 documentation states: >> >> With Flink’s checkpointing enabled, the Flink Kafka Consumer will consume >>> records from a topic and periodically checkpoint all its Kafka offsets, >>> together with the state of other operations. In case of a job failure, >>> Flink will restore the streaming program to the state of the latest >>> checkpoint and re-consume the records from Kafka, starting from the offsets >>> that were stored in the checkpoint. >>> The interval of drawing checkpoints therefore defines how much the >>> program may have to go back at most, in case of a failure. To use fault >>> tolerant Kafka Consumers, checkpointing of the topology needs to be enabled >>> in the job. >>> If checkpointing is disabled, the Kafka consumer will periodically >>> commit the offsets to Zookeeper. >> >> >> Thank you. >> >> Marco >> >> >>