[ https://issues.apache.org/jira/browse/FLINK-32038?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17773719#comment-17773719 ]
Martijn Visser commented on FLINK-32038: ---------------------------------------- [~pritam.agarwala] [~tzulitai] What is the conclusion of this ticket? > OffsetCommitMode.Kafka_periodic with checkpointing enabled > ----------------------------------------------------------- > > Key: FLINK-32038 > URL: https://issues.apache.org/jira/browse/FLINK-32038 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka, Runtime / Checkpointing > Affects Versions: 1.14.6 > Reporter: Pritam Agarwala > Priority: Major > > I need to get kafka-lag to prepare a graph and its dependent on kafka > committed offset. Flink is updating the offsets only after checkpointing to > make it consistent. > Default Behaviour as per doc : > If checkpoint is enabled, but {{consumer.setCommitOffsetsOnCheckpoints}} set > to false, then offset will not be committed at all even if the > {{enable.auto.commit}} is set to true. > So, when {{consumer.setCommitOffsetsOnCheckpoints}} set to false, *shouldn't > it fall back on the {{enable.auto.commit}} to do offset commit regularly > since* *in any case flink doesn't use consumer committed offsets for > recovery.* > > OffsetCommitModes class : > > {code:java} > public class OffsetCommitModes { > /** > * Determine the offset commit mode using several configuration values. > * > * @param enableAutoCommit whether or not auto committing is enabled in > the provided Kafka > * properties. > * @param enableCommitOnCheckpoint whether or not committing on > checkpoints is enabled. > * @param enableCheckpointing whether or not checkpoint is enabled for > the consumer. > * @return the offset commit mode to use, based on the configuration > values. > */ > public static OffsetCommitMode fromConfiguration( > boolean enableAutoCommit, > boolean enableCommitOnCheckpoint, > boolean enableCheckpointing) { > if (enableCheckpointing) { > // if checkpointing is enabled, the mode depends only on whether > committing on > // checkpoints is enabled > return (enableCommitOnCheckpoint) > ? OffsetCommitMode.ON_CHECKPOINTS > : OffsetCommitMode.DISABLED; > } else { > // else, the mode depends only on whether auto committing is > enabled in the provided > // Kafka properties > return (enableAutoCommit) ? OffsetCommitMode.KAFKA_PERIODIC : > OffsetCommitMode.DISABLED; > } > } > } > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)