This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 1edf5b31b61 [Regression-test](wal) Add fault injection case for wal mem back pressure (#29298) 1edf5b31b61 is described below commit 1edf5b31b61a903957e5d81eab021ba062c958ca Author: abmdocrt <yukang.lian2...@gmail.com> AuthorDate: Wed Jan 3 00:06:52 2024 +0800 [Regression-test](wal) Add fault injection case for wal mem back pressure (#29298) --- be/src/common/config.cpp | 2 +- be/src/common/config.h | 2 +- ...st_wal_mem_back_pressure_fault_injection.csv.gz | Bin 0 -> 372017 bytes ...st_wal_mem_back_pressure_fault_injection.groovy | 142 +++++++++++++++++++++ 4 files changed, 144 insertions(+), 2 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 9b30ae145c4..ddb235130bc 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1121,7 +1121,7 @@ DEFINE_Int32(group_commit_insert_threads, "10"); DEFINE_Int32(group_commit_memory_rows_for_max_filter_ratio, "10000"); DEFINE_Bool(wait_internal_group_commit_finish, "false"); // Max size(bytes) of group commit queues, used for mem back pressure, defult 64M. -DEFINE_Int32(group_commit_queue_mem_limit, "67108864"); +DEFINE_mInt32(group_commit_queue_mem_limit, "67108864"); // Max size(bytes) or percentage(%) of wal disk usage, used for disk space back pressure, default 10% of the disk available space. // group_commit_wal_max_disk_limit=1024 or group_commit_wal_max_disk_limit=10% can be automatically identified. DEFINE_String(group_commit_wal_max_disk_limit, "10%"); diff --git a/be/src/common/config.h b/be/src/common/config.h index c340abe05f3..cfb8ed02f32 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1187,7 +1187,7 @@ DECLARE_mInt32(group_commit_insert_threads); DECLARE_mInt32(group_commit_memory_rows_for_max_filter_ratio); DECLARE_Bool(wait_internal_group_commit_finish); // Max size(bytes) of group commit queues, used for mem back pressure. -DECLARE_Int32(group_commit_queue_mem_limit); +DECLARE_mInt32(group_commit_queue_mem_limit); // Max size(bytes) or percentage(%) of wal disk usage, used for disk space back pressure, default 10% of the disk available space. // group_commit_wal_max_disk_limit=1024 or group_commit_wal_max_disk_limit=10% can be automatically identified. DECLARE_mString(group_commit_wal_max_disk_limit); diff --git a/regression-test/data/fault_injection_p0/test_wal_mem_back_pressure_fault_injection.csv.gz b/regression-test/data/fault_injection_p0/test_wal_mem_back_pressure_fault_injection.csv.gz new file mode 100644 index 00000000000..539fb01b418 Binary files /dev/null and b/regression-test/data/fault_injection_p0/test_wal_mem_back_pressure_fault_injection.csv.gz differ diff --git a/regression-test/suites/fault_injection_p0/test_wal_mem_back_pressure_fault_injection.groovy b/regression-test/suites/fault_injection_p0/test_wal_mem_back_pressure_fault_injection.groovy new file mode 100644 index 00000000000..473abb6bb2a --- /dev/null +++ b/regression-test/suites/fault_injection_p0/test_wal_mem_back_pressure_fault_injection.groovy @@ -0,0 +1,142 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_wal_mem_back_pressure_fault_injection","nonConcurrent") { + + + def tableName = "wal_test" + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ + CREATE TABLE IF NOT EXISTS `wal_baseall` ( + `k0` boolean null comment "", + `k1` tinyint(4) null comment "", + `k2` smallint(6) null comment "", + `k3` int(11) null comment "", + `k4` bigint(20) null comment "", + `k5` decimal(9, 3) null comment "", + `k6` char(5) null comment "", + `k10` date null comment "", + `k11` datetime null comment "", + `k7` varchar(20) null comment "", + `k8` double max null comment "", + `k9` float sum null comment "", + `k12` string replace null comment "", + `k13` largeint(40) replace null comment "" + ) engine=olap + DISTRIBUTED BY HASH(`k1`) BUCKETS 5 properties("replication_num" = "1") + """ + + sql """ + CREATE TABLE IF NOT EXISTS `wal_test` ( + `k0` boolean null comment "", + `k1` tinyint(4) null comment "", + `k2` smallint(6) null comment "", + `k3` int(11) null comment "", + `k4` bigint(20) null comment "", + `k5` decimal(9, 3) null comment "", + `k6` char(5) null comment "", + `k10` date null comment "", + `k11` datetime null comment "", + `k7` varchar(20) null comment "", + `k8` double max null comment "", + `k9` float sum null comment "", + `k12` string replace_if_not_null null comment "", + `k13` largeint(40) replace null comment "" + ) engine=olap + DISTRIBUTED BY HASH(`k1`) BUCKETS 5 properties("replication_num" = "1") + """ + + streamLoad { + table "wal_baseall" + db "regression_test_fault_injection_p0" + set 'column_separator', ',' + file "baseall.txt" + } + + def enable_back_pressure = { + try { + def fes = sql_return_maparray "show frontends" + def bes = sql_return_maparray "show backends" + logger.info("frontends: ${fes}") + def fe = fes[0] + def be = bes[0] + def url = "jdbc:mysql://${fe.Host}:${fe.QueryPort}/" + logger.info("observer url: " + url) + StringBuilder sb = new StringBuilder(); + sb.append("curl -X POST http://${fe.Host}:${fe.HttpPort}") + sb.append("/rest/v2/manager/node/set_config/be") + sb.append(" -H \"Content-Type: application/json\" -H \"Authorization: Basic cm9vdDo= \"") + sb.append(""" -d \"{\\"group_commit_queue_mem_limit\\": {\\"node\\": [\\"${be.Host}:${be.HttpPort}\\"],\\"value\\": \\"0\\",\\"persist\\": \\"false\\"}}\"""") + String command = sb.toString() + logger.info(command) + def process = command.execute() + } finally { + } + } + + def disable_back_pressure = { + try { + def fes = sql_return_maparray "show frontends" + def bes = sql_return_maparray "show backends" + logger.info("frontends: ${fes}") + def fe = fes[0] + def be = bes[0] + def url = "jdbc:mysql://${fe.Host}:${fe.QueryPort}/" + logger.info("observer url: " + url) + StringBuilder sb = new StringBuilder(); + sb.append("curl -X POST http://${fe.Host}:${fe.HttpPort}") + sb.append("/rest/v2/manager/node/set_config/be") + sb.append(" -H \"Content-Type: application/json\" -H \"Authorization: Basic cm9vdDo= \"") + sb.append(""" -d \"{\\"group_commit_queue_mem_limit\\": {\\"node\\": [\\"${be.Host}:${be.HttpPort}\\"],\\"value\\": \\"67108864\\",\\"persist\\": \\"false\\"}}\"""") + String command = sb.toString() + logger.info(command) + def process = command.execute() + } finally { + } + } + + boolean finish = false + enable_back_pressure() + def thread1 = new Thread({ + sql """ set group_commit = async_mode; """ + try { + sql """insert into ${tableName} select * from wal_baseall where k1 <= 3""" + } catch (Exception e) { + logger.info(e.getMessage()) + assertTrue(e.getMessage().contains('Communications link failure')) + } + disable_back_pressure() + finish = true + }) + thread1.start() + + for(int i = 0;i<10;i++){ + def processList = sql "show processlist" + logger.info(processList.toString()) + processList.each { item -> + logger.info(item[1].toString()) + logger.info(item[11].toString()) + if (item[11].toString() == "insert into ${tableName} select * from wal_baseall where k1 <= 3".toString()){ + def res = sql "kill ${item[1]}" + logger.info(res.toString()) + } + } + } + + thread1.join() + +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org