Thank you. One last question.  What is an RP? Where can I read it?

Marco

> On Nov 30, 2021, at 11:06 PM, Hang Ruan <ruanhang1...@gmail.com> wrote:
> 
> Hi,
> 
> In 1.12.0-1.12.5 versions, committing offset to kafka when the checkpoint is 
> open is the default behavior in KafkaSourceBuilder. And it can not be changed 
> in KafkaSourceBuilder. 
> 
> By this FLINK-24277 <https://issues.apache.org/jira/browse/FLINK-24277>, we 
> could change the behavior. This problem will be fixed in 1.12.6. It seems not 
> to be contained in your version.  
> 
> Reading the RP will be helpful for you to understand the behavior.
>  
> 
> Marco Villalobos <mvillalo...@kineteque.com 
> <mailto:mvillalo...@kineteque.com>> 于2021年12月1日周三 上午3:43写道:
> Thanks! 
> 
> However, I noticed that KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT does 
> not exist in Flink 1.12.
> 
> Is that property supported with the string "commit.offsets.on.checkpoints"?
> 
> How do I configure that behavior so that offsets get committed on checkpoints 
> in Flink 1.12 when using the KafkaSourceBuilder? Or is that the default 
> behavior with checkpoints?
> 
> 
> 
> 
> On Mon, Nov 29, 2021 at 7:46 PM Hang Ruan <ruanhang1...@gmail.com 
> <mailto:ruanhang1...@gmail.com>> wrote:
> Hi, 
> 
> Maybe you can write like this : 
> builder.setProperty(KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT.key(), 
> "true");
> 
> Other additional properties could be found here : 
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/kafka/#additional-properties
>  
> <https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/kafka/#additional-properties>
> Marco Villalobos <mvillalo...@kineteque.com 
> <mailto:mvillalo...@kineteque.com>> 于2021年11月30日周二 上午11:08写道:
> Thank you for the information.  That still does not answer my question 
> though.  How do I configure Flink in 1.12 using the KafkaSourceBuilder so 
> that consumer should commit offsets back to Kafka on checkpoints?
> 
> FlinkKafkaConsumer#setCommitOffsetsOnCheckpoints(boolean) has this method. 
> 
> But now that I am using KafkaSourceBuilder, how do I configure that behavior 
> so that offsets get committed on checkpoints?  Or is that the default 
> behavior with checkpoints?
> 
> -Marco
> 
> On Mon, Nov 29, 2021 at 5:58 PM Caizhi Weng <tsreape...@gmail.com 
> <mailto:tsreape...@gmail.com>> wrote:
> Hi!
> 
> Flink 1.14 release note states about this. See [1].
> 
> [1] 
> https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/release-notes/flink-1.14/#deprecate-flinkkafkaconsumer
>  
> <https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/release-notes/flink-1.14/#deprecate-flinkkafkaconsumer>
> Marco Villalobos <mvillalo...@kineteque.com 
> <mailto:mvillalo...@kineteque.com>> 于2021年11月30日周二 上午7:12写道:
> Hi everybody,
> 
> I am using Flink 1.12 and migrating my code from using FlinkKafkaConsumer to 
> using the KafkaSourceBuilder.
> 
> FlinkKafkaConsumer has the method 
> 
> /**
>  * Specifies whether or not the consumer should commit offsets back to Kafka 
> on checkpoints.
>  * This setting will only have effect if checkpointing is enabled for the 
> job. If checkpointing isn't
>  * enabled, only the "auto.commit.enable" (for 0.8) / "enable.auto.commit" 
> (for 0.9+) property
>  * settings will be used.
> */
> FlinkKafkaConsumer#setCommitOffsetsOnCheckpoints(boolean)
> 
> How do I setup that parameter when using the KafkaSourceBuilder? If I already 
> have checkpointing configured, is it necessary to setup "commit offsets on 
> checkpoints"?
> 
> The Flink 1.12 documentation does not discuss this topic, and the Flink 1.14 
> documentation says little about it.
> 
>  For example, the Flink 1.14 documentation states:
> 
> Additional Properties
> In addition to properties described above, you can set arbitrary properties 
> for KafkaSource and KafkaConsumer by using setProperties(Properties) and 
> setProperty(String, String). KafkaSource has following options for 
> configuration:
> commit.offsets.on.checkpoint specifies whether to commit consuming offsets to 
> Kafka brokers on checkpoint
> 
> And the 1.12 documentation states:
> 
> With Flink’s checkpointing enabled, the Flink Kafka Consumer will consume 
> records from a topic and periodically checkpoint all its Kafka offsets, 
> together with the state of other operations. In case of a job failure, Flink 
> will restore the streaming program to the state of the latest checkpoint and 
> re-consume the records from Kafka, starting from the offsets that were stored 
> in the checkpoint.
> The interval of drawing checkpoints therefore defines how much the program 
> may have to go back at most, in case of a failure. To use fault tolerant 
> Kafka Consumers, checkpointing of the topology needs to be enabled in the job.
> If checkpointing is disabled, the Kafka consumer will periodically commit the 
> offsets to Zookeeper.
> 
> Thank you.
> 
> Marco
> 
> 

Reply via email to