This is an automated email from the ASF dual-hosted git repository. wanghailin pushed a commit to branch dev in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push: new 3c6f216135 [Fix][Connector-V2] Fix possible data loss in scenarios of request_tablet_size is less than the number of BUCKETS (#8768) 3c6f216135 is described below commit 3c6f216135dfd48c2539a400b596ef82ce24fcb3 Author: xiaochen <598457...@qq.com> AuthorDate: Fri Feb 21 11:39:24 2025 +0800 [Fix][Connector-V2] Fix possible data loss in scenarios of request_tablet_size is less than the number of BUCKETS (#8768) --- .../client/source/StarRocksBeReadClient.java | 5 +++- .../e2e/connector/starrocks/StarRocksIT.java | 9 +++++- .../starrocks-thrift-to-starrocks-streamload.conf | 1 + ...ks-streamload.conf => starrocks-to-assert.conf} | 32 ++++++++++------------ 4 files changed, 28 insertions(+), 19 deletions(-) diff --git a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/source/StarRocksBeReadClient.java b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/source/StarRocksBeReadClient.java index c0be0106bb..3fa50f1cc0 100644 --- a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/source/StarRocksBeReadClient.java +++ b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/source/StarRocksBeReadClient.java @@ -92,7 +92,6 @@ public class StarRocksBeReadClient implements Serializable { } public void openScanner(QueryPartition partition, SeaTunnelRowType seaTunnelRowType) { - this.seaTunnelRowType = seaTunnelRowType; Set<Long> tabletIds = partition.getTabletIds(); TScanOpenParams params = new TScanOpenParams(); params.setTablet_ids(new ArrayList<>(tabletIds)); @@ -135,6 +134,10 @@ public class StarRocksBeReadClient implements Serializable { contextId, tabletIds.size(), tabletIds); + this.eos.set(false); + this.rowBatch = null; + this.readerOffset = 0; + this.seaTunnelRowType = seaTunnelRowType; } public boolean hasNext() { diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/java/org/apache/seatunnel/e2e/connector/starrocks/StarRocksIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/java/org/apache/seatunnel/e2e/connector/starrocks/StarRocksIT.java index c49b1bfa41..2d2eda1ae2 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/java/org/apache/seatunnel/e2e/connector/starrocks/StarRocksIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/java/org/apache/seatunnel/e2e/connector/starrocks/StarRocksIT.java @@ -105,7 +105,7 @@ public class StarRocksIT extends TestSuiteBase implements TestResource { + " DATE_COL DATE\n" + ")ENGINE=OLAP\n" + "DUPLICATE KEY(`BIGINT_COL`)\n" - + "DISTRIBUTED BY HASH(`BIGINT_COL`) BUCKETS 1\n" + + "DISTRIBUTED BY HASH(`BIGINT_COL`) BUCKETS 3\n" + "PROPERTIES (\n" + "\"replication_num\" = \"1\",\n" + "\"in_memory\" = \"false\"," @@ -419,4 +419,11 @@ public class StarRocksIT extends TestSuiteBase implements TestResource { Assertions.assertFalse(starRocksCatalog.tableExists(tablePathStarRocksSink)); starRocksCatalog.close(); } + + @TestTemplate + public void testStarRocksReadRowCount(TestContainer container) + throws IOException, InterruptedException { + Container.ExecResult execResult = container.executeJob("/starrocks-to-assert.conf"); + Assertions.assertEquals(0, execResult.getExitCode()); + } } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/starrocks-thrift-to-starrocks-streamload.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/starrocks-thrift-to-starrocks-streamload.conf index ca47a8eb08..8af2b36107 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/starrocks-thrift-to-starrocks-streamload.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/starrocks-thrift-to-starrocks-streamload.conf @@ -28,6 +28,7 @@ source { database = "test" table = "e2e_table_source" max_retries = 3 + request_tablet_size = 5 schema { fields { BIGINT_COL = BIGINT diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/starrocks-thrift-to-starrocks-streamload.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/starrocks-to-assert.conf similarity index 68% copy from seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/starrocks-thrift-to-starrocks-streamload.conf copy to seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/starrocks-to-assert.conf index ca47a8eb08..416d1ec853 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/starrocks-thrift-to-starrocks-streamload.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/starrocks-to-assert.conf @@ -28,6 +28,7 @@ source { database = "test" table = "e2e_table_source" max_retries = 3 + request_tablet_size = 1 schema { fields { BIGINT_COL = BIGINT @@ -54,22 +55,19 @@ transform { } sink { - StarRocks { - nodeUrls = ["starrocks_e2e:8030"] - username = root - password = "" - database = "test" - table = "e2e_table_sink" - batch_max_rows = 100 - max_retries = 3 - base-url="jdbc:mysql://starrocks_e2e:9030/test" - starrocks.config = { - format = "JSON" - strip_outer_array = true + Assert { + rules = + { + row_rules = [ + { + rule_type = MAX_ROW + rule_value = 100 + }, + { + rule_type = MIN_ROW + rule_value = 100 + } + ] + } } - "schema_save_mode"="RECREATE_SCHEMA" - "data_save_mode"="APPEND_DATA" - save_mode_create_template = "CREATE TABLE IF NOT EXISTS `${database}`.`${table}` (\n ${rowtype_fields}\n ) ENGINE=OLAP \n DUPLICATE KEY(`BIGINT_COL`) \n DISTRIBUTED BY HASH (BIGINT_COL) BUCKETS 1 \n PROPERTIES (\n \"replication_num\" = \"1\", \n \"in_memory\" = \"false\" , \n \"storage_format\" = \"DEFAULT\" \n )" - - } } \ No newline at end of file