This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 1784c01a35 [Fix][Connector-V2] Fix kafka batch mode can not read all 
message (#7135)
1784c01a35 is described below

commit 1784c01a35e6fa23cad9a9f514a6b840835ea51a
Author: Jia Fan <fanjiaemi...@qq.com>
AuthorDate: Tue Jul 9 21:50:39 2024 +0800

    [Fix][Connector-V2] Fix kafka batch mode can not read all message (#7135)
---
 .github/workflows/backend.yml                      |  6 +-
 docs/en/connector-v2/formats/avro.md               |  2 +-
 docs/zh/connector-v2/formats/avro.md               |  2 +-
 .../seatunnel/kafka/source/KafkaSourceReader.java  | 31 ++++++----
 .../seatunnel/e2e/connector/kafka/KafkaIT.java     | 70 +++++++++++++++-------
 .../test/resources/avro/kafka_avro_to_assert.conf  |  2 +-
 .../jsonFormatIT/kafka_source_json_to_console.conf |  2 +-
 ..._source_to_assert_with_max_poll_records_1.conf} | 34 +++++------
 ...ce_format_error_handle_way_fail_to_console.conf |  1 -
 ...ce_format_error_handle_way_skip_to_console.conf |  1 -
 .../textFormatIT/kafka_source_text_to_console.conf |  2 +-
 ...ource_text_to_console_assert_catalog_table.conf |  2 +-
 12 files changed, 91 insertions(+), 64 deletions(-)

diff --git a/.github/workflows/backend.yml b/.github/workflows/backend.yml
index 9975d477da..b6094aff4d 100644
--- a/.github/workflows/backend.yml
+++ b/.github/workflows/backend.yml
@@ -1052,7 +1052,7 @@ jobs:
 
   kafka-connector-it:
     needs: [ changes, sanity-check ]
-    if: needs.changes.outputs.api == 'true'
+    if: needs.changes.outputs.api == 'true' || 
contains(needs.changes.outputs.it-modules, 'connector-kafka-e2e')
     runs-on: ${{ matrix.os }}
     strategy:
       matrix:
@@ -1068,7 +1068,6 @@ jobs:
           distribution: 'temurin'
           cache: 'maven'
       - name: run kafka connector integration test
-        if: needs.changes.outputs.api == 'true'
         run: |
           ./mvnw -B -T 1 verify -DskipUT=true -DskipIT=false 
-D"license.skipAddThirdParty"=true --no-snapshot-updates -pl 
:connector-kafka-e2e -am -Pci
         env:
@@ -1076,7 +1075,7 @@ jobs:
 
   rocketmq-connector-it:
     needs: [ changes, sanity-check ]
-    if: needs.changes.outputs.api == 'true'
+    if: needs.changes.outputs.api == 'true' || 
contains(needs.changes.outputs.it-modules, 'connector-rocketmq-e2e')
     runs-on: ${{ matrix.os }}
     strategy:
       matrix:
@@ -1092,7 +1091,6 @@ jobs:
           distribution: 'temurin'
           cache: 'maven'
       - name: run rocket connector integration test
-        if: needs.changes.outputs.api == 'true'
         run: |
           ./mvnw -B -T 1 verify -DskipUT=true -DskipIT=false 
-D"license.skipAddThirdParty"=true --no-snapshot-updates -pl 
:connector-rocketmq-e2e -am -Pci
         env:
diff --git a/docs/en/connector-v2/formats/avro.md 
b/docs/en/connector-v2/formats/avro.md
index 638657b345..8fef411fb5 100644
--- a/docs/en/connector-v2/formats/avro.md
+++ b/docs/en/connector-v2/formats/avro.md
@@ -77,7 +77,7 @@ source {
     bootstrap.servers = "kafkaCluster:9092"
     topic = "test_avro_topic"
     result_table_name = "kafka_table"
-    kafka.auto.offset.reset = "earliest"
+    start_mode = "earliest"
     format = avro
     format_error_handle_way = skip
     schema = {
diff --git a/docs/zh/connector-v2/formats/avro.md 
b/docs/zh/connector-v2/formats/avro.md
index 4e19ea4b98..7176f4e507 100644
--- a/docs/zh/connector-v2/formats/avro.md
+++ b/docs/zh/connector-v2/formats/avro.md
@@ -77,7 +77,7 @@ source {
     bootstrap.servers = "kafkaCluster:9092"
     topic = "test_avro_topic"
     result_table_name = "kafka_table"
-    kafka.auto.offset.reset = "earliest"
+    start_mode = "earliest"
     format = avro
     format_error_handle_way = skip
     schema = {
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java
index d136fabc40..02c2a9007e 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java
@@ -45,6 +45,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -103,7 +104,7 @@ public class KafkaSourceReader implements 
SourceReader<SeaTunnelRow, KafkaSource
             return;
         }
 
-        while (pendingPartitionsQueue.size() != 0) {
+        while (!pendingPartitionsQueue.isEmpty()) {
             sourceSplits.add(pendingPartitionsQueue.poll());
         }
         sourceSplits.forEach(
@@ -120,9 +121,10 @@ public class KafkaSourceReader implements 
SourceReader<SeaTunnelRow, KafkaSource
                                     executorService.submit(thread);
                                     return thread;
                                 }));
+        List<KafkaSourceSplit> finishedSplits = new CopyOnWriteArrayList<>();
         sourceSplits.forEach(
                 sourceSplit -> {
-                    CompletableFuture<Void> completableFuture = new 
CompletableFuture<>();
+                    CompletableFuture<Boolean> completableFuture = new 
CompletableFuture<>();
                     TablePath tablePath = sourceSplit.getTablePath();
                     DeserializationSchema<SeaTunnelRow> deserializationSchema =
                             
tablePathMetadataMap.get(tablePath).getDeserializationSchema();
@@ -148,9 +150,14 @@ public class KafkaSourceReader implements 
SourceReader<SeaTunnelRow, KafkaSource
                                                 for (TopicPartition partition 
: partitions) {
                                                     
List<ConsumerRecord<byte[], byte[]>>
                                                             recordList = 
records.records(partition);
+                                                    if 
(Boundedness.BOUNDED.equals(
+                                                                    
context.getBoundedness())
+                                                            && 
recordList.isEmpty()) {
+                                                        
completableFuture.complete(true);
+                                                        return;
+                                                    }
                                                     for 
(ConsumerRecord<byte[], byte[]> record :
                                                             recordList) {
-
                                                         try {
                                                             if 
(deserializationSchema
                                                                     instanceof
@@ -180,7 +187,8 @@ public class KafkaSourceReader implements 
SourceReader<SeaTunnelRow, KafkaSource
                                                                 && 
record.offset()
                                                                         >= 
sourceSplit
                                                                                
 .getEndOffset()) {
-                                                            break;
+                                                            
completableFuture.complete(true);
+                                                            return;
                                                         }
                                                     }
                                                     long lastOffset = -1;
@@ -199,18 +207,21 @@ public class KafkaSourceReader implements 
SourceReader<SeaTunnelRow, KafkaSource
                                             } catch (Exception e) {
                                                 
completableFuture.completeExceptionally(e);
                                             }
-                                            completableFuture.complete(null);
+                                            completableFuture.complete(false);
                                         });
-                    } catch (InterruptedException e) {
+                        if (completableFuture.get()) {
+                            finishedSplits.add(sourceSplit);
+                        }
+                    } catch (Exception e) {
                         throw new KafkaConnectorException(
                                 KafkaConnectorErrorCode.CONSUME_DATA_FAILED, 
e);
                     }
-                    completableFuture.join();
                 });
-
         if (Boundedness.BOUNDED.equals(context.getBoundedness())) {
-            // signal to the source that we have reached the end of the data.
-            context.signalNoMoreElement();
+            finishedSplits.forEach(sourceSplits::remove);
+            if (sourceSplits.isEmpty()) {
+                context.signalNoMoreElement();
+            }
         }
     }
 
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
index 2f1c92048e..d4629851e7 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
@@ -212,6 +212,27 @@ public class KafkaIT extends TestSuiteBase implements 
TestResource {
         Assertions.assertEquals(0, execResult.getExitCode(), 
execResult.getStderr());
     }
 
