This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 7dc3fa8a13 [Fix][Connector kafka]Fix Kafka consumer stop fetching 
after TM node restarted (#7233)
7dc3fa8a13 is described below

commit 7dc3fa8a13e77914798b133332a72831426cc935
Author: litiliu <38579068+liti...@users.noreply.github.com>
AuthorDate: Mon Jul 22 11:41:33 2024 +0800

    [Fix][Connector kafka]Fix Kafka consumer stop fetching after TM node 
restarted (#7233)
---
 .../kafka/source/KafkaSourceSplitEnumerator.java   | 20 +++++-
 .../source/KafkaSourceSplitEnumeratorTest.java     | 74 ++++++++++++++++++++++
 2 files changed, 93 insertions(+), 1 deletion(-)

diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java
index a7471ae086..f868eaed20 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java
@@ -30,6 +30,7 @@ import org.apache.kafka.clients.admin.OffsetSpec;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.TopicPartition;
 
+import com.google.common.annotations.VisibleForTesting;
 import lombok.extern.slf4j.Slf4j;
 
 import java.io.IOException;
@@ -82,6 +83,20 @@ public class KafkaSourceSplitEnumerator
         this.discoveryIntervalMillis = 
kafkaSourceConfig.getDiscoveryIntervalMillis();
     }
 
+    @VisibleForTesting
+    protected KafkaSourceSplitEnumerator(
+            AdminClient adminClient,
+            Map<TopicPartition, KafkaSourceSplit> pendingSplit,
+            Map<TopicPartition, KafkaSourceSplit> assignedSplit) {
+        this.tablePathMetadataMap = new HashMap<>();
+        this.context = null;
+        this.discoveryIntervalMillis = -1;
+        this.adminClient = adminClient;
+        this.kafkaSourceConfig = null;
+        this.pendingSplit = pendingSplit;
+        this.assignedSplit = assignedSplit;
+    }
+
     @Override
     public void open() {
         if (discoveryIntervalMillis > 0) {
@@ -180,7 +195,10 @@ public class KafkaSourceSplitEnumerator
     @Override
     public void addSplitsBack(List<KafkaSourceSplit> splits, int subtaskId) {
         if (!splits.isEmpty()) {
-            pendingSplit.putAll(convertToNextSplit(splits));
+            Map<TopicPartition, ? extends KafkaSourceSplit> nextSplit = 
convertToNextSplit(splits);
+            // remove them from the assignedSplit, so we can reassign them
+            nextSplit.keySet().forEach(assignedSplit::remove);
+            pendingSplit.putAll(nextSplit);
         }
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/test/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumeratorTest.java
 
b/seatunnel-connectors-v2/connector-kafka/src/test/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumeratorTest.java
new file mode 100644
index 0000000000..6a8de812d3
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/test/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumeratorTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.kafka.source;
+
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.KafkaAdminClient;
+import org.apache.kafka.clients.admin.ListOffsetsResult;
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.TopicPartition;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+class KafkaSourceSplitEnumeratorTest {
+
+    @Test
+    void addSplitsBack() {
+        // prepare
+        TopicPartition partition = new TopicPartition("test", 0);
+
+        AdminClient adminClient = Mockito.mock(KafkaAdminClient.class);
+        Mockito.when(adminClient.listOffsets(Mockito.any(java.util.Map.class)))
+                .thenReturn(
+                        new ListOffsetsResult(
+                                new HashMap<
+                                        TopicPartition,
+                                        
KafkaFuture<ListOffsetsResult.ListOffsetsResultInfo>>() {
+                                    {
+                                        put(
+                                                partition,
+                                                KafkaFuture.completedFuture(
+                                                        new 
ListOffsetsResult.ListOffsetsResultInfo(
+                                                                0, 0, 
Optional.of(0))));
+                                    }
+                                }));
+
+        // test
+        Map<TopicPartition, KafkaSourceSplit> assignedSplit =
+                new HashMap<TopicPartition, KafkaSourceSplit>() {
+                    {
+                        put(partition, new KafkaSourceSplit(null, partition));
+                    }
+                };
+        Map<TopicPartition, KafkaSourceSplit> pendingSplit = new HashMap<>();
+        List<KafkaSourceSplit> splits = Arrays.asList(new 
KafkaSourceSplit(null, partition));
+        KafkaSourceSplitEnumerator enumerator =
+                new KafkaSourceSplitEnumerator(adminClient, pendingSplit, 
assignedSplit);
+        enumerator.addSplitsBack(splits, 1);
+        Assertions.assertTrue(pendingSplit.size() == splits.size());
+        Assertions.assertNull(assignedSplit.get(partition));
+    }
+}

Reply via email to