Hi, Maybe you can write like this : builder.setProperty(KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT.key(), "true");
Other additional properties could be found here : https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/kafka/#additional-properties Marco Villalobos <mvillalo...@kineteque.com> 于2021年11月30日周二 上午11:08写道: > Thank you for the information. That still does not answer my question > though. How do I configure Flink in 1.12 using the KafkaSourceBuilder so > that consumer should commit offsets back to Kafka on checkpoints? > > FlinkKafkaConsumer#setCommitOffsetsOnCheckpoints(boolean) has this method. > > But now that I am using KafkaSourceBuilder, how do I configure that > behavior so that offsets get committed on checkpoints? Or is that the > default behavior with checkpoints? > > -Marco > > On Mon, Nov 29, 2021 at 5:58 PM Caizhi Weng <tsreape...@gmail.com> wrote: > >> Hi! >> >> Flink 1.14 release note states about this. See [1]. >> >> [1] >> https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/release-notes/flink-1.14/#deprecate-flinkkafkaconsumer >> >> Marco Villalobos <mvillalo...@kineteque.com> 于2021年11月30日周二 上午7:12写道: >> >>> Hi everybody, >>> >>> I am using Flink 1.12 and migrating my code from using >>> FlinkKafkaConsumer to using the KafkaSourceBuilder. >>> >>> FlinkKafkaConsumer has the method >>> >>> /** >>>> * Specifies whether or not the consumer should commit offsets back to >>>> Kafka on checkpoints. >>>> * This setting will only have effect if checkpointing is enabled for >>>> the job. If checkpointing isn't >>>> * enabled, only the "auto.commit.enable" (for 0.8) / >>>> "enable.auto.commit" (for 0.9+) property >>>> * settings will be used. >>>> */ >>>> FlinkKafkaConsumer#setCommitOffsetsOnCheckpoints(boolean) >>> >>> >>> How do I setup that parameter when using the KafkaSourceBuilder? If I >>> already have checkpointing configured, is it necessary to setup "commit >>> offsets on checkpoints"? >>> >>> The Flink 1.12 documentation does not discuss this topic, and the Flink >>> 1.14 documentation says little about it. >>> >>> For example, the Flink 1.14 documentation states: >>> >>> Additional Properties >>>> In addition to properties described above, you can set arbitrary >>>> properties for KafkaSource and KafkaConsumer by using >>>> setProperties(Properties) and setProperty(String, String). KafkaSource has >>>> following options for configuration: >>>> commit.offsets.on.checkpoint specifies whether to commit consuming >>>> offsets to Kafka brokers on checkpoint >>> >>> >>> And the 1.12 documentation states: >>> >>> With Flink’s checkpointing enabled, the Flink Kafka Consumer will >>>> consume records from a topic and periodically checkpoint all its Kafka >>>> offsets, together with the state of other operations. In case of a job >>>> failure, Flink will restore the streaming program to the state of the >>>> latest checkpoint and re-consume the records from Kafka, starting from the >>>> offsets that were stored in the checkpoint. >>>> The interval of drawing checkpoints therefore defines how much the >>>> program may have to go back at most, in case of a failure. To use fault >>>> tolerant Kafka Consumers, checkpointing of the topology needs to be enabled >>>> in the job. >>>> If checkpointing is disabled, the Kafka consumer will periodically >>>> commit the offsets to Zookeeper. >>> >>> >>> Thank you. >>> >>> Marco >>> >>> >>>