+    @TestTemplate
+    public void testSourceKafkaToAssertWithMaxPollRecords1(TestContainer 
container)
+            throws IOException, InterruptedException {
+        TextSerializationSchema serializer =
+                TextSerializationSchema.builder()
+                        .seaTunnelRowType(SEATUNNEL_ROW_TYPE)
+                        .delimiter(",")
+                        .build();
+        generateTestData(
+                row ->
+                        new ProducerRecord<>(
+                                "test_topic_text_max_poll_records_1",
+                                null,
+                                serializer.serialize(row)),
+                0,
+                100);
+        Container.ExecResult execResult =
+                
container.executeJob("/kafka/kafka_source_to_assert_with_max_poll_records_1.conf");
+        Assertions.assertEquals(0, execResult.getExitCode(), 
execResult.getStderr());
+    }
+
     @TestTemplate
     public void testSourceKafkaTextToConsoleAssertCatalogTable(TestContainer 
container)
             throws IOException, InterruptedException {
@@ -538,29 +559,34 @@ public class KafkaIT extends TestSuiteBase implements 
TestResource {
     }
 
     private void generateTestData(ProducerRecordConverter converter, int 
start, int end) {
-        for (int i = start; i < end; i++) {
-            SeaTunnelRow row =
-                    new SeaTunnelRow(
-                            new Object[] {
-                                Long.valueOf(i),
-                                Collections.singletonMap("key", 
Short.parseShort("1")),
-                                new Byte[] {Byte.parseByte("1")},
-                                "string",
-                                Boolean.FALSE,
-                                Byte.parseByte("1"),
-                                Short.parseShort("1"),
-                                Integer.parseInt("1"),
-                                Long.parseLong("1"),
-                                Float.parseFloat("1.1"),
-                                Double.parseDouble("1.1"),
-                                BigDecimal.valueOf(11, 1),
-                                "test".getBytes(),
-                                LocalDate.of(2024, 1, 1),
-                                LocalDateTime.of(2024, 1, 1, 12, 59, 23)
-                            });
-            ProducerRecord<byte[], byte[]> producerRecord = 
converter.convert(row);
-            producer.send(producerRecord);
+        try {
+            for (int i = start; i < end; i++) {
+                SeaTunnelRow row =
+                        new SeaTunnelRow(
+                                new Object[] {
+                                    Long.valueOf(i),
+                                    Collections.singletonMap("key", 
Short.parseShort("1")),
+                                    new Byte[] {Byte.parseByte("1")},
+                                    "string",
+                                    Boolean.FALSE,
+                                    Byte.parseByte("1"),
+                                    Short.parseShort("1"),
+                                    Integer.parseInt("1"),
+                                    Long.parseLong("1"),
+                                    Float.parseFloat("1.1"),
+                                    Double.parseDouble("1.1"),
+                                    BigDecimal.valueOf(11, 1),
+                                    "test".getBytes(),
+                                    LocalDate.of(2024, 1, 1),
+                                    LocalDateTime.of(2024, 1, 1, 12, 59, 23)
+                                });
+                ProducerRecord<byte[], byte[]> producerRecord = 
converter.convert(row);
+                producer.send(producerRecord).get();
+            }
+        } catch (Exception e) {
+            throw new RuntimeException(e);
         }
+        producer.flush();
     }
 
     private static final SeaTunnelRowType SEATUNNEL_ROW_TYPE =
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/avro/kafka_avro_to_assert.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/avro/kafka_avro_to_assert.conf
index 31fe77a3e2..755a9a2b8d 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/avro/kafka_avro_to_assert.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/avro/kafka_avro_to_assert.conf
@@ -32,7 +32,7 @@ source {
     bootstrap.servers = "kafkaCluster:9092"
     topic = "test_avro_topic"
     result_table_name = "kafka_table"
-    kafka.auto.offset.reset = "earliest"
+    start_mode = "earliest"
     format = avro
     format_error_handle_way = skip
     schema = {
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/jsonFormatIT/kafka_source_json_to_console.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/jsonFormatIT/kafka_source_json_to_console.conf
index f9a41e7987..3657390602 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/jsonFormatIT/kafka_source_json_to_console.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/jsonFormatIT/kafka_source_json_to_console.conf
@@ -32,7 +32,7 @@ source {
     bootstrap.servers = "kafkaCluster:9092"
     topic = "test_topic_json"
     result_table_name = "kafka_table"
-    kafka.auto.offset.reset = "earliest"
+    start_mode = "earliest"
     format_error_handle_way = skip
     schema = {
       fields {
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/textFormatIT/kafka_source_text_to_console_assert_catalog_table.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafka_source_to_assert_with_max_poll_records_1.conf
similarity index 85%
copy from 
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/textFormatIT/kafka_source_text_to_console_assert_catalog_table.conf
copy to 
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafka_source_to_assert_with_max_poll_records_1.conf
index d7f875272b..787858e229 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/textFormatIT/kafka_source_text_to_console_assert_catalog_table.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafka_source_to_assert_with_max_poll_records_1.conf
@@ -30,10 +30,13 @@ env {
 source {
   Kafka {
     bootstrap.servers = "kafkaCluster:9092"
-    topic = "test_topic_text"
+    topic = "test_topic_text_max_poll_records_1"
     result_table_name = "kafka_table"
-    kafka.auto.offset.reset = "earliest"
+    start_mode = "earliest"
     format_error_handle_way = fail
+    kafka.config = {
+      max.poll.records = 1
+    }
     schema = {
       columns = [
         {
@@ -120,6 +123,9 @@ source {
 }
 
 sink {
+  console {
+    source_table_name = "kafka_table"
+  }
   Assert {
     source_table_name = "kafka_table"
     rules =
@@ -143,24 +149,12 @@ sink {
             ]
           }
         ]
-      catalog_table_rule = {
-        primary_key_rule = {
-            primary_key_name = "primary key"
-            primary_key_columns = ["id"]
-        }
-        constraint_key_rule = [
-            {
-            constraint_key_name = "unique_c_string"
-            constraint_key_type = UNIQUE_KEY
-            constraint_key_columns = [
-                {
-                    constraint_key_column_name = "c_string"
-                    constraint_key_sort_type = ASC
-                }
-            ]
-            }
-        ]
-      }
+      row_rules = [
+        {
+          rule_type = MIN_ROW
+          rule_value = 100
+        }
+      ]
     }
   }
 }
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_format_error_handle_way_fail_to_console.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_format_error_handle_way_fail_to_console.conf
index b6db50989a..d2a0f05354 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_format_error_handle_way_fail_to_console.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_format_error_handle_way_fail_to_console.conf
@@ -37,7 +37,6 @@ source {
     result_table_name = "kafka_table"
     start_mode = "earliest"
     format_error_handle_way = fail
-    #     kafka.auto.offset.reset = "earliest"
     schema = {
       fields {
         id = bigint
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_format_error_handle_way_skip_to_console.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_format_error_handle_way_skip_to_console.conf
index 45b29d1915..88b6098b5e 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_format_error_handle_way_skip_to_console.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_format_error_handle_way_skip_to_console.conf
@@ -37,7 +37,6 @@ source {
     result_table_name = "kafka_table"
     start_mode = "earliest"
     format_error_handle_way = skip
-    #     kafka.auto.offset.reset = "earliest"
     schema = {
       fields {
         id = bigint
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/textFormatIT/kafka_source_text_to_console.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/textFormatIT/kafka_source_text_to_console.conf
index 36f01c0337..3ce077bd58 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/textFormatIT/kafka_source_text_to_console.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/textFormatIT/kafka_source_text_to_console.conf
@@ -32,7 +32,7 @@ source {
     bootstrap.servers = "kafkaCluster:9092"
     topic = "test_topic_text"
     result_table_name = "kafka_table"
-    kafka.auto.offset.reset = "earliest"
+    start_mode = "earliest"
     format_error_handle_way = fail
     schema = {
       fields {
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/textFormatIT/kafka_source_text_to_console_assert_catalog_table.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/textFormatIT/kafka_source_text_to_console_assert_catalog_table.conf
index d7f875272b..132829e324 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/textFormatIT/kafka_source_text_to_console_assert_catalog_table.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/textFormatIT/kafka_source_text_to_console_assert_catalog_table.conf
@@ -32,7 +32,7 @@ source {
     bootstrap.servers = "kafkaCluster:9092"
     topic = "test_topic_text"
     result_table_name = "kafka_table"
-    kafka.auto.offset.reset = "earliest"
+    start_mode = "earliest"
     format_error_handle_way = fail
     schema = {
       columns = [

Reply via email to