This is an automated email from the ASF dual-hosted git repository.
zhouyao2023 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new bd24fa77cb [Fix][Connector-V2] Add Filter for Partitions to Prevent
Blocking in KafkaConsumer StreamMode (#9598)
bd24fa77cb is described below
commit bd24fa77cbf1b20edbf29dd03371888ce56f0ead
Author: xiaochen <[email protected]>
AuthorDate: Mon Aug 4 19:34:48 2025 +0800
[Fix][Connector-V2] Add Filter for Partitions to Prevent Blocking in
KafkaConsumer StreamMode (#9598)
---
docs/en/connector-v2/source/Kafka.md | 19 +++
docs/zh/connector-v2/source/Kafka.md | 19 +++
.../seatunnel/kafka/config/KafkaSourceOptions.java | 9 ++
.../seatunnel/kafka/source/KafkaSourceConfig.java | 8 ++
.../seatunnel/kafka/source/KafkaSourceFactory.java | 7 +-
.../kafka/source/KafkaSourceSplitEnumerator.java | 22 ++-
.../admin/KafkaSourceSplitEnumeratorTest.java | 155 +++++++++++++++++----
7 files changed, 208 insertions(+), 31 deletions(-)
diff --git a/docs/en/connector-v2/source/Kafka.md
b/docs/en/connector-v2/source/Kafka.md
index 15b25d60d4..fcd0d21f3e 100644
--- a/docs/en/connector-v2/source/Kafka.md
+++ b/docs/en/connector-v2/source/Kafka.md
@@ -54,6 +54,7 @@ They can be downloaded via install-plugin.sh or from the
Maven central repositor
| start_mode.timestamp | Long
| No | - | The
time required for consumption mode to be "timestamp".
[...]
| start_mode.end_timestamp | Long
| No | - |
The end time required for consumption mode to be "timestamp" in batch mode
| partition-discovery.interval-millis | Long
| No | -1 | The
interval for dynamically discovering topics and partitions.
[...]
+| ignore_no_leader_partition | Boolean
| No | false |
Whether to ignore partitions that have no leader. If set to true, partitions
without a leader will be skipped during partition discovery. If set to false
(default), the connector will include all partitions regardless of leader
status. This is useful when dealing with Kafka clusters that may have temporary
leadership issues. [...]
| common-options |
| No | - |
Source plugin common parameters, please refer to [Source Common
Options](../source-common-options.md) for details
[...]
| protobuf_message_name | String
| No | - |
Effective when the format is set to protobuf, specifies the Message name
[...]
| protobuf_schema | String
| No | - |
Effective when the format is set to protobuf, specifies the Schema definition
[...]
@@ -375,6 +376,24 @@ source {
}
```
+### Ignore No Leader Partition
+
+When dealing with Kafka clusters that may have temporary leadership issues,
you can configure the connector to ignore partitions without a leader:
+
+```hocon
+source {
+ Kafka {
+ topic = "test_topic"
+ bootstrap.servers = "localhost:9092"
+ consumer.group = "test_group"
+ ignore_no_leader_partition = true
+ start_mode = "earliest"
+ }
+}
+```
+
+With `ignore_no_leader_partition = true`, the connector will skip any
partitions that don't have a leader during partition discovery, allowing the
job to continue processing other healthy partitions.
+
### format
If you need to retain Kafka's native information, you can refer to the
following configuration.
diff --git a/docs/zh/connector-v2/source/Kafka.md
b/docs/zh/connector-v2/source/Kafka.md
index 6243898bc1..643b584cc3 100644
--- a/docs/zh/connector-v2/source/Kafka.md
+++ b/docs/zh/connector-v2/source/Kafka.md
@@ -54,6 +54,7 @@ import ChangeLog from '../changelog/connector-kafka.md';
| start_mode.timestamp | Long |
否 | - | 用于 "timestamp" 消费模式的时间。
|
| start_mode.end_timestamp | Long |
否 | - | 用于 "timestamp" 消费模式的结束时间,只支持批模式
|
| partition-discovery.interval-millis | Long |
否 | -1 | 动态发现主题和分区的间隔时间。
|
+| ignore_no_leader_partition | Boolean |
否 | false | 是否忽略没有 leader 的分区。如果设置为
true,在分区发现过程中将跳过没有 leader 的分区。如果设置为 false(默认值),连接器将包含所有分区,无论 leader
状态如何。这在处理可能存在临时 leader 问题的 Kafka 集群时很有用。
|
| common-options | |
否 | - | 源插件的常见参数,详情请参考 [Source Common
Options](../source-common-options.md)。
|
| protobuf_message_name | String |
否 | - | 当格式设置为 protobuf 时有效,指定消息名称。
|
| protobuf_schema | String |
否 | - | 当格式设置为 protobuf 时有效,指定 Schema 定义。
|
@@ -368,6 +369,24 @@ source {
}
```
+### 忽略无 Leader 分区
+
+当处理可能存在临时 leader 问题的 Kafka 集群时,您可以配置连接器忽略没有 leader 的分区:
+
+```hocon
+source {
+ Kafka {
+ topic = "test_topic"
+ bootstrap.servers = "localhost:9092"
+ consumer.group = "test_group"
+ ignore_no_leader_partition = true
+ start_mode = "earliest"
+ }
+}
+```
+
+当 `ignore_no_leader_partition = true` 时,连接器将在分区发现过程中跳过任何没有 leader
的分区,允许作业继续处理其他健康的分区。
+
### format
如果需要保留Kafka原生的信息,可以参考如下配置。
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/KafkaSourceOptions.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/KafkaSourceOptions.java
index 51b80e81ac..ec1441320a 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/KafkaSourceOptions.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/KafkaSourceOptions.java
@@ -101,6 +101,15 @@ public class KafkaSourceOptions extends KafkaBaseOptions {
.defaultValue(10000L)
.withDescription("The interval for poll message");
+ public static final Option<Boolean> IGNORE_NO_LEADER_PARTITION =
+ Options.key("ignore_no_leader_partition")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "Whether to ignore partitions that have no leader.
"
+ + "If set to true, partitions without a
leader will be skipped during partition discovery. "
+ + "If set to false (default), the
connector will include all partitions regardless of leader status.");
+
public static final Option<MessageFormatErrorHandleWay>
MESSAGE_FORMAT_ERROR_HANDLE_WAY_OPTION =
Options.key("format_error_handle_way")
.enumType(MessageFormatErrorHandleWay.class)
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java
index f601b9bcf7..d574b6d23f 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java
@@ -84,6 +84,7 @@ import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSource
import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSourceOptions.DEBEZIUM_RECORD_TABLE_FILTER;
import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSourceOptions.FIELD_DELIMITER;
import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSourceOptions.FORMAT;
+import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSourceOptions.IGNORE_NO_LEADER_PARTITION;
import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSourceOptions.KAFKA_CONFIG;
import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSourceOptions.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS;
import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSourceOptions.KEY_POLL_TIMEOUT;
@@ -111,6 +112,7 @@ public class KafkaSourceConfig implements Serializable {
@Getter private final String consumerGroup;
@Getter private final long pollTimeout;
@Getter private final int readerCacheQueueSize;
+ @Getter private final boolean ignoreNoLeaderPartition;
public KafkaSourceConfig(ReadonlyConfig readonlyConfig) {
this.bootstrap = readonlyConfig.get(BOOTSTRAP_SERVERS);
@@ -123,6 +125,12 @@ public class KafkaSourceConfig implements Serializable {
this.pollTimeout = readonlyConfig.get(KEY_POLL_TIMEOUT);
this.consumerGroup = readonlyConfig.get(CONSUMER_GROUP);
this.readerCacheQueueSize =
readonlyConfig.get(READER_CACHE_QUEUE_SIZE);
+ this.ignoreNoLeaderPartition =
readonlyConfig.get(IGNORE_NO_LEADER_PARTITION);
+ if (this.ignoreNoLeaderPartition && this.discoveryIntervalMillis <= 0)
{
+ throw new IllegalArgumentException(
+ "partition-discovery.interval-millis must be configured
when ignore_no_leader_partition is set to true. "
+ + "Please provide a positive value for
partition-discovery.interval-millis.");
+ }
}
private Properties createKafkaProperties(ReadonlyConfig readonlyConfig) {
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceFactory.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceFactory.java
index 14bfbba415..568622ce37 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceFactory.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceFactory.java
@@ -58,11 +58,16 @@ public class KafkaSourceFactory implements
TableSourceFactory {
KafkaSourceOptions.DEBEZIUM_RECORD_INCLUDE_SCHEMA,
KafkaSourceOptions.DEBEZIUM_RECORD_TABLE_FILTER,
KafkaSourceOptions.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS,
- KafkaSourceOptions.READER_CACHE_QUEUE_SIZE)
+ KafkaSourceOptions.READER_CACHE_QUEUE_SIZE,
+ KafkaSourceOptions.IGNORE_NO_LEADER_PARTITION)
.conditional(
KafkaSourceOptions.START_MODE,
StartMode.TIMESTAMP,
KafkaSourceOptions.START_MODE_TIMESTAMP)
+ .conditional(
+ KafkaSourceOptions.IGNORE_NO_LEADER_PARTITION,
+ Boolean.TRUE,
+
KafkaSourceOptions.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS)
.conditional(
KafkaSourceOptions.START_MODE,
StartMode.SPECIFIC_OFFSETS,
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java
index e297cad40c..024f6416a8 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java
@@ -92,13 +92,14 @@ public class KafkaSourceSplitEnumerator
@VisibleForTesting
public KafkaSourceSplitEnumerator(
AdminClient adminClient,
+ KafkaSourceConfig kafkaSourceConfig,
Map<TopicPartition, KafkaSourceSplit> pendingSplit,
Map<TopicPartition, KafkaSourceSplit> assignedSplit) {
this.tablePathMetadataMap = new HashMap<>();
this.context = null;
this.discoveryIntervalMillis = -1;
this.adminClient = adminClient;
- this.kafkaSourceConfig = null;
+ this.kafkaSourceConfig = kafkaSourceConfig;
this.pendingSplit = pendingSplit;
this.assignedSplit = assignedSplit;
}
@@ -109,7 +110,7 @@ public class KafkaSourceSplitEnumerator
Map<TopicPartition, KafkaSourceSplit> pendingSplit,
Map<TopicPartition, KafkaSourceSplit> assignedSplit,
boolean isStreamingMode) {
- this(adminClient, pendingSplit, assignedSplit);
+ this(adminClient, null, pendingSplit, assignedSplit);
this.isStreamingMode = isStreamingMode;
}
@@ -333,10 +334,25 @@ public class KafkaSourceSplitEnumerator
}
log.info("Discovered topics: {}", topics);
Collection<TopicPartition> partitions =
-
adminClient.describeTopics(topics).all().get().values().stream()
+
adminClient.describeTopics(topics).allTopicNames().get().values().stream()
.flatMap(
t ->
t.partitions().stream()
+ .filter(
+ partitionInfo -> {
+ if
(kafkaSourceConfig != null
+ &&
kafkaSourceConfig
+
.isIgnoreNoLeaderPartition()
+ &&
partitionInfo.leader()
+ ==
null) {
+ log.warn(
+
"Partition {} of topic {} has no leader, skipping due to
ignore_no_leader_partition=true.",
+
partitionInfo.partition(),
+
t.name());
+ return false;
+ }
+ return true;
+ })
.map(
p ->
new
TopicPartition(
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/test/java/org/apache/kafka/clients/admin/KafkaSourceSplitEnumeratorTest.java
b/seatunnel-connectors-v2/connector-kafka/src/test/java/org/apache/kafka/clients/admin/KafkaSourceSplitEnumeratorTest.java
index 00e059ecfe..a9f38f613d 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/test/java/org/apache/kafka/clients/admin/KafkaSourceSplitEnumeratorTest.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/test/java/org/apache/kafka/clients/admin/KafkaSourceSplitEnumeratorTest.java
@@ -17,10 +17,15 @@
package org.apache.kafka.clients.admin;
+import org.apache.seatunnel.shade.com.google.common.collect.Lists;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import
org.apache.seatunnel.connectors.seatunnel.kafka.source.KafkaSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.kafka.source.KafkaSourceSplit;
import
org.apache.seatunnel.connectors.seatunnel.kafka.source.KafkaSourceSplitEnumerator;
import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionInfo;
@@ -40,8 +45,10 @@ import java.util.concurrent.ExecutionException;
class KafkaSourceSplitEnumeratorTest {
AdminClient adminClient = Mockito.mock(KafkaAdminClient.class);
+ KafkaSourceConfig kafkaSourceConfig =
Mockito.mock(KafkaSourceConfig.class);
// prepare
- TopicPartition partition = new TopicPartition("test", 0);
+ TopicPartition partition0 = new TopicPartition("test", 0);
+ TopicPartition partition2 = new TopicPartition("test", 2);
@BeforeEach
void init() {
@@ -54,31 +61,42 @@ class KafkaSourceSplitEnumeratorTest {
KafkaFuture<ListOffsetsResult.ListOffsetsResultInfo>>() {
{
put(
- partition,
+ partition0,
+ KafkaFuture.completedFuture(
+ new
ListOffsetsResult.ListOffsetsResultInfo(
+ 0, 0,
Optional.of(0))));
+ put(
+ partition2,
KafkaFuture.completedFuture(
new
ListOffsetsResult.ListOffsetsResultInfo(
0, 0,
Optional.of(0))));
}
}));
+
+ List<TopicPartitionInfo> mockTopicPartition = Lists.newArrayList();
+ TopicPartitionInfo topicPartitionWithLeader =
+ new TopicPartitionInfo(
+ 0,
+ new Node(1, "127.0.0.1", 9092),
+ Collections.emptyList(),
+ Collections.emptyList());
+ TopicPartitionInfo topicPartitionInfoNoLeader =
+ new TopicPartitionInfo(2, null, Collections.emptyList(),
Collections.emptyList());
+ mockTopicPartition.add(topicPartitionWithLeader);
+ mockTopicPartition.add(topicPartitionInfoNoLeader);
+
Mockito.when(adminClient.describeTopics(Mockito.any(java.util.Collection.class)))
.thenReturn(
DescribeTopicsResult.ofTopicNames(
new HashMap<String,
KafkaFuture<TopicDescription>>() {
{
put(
- partition.topic(),
+ partition0.topic(),
KafkaFuture.completedFuture(
new TopicDescription(
-
partition.topic(),
+
partition0.topic(),
false,
-
Collections.singletonList(
- new
TopicPartitionInfo(
-
0,
-
null,
-
Collections
-
.emptyList(),
-
Collections
-
.emptyList())))));
+
mockTopicPartition)));
}
}));
}
@@ -89,17 +107,17 @@ class KafkaSourceSplitEnumeratorTest {
Map<TopicPartition, KafkaSourceSplit> assignedSplit =
new HashMap<TopicPartition, KafkaSourceSplit>() {
{
- put(partition, new KafkaSourceSplit(null, partition));
+ put(partition0, new KafkaSourceSplit(null,
partition0));
}
};
Map<TopicPartition, KafkaSourceSplit> pendingSplit = new HashMap<>();
- List<KafkaSourceSplit> splits = Arrays.asList(new
KafkaSourceSplit(null, partition));
+ List<KafkaSourceSplit> splits = Arrays.asList(new
KafkaSourceSplit(null, partition0));
KafkaSourceSplitEnumerator enumerator =
- new KafkaSourceSplitEnumerator(adminClient, pendingSplit,
assignedSplit);
+ new KafkaSourceSplitEnumerator(adminClient, null,
pendingSplit, assignedSplit);
enumerator.addSplitsBack(splits, 1);
Assertions.assertTrue(pendingSplit.size() == splits.size());
- Assertions.assertNull(assignedSplit.get(partition));
- Assertions.assertTrue(pendingSplit.get(partition).getEndOffset() == 0);
+ Assertions.assertNull(assignedSplit.get(partition0));
+ Assertions.assertTrue(pendingSplit.get(partition0).getEndOffset() ==
0);
}
@Test
@@ -108,18 +126,18 @@ class KafkaSourceSplitEnumeratorTest {
Map<TopicPartition, KafkaSourceSplit> assignedSplit =
new HashMap<TopicPartition, KafkaSourceSplit>() {
{
- put(partition, new KafkaSourceSplit(null, partition));
+ put(partition0, new KafkaSourceSplit(null,
partition0));
}
};
Map<TopicPartition, KafkaSourceSplit> pendingSplit = new HashMap<>();
List<KafkaSourceSplit> splits =
- Collections.singletonList(new KafkaSourceSplit(null,
partition));
+ Collections.singletonList(new KafkaSourceSplit(null,
partition0));
KafkaSourceSplitEnumerator enumerator =
new KafkaSourceSplitEnumerator(adminClient, pendingSplit,
assignedSplit, true);
enumerator.addSplitsBack(splits, 1);
Assertions.assertEquals(pendingSplit.size(), splits.size());
- Assertions.assertNull(assignedSplit.get(partition));
- Assertions.assertTrue(pendingSplit.get(partition).getEndOffset() ==
Long.MAX_VALUE);
+ Assertions.assertNull(assignedSplit.get(partition0));
+ Assertions.assertTrue(pendingSplit.get(partition0).getEndOffset() ==
Long.MAX_VALUE);
}
@Test
@@ -128,14 +146,17 @@ class KafkaSourceSplitEnumeratorTest {
Map<TopicPartition, KafkaSourceSplit> assignedSplit =
new HashMap<TopicPartition, KafkaSourceSplit>();
Map<TopicPartition, KafkaSourceSplit> pendingSplit = new HashMap<>();
+
List<KafkaSourceSplit> splits =
- Collections.singletonList(new KafkaSourceSplit(null,
partition));
+ Arrays.asList(
+ new KafkaSourceSplit(null, partition0),
+ new KafkaSourceSplit(null, partition2));
KafkaSourceSplitEnumerator enumerator =
new KafkaSourceSplitEnumerator(adminClient, pendingSplit,
assignedSplit, true);
enumerator.fetchPendingPartitionSplit();
Assertions.assertEquals(pendingSplit.size(), splits.size());
- Assertions.assertNotNull(pendingSplit.get(partition));
- Assertions.assertTrue(pendingSplit.get(partition).getEndOffset() ==
Long.MAX_VALUE);
+ Assertions.assertNotNull(pendingSplit.get(partition0));
+ Assertions.assertTrue(pendingSplit.get(partition0).getEndOffset() ==
Long.MAX_VALUE);
}
@Test
@@ -145,12 +166,92 @@ class KafkaSourceSplitEnumeratorTest {
new HashMap<TopicPartition, KafkaSourceSplit>();
Map<TopicPartition, KafkaSourceSplit> pendingSplit = new HashMap<>();
List<KafkaSourceSplit> splits =
- Collections.singletonList(new KafkaSourceSplit(null,
partition));
+ Arrays.asList(
+ new KafkaSourceSplit(null, partition0),
+ new KafkaSourceSplit(null, partition2));
+
KafkaSourceSplitEnumerator enumerator =
new KafkaSourceSplitEnumerator(adminClient, pendingSplit,
assignedSplit, false);
enumerator.fetchPendingPartitionSplit();
Assertions.assertEquals(pendingSplit.size(), splits.size());
- Assertions.assertNotNull(pendingSplit.get(partition));
- Assertions.assertTrue(pendingSplit.get(partition).getEndOffset() == 0);
+ Assertions.assertNotNull(pendingSplit.get(partition0));
+ Assertions.assertTrue(pendingSplit.get(partition0).getEndOffset() ==
0);
+ }
+
+ @Test
+ void testIgnoreNoLeaderPartition() throws ExecutionException,
InterruptedException {
+
+ Map<TopicPartition, KafkaSourceSplit> assignedSplit = new HashMap<>();
+ Map<TopicPartition, KafkaSourceSplit> pendingSplit = new HashMap<>();
+
+ Map<String, Object> configMap = new HashMap<>();
+ configMap.put("group.id", "test");
+ configMap.put("topic", "test");
+ configMap.put("ignore_no_leader_partition", "false");
+ KafkaSourceConfig sourceConfig = new
KafkaSourceConfig(ReadonlyConfig.fromMap(configMap));
+ KafkaSourceSplitEnumerator enumerator =
+ new KafkaSourceSplitEnumerator(
+ adminClient, sourceConfig, pendingSplit,
assignedSplit);
+ enumerator.fetchPendingPartitionSplit();
+
+ Assertions.assertEquals(2, pendingSplit.size());
+ Assertions.assertNotNull(pendingSplit.get(partition0));
+ Assertions.assertNotNull(pendingSplit.get(partition2));
+
+ pendingSplit.clear();
+ assignedSplit.clear();
+
+ configMap.put("ignore_no_leader_partition", "true");
+ configMap.put("partition-discovery.interval-millis", 5000L);
+ sourceConfig = new
KafkaSourceConfig(ReadonlyConfig.fromMap(configMap));
+ enumerator =
+ new KafkaSourceSplitEnumerator(
+ adminClient, sourceConfig, pendingSplit,
assignedSplit);
+ enumerator.fetchPendingPartitionSplit();
+ Assertions.assertEquals(1, pendingSplit.size());
+ Assertions.assertNotNull(pendingSplit.get(partition0));
+ Assertions.assertNull(pendingSplit.get(partition2));
+
+ // Test partition restoration: simulate partition2 getting a leader
+ // Create new mock topic partition list with partition2 now having a
leader
+ List<TopicPartitionInfo> restoredMockTopicPartition =
Lists.newArrayList();
+ TopicPartitionInfo topicPartitionWithLeader =
+ new TopicPartitionInfo(
+ 0,
+ new Node(1, "127.0.0.1", 9092),
+ Collections.emptyList(),
+ Collections.emptyList());
+ TopicPartitionInfo restoredTopicPartitionWithLeader =
+ new TopicPartitionInfo(
+ 2,
+ new Node(2, "127.0.0.1", 9093), // partition2 now has
a leader
+ Collections.emptyList(),
+ Collections.emptyList());
+ restoredMockTopicPartition.add(topicPartitionWithLeader);
+ restoredMockTopicPartition.add(restoredTopicPartitionWithLeader);
+
+ // Update the mock to return the restored partition information
+
Mockito.when(adminClient.describeTopics(Mockito.any(java.util.Collection.class)))
+ .thenReturn(
+ DescribeTopicsResult.ofTopicNames(
+ new HashMap<String,
KafkaFuture<TopicDescription>>() {
+ {
+ put(
+ partition0.topic(),
+ KafkaFuture.completedFuture(
+ new TopicDescription(
+
partition0.topic(),
+ false,
+
restoredMockTopicPartition)));
+ }
+ }));
+
+ // Test that dynamic partition discovery detects the restored partition
+ enumerator.fetchPendingPartitionSplit();
+
+ // After partition restoration, both partitions should be available
+ Assertions.assertEquals(2, pendingSplit.size());
+ Assertions.assertNotNull(pendingSplit.get(partition0));
+ Assertions.assertNotNull(pendingSplit.get(partition2));
}
}