Thank you. One last question. What is an RP? Where can I read it? Marco
> On Nov 30, 2021, at 11:06 PM, Hang Ruan <ruanhang1...@gmail.com> wrote: > > Hi, > > In 1.12.0-1.12.5 versions, committing offset to kafka when the checkpoint is > open is the default behavior in KafkaSourceBuilder. And it can not be changed > in KafkaSourceBuilder. > > By this FLINK-24277 <https://issues.apache.org/jira/browse/FLINK-24277>, we > could change the behavior. This problem will be fixed in 1.12.6. It seems not > to be contained in your version. > > Reading the RP will be helpful for you to understand the behavior. > > > Marco Villalobos <mvillalo...@kineteque.com > <mailto:mvillalo...@kineteque.com>> 于2021年12月1日周三 上午3:43写道: > Thanks! > > However, I noticed that KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT does > not exist in Flink 1.12. > > Is that property supported with the string "commit.offsets.on.checkpoints"? > > How do I configure that behavior so that offsets get committed on checkpoints > in Flink 1.12 when using the KafkaSourceBuilder? Or is that the default > behavior with checkpoints? > > > > > On Mon, Nov 29, 2021 at 7:46 PM Hang Ruan <ruanhang1...@gmail.com > <mailto:ruanhang1...@gmail.com>> wrote: > Hi, > > Maybe you can write like this : > builder.setProperty(KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT.key(), > "true"); > > Other additional properties could be found here : > https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/kafka/#additional-properties > > <https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/kafka/#additional-properties> > Marco Villalobos <mvillalo...@kineteque.com > <mailto:mvillalo...@kineteque.com>> 于2021年11月30日周二 上午11:08写道: > Thank you for the information. That still does not answer my question > though. How do I configure Flink in 1.12 using the KafkaSourceBuilder so > that consumer should commit offsets back to Kafka on checkpoints? > > FlinkKafkaConsumer#setCommitOffsetsOnCheckpoints(boolean) has this method. > > But now that I am using KafkaSourceBuilder, how do I configure that behavior > so that offsets get committed on checkpoints? Or is that the default > behavior with checkpoints? > > -Marco > > On Mon, Nov 29, 2021 at 5:58 PM Caizhi Weng <tsreape...@gmail.com > <mailto:tsreape...@gmail.com>> wrote: > Hi! > > Flink 1.14 release note states about this. See [1]. > > [1] > https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/release-notes/flink-1.14/#deprecate-flinkkafkaconsumer > > <https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/release-notes/flink-1.14/#deprecate-flinkkafkaconsumer> > Marco Villalobos <mvillalo...@kineteque.com > <mailto:mvillalo...@kineteque.com>> 于2021年11月30日周二 上午7:12写道: > Hi everybody, > > I am using Flink 1.12 and migrating my code from using FlinkKafkaConsumer to > using the KafkaSourceBuilder. > > FlinkKafkaConsumer has the method > > /** > * Specifies whether or not the consumer should commit offsets back to Kafka > on checkpoints. > * This setting will only have effect if checkpointing is enabled for the > job. If checkpointing isn't > * enabled, only the "auto.commit.enable" (for 0.8) / "enable.auto.commit" > (for 0.9+) property > * settings will be used. > */ > FlinkKafkaConsumer#setCommitOffsetsOnCheckpoints(boolean) > > How do I setup that parameter when using the KafkaSourceBuilder? If I already > have checkpointing configured, is it necessary to setup "commit offsets on > checkpoints"? > > The Flink 1.12 documentation does not discuss this topic, and the Flink 1.14 > documentation says little about it. > > For example, the Flink 1.14 documentation states: > > Additional Properties > In addition to properties described above, you can set arbitrary properties > for KafkaSource and KafkaConsumer by using setProperties(Properties) and > setProperty(String, String). KafkaSource has following options for > configuration: > commit.offsets.on.checkpoint specifies whether to commit consuming offsets to > Kafka brokers on checkpoint > > And the 1.12 documentation states: > > With Flink’s checkpointing enabled, the Flink Kafka Consumer will consume > records from a topic and periodically checkpoint all its Kafka offsets, > together with the state of other operations. In case of a job failure, Flink > will restore the streaming program to the state of the latest checkpoint and > re-consume the records from Kafka, starting from the offsets that were stored > in the checkpoint. > The interval of drawing checkpoints therefore defines how much the program > may have to go back at most, in case of a failure. To use fault tolerant > Kafka Consumers, checkpointing of the topology needs to be enabled in the job. > If checkpointing is disabled, the Kafka consumer will periodically commit the > offsets to Zookeeper. > > Thank you. > > Marco > >