This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 3c6f216135 [Fix][Connector-V2] Fix possible data loss in scenarios of 
request_tablet_size is less than the number of BUCKETS (#8768)
3c6f216135 is described below

commit 3c6f216135dfd48c2539a400b596ef82ce24fcb3
Author: xiaochen <598457...@qq.com>
AuthorDate: Fri Feb 21 11:39:24 2025 +0800

    [Fix][Connector-V2] Fix possible data loss in scenarios of 
request_tablet_size is less than the number of BUCKETS (#8768)
---
 .../client/source/StarRocksBeReadClient.java       |  5 +++-
 .../e2e/connector/starrocks/StarRocksIT.java       |  9 +++++-
 .../starrocks-thrift-to-starrocks-streamload.conf  |  1 +
 ...ks-streamload.conf => starrocks-to-assert.conf} | 32 ++++++++++------------
 4 files changed, 28 insertions(+), 19 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/source/StarRocksBeReadClient.java
 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/source/StarRocksBeReadClient.java
index c0be0106bb..3fa50f1cc0 100644
--- 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/source/StarRocksBeReadClient.java
+++ 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/source/StarRocksBeReadClient.java
@@ -92,7 +92,6 @@ public class StarRocksBeReadClient implements Serializable {
     }
 
     public void openScanner(QueryPartition partition, SeaTunnelRowType 
seaTunnelRowType) {
-        this.seaTunnelRowType = seaTunnelRowType;
         Set<Long> tabletIds = partition.getTabletIds();
         TScanOpenParams params = new TScanOpenParams();
         params.setTablet_ids(new ArrayList<>(tabletIds));
@@ -135,6 +134,10 @@ public class StarRocksBeReadClient implements Serializable 
{
                 contextId,
                 tabletIds.size(),
                 tabletIds);
+        this.eos.set(false);
+        this.rowBatch = null;
+        this.readerOffset = 0;
+        this.seaTunnelRowType = seaTunnelRowType;
     }
 
     public boolean hasNext() {
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/java/org/apache/seatunnel/e2e/connector/starrocks/StarRocksIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/java/org/apache/seatunnel/e2e/connector/starrocks/StarRocksIT.java
index c49b1bfa41..2d2eda1ae2 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/java/org/apache/seatunnel/e2e/connector/starrocks/StarRocksIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/java/org/apache/seatunnel/e2e/connector/starrocks/StarRocksIT.java
@@ -105,7 +105,7 @@ public class StarRocksIT extends TestSuiteBase implements 
TestResource {
                     + "  DATE_COL       DATE\n"
                     + ")ENGINE=OLAP\n"
                     + "DUPLICATE KEY(`BIGINT_COL`)\n"
-                    + "DISTRIBUTED BY HASH(`BIGINT_COL`) BUCKETS 1\n"
+                    + "DISTRIBUTED BY HASH(`BIGINT_COL`) BUCKETS 3\n"
                     + "PROPERTIES (\n"
                     + "\"replication_num\" = \"1\",\n"
                     + "\"in_memory\" = \"false\","
@@ -419,4 +419,11 @@ public class StarRocksIT extends TestSuiteBase implements 
TestResource {
         
Assertions.assertFalse(starRocksCatalog.tableExists(tablePathStarRocksSink));
         starRocksCatalog.close();
     }
+
+    @TestTemplate
+    public void testStarRocksReadRowCount(TestContainer container)
+            throws IOException, InterruptedException {
+        Container.ExecResult execResult = 
container.executeJob("/starrocks-to-assert.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+    }
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/starrocks-thrift-to-starrocks-streamload.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/starrocks-thrift-to-starrocks-streamload.conf
index ca47a8eb08..8af2b36107 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/starrocks-thrift-to-starrocks-streamload.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/starrocks-thrift-to-starrocks-streamload.conf
@@ -28,6 +28,7 @@ source {
     database = "test"
     table = "e2e_table_source"
     max_retries = 3
+    request_tablet_size = 5
     schema {
       fields {
         BIGINT_COL = BIGINT
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/starrocks-thrift-to-starrocks-streamload.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/starrocks-to-assert.conf
similarity index 68%
copy from 
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/starrocks-thrift-to-starrocks-streamload.conf
copy to 
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/starrocks-to-assert.conf
index ca47a8eb08..416d1ec853 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/starrocks-thrift-to-starrocks-streamload.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/starrocks-to-assert.conf
@@ -28,6 +28,7 @@ source {
     database = "test"
     table = "e2e_table_source"
     max_retries = 3
+    request_tablet_size = 1
     schema {
       fields {
         BIGINT_COL = BIGINT
@@ -54,22 +55,19 @@ transform {
 }
 
 sink {
-  StarRocks {
-    nodeUrls = ["starrocks_e2e:8030"]
-    username = root
-    password = ""
-    database = "test"
-    table = "e2e_table_sink"
-    batch_max_rows = 100
-    max_retries = 3
-    base-url="jdbc:mysql://starrocks_e2e:9030/test"
-    starrocks.config = {
-      format = "JSON"
-      strip_outer_array = true
+  Assert {
+    rules =
+      {
+        row_rules = [
+          {
+            rule_type = MAX_ROW
+            rule_value = 100
+          },
+          {
+            rule_type = MIN_ROW
+            rule_value = 100
+          }
+        ]
+      }
     }
-    "schema_save_mode"="RECREATE_SCHEMA"
-    "data_save_mode"="APPEND_DATA"
-    save_mode_create_template = "CREATE TABLE IF NOT EXISTS 
`${database}`.`${table}` (\n ${rowtype_fields}\n ) ENGINE=OLAP \n  DUPLICATE 
KEY(`BIGINT_COL`) \n  DISTRIBUTED BY HASH (BIGINT_COL) BUCKETS 1 \n PROPERTIES 
(\n   \"replication_num\" = \"1\", \n  \"in_memory\" = \"false\" , \n  
\"storage_format\" = \"DEFAULT\"  \n )"
-
-  }
 }
\ No newline at end of file

Reply via email to