This is an automated email from the ASF dual-hosted git repository.

zhouyao2023 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new bd24fa77cb [Fix][Connector-V2] Add Filter for Partitions to Prevent 
Blocking in KafkaConsumer StreamMode (#9598)
bd24fa77cb is described below

commit bd24fa77cbf1b20edbf29dd03371888ce56f0ead
Author: xiaochen <[email protected]>
AuthorDate: Mon Aug 4 19:34:48 2025 +0800

    [Fix][Connector-V2] Add Filter for Partitions to Prevent Blocking in 
KafkaConsumer StreamMode (#9598)
---
 docs/en/connector-v2/source/Kafka.md               |  19 +++
 docs/zh/connector-v2/source/Kafka.md               |  19 +++
 .../seatunnel/kafka/config/KafkaSourceOptions.java |   9 ++
 .../seatunnel/kafka/source/KafkaSourceConfig.java  |   8 ++
 .../seatunnel/kafka/source/KafkaSourceFactory.java |   7 +-
 .../kafka/source/KafkaSourceSplitEnumerator.java   |  22 ++-
 .../admin/KafkaSourceSplitEnumeratorTest.java      | 155 +++++++++++++++++----
 7 files changed, 208 insertions(+), 31 deletions(-)

diff --git a/docs/en/connector-v2/source/Kafka.md 
b/docs/en/connector-v2/source/Kafka.md
index 15b25d60d4..fcd0d21f3e 100644
--- a/docs/en/connector-v2/source/Kafka.md
+++ b/docs/en/connector-v2/source/Kafka.md
@@ -54,6 +54,7 @@ They can be downloaded via install-plugin.sh or from the 
Maven central repositor
 | start_mode.timestamp                | Long                                   
                                    | No       | -                        | The 
time required for consumption mode to be "timestamp".                           
                                                                                
                                                                                
                                                                                
              [...]
 | start_mode.end_timestamp             | Long                                  
                                     | No       | -                        | 
The end time required for consumption mode to be "timestamp" in batch mode
 | partition-discovery.interval-millis | Long                                   
                                    | No       | -1                       | The 
interval for dynamically discovering topics and partitions.                     
                                                                                
                                                                                
                                                                                
              [...]
+| ignore_no_leader_partition          | Boolean                                
                                    | No       | false                    | 
Whether to ignore partitions that have no leader. If set to true, partitions 
without a leader will be skipped during partition discovery. If set to false 
(default), the connector will include all partitions regardless of leader 
status. This is useful when dealing with Kafka clusters that may have temporary 
leadership issues.            [...]
 | common-options                      |                                        
                                    | No       | -                        | 
Source plugin common parameters, please refer to [Source Common 
Options](../source-common-options.md) for details                               
                                                                                
                                                                                
                                  [...]
 | protobuf_message_name               | String                                 
                                    | No       | -                        | 
Effective when the format is set to protobuf, specifies the Message name        
                                                                                
                                                                                
                                                                                
                  [...]
 | protobuf_schema                     | String                                 
                                    | No       | -                        | 
Effective when the format is set to protobuf, specifies the Schema definition   
                                                                                
                                                                                
                                                                                
                  [...]
@@ -375,6 +376,24 @@ source {
 }
 ```
 
+### Ignore No Leader Partition
+
+When dealing with Kafka clusters that may have temporary leadership issues, 
you can configure the connector to ignore partitions without a leader:
+
+```hocon
+source {
+  Kafka {
+    topic = "test_topic"
+    bootstrap.servers = "localhost:9092"
+    consumer.group = "test_group"
+    ignore_no_leader_partition = true
+    start_mode = "earliest"
+  }
+}
+```
+
+With `ignore_no_leader_partition = true`, the connector will skip any 
partitions that don't have a leader during partition discovery, allowing the 
job to continue processing other healthy partitions.
+
 ### format
 If you need to retain Kafka's native information, you can refer to the 
following configuration.
 
diff --git a/docs/zh/connector-v2/source/Kafka.md 
b/docs/zh/connector-v2/source/Kafka.md
index 6243898bc1..643b584cc3 100644
--- a/docs/zh/connector-v2/source/Kafka.md
+++ b/docs/zh/connector-v2/source/Kafka.md
@@ -54,6 +54,7 @@ import ChangeLog from '../changelog/connector-kafka.md';
 | start_mode.timestamp                | Long                                | 
否    | -                            | 用于 "timestamp" 消费模式的时间。                   
                                                                                
                                                                                
                                                                                
                                     |
 | start_mode.end_timestamp             | Long                                | 
否    | -                            | 用于 "timestamp" 消费模式的结束时间,只支持批模式           
                                                                                
                                                                                
                                                                                
                                  |
 | partition-discovery.interval-millis | Long                                | 
否    | -1                           | 动态发现主题和分区的间隔时间。                           
                                                                                
                                                                                
                                                                                
                                     |
+| ignore_no_leader_partition          | Boolean                             | 
否    | false                        | 是否忽略没有 leader 的分区。如果设置为 
true,在分区发现过程中将跳过没有 leader 的分区。如果设置为 false(默认值),连接器将包含所有分区,无论 leader 
状态如何。这在处理可能存在临时 leader 问题的 Kafka 集群时很有用。                                        
                                                                                
                                                          |
 | common-options                      |                                     | 
否    | -                            | 源插件的常见参数,详情请参考 [Source Common 
Options](../source-common-options.md)。                                          
                                                                                
                                                                                
                                                 |
 | protobuf_message_name               | String                              | 
否    | -                            | 当格式设置为 protobuf 时有效,指定消息名称。               
                                                                                
                                                                                
                                                                                
                                     |
 | protobuf_schema                     | String                              | 
否    | -                            | 当格式设置为 protobuf 时有效,指定 Schema 定义。         
                                                                                
                                                                                
                                                                                
                                     |
@@ -368,6 +369,24 @@ source {
 }
 ```
 
+### 忽略无 Leader 分区
+
+当处理可能存在临时 leader 问题的 Kafka 集群时,您可以配置连接器忽略没有 leader 的分区:
+
+```hocon
+source {
+  Kafka {
+    topic = "test_topic"
+    bootstrap.servers = "localhost:9092"
+    consumer.group = "test_group"
+    ignore_no_leader_partition = true
+    start_mode = "earliest"
+  }
+}
+```
+
+当 `ignore_no_leader_partition = true` 时,连接器将在分区发现过程中跳过任何没有 leader 
的分区,允许作业继续处理其他健康的分区。
+
 ### format
 如果需要保留Kafka原生的信息,可以参考如下配置。
 
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/KafkaSourceOptions.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/KafkaSourceOptions.java
index 51b80e81ac..ec1441320a 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/KafkaSourceOptions.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/KafkaSourceOptions.java
@@ -101,6 +101,15 @@ public class KafkaSourceOptions extends KafkaBaseOptions {
                     .defaultValue(10000L)
                     .withDescription("The interval for poll message");
 
+    public static final Option<Boolean> IGNORE_NO_LEADER_PARTITION =
+            Options.key("ignore_no_leader_partition")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Whether to ignore partitions that have no leader. 
"
+                                    + "If set to true, partitions without a 
leader will be skipped during partition discovery. "
+                                    + "If set to false (default), the 
connector will include all partitions regardless of leader status.");
+
     public static final Option<MessageFormatErrorHandleWay> 
MESSAGE_FORMAT_ERROR_HANDLE_WAY_OPTION =
             Options.key("format_error_handle_way")
                     .enumType(MessageFormatErrorHandleWay.class)
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java
index f601b9bcf7..d574b6d23f 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java
@@ -84,6 +84,7 @@ import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSource
 import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSourceOptions.DEBEZIUM_RECORD_TABLE_FILTER;
 import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSourceOptions.FIELD_DELIMITER;
 import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSourceOptions.FORMAT;
+import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSourceOptions.IGNORE_NO_LEADER_PARTITION;
 import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSourceOptions.KAFKA_CONFIG;
 import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSourceOptions.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS;
 import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSourceOptions.KEY_POLL_TIMEOUT;
@@ -111,6 +112,7 @@ public class KafkaSourceConfig implements Serializable {
     @Getter private final String consumerGroup;
     @Getter private final long pollTimeout;
     @Getter private final int readerCacheQueueSize;
+    @Getter private final boolean ignoreNoLeaderPartition;
 
     public KafkaSourceConfig(ReadonlyConfig readonlyConfig) {
         this.bootstrap = readonlyConfig.get(BOOTSTRAP_SERVERS);
@@ -123,6 +125,12 @@ public class KafkaSourceConfig implements Serializable {
         this.pollTimeout = readonlyConfig.get(KEY_POLL_TIMEOUT);
         this.consumerGroup = readonlyConfig.get(CONSUMER_GROUP);
         this.readerCacheQueueSize = 
readonlyConfig.get(READER_CACHE_QUEUE_SIZE);
+        this.ignoreNoLeaderPartition = 
readonlyConfig.get(IGNORE_NO_LEADER_PARTITION);
+        if (this.ignoreNoLeaderPartition && this.discoveryIntervalMillis <= 0) 
{
+            throw new IllegalArgumentException(
+                    "partition-discovery.interval-millis must be configured 
when ignore_no_leader_partition is set to true. "
+                            + "Please provide a positive value for 
partition-discovery.interval-millis.");
+        }
     }
 
     private Properties createKafkaProperties(ReadonlyConfig readonlyConfig) {
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceFactory.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceFactory.java
index 14bfbba415..568622ce37 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceFactory.java
@@ -58,11 +58,16 @@ public class KafkaSourceFactory implements 
TableSourceFactory {
                         KafkaSourceOptions.DEBEZIUM_RECORD_INCLUDE_SCHEMA,
                         KafkaSourceOptions.DEBEZIUM_RECORD_TABLE_FILTER,
                         
KafkaSourceOptions.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS,
-                        KafkaSourceOptions.READER_CACHE_QUEUE_SIZE)
+                        KafkaSourceOptions.READER_CACHE_QUEUE_SIZE,
+                        KafkaSourceOptions.IGNORE_NO_LEADER_PARTITION)
                 .conditional(
                         KafkaSourceOptions.START_MODE,
                         StartMode.TIMESTAMP,
                         KafkaSourceOptions.START_MODE_TIMESTAMP)
+                .conditional(
+                        KafkaSourceOptions.IGNORE_NO_LEADER_PARTITION,
+                        Boolean.TRUE,
+                        
KafkaSourceOptions.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS)
                 .conditional(
                         KafkaSourceOptions.START_MODE,
                         StartMode.SPECIFIC_OFFSETS,
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java
index e297cad40c..024f6416a8 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java
@@ -92,13 +92,14 @@ public class KafkaSourceSplitEnumerator
     @VisibleForTesting
     public KafkaSourceSplitEnumerator(
             AdminClient adminClient,
+            KafkaSourceConfig kafkaSourceConfig,
             Map<TopicPartition, KafkaSourceSplit> pendingSplit,
             Map<TopicPartition, KafkaSourceSplit> assignedSplit) {
         this.tablePathMetadataMap = new HashMap<>();
         this.context = null;
         this.discoveryIntervalMillis = -1;
         this.adminClient = adminClient;
-        this.kafkaSourceConfig = null;
+        this.kafkaSourceConfig = kafkaSourceConfig;
         this.pendingSplit = pendingSplit;
         this.assignedSplit = assignedSplit;
     }
@@ -109,7 +110,7 @@ public class KafkaSourceSplitEnumerator
             Map<TopicPartition, KafkaSourceSplit> pendingSplit,
             Map<TopicPartition, KafkaSourceSplit> assignedSplit,
             boolean isStreamingMode) {
-        this(adminClient, pendingSplit, assignedSplit);
+        this(adminClient, null, pendingSplit, assignedSplit);
         this.isStreamingMode = isStreamingMode;
     }
 
@@ -333,10 +334,25 @@ public class KafkaSourceSplitEnumerator
         }
         log.info("Discovered topics: {}", topics);
         Collection<TopicPartition> partitions =
-                
adminClient.describeTopics(topics).all().get().values().stream()
+                
adminClient.describeTopics(topics).allTopicNames().get().values().stream()
                         .flatMap(
                                 t ->
                                         t.partitions().stream()
+                                                .filter(
+                                                        partitionInfo -> {
+                                                            if 
(kafkaSourceConfig != null
+                                                                    && 
kafkaSourceConfig
+                                                                            
.isIgnoreNoLeaderPartition()
+                                                                    && 
partitionInfo.leader()
+                                                                            == 
null) {
+                                                                log.warn(
+                                                                        
"Partition {} of topic {} has no leader, skipping due to 
ignore_no_leader_partition=true.",
+                                                                        
partitionInfo.partition(),
+                                                                        
t.name());
+                                                                return false;
+                                                            }
+                                                            return true;
+                                                        })
                                                 .map(
                                                         p ->
                                                                 new 
TopicPartition(
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/test/java/org/apache/kafka/clients/admin/KafkaSourceSplitEnumeratorTest.java
 
b/seatunnel-connectors-v2/connector-kafka/src/test/java/org/apache/kafka/clients/admin/KafkaSourceSplitEnumeratorTest.java
index 00e059ecfe..a9f38f613d 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/test/java/org/apache/kafka/clients/admin/KafkaSourceSplitEnumeratorTest.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/test/java/org/apache/kafka/clients/admin/KafkaSourceSplitEnumeratorTest.java
@@ -17,10 +17,15 @@
 
 package org.apache.kafka.clients.admin;
 
+import org.apache.seatunnel.shade.com.google.common.collect.Lists;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.kafka.source.KafkaSourceConfig;
 import org.apache.seatunnel.connectors.seatunnel.kafka.source.KafkaSourceSplit;
 import 
org.apache.seatunnel.connectors.seatunnel.kafka.source.KafkaSourceSplitEnumerator;
 
 import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.TopicPartitionInfo;
 
@@ -40,8 +45,10 @@ import java.util.concurrent.ExecutionException;
 class KafkaSourceSplitEnumeratorTest {
 
     AdminClient adminClient = Mockito.mock(KafkaAdminClient.class);
+    KafkaSourceConfig kafkaSourceConfig = 
Mockito.mock(KafkaSourceConfig.class);
     // prepare
-    TopicPartition partition = new TopicPartition("test", 0);
+    TopicPartition partition0 = new TopicPartition("test", 0);
+    TopicPartition partition2 = new TopicPartition("test", 2);
 
     @BeforeEach
     void init() {
@@ -54,31 +61,42 @@ class KafkaSourceSplitEnumeratorTest {
                                         
KafkaFuture<ListOffsetsResult.ListOffsetsResultInfo>>() {
                                     {
                                         put(
-                                                partition,
+                                                partition0,
+                                                KafkaFuture.completedFuture(
+                                                        new 
ListOffsetsResult.ListOffsetsResultInfo(
+                                                                0, 0, 
Optional.of(0))));
+                                        put(
+                                                partition2,
                                                 KafkaFuture.completedFuture(
                                                         new 
ListOffsetsResult.ListOffsetsResultInfo(
                                                                 0, 0, 
Optional.of(0))));
                                     }
                                 }));
+
+        List<TopicPartitionInfo> mockTopicPartition = Lists.newArrayList();
+        TopicPartitionInfo topicPartitionWithLeader =
+                new TopicPartitionInfo(
+                        0,
+                        new Node(1, "127.0.0.1", 9092),
+                        Collections.emptyList(),
+                        Collections.emptyList());
+        TopicPartitionInfo topicPartitionInfoNoLeader =
+                new TopicPartitionInfo(2, null, Collections.emptyList(), 
Collections.emptyList());
+        mockTopicPartition.add(topicPartitionWithLeader);
+        mockTopicPartition.add(topicPartitionInfoNoLeader);
+
         
Mockito.when(adminClient.describeTopics(Mockito.any(java.util.Collection.class)))
                 .thenReturn(
                         DescribeTopicsResult.ofTopicNames(
                                 new HashMap<String, 
KafkaFuture<TopicDescription>>() {
                                     {
                                         put(
-                                                partition.topic(),
+                                                partition0.topic(),
                                                 KafkaFuture.completedFuture(
                                                         new TopicDescription(
-                                                                
partition.topic(),
+                                                                
partition0.topic(),
                                                                 false,
-                                                                
Collections.singletonList(
-                                                                        new 
TopicPartitionInfo(
-                                                                               
 0,
-                                                                               
 null,
-                                                                               
 Collections
-                                                                               
         .emptyList(),
-                                                                               
 Collections
-                                                                               
         .emptyList())))));
+                                                                
mockTopicPartition)));
                                     }
                                 }));
     }
@@ -89,17 +107,17 @@ class KafkaSourceSplitEnumeratorTest {
         Map<TopicPartition, KafkaSourceSplit> assignedSplit =
                 new HashMap<TopicPartition, KafkaSourceSplit>() {
                     {
-                        put(partition, new KafkaSourceSplit(null, partition));
+                        put(partition0, new KafkaSourceSplit(null, 
partition0));
                     }
                 };
         Map<TopicPartition, KafkaSourceSplit> pendingSplit = new HashMap<>();
-        List<KafkaSourceSplit> splits = Arrays.asList(new 
KafkaSourceSplit(null, partition));
+        List<KafkaSourceSplit> splits = Arrays.asList(new 
KafkaSourceSplit(null, partition0));
         KafkaSourceSplitEnumerator enumerator =
-                new KafkaSourceSplitEnumerator(adminClient, pendingSplit, 
assignedSplit);
+                new KafkaSourceSplitEnumerator(adminClient, null, 
pendingSplit, assignedSplit);
         enumerator.addSplitsBack(splits, 1);
         Assertions.assertTrue(pendingSplit.size() == splits.size());
-        Assertions.assertNull(assignedSplit.get(partition));
-        Assertions.assertTrue(pendingSplit.get(partition).getEndOffset() == 0);
+        Assertions.assertNull(assignedSplit.get(partition0));
+        Assertions.assertTrue(pendingSplit.get(partition0).getEndOffset() == 
0);
     }
 
     @Test
@@ -108,18 +126,18 @@ class KafkaSourceSplitEnumeratorTest {
         Map<TopicPartition, KafkaSourceSplit> assignedSplit =
                 new HashMap<TopicPartition, KafkaSourceSplit>() {
                     {
-                        put(partition, new KafkaSourceSplit(null, partition));
+                        put(partition0, new KafkaSourceSplit(null, 
partition0));
                     }
                 };
         Map<TopicPartition, KafkaSourceSplit> pendingSplit = new HashMap<>();
         List<KafkaSourceSplit> splits =
-                Collections.singletonList(new KafkaSourceSplit(null, 
partition));
+                Collections.singletonList(new KafkaSourceSplit(null, 
partition0));
         KafkaSourceSplitEnumerator enumerator =
                 new KafkaSourceSplitEnumerator(adminClient, pendingSplit, 
assignedSplit, true);
         enumerator.addSplitsBack(splits, 1);
         Assertions.assertEquals(pendingSplit.size(), splits.size());
-        Assertions.assertNull(assignedSplit.get(partition));
-        Assertions.assertTrue(pendingSplit.get(partition).getEndOffset() == 
Long.MAX_VALUE);
+        Assertions.assertNull(assignedSplit.get(partition0));
+        Assertions.assertTrue(pendingSplit.get(partition0).getEndOffset() == 
Long.MAX_VALUE);
     }
 
     @Test
@@ -128,14 +146,17 @@ class KafkaSourceSplitEnumeratorTest {
         Map<TopicPartition, KafkaSourceSplit> assignedSplit =
                 new HashMap<TopicPartition, KafkaSourceSplit>();
         Map<TopicPartition, KafkaSourceSplit> pendingSplit = new HashMap<>();
+
         List<KafkaSourceSplit> splits =
-                Collections.singletonList(new KafkaSourceSplit(null, 
partition));
+                Arrays.asList(
+                        new KafkaSourceSplit(null, partition0),
+                        new KafkaSourceSplit(null, partition2));
         KafkaSourceSplitEnumerator enumerator =
                 new KafkaSourceSplitEnumerator(adminClient, pendingSplit, 
assignedSplit, true);
         enumerator.fetchPendingPartitionSplit();
         Assertions.assertEquals(pendingSplit.size(), splits.size());
-        Assertions.assertNotNull(pendingSplit.get(partition));
-        Assertions.assertTrue(pendingSplit.get(partition).getEndOffset() == 
Long.MAX_VALUE);
+        Assertions.assertNotNull(pendingSplit.get(partition0));
+        Assertions.assertTrue(pendingSplit.get(partition0).getEndOffset() == 
Long.MAX_VALUE);
     }
 
     @Test
@@ -145,12 +166,92 @@ class KafkaSourceSplitEnumeratorTest {
                 new HashMap<TopicPartition, KafkaSourceSplit>();
         Map<TopicPartition, KafkaSourceSplit> pendingSplit = new HashMap<>();
         List<KafkaSourceSplit> splits =
-                Collections.singletonList(new KafkaSourceSplit(null, 
partition));
+                Arrays.asList(
+                        new KafkaSourceSplit(null, partition0),
+                        new KafkaSourceSplit(null, partition2));
+
         KafkaSourceSplitEnumerator enumerator =
                 new KafkaSourceSplitEnumerator(adminClient, pendingSplit, 
assignedSplit, false);
         enumerator.fetchPendingPartitionSplit();
         Assertions.assertEquals(pendingSplit.size(), splits.size());
-        Assertions.assertNotNull(pendingSplit.get(partition));
-        Assertions.assertTrue(pendingSplit.get(partition).getEndOffset() == 0);
+        Assertions.assertNotNull(pendingSplit.get(partition0));
+        Assertions.assertTrue(pendingSplit.get(partition0).getEndOffset() == 
0);
+    }
+
+    @Test
+    void testIgnoreNoLeaderPartition() throws ExecutionException, 
InterruptedException {
+
+        Map<TopicPartition, KafkaSourceSplit> assignedSplit = new HashMap<>();
+        Map<TopicPartition, KafkaSourceSplit> pendingSplit = new HashMap<>();
+
+        Map<String, Object> configMap = new HashMap<>();
+        configMap.put("group.id", "test");
+        configMap.put("topic", "test");
+        configMap.put("ignore_no_leader_partition", "false");
+        KafkaSourceConfig sourceConfig = new 
KafkaSourceConfig(ReadonlyConfig.fromMap(configMap));
+        KafkaSourceSplitEnumerator enumerator =
+                new KafkaSourceSplitEnumerator(
+                        adminClient, sourceConfig, pendingSplit, 
assignedSplit);
+        enumerator.fetchPendingPartitionSplit();
+
+        Assertions.assertEquals(2, pendingSplit.size());
+        Assertions.assertNotNull(pendingSplit.get(partition0));
+        Assertions.assertNotNull(pendingSplit.get(partition2));
+
+        pendingSplit.clear();
+        assignedSplit.clear();
+
+        configMap.put("ignore_no_leader_partition", "true");
+        configMap.put("partition-discovery.interval-millis", 5000L);
+        sourceConfig = new 
KafkaSourceConfig(ReadonlyConfig.fromMap(configMap));
+        enumerator =
+                new KafkaSourceSplitEnumerator(
+                        adminClient, sourceConfig, pendingSplit, 
assignedSplit);
+        enumerator.fetchPendingPartitionSplit();
+        Assertions.assertEquals(1, pendingSplit.size());
+        Assertions.assertNotNull(pendingSplit.get(partition0));
+        Assertions.assertNull(pendingSplit.get(partition2));
+
+        // Test partition restoration: simulate partition2 getting a leader
+        // Create new mock topic partition list with partition2 now having a 
leader
+        List<TopicPartitionInfo> restoredMockTopicPartition = 
Lists.newArrayList();
+        TopicPartitionInfo topicPartitionWithLeader =
+                new TopicPartitionInfo(
+                        0,
+                        new Node(1, "127.0.0.1", 9092),
+                        Collections.emptyList(),
+                        Collections.emptyList());
+        TopicPartitionInfo restoredTopicPartitionWithLeader =
+                new TopicPartitionInfo(
+                        2,
+                        new Node(2, "127.0.0.1", 9093), // partition2 now has 
a leader
+                        Collections.emptyList(),
+                        Collections.emptyList());
+        restoredMockTopicPartition.add(topicPartitionWithLeader);
+        restoredMockTopicPartition.add(restoredTopicPartitionWithLeader);
+
+        // Update the mock to return the restored partition information
+        
Mockito.when(adminClient.describeTopics(Mockito.any(java.util.Collection.class)))
+                .thenReturn(
+                        DescribeTopicsResult.ofTopicNames(
+                                new HashMap<String, 
KafkaFuture<TopicDescription>>() {
+                                    {
+                                        put(
+                                                partition0.topic(),
+                                                KafkaFuture.completedFuture(
+                                                        new TopicDescription(
+                                                                
partition0.topic(),
+                                                                false,
+                                                                
restoredMockTopicPartition)));
+                                    }
+                                }));
+
+        // Test that dynamic partition discovery detects the restored partition
+        enumerator.fetchPendingPartitionSplit();
+
+        // After partition restoration, both partitions should be available
+        Assertions.assertEquals(2, pendingSplit.size());
+        Assertions.assertNotNull(pendingSplit.get(partition0));
+        Assertions.assertNotNull(pendingSplit.get(partition2));
     }
 }

Reply via email to