This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 5706b6bfe7c [test](show_data) test the correctness of data statistics in cloud mode (#44947) 5706b6bfe7c is described below commit 5706b6bfe7c2f6115294167fb4cac0185799d061 Author: chunping <cece_m...@163.com> AuthorDate: Thu Dec 12 10:39:26 2024 +0800 [test](show_data) test the correctness of data statistics in cloud mode (#44947) --- aazcp.tar.gz | Bin 0 -> 4218 bytes regression-test/conf/regression-conf.groovy | 7 + regression-test/framework/pom.xml | 5 + regression-test/plugins/aliyunOssSdk.groovy | 169 +++++++++++++++ .../suites/show_data/ddl/lineitem_delete.sql | 2 + .../suites/show_data/ddl/lineitem_dup.sql | 25 +++ .../suites/show_data/ddl/lineitem_mow.sql | 25 +++ .../suites/show_data/test_show_mow_data.groovy | 240 +++++++++++++++++++++ 8 files changed, 473 insertions(+) diff --git a/aazcp.tar.gz b/aazcp.tar.gz new file mode 100644 index 00000000000..681acf72cde Binary files /dev/null and b/aazcp.tar.gz differ diff --git a/regression-test/conf/regression-conf.groovy b/regression-test/conf/regression-conf.groovy index ab9bb0beb91..bc001126bce 100644 --- a/regression-test/conf/regression-conf.groovy +++ b/regression-test/conf/regression-conf.groovy @@ -259,3 +259,10 @@ lakesoulMinioEndpoint="*******" metaServiceToken = "greedisgood9999" instanceId = "default_instance_id" multiClusterInstance = "default_instance_id" + +storageProvider = "oss" +cbsS3Ak = "*******" +cbsS3Sk = "*******" +cbsS3Endpoint = "oss-cn-beijing.aliyuncs.com" +cbsS3Bucket = "test-bucket" +cbsS3Prefix = "test-cluster-prefix" diff --git a/regression-test/framework/pom.xml b/regression-test/framework/pom.xml index 813659989ae..cad25df8ced 100644 --- a/regression-test/framework/pom.xml +++ b/regression-test/framework/pom.xml @@ -409,6 +409,11 @@ under the License. <!--Regression tests need to include this jar--> <scope>compile</scope> </dependency> + <dependency> + <groupId>com.aliyun.oss</groupId> + <artifactId>aliyun-sdk-oss</artifactId> + <version>3.18.1</version> + </dependency> </dependencies> </project> diff --git a/regression-test/plugins/aliyunOssSdk.groovy b/regression-test/plugins/aliyunOssSdk.groovy new file mode 100644 index 00000000000..cbc132a088d --- /dev/null +++ b/regression-test/plugins/aliyunOssSdk.groovy @@ -0,0 +1,169 @@ + + +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +import org.apache.doris.regression.suite.Suite; +import org.apache.doris.regression.Config; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; + +import com.aliyun.oss.ClientException; +import com.aliyun.oss.OSS; +import com.aliyun.oss.OSSClientBuilder; +import com.aliyun.oss.OSSException; +import com.aliyun.oss.model.DeleteObjectsRequest; +import com.aliyun.oss.model.DeleteObjectsResult; +import com.aliyun.oss.model.ListObjectsRequest; +import com.aliyun.oss.model.OSSObjectSummary; +import com.aliyun.oss.model.ObjectListing; + +import org.slf4j.Logger +import org.slf4j.LoggerFactory +import groovy.util.logging.Slf4j + +Suite.metaClass.initOssClient = { String accessKeyId, String accessKeySecret, String endpoint -> + return new OSSClientBuilder().build(endpoint, accessKeyId, accessKeySecret) +} + +Suite.metaClass.listOssObjectWithPrefix = { OSS client, String bucketName, String prefix="" -> + try { + ObjectListing objectListing = null; + String nextMarker = null; + final int maxKeys = 500; + List<OSSObjectSummary> sums = null; + + if (!client.doesBucketExist(bucketName)) { + logger.info("no bucket named ${bucketName} in ${endpoint}") + return + } + + // Gets all object with specified marker by paging. Each page will have up to 100 entries. + logger.info("List all objects with prefix:"); + nextMarker = null; + do { + objectListing = client.listObjects(new ListObjectsRequest(bucketName). + withPrefix(prefix).withMarker(nextMarker).withMaxKeys(maxKeys)); + + sums = objectListing.getObjectSummaries(); + for (OSSObjectSummary s : sums) { + logger.info("\t" + s.getKey()); + } + + nextMarker = objectListing.getNextMarker(); + + } while (objectListing.isTruncated()); + } catch (OSSException oe) { + logger.error("Caught an OSSException, which means your request made it to OSS, " + + "but was rejected with an error response for some reason."); + logger.error("Error Message: " + oe.getErrorMessage()); + logger.error("Error Code: " + oe.getErrorCode()); + logger.error("Request ID: " + oe.getRequestId()); + logger.error("Host ID: " + oe.getHostId()); + } catch (ClientException ce) { + logger.error("Caught an ClientException, which means the client encountered " + + "a serious internal problem while trying to communicate with OSS, " + + "such as not being able to access the network."); + logger.error("Error Message: " + ce.getMessage()); + } finally { + /* + * Do not forget to shut down the client finally to release all allocated resources. + */ + //client.shutdown(); + logger.info("Done!") + } + +} + +// get file size in a specific directory +Suite.metaClass.calculateFolderLength = { OSS client, String bucketName, String folder -> + long size = 0L; + ObjectListing objectListing = null; + do { + // The default value for MaxKey is 100, and the maximum value is 1000 + ListObjectsRequest request = new ListObjectsRequest(bucketName).withPrefix(folder).withMaxKeys(1000); + if (objectListing != null) { + request.setMarker(objectListing.getNextMarker()); + } + objectListing = client.listObjects(request); + List<OSSObjectSummary> sums = objectListing.getObjectSummaries(); + for (OSSObjectSummary s : sums) { + size += s.getSize(); + } + } while (objectListing.isTruncated()); + return size; +} + +Suite.metaClass.shutDownOssClient = { OSS client -> + client.shutdown(); +} + + + +Suite.metaClass.getOssAllDirSizeWithPrefix = { OSS client, String bucketName, String prefix="" -> + try { + if (!client.doesBucketExist(bucketName)) { + logger.info("no bucket named ${bucketName} in ${endpoint}") + return + } + + // Gets all object with specified marker by paging. Each page will have up to 100 entries. + logger.info("List all objects with prefix:"); + ObjectListing objectListing = null; + do { + // By default, list 100 files or directories at a time + ListObjectsRequest request = new ListObjectsRequest(bucketName).withDelimiter("/").withPrefix(prefix); + if (objectListing != null) { + request.setMarker(objectListing.getNextMarker()); + } + objectListing = client.listObjects(request); + List<String> folders = objectListing.getCommonPrefixes(); + for (String folder : folders) { + logger.info(folder + " : " + (calculateFolderLength(client, bucketName, folder) / (1024 * 1024 * 1024)) + "GB"); + } + List<OSSObjectSummary> sums = objectListing.getObjectSummaries(); + for (OSSObjectSummary s : sums) { + logger.info(s.getKey() + " : " + (s.getSize() / (1024 * 1024 * 1024)) + "GB"); + } + } while (objectListing.isTruncated()); + + } catch (OSSException oe) { + logger.error("Caught an OSSException, which means your request made it to OSS, " + + "but was rejected with an error response for some reason."); + logger.error("Error Message: " + oe.getErrorMessage()); + logger.error("Error Code: " + oe.getErrorCode()); + logger.error("Request ID: " + oe.getRequestId()); + logger.error("Host ID: " + oe.getHostId()); + } catch (ClientException ce) { + logger.error("Caught an ClientException, which means the client encountered " + + "a serious internal problem while trying to communicate with OSS, " + + "such as not being able to access the network."); + logger.error("Error Message: " + ce.getMessage()); + } finally { + /* + * Do not forget to shut down the client finally to release all allocated resources. + */ + //client.shutdown(); + logger.info("Done!") + } +} + + + diff --git a/regression-test/suites/show_data/ddl/lineitem_delete.sql b/regression-test/suites/show_data/ddl/lineitem_delete.sql new file mode 100644 index 00000000000..df8b0405649 --- /dev/null +++ b/regression-test/suites/show_data/ddl/lineitem_delete.sql @@ -0,0 +1,2 @@ +DELETE from ${table} where L_ORDERKEY >= 0; + diff --git a/regression-test/suites/show_data/ddl/lineitem_dup.sql b/regression-test/suites/show_data/ddl/lineitem_dup.sql new file mode 100644 index 00000000000..29ed215d236 --- /dev/null +++ b/regression-test/suites/show_data/ddl/lineitem_dup.sql @@ -0,0 +1,25 @@ +CREATE TABLE IF NOT EXISTS lineitem_mow ( + L_ORDERKEY INTEGER NOT NULL, + L_PARTKEY INTEGER NOT NULL, + L_SUPPKEY INTEGER NOT NULL, + L_LINENUMBER INTEGER NOT NULL, + L_QUANTITY DECIMAL(15,2) NOT NULL, + L_EXTENDEDPRICE DECIMAL(15,2) NOT NULL, + L_DISCOUNT DECIMAL(15,2) NOT NULL, + L_TAX DECIMAL(15,2) NOT NULL, + L_RETURNFLAG CHAR(1) NOT NULL, + L_LINESTATUS CHAR(1) NOT NULL, + L_SHIPDATE DATE NOT NULL, + L_COMMITDATE DATE NOT NULL, + L_RECEIPTDATE DATE NOT NULL, + L_SHIPINSTRUCT CHAR(25) NOT NULL, + L_SHIPMODE CHAR(10) NOT NULL, + L_COMMENT VARCHAR(44) NOT NULL +) +DUPLICATE KEY(L_ORDERKEY, L_PARTKEY, L_SUPPKEY, L_LINENUMBER) +DISTRIBUTED BY HASH(L_ORDERKEY) BUCKETS 3 +PROPERTIES ( + "replication_num" = "1" +) + + diff --git a/regression-test/suites/show_data/ddl/lineitem_mow.sql b/regression-test/suites/show_data/ddl/lineitem_mow.sql new file mode 100644 index 00000000000..1d29f44ac8f --- /dev/null +++ b/regression-test/suites/show_data/ddl/lineitem_mow.sql @@ -0,0 +1,25 @@ +CREATE TABLE IF NOT EXISTS lineitem_mow ( + L_ORDERKEY INTEGER NOT NULL, + L_PARTKEY INTEGER NOT NULL, + L_SUPPKEY INTEGER NOT NULL, + L_LINENUMBER INTEGER NOT NULL, + L_QUANTITY DECIMAL(15,2) NOT NULL, + L_EXTENDEDPRICE DECIMAL(15,2) NOT NULL, + L_DISCOUNT DECIMAL(15,2) NOT NULL, + L_TAX DECIMAL(15,2) NOT NULL, + L_RETURNFLAG CHAR(1) NOT NULL, + L_LINESTATUS CHAR(1) NOT NULL, + L_SHIPDATE DATE NOT NULL, + L_COMMITDATE DATE NOT NULL, + L_RECEIPTDATE DATE NOT NULL, + L_SHIPINSTRUCT CHAR(25) NOT NULL, + L_SHIPMODE CHAR(10) NOT NULL, + L_COMMENT VARCHAR(44) NOT NULL +) +UNIQUE KEY(L_ORDERKEY, L_PARTKEY, L_SUPPKEY, L_LINENUMBER) +DISTRIBUTED BY HASH(L_ORDERKEY) BUCKETS 3 +PROPERTIES ( + "replication_num" = "1" +) + + diff --git a/regression-test/suites/show_data/test_show_mow_data.groovy b/regression-test/suites/show_data/test_show_mow_data.groovy new file mode 100644 index 00000000000..c94e5786d7e --- /dev/null +++ b/regression-test/suites/show_data/test_show_mow_data.groovy @@ -0,0 +1,240 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// The cases is copied from https://github.com/trinodb/trino/tree/master +// /testing/trino-product-tests/src/main/resources/sql-tests/testcases/tpcds +// and modified by Doris. +import org.codehaus.groovy.runtime.IOGroovyMethods + + // loading one data 10 times, expect data size not rising +suite("test_mow_show_data_in_cloud","p2") { + //cloud-mode + if (!isCloudMode()) { + logger.info("not cloud mode, not run") + return + } + + def repeate_stream_load_same_data = { String tableName, int loadTimes -> + for (int i = 0; i < loadTimes; i++) { + streamLoad { + table tableName + set 'column_separator', '|' + set 'compress_type', 'GZ' + file """${getS3Url()}/regression/tpch/sf1/lineitem.csv.split00.gz""" + time 10000 // limit inflight 10s + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(json.NumberTotalRows, json.NumberLoadedRows) + assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0) + } + } + } + } + + def get_tablets_from_table = { String table -> + def res = sql_return_maparray """show tablets from ${table}""" + return res + } + + def show_tablet_compaction = { HashMap tablet -> + StringBuilder sb = new StringBuilder(); + sb.append("curl -X GET ") + sb.append(tablet["CompactionStatus"]) + String command = sb.toString() + logger.info(command) + process = command.execute() + code = process.waitFor() + err = IOGroovyMethods.getText(new BufferedReader(new InputStreamReader(process.getErrorStream()))); + out = process.getText() + logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + return parseJson(out.trim()) + } + + def trigger_tablet_compaction = { HashMap tablet, String compact_type -> + //support trigger base/cumulative/full compaction + def tabletStatusBeforeCompaction = show_tablet_compaction(tablet) + + String tabletInBe = tablet + String showCompactionStatus = tablet["CompactionStatus"] + String triggerCompactionUrl = showCompactionStatus.split("show")[0] + "run?tablet_id=" + tablet["TabletId"] + "&compact_type=" + compact_type + StringBuilder sb = new StringBuilder(); + sb.append("curl -X POST ") + sb.append(triggerCompactionUrl) + String command = sb.toString() + logger.info(command) + process = command.execute() + code = process.waitFor() + err = IOGroovyMethods.getText(new BufferedReader(new InputStreamReader(process.getErrorStream()))); + out = process.getText() + def outJson = parseJson(out) + logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err) + // if code = 0 means compaction happend, need to check + // other condition may indicate no suitable compaction condition + if ( code == 0 && outJson.status.toLowerCase() == "success" ){ + def compactionStatus = "RUNNING" + def tabletStatusAfterCompaction = null + long startTime = System.currentTimeMillis() + long timeoutTimestamp = startTime + 5 * 60 * 1000 // 5 min + do { + tabletStatusAfterCompaction = show_tablet_compaction(tablet) + logger.info("tabletStatusAfterCompaction class: " + tabletStatusAfterCompaction.class) + logger.info("hhhhhh: " + tabletStatusAfterCompaction.toString()) + if (tabletStatusAfterCompaction.rowsets.size() < tabletStatusBeforeCompaction.rowsets.size()){ + compactionStatus = 'FINISHED' + } + Thread.sleep(60 * 1000) + } while (timeoutTimestamp > System.currentTimeMillis() && (status != 'FINISHED')) + + if (status != "FINISHED") { + logger.info("compaction not Finish or failed") + return false + } + } + } + + def trigger_compaction = { List<List<Object>> tablets -> + for(def tablet: tablets) { + trigger_tablet_compaction(tablet, "cumulative") + trigger_tablet_compaction(tablet, "base") + trigger_tablet_compaction(tablet, "full") + } + } + + def caculate_table_data_size_in_backend_storage = { List<List<Object>> tablets -> + storageType = context.config.otherConfigs.get("storageProvider") + Double storageSize = 0 + + List<String> tabletIds = [] + for(def tablet: tablets) { + tabletIds.add(tablet["TabletId"]) + } + + if (storageType.toLowerCase() == "oss") { + //cbs means cluster backend storage + ak = context.config.otherConfigs.get("cbsS3Ak") + sk = context.config.otherConfigs.get("cbsS3Sk") + endpoint = context.config.otherConfigs.get("cbsS3Endpoint") + bucketName = context.config.otherConfigs.get("cbsS3Bucket") + storagePrefix = context.config.otherConfigs.get("cbsS3Prefix") + + client = initOssClient(ak, sk, endpoint) + for(String tabletId: tabletIds) { + storageSize += calculateFolderLength(client, bucketName, storagePrefix + "/data/" + tabletId) + } + shutDownOssClient(client) + } + + if (storageType.toLowerCase() == "hdfs") { + fsName = context.config.otherConfigs.get("cbsFsName") + isKerberosFs = context.config.otherConfigs.get("cbsFsKerberos") + fsUser = context.config.otherConfigs.get("cbsFsUser") + storagePrefix = context.config.otherConfigs.get("cbsFsPrefix") + } + + return storageSize + } + + def translate_different_unit_to_MB = { String size, String unitField -> + Double sizeKb = 0.0 + if (unitField == "KB") { + sizeKb = Double.parseDouble(size) / 1024 + } else if (unitField == "MB") { + sizeKb = Double.parseDouble(size) + } else if (unitField == "GB") { + sizeKb = Double.parseDouble(size) * 1024 * 1024 + } else if (unitField == "TB") { + sizeKb = Double.parseDouble(size) * 1024 * 1024 * 1024 + } + return sizeKb + } + + def show_table_data_size_through_mysql = { String table -> + def mysqlShowDataSize = 0L + def res = sql_return_maparray " show data from ${table}" + def tableSizeInfo = res[0] + def fields = tableSizeInfo["Size"].split(" ") + if (fields.length == 2 ){ + def sizeField = fields[0] + def unitField = fields[1] + mysqlShowDataSize = translate_different_unit_to_MB(sizeField, unitField) + } + return mysqlShowDataSize + } + + def caculate_table_data_size_through_api = { List<List<Object>> tablets -> + Double apiCaculateSize = 0 + for (HashMap tablet in tablets) { + def tabletStatus = show_tablet_compaction(tablet) + + for(String rowset: tabletStatus.rowsets){ + def fields = rowset.split(" ") + if (fields.length == 7) { + def sizeField = fields[-2] // the last field(size) + def unitField = fields[-1] // The second to last field(unit) + // 转换成 KB + apiCaculateSize += translate_different_unit_to_MB(sizeField, unitField ) + } + } + } + + return apiCaculateSize + } + + def main = { + tableName="lineitem_mow" + sql "DROP TABLE IF EXISTS ${tableName};" + sql new File("""${context.file.parent}/ddl/${tableName}.sql""").text + sql new File("""${context.file.parent}/ddl/lineitem_delete.sql""").text.replaceAll("\\\$\\{table\\}", tableName) + List<String> tablets = get_tablets_from_table(tableName) + def loadTimes = [1, 10] + Map<String, List> sizeRecords = ["apiSize":[], "mysqlSize":[], "cbsSize":[]] + for (int i in loadTimes){ + // stream load 1 time, record each size + repeate_stream_load_same_data(tableName, i) + def rows = sql_return_maparray "select count(*) as count from ${tableName};" + logger.info("table ${tableName} has ${rows[0]["count"]} rows") + // 加一下触发compaction的机制 + trigger_compaction(tablets) + + // 然后 sleep 5min, 等fe汇报完 + sleep(300 * 1000) + + sizeRecords["apiSize"].add(caculate_table_data_size_through_api(tablets)) + sizeRecords["cbsSize"].add(caculate_table_data_size_in_backend_storage(tablets)) + sizeRecords["mysqlSize"].add(show_table_data_size_through_mysql(tableName)) + sleep(300 * 1000) + logger.info("after ${i} times stream load, mysqlSize is: ${sizeRecords["mysqlSize"][-1]}, apiSize is: ${sizeRecords["apiSize"][-1]}, storageSize is: ${sizeRecords["cbsSize"][-1]}") + + } + + // expect mysqlSize == apiSize == storageSize + assertEquals(sizeRecords["mysqlSize"][0], sizeRecords["apiSize"][0]) + assertEquals(sizeRecords["mysqlSize"][0], sizeRecords["cbsSize"][0]) + // expect load 1 times == load 10 times + assertEquals(sizeRecords["mysqlSize"][0], sizeRecords["mysqlSize"][1]) + assertEquals(sizeRecords["apiSize"][0], sizeRecords["apiSize"][1]) + assertEquals(sizeRecords["cbsSize"][0], sizeRecords["cbsSize"][1]) + } + + main() +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org