This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new a0eeeb9b62 [Fix][Kafka] Fix in kafka streaming mode can not read 
incremental data (#7871)
a0eeeb9b62 is described below

commit a0eeeb9b6234ce842f25395e6f5524eef53fb1f5
Author: Carl-Zhou-CN <1058249...@qq.com>
AuthorDate: Sat Nov 16 11:17:37 2024 +0800

    [Fix][Kafka] Fix in kafka streaming mode can not read incremental data 
(#7871)
---
 docs/en/connector-v2/source/kafka.md               |   1 +
 .../seatunnel/kafka/source/KafkaSource.java        |  11 +-
 .../kafka/source/KafkaSourceSplitEnumerator.java   |  34 ++++-
 .../admin/KafkaSourceSplitEnumeratorTest.java      | 156 +++++++++++++++++++++
 .../source/KafkaSourceSplitEnumeratorTest.java     |  74 ----------
 5 files changed, 194 insertions(+), 82 deletions(-)

diff --git a/docs/en/connector-v2/source/kafka.md 
b/docs/en/connector-v2/source/kafka.md
index 90c183c2c1..c0ed66186b 100644
--- a/docs/en/connector-v2/source/kafka.md
+++ b/docs/en/connector-v2/source/kafka.md
@@ -59,6 +59,7 @@ They can be downloaded via install-plugin.sh or from the 
Maven central repositor
 ### Simple
 
 > This example reads the data of kafka's topic_1, topic_2, topic_3 and prints 
 > it to the client.And if you have not yet installed and deployed SeaTunnel, 
 > you need to follow the instructions in Install SeaTunnel to install and 
 > deploy SeaTunnel. And if you have not yet installed and deployed SeaTunnel, 
 > you need to follow the instructions in [Install 
 > SeaTunnel](../../start-v2/locally/deployment.md) to install and deploy 
 > SeaTunnel. And then follow the instructions in [Quick Start With SeaTunn 
 > [...]
+> In batch mode, during the enumerator sharding process, it will fetch the 
latest offset for each partition and use it as the stopping point.
 
 ```hocon
 # Defining the runtime environment
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
index 5688fde5b6..0ff99807f2 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
@@ -104,7 +104,11 @@ public class KafkaSource
     @Override
     public SourceSplitEnumerator<KafkaSourceSplit, KafkaSourceState> 
createEnumerator(
             SourceSplitEnumerator.Context<KafkaSourceSplit> enumeratorContext) 
{
-        return new KafkaSourceSplitEnumerator(kafkaSourceConfig, 
enumeratorContext, null);
+        return new KafkaSourceSplitEnumerator(
+                kafkaSourceConfig,
+                enumeratorContext,
+                null,
+                getBoundedness() == Boundedness.UNBOUNDED);
     }
 
     @Override
@@ -112,7 +116,10 @@ public class KafkaSource
             SourceSplitEnumerator.Context<KafkaSourceSplit> enumeratorContext,
             KafkaSourceState checkpointState) {
         return new KafkaSourceSplitEnumerator(
-                kafkaSourceConfig, enumeratorContext, checkpointState);
+                kafkaSourceConfig,
+                enumeratorContext,
+                checkpointState,
+                getBoundedness() == Boundedness.UNBOUNDED);
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java
index 06ce4565c3..6d6c1ca96f 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java
@@ -70,10 +70,13 @@ public class KafkaSourceSplitEnumerator
 
     private final Map<String, TablePath> topicMappingTablePathMap = new 
HashMap<>();
 
+    private boolean isStreamingMode;
+
     KafkaSourceSplitEnumerator(
             KafkaSourceConfig kafkaSourceConfig,
             Context<KafkaSourceSplit> context,
-            KafkaSourceState sourceState) {
+            KafkaSourceState sourceState,
+            boolean isStreamingMode) {
         this.kafkaSourceConfig = kafkaSourceConfig;
         this.tablePathMetadataMap = kafkaSourceConfig.getMapMetadata();
         this.context = context;
@@ -81,10 +84,11 @@ public class KafkaSourceSplitEnumerator
         this.pendingSplit = new HashMap<>();
         this.adminClient = 
initAdminClient(this.kafkaSourceConfig.getProperties());
         this.discoveryIntervalMillis = 
kafkaSourceConfig.getDiscoveryIntervalMillis();
+        this.isStreamingMode = isStreamingMode;
     }
 
     @VisibleForTesting
-    protected KafkaSourceSplitEnumerator(
+    public KafkaSourceSplitEnumerator(
             AdminClient adminClient,
             Map<TopicPartition, KafkaSourceSplit> pendingSplit,
             Map<TopicPartition, KafkaSourceSplit> assignedSplit) {
@@ -97,6 +101,16 @@ public class KafkaSourceSplitEnumerator
         this.assignedSplit = assignedSplit;
     }
 
+    @VisibleForTesting
+    public KafkaSourceSplitEnumerator(
+            AdminClient adminClient,
+            Map<TopicPartition, KafkaSourceSplit> pendingSplit,
+            Map<TopicPartition, KafkaSourceSplit> assignedSplit,
+            boolean isStreamingMode) {
+        this(adminClient, pendingSplit, assignedSplit);
+        this.isStreamingMode = isStreamingMode;
+    }
+
     @Override
     public void open() {
         if (discoveryIntervalMillis > 0) {
@@ -204,7 +218,7 @@ public class KafkaSourceSplitEnumerator
     private Map<TopicPartition, ? extends KafkaSourceSplit> convertToNextSplit(
             List<KafkaSourceSplit> splits) {
         try {
-            Map<TopicPartition, Long> listOffsets =
+            Map<TopicPartition, Long> latestOffsets =
                     listOffsets(
                             splits.stream()
                                     .map(KafkaSourceSplit::getTopicPartition)
@@ -214,7 +228,10 @@ public class KafkaSourceSplitEnumerator
             splits.forEach(
                     split -> {
                         split.setStartOffset(split.getEndOffset() + 1);
-                        
split.setEndOffset(listOffsets.get(split.getTopicPartition()));
+                        split.setEndOffset(
+                                isStreamingMode
+                                        ? Long.MAX_VALUE
+                                        : 
latestOffsets.get(split.getTopicPartition()));
                     });
             return splits.stream()
                     
.collect(Collectors.toMap(KafkaSourceSplit::getTopicPartition, split -> split));
@@ -305,7 +322,10 @@ public class KafkaSourceSplitEnumerator
                             // Obtain the corresponding topic TablePath from 
kafka topic
                             TablePath tablePath = 
topicMappingTablePathMap.get(partition.topic());
                             KafkaSourceSplit split = new 
KafkaSourceSplit(tablePath, partition);
-                            
split.setEndOffset(latestOffsets.get(split.getTopicPartition()));
+                            split.setEndOffset(
+                                    isStreamingMode
+                                            ? Long.MAX_VALUE
+                                            : latestOffsets.get(partition));
                             return split;
                         })
                 .collect(Collectors.toSet());
@@ -344,6 +364,7 @@ public class KafkaSourceSplitEnumerator
     private Map<TopicPartition, Long> listOffsets(
             Collection<TopicPartition> partitions, OffsetSpec offsetSpec)
             throws ExecutionException, InterruptedException {
+
         Map<TopicPartition, OffsetSpec> topicPartitionOffsets =
                 partitions.stream()
                         .collect(Collectors.toMap(partition -> partition, __ 
-> offsetSpec));
@@ -391,7 +412,8 @@ public class KafkaSourceSplitEnumerator
         assignSplit();
     }
 
-    private void fetchPendingPartitionSplit() throws ExecutionException, 
InterruptedException {
+    @VisibleForTesting
+    public void fetchPendingPartitionSplit() throws ExecutionException, 
InterruptedException {
         getTopicInfo()
                 .forEach(
                         split -> {
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/test/java/org/apache/kafka/clients/admin/KafkaSourceSplitEnumeratorTest.java
 
b/seatunnel-connectors-v2/connector-kafka/src/test/java/org/apache/kafka/clients/admin/KafkaSourceSplitEnumeratorTest.java
new file mode 100644
index 0000000000..00e059ecfe
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/test/java/org/apache/kafka/clients/admin/KafkaSourceSplitEnumeratorTest.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.seatunnel.connectors.seatunnel.kafka.source.KafkaSourceSplit;
+import 
org.apache.seatunnel.connectors.seatunnel.kafka.source.KafkaSourceSplitEnumerator;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+
+class KafkaSourceSplitEnumeratorTest {
+
+    AdminClient adminClient = Mockito.mock(KafkaAdminClient.class);
+    // prepare
+    TopicPartition partition = new TopicPartition("test", 0);
+
+    @BeforeEach
+    void init() {
+
+        Mockito.when(adminClient.listOffsets(Mockito.any(java.util.Map.class)))
+                .thenReturn(
+                        new ListOffsetsResult(
+                                new HashMap<
+                                        TopicPartition,
+                                        
KafkaFuture<ListOffsetsResult.ListOffsetsResultInfo>>() {
+                                    {
+                                        put(
+                                                partition,
+                                                KafkaFuture.completedFuture(
+                                                        new 
ListOffsetsResult.ListOffsetsResultInfo(
+                                                                0, 0, 
Optional.of(0))));
+                                    }
+                                }));
+        
Mockito.when(adminClient.describeTopics(Mockito.any(java.util.Collection.class)))
+                .thenReturn(
+                        DescribeTopicsResult.ofTopicNames(
+                                new HashMap<String, 
KafkaFuture<TopicDescription>>() {
+                                    {
+                                        put(
+                                                partition.topic(),
+                                                KafkaFuture.completedFuture(
+                                                        new TopicDescription(
+                                                                
partition.topic(),
+                                                                false,
+                                                                
Collections.singletonList(
+                                                                        new 
TopicPartitionInfo(
+                                                                               
 0,
+                                                                               
 null,
+                                                                               
 Collections
+                                                                               
         .emptyList(),
+                                                                               
 Collections
+                                                                               
         .emptyList())))));
+                                    }
+                                }));
+    }
+
+    @Test
+    void addSplitsBack() {
+        // test
+        Map<TopicPartition, KafkaSourceSplit> assignedSplit =
+                new HashMap<TopicPartition, KafkaSourceSplit>() {
+                    {
+                        put(partition, new KafkaSourceSplit(null, partition));
+                    }
+                };
+        Map<TopicPartition, KafkaSourceSplit> pendingSplit = new HashMap<>();
+        List<KafkaSourceSplit> splits = Arrays.asList(new 
KafkaSourceSplit(null, partition));
+        KafkaSourceSplitEnumerator enumerator =
+                new KafkaSourceSplitEnumerator(adminClient, pendingSplit, 
assignedSplit);
+        enumerator.addSplitsBack(splits, 1);
+        Assertions.assertTrue(pendingSplit.size() == splits.size());
+        Assertions.assertNull(assignedSplit.get(partition));
+        Assertions.assertTrue(pendingSplit.get(partition).getEndOffset() == 0);
+    }
+
+    @Test
+    void addStreamingSplitsBack() {
+        // test
+        Map<TopicPartition, KafkaSourceSplit> assignedSplit =
+                new HashMap<TopicPartition, KafkaSourceSplit>() {
+                    {
+                        put(partition, new KafkaSourceSplit(null, partition));
+                    }
+                };
+        Map<TopicPartition, KafkaSourceSplit> pendingSplit = new HashMap<>();
+        List<KafkaSourceSplit> splits =
+                Collections.singletonList(new KafkaSourceSplit(null, 
partition));
+        KafkaSourceSplitEnumerator enumerator =
+                new KafkaSourceSplitEnumerator(adminClient, pendingSplit, 
assignedSplit, true);
+        enumerator.addSplitsBack(splits, 1);
+        Assertions.assertEquals(pendingSplit.size(), splits.size());
+        Assertions.assertNull(assignedSplit.get(partition));
+        Assertions.assertTrue(pendingSplit.get(partition).getEndOffset() == 
Long.MAX_VALUE);
+    }
+
+    @Test
+    void addStreamingSplits() throws ExecutionException, InterruptedException {
+        // test
+        Map<TopicPartition, KafkaSourceSplit> assignedSplit =
+                new HashMap<TopicPartition, KafkaSourceSplit>();
+        Map<TopicPartition, KafkaSourceSplit> pendingSplit = new HashMap<>();
+        List<KafkaSourceSplit> splits =
+                Collections.singletonList(new KafkaSourceSplit(null, 
partition));
+        KafkaSourceSplitEnumerator enumerator =
+                new KafkaSourceSplitEnumerator(adminClient, pendingSplit, 
assignedSplit, true);
+        enumerator.fetchPendingPartitionSplit();
+        Assertions.assertEquals(pendingSplit.size(), splits.size());
+        Assertions.assertNotNull(pendingSplit.get(partition));
+        Assertions.assertTrue(pendingSplit.get(partition).getEndOffset() == 
Long.MAX_VALUE);
+    }
+
+    @Test
+    void addplits() throws ExecutionException, InterruptedException {
+        // test
+        Map<TopicPartition, KafkaSourceSplit> assignedSplit =
+                new HashMap<TopicPartition, KafkaSourceSplit>();
+        Map<TopicPartition, KafkaSourceSplit> pendingSplit = new HashMap<>();
+        List<KafkaSourceSplit> splits =
+                Collections.singletonList(new KafkaSourceSplit(null, 
partition));
+        KafkaSourceSplitEnumerator enumerator =
+                new KafkaSourceSplitEnumerator(adminClient, pendingSplit, 
assignedSplit, false);
+        enumerator.fetchPendingPartitionSplit();
+        Assertions.assertEquals(pendingSplit.size(), splits.size());
+        Assertions.assertNotNull(pendingSplit.get(partition));
+        Assertions.assertTrue(pendingSplit.get(partition).getEndOffset() == 0);
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/test/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumeratorTest.java
 
b/seatunnel-connectors-v2/connector-kafka/src/test/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumeratorTest.java
deleted file mode 100644
index 6a8de812d3..0000000000
--- 
a/seatunnel-connectors-v2/connector-kafka/src/test/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumeratorTest.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.kafka.source;
-
-import org.apache.kafka.clients.admin.AdminClient;
-import org.apache.kafka.clients.admin.KafkaAdminClient;
-import org.apache.kafka.clients.admin.ListOffsetsResult;
-import org.apache.kafka.common.KafkaFuture;
-import org.apache.kafka.common.TopicPartition;
-
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
-import org.mockito.Mockito;
-
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-
-class KafkaSourceSplitEnumeratorTest {
-
-    @Test
-    void addSplitsBack() {
-        // prepare
-        TopicPartition partition = new TopicPartition("test", 0);
-
-        AdminClient adminClient = Mockito.mock(KafkaAdminClient.class);
-        Mockito.when(adminClient.listOffsets(Mockito.any(java.util.Map.class)))
-                .thenReturn(
-                        new ListOffsetsResult(
-                                new HashMap<
-                                        TopicPartition,
-                                        
KafkaFuture<ListOffsetsResult.ListOffsetsResultInfo>>() {
-                                    {
-                                        put(
-                                                partition,
-                                                KafkaFuture.completedFuture(
-                                                        new 
ListOffsetsResult.ListOffsetsResultInfo(
-                                                                0, 0, 
Optional.of(0))));
-                                    }
-                                }));
-
-        // test
-        Map<TopicPartition, KafkaSourceSplit> assignedSplit =
-                new HashMap<TopicPartition, KafkaSourceSplit>() {
-                    {
-                        put(partition, new KafkaSourceSplit(null, partition));
-                    }
-                };
-        Map<TopicPartition, KafkaSourceSplit> pendingSplit = new HashMap<>();
-        List<KafkaSourceSplit> splits = Arrays.asList(new 
KafkaSourceSplit(null, partition));
-        KafkaSourceSplitEnumerator enumerator =
-                new KafkaSourceSplitEnumerator(adminClient, pendingSplit, 
assignedSplit);
-        enumerator.addSplitsBack(splits, 1);
-        Assertions.assertTrue(pendingSplit.size() == splits.size());
-        Assertions.assertNull(assignedSplit.get(partition));
-    }
-}

Reply via email to