Unsubscribe > 2024年12月3日 21:11,ar...@apache.org 写道: > > This is an automated email from the ASF dual-hosted git repository. > > arvid pushed a commit to branch main > in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git > > > The following commit(s) were added to refs/heads/main by this push: > new f6a077a9 [FLINK-36780] Kafka source disable partition discovery > unexpectedly (#136) > f6a077a9 is described below > > commit f6a077a9dd8d1d5e43fc545cc9baab227d8438a0 > Author: Mingliang Liu <lium...@apache.org> > AuthorDate: Tue Dec 3 05:11:18 2024 -0800 > > [FLINK-36780] Kafka source disable partition discovery unexpectedly (#136) > --- > .../connector/kafka/source/KafkaSourceBuilder.java | 7 +++---- > .../kafka/source/KafkaSourceBuilderTest.java | 23 ++++++++++++++++++++++ > 2 files changed, 26 insertions(+), 4 deletions(-) > > diff --git > a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java > > b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java > index 78a4b0b6..0709afe0 100644 > --- > a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java > +++ > b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java > @@ -474,10 +474,9 @@ public class KafkaSourceBuilder<OUT> { > true); > > // If the source is bounded, do not run periodic partition discovery. > - maybeOverride( > - KafkaSourceOptions.PARTITION_DISCOVERY_INTERVAL_MS.key(), > - "-1", > - boundedness == Boundedness.BOUNDED); > + if (boundedness == Boundedness.BOUNDED) { > + > maybeOverride(KafkaSourceOptions.PARTITION_DISCOVERY_INTERVAL_MS.key(), "-1", > true); > + } > > // If the client id prefix is not set, reuse the consumer group id as > the client id prefix, > // or generate a random string if consumer group id is not specified. > diff --git > a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilderTest.java > > b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilderTest.java > index 2829f01e..ca777bc7 100644 > --- > a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilderTest.java > +++ > b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilderTest.java > @@ -217,6 +217,29 @@ public class KafkaSourceBuilderTest { > .hasMessageContaining(expectedError); > } > > + @Test > + public void testDefaultPartitionDiscovery() { > + final KafkaSource<String> kafkaSource = getBasicBuilder().build(); > + // Commit on checkpoint and auto commit should be disabled because > group.id is not specified > + assertThat( > + kafkaSource > + .getConfiguration() > + > .get(KafkaSourceOptions.PARTITION_DISCOVERY_INTERVAL_MS)) > + > .isEqualTo(KafkaSourceOptions.PARTITION_DISCOVERY_INTERVAL_MS.defaultValue()); > + } > + > + @Test > + public void testPeriodPartitionDiscovery() { > + final KafkaSource<String> kafkaSource = > + > getBasicBuilder().setBounded(OffsetsInitializer.latest()).build(); > + // Commit on checkpoint and auto commit should be disabled because > group.id is not specified > + assertThat( > + kafkaSource > + .getConfiguration() > + > .get(KafkaSourceOptions.PARTITION_DISCOVERY_INTERVAL_MS)) > + .isEqualTo(-1L); > + } > + > private KafkaSourceBuilder<String> getBasicBuilder() { > return new KafkaSourceBuilder<String>() > .setBootstrapServers("testServer") >