This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 1edf5b31b61 [Regression-test](wal) Add fault injection case for wal 
mem back pressure (#29298)
1edf5b31b61 is described below

commit 1edf5b31b61a903957e5d81eab021ba062c958ca
Author: abmdocrt <yukang.lian2...@gmail.com>
AuthorDate: Wed Jan 3 00:06:52 2024 +0800

    [Regression-test](wal) Add fault injection case for wal mem back pressure 
(#29298)
---
 be/src/common/config.cpp                           |   2 +-
 be/src/common/config.h                             |   2 +-
 ...st_wal_mem_back_pressure_fault_injection.csv.gz | Bin 0 -> 372017 bytes
 ...st_wal_mem_back_pressure_fault_injection.groovy | 142 +++++++++++++++++++++
 4 files changed, 144 insertions(+), 2 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 9b30ae145c4..ddb235130bc 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1121,7 +1121,7 @@ DEFINE_Int32(group_commit_insert_threads, "10");
 DEFINE_Int32(group_commit_memory_rows_for_max_filter_ratio, "10000");
 DEFINE_Bool(wait_internal_group_commit_finish, "false");
 // Max size(bytes) of group commit queues, used for mem back pressure, defult 
64M.
-DEFINE_Int32(group_commit_queue_mem_limit, "67108864");
+DEFINE_mInt32(group_commit_queue_mem_limit, "67108864");
 // Max size(bytes) or percentage(%) of wal disk usage, used for disk space 
back pressure, default 10% of the disk available space.
 // group_commit_wal_max_disk_limit=1024 or group_commit_wal_max_disk_limit=10% 
can be automatically identified.
 DEFINE_String(group_commit_wal_max_disk_limit, "10%");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index c340abe05f3..cfb8ed02f32 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1187,7 +1187,7 @@ DECLARE_mInt32(group_commit_insert_threads);
 DECLARE_mInt32(group_commit_memory_rows_for_max_filter_ratio);
 DECLARE_Bool(wait_internal_group_commit_finish);
 // Max size(bytes) of group commit queues, used for mem back pressure.
-DECLARE_Int32(group_commit_queue_mem_limit);
+DECLARE_mInt32(group_commit_queue_mem_limit);
 // Max size(bytes) or percentage(%) of wal disk usage, used for disk space 
back pressure, default 10% of the disk available space.
 // group_commit_wal_max_disk_limit=1024 or group_commit_wal_max_disk_limit=10% 
can be automatically identified.
 DECLARE_mString(group_commit_wal_max_disk_limit);
diff --git 
a/regression-test/data/fault_injection_p0/test_wal_mem_back_pressure_fault_injection.csv.gz
 
b/regression-test/data/fault_injection_p0/test_wal_mem_back_pressure_fault_injection.csv.gz
new file mode 100644
index 00000000000..539fb01b418
Binary files /dev/null and 
b/regression-test/data/fault_injection_p0/test_wal_mem_back_pressure_fault_injection.csv.gz
 differ
diff --git 
a/regression-test/suites/fault_injection_p0/test_wal_mem_back_pressure_fault_injection.groovy
 
b/regression-test/suites/fault_injection_p0/test_wal_mem_back_pressure_fault_injection.groovy
new file mode 100644
index 00000000000..473abb6bb2a
--- /dev/null
+++ 
b/regression-test/suites/fault_injection_p0/test_wal_mem_back_pressure_fault_injection.groovy
@@ -0,0 +1,142 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_wal_mem_back_pressure_fault_injection","nonConcurrent") {
+
+
+    def tableName = "wal_test"
+    sql """ DROP TABLE IF EXISTS ${tableName} """
+    sql """
+        CREATE TABLE IF NOT EXISTS `wal_baseall` (
+            `k0` boolean null comment "",
+            `k1` tinyint(4) null comment "",
+            `k2` smallint(6) null comment "",
+            `k3` int(11) null comment "",
+            `k4` bigint(20) null comment "",
+            `k5` decimal(9, 3) null comment "",
+            `k6` char(5) null comment "",
+            `k10` date null comment "",
+            `k11` datetime null comment "",
+            `k7` varchar(20) null comment "",
+            `k8` double max null comment "",
+            `k9` float sum null comment "",
+            `k12` string replace null comment "",
+            `k13` largeint(40) replace null comment ""
+        ) engine=olap
+        DISTRIBUTED BY HASH(`k1`) BUCKETS 5 properties("replication_num" = "1")
+        """
+
+    sql """
+        CREATE TABLE IF NOT EXISTS `wal_test` (
+            `k0` boolean null comment "",
+            `k1` tinyint(4) null comment "",
+            `k2` smallint(6) null comment "",
+            `k3` int(11) null comment "",
+            `k4` bigint(20) null comment "",
+            `k5` decimal(9, 3) null comment "",
+            `k6` char(5) null comment "",
+            `k10` date null comment "",
+            `k11` datetime null comment "",
+            `k7` varchar(20) null comment "",
+            `k8` double max null comment "",
+            `k9` float sum null comment "",
+            `k12` string replace_if_not_null null comment "",
+            `k13` largeint(40) replace null comment ""
+        ) engine=olap
+        DISTRIBUTED BY HASH(`k1`) BUCKETS 5 properties("replication_num" = "1")
+        """
+
+    streamLoad {
+        table "wal_baseall"
+        db "regression_test_fault_injection_p0"
+        set 'column_separator', ','
+        file "baseall.txt"
+    }
+
+    def enable_back_pressure = {
+        try {
+            def fes = sql_return_maparray "show frontends"
+            def bes = sql_return_maparray "show backends"
+            logger.info("frontends: ${fes}")
+                def fe = fes[0]
+                def be = bes[0]
+                    def url = "jdbc:mysql://${fe.Host}:${fe.QueryPort}/"
+                    logger.info("observer url: " + url)
+                        StringBuilder sb = new StringBuilder();
+                        sb.append("curl -X POST 
http://${fe.Host}:${fe.HttpPort}";)
+                        sb.append("/rest/v2/manager/node/set_config/be")
+                        sb.append(" -H \"Content-Type: application/json\" -H 
\"Authorization: Basic cm9vdDo= \"")
+                        sb.append(""" -d 
\"{\\"group_commit_queue_mem_limit\\": {\\"node\\": 
[\\"${be.Host}:${be.HttpPort}\\"],\\"value\\": \\"0\\",\\"persist\\": 
\\"false\\"}}\"""")
+                        String command = sb.toString()
+                        logger.info(command)
+                        def process = command.execute()
+        } finally {
+        }
+    }
+
+    def disable_back_pressure = {
+        try {
+            def fes = sql_return_maparray "show frontends"
+            def bes = sql_return_maparray "show backends"
+            logger.info("frontends: ${fes}")
+                def fe = fes[0]
+                def be = bes[0]
+                    def url = "jdbc:mysql://${fe.Host}:${fe.QueryPort}/"
+                    logger.info("observer url: " + url)
+                        StringBuilder sb = new StringBuilder();
+                        sb.append("curl -X POST 
http://${fe.Host}:${fe.HttpPort}";)
+                        sb.append("/rest/v2/manager/node/set_config/be")
+                        sb.append(" -H \"Content-Type: application/json\" -H 
\"Authorization: Basic cm9vdDo= \"")
+                        sb.append(""" -d 
\"{\\"group_commit_queue_mem_limit\\": {\\"node\\": 
[\\"${be.Host}:${be.HttpPort}\\"],\\"value\\": \\"67108864\\",\\"persist\\": 
\\"false\\"}}\"""")
+                        String command = sb.toString()
+                        logger.info(command)
+                        def process = command.execute()
+        } finally {
+        }
+    }
+
+    boolean finish = false
+    enable_back_pressure()
+    def thread1 = new Thread({
+    sql """ set group_commit = async_mode; """
+        try {
+            sql """insert into ${tableName} select * from wal_baseall where k1 
<= 3"""
+        } catch (Exception e) {
+            logger.info(e.getMessage())
+            assertTrue(e.getMessage().contains('Communications link failure'))
+        } 
+    disable_back_pressure()
+    finish  = true
+    })
+    thread1.start()
+
+    for(int i = 0;i<10;i++){
+        def processList = sql "show processlist"
+        logger.info(processList.toString())
+        processList.each { item ->
+            logger.info(item[1].toString())
+            logger.info(item[11].toString())
+            if (item[11].toString() == "insert into ${tableName} select * from 
wal_baseall where k1 <= 3".toString()){
+                def res = sql "kill ${item[1]}"
+                logger.info(res.toString())
+            }
+        }
+    }
+
+    thread1.join()
+
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to