slankka created FLINK-36674: ------------------------------- Summary: Flink will not commit if checkpointing is not enabled Key: FLINK-36674 URL: https://issues.apache.org/jira/browse/FLINK-36674 Project: Flink Issue Type: Bug Components: Connectors / Kafka Reporter: slankka
background: recently I found that _Flink-sql-connector-kafka-1.13.5_ kafka source DDL style, checkpoint is *NOT* enabled, I still can see consumer group defined by _properties.group.id_ values in Kafka. And when I change to flink-sql-connector-kafka-1.17.2, checkpoint must enabled otherwise consumer group will never seem. Developers offen look into consumer groups to see whether program works properly. The expected result will be Flink document, saying: {noformat} If checkpointing is not enabled, Kafka source relies on Kafka consumer’s internal automatic periodic offset committing logic, configured by enable.auto.commit and auto.commit.interval.ms in the properties of Kafka consumer.{noformat} The simplist kafka source DDL: {code:java} CREATE TABLE KafkaTable ( `user_id` BIGINT, `item_id` BIGINT, `behavior` STRING, `ts` TIMESTAMP(3) METADATA FROM 'timestamp' ) WITH ( 'connector' = 'kafka', 'topic' = 'user_behavior', 'properties.bootstrap.servers' = 'localhost:9092', 'properties.group.id' = 'testGroup', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ) {code} Precondition: _group.id_ always provided. Here're my questions: # why the _enable.auto.commit_ behavior turned off by default (The Kafka officially says defaults to true if there group.id provided) which may cause user use must define expilicity true to the kafka source DDL. (naturely peolple won't set it) # It seems that this ticket [FLINK-20114] Fix a few KafkaSource-related bugs - ASF JIRA takes effects earlier than 1.13.5, what causes the commiting to consumer group behaivor diffierence between Flink 1.13.5 and Flink 1.17.2. (no checkpoint enabled)? logic related: [Github PR-15161|https://github.com/apache/flink/pull/15161] {code:java} org.apache.flink.connector.kafka.source.KafkaSourceBuilder: maybeOverride(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false", false); {code} FlinkKafkaConsumer.java respect the default value (which is true) , {code:java} @Override protected boolean getIsAutoCommitEnabled() { return getBoolean(properties, ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true) && PropertiesUtil.getLong( properties, ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000) > 0; } {code} The OffsetCommitModes : {code:java} public class OffsetCommitModes { ... public static OffsetCommitMode fromConfiguration( boolean enableAutoCommit, boolean enableCommitOnCheckpoint, boolean enableCheckpointing) { if (enableCheckpointing) { // if checkpointing is enabled, the mode depends only on whether committing on // checkpoints is enabled return (enableCommitOnCheckpoint) ? OffsetCommitMode.ON_CHECKPOINTS : OffsetCommitMode.DISABLED; } else { // else, the mode depends only on whether auto committing is enabled in the provided // Kafka properties return (enableAutoCommit) ? OffsetCommitMode.KAFKA_PERIODIC : OffsetCommitMode.DISABLED; } } } {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)