Unsubscribe

> 2024年12月3日 21:11,ar...@apache.org 写道:
> 
> This is an automated email from the ASF dual-hosted git repository.
> 
> arvid pushed a commit to branch main
> in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git
> 
> 
> The following commit(s) were added to refs/heads/main by this push:
>     new f6a077a9 [FLINK-36780] Kafka source disable partition discovery 
> unexpectedly (#136)
> f6a077a9 is described below
> 
> commit f6a077a9dd8d1d5e43fc545cc9baab227d8438a0
> Author: Mingliang Liu <lium...@apache.org>
> AuthorDate: Tue Dec 3 05:11:18 2024 -0800
> 
>    [FLINK-36780] Kafka source disable partition discovery unexpectedly (#136)
> ---
> .../connector/kafka/source/KafkaSourceBuilder.java |  7 +++----
> .../kafka/source/KafkaSourceBuilderTest.java       | 23 ++++++++++++++++++++++
> 2 files changed, 26 insertions(+), 4 deletions(-)
> 
> diff --git 
> a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java
>  
> b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java
> index 78a4b0b6..0709afe0 100644
> --- 
> a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java
> +++ 
> b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java
> @@ -474,10 +474,9 @@ public class KafkaSourceBuilder<OUT> {
>                 true);
> 
>         // If the source is bounded, do not run periodic partition discovery.
> -        maybeOverride(
> -                KafkaSourceOptions.PARTITION_DISCOVERY_INTERVAL_MS.key(),
> -                "-1",
> -                boundedness == Boundedness.BOUNDED);
> +        if (boundedness == Boundedness.BOUNDED) {
> +            
> maybeOverride(KafkaSourceOptions.PARTITION_DISCOVERY_INTERVAL_MS.key(), "-1", 
> true);
> +        }
> 
>         // If the client id prefix is not set, reuse the consumer group id as 
> the client id prefix,
>         // or generate a random string if consumer group id is not specified.
> diff --git 
> a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilderTest.java
>  
> b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilderTest.java
> index 2829f01e..ca777bc7 100644
> --- 
> a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilderTest.java
> +++ 
> b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilderTest.java
> @@ -217,6 +217,29 @@ public class KafkaSourceBuilderTest {
>                 .hasMessageContaining(expectedError);
>     }
> 
> +    @Test
> +    public void testDefaultPartitionDiscovery() {
> +        final KafkaSource<String> kafkaSource = getBasicBuilder().build();
> +        // Commit on checkpoint and auto commit should be disabled because 
> group.id is not specified
> +        assertThat(
> +                        kafkaSource
> +                                .getConfiguration()
> +                                
> .get(KafkaSourceOptions.PARTITION_DISCOVERY_INTERVAL_MS))
> +                
> .isEqualTo(KafkaSourceOptions.PARTITION_DISCOVERY_INTERVAL_MS.defaultValue());
> +    }
> +
> +    @Test
> +    public void testPeriodPartitionDiscovery() {
> +        final KafkaSource<String> kafkaSource =
> +                
> getBasicBuilder().setBounded(OffsetsInitializer.latest()).build();
> +        // Commit on checkpoint and auto commit should be disabled because 
> group.id is not specified
> +        assertThat(
> +                        kafkaSource
> +                                .getConfiguration()
> +                                
> .get(KafkaSourceOptions.PARTITION_DISCOVERY_INTERVAL_MS))
> +                .isEqualTo(-1L);
> +    }
> +
>     private KafkaSourceBuilder<String> getBasicBuilder() {
>         return new KafkaSourceBuilder<String>()
>                 .setBootstrapServers("testServer")
> 

Reply via email to