This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 7bcf173178d branch-3.0: [fix](load) fix dead lock when write memtable
failed #49170 (#49230)
7bcf173178d is described below
commit 7bcf173178d938bbfef86323837dafb901150a30
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Thu Mar 20 10:53:03 2025 +0800
branch-3.0: [fix](load) fix dead lock when write memtable failed #49170
(#49230)
Cherry-picked from #49170
Co-authored-by: Xin Liao <[email protected]>
---
be/src/olap/memtable_memory_limiter.cpp | 18 ++++-
be/src/olap/memtable_writer.cpp | 6 +-
.../test_memtable_write_failed.groovy | 90 ++++++++++++++++++++++
3 files changed, 110 insertions(+), 4 deletions(-)
diff --git a/be/src/olap/memtable_memory_limiter.cpp
b/be/src/olap/memtable_memory_limiter.cpp
index 1cb6c0c8e2d..1f59f8fc341 100644
--- a/be/src/olap/memtable_memory_limiter.cpp
+++ b/be/src/olap/memtable_memory_limiter.cpp
@@ -103,6 +103,12 @@ bool MemTableMemoryLimiter::_load_usage_low() {
}
int64_t MemTableMemoryLimiter::_need_flush() {
+ DBUG_EXECUTE_IF("MemTableMemoryLimiter._need_flush.random_flush", {
+ if (rand() % 100 < (100 * dp->param("percent", 0.5))) {
+ LOG(INFO) << "debug memtable need flush return 1";
+ return 1;
+ }
+ });
int64_t limit1 = _mem_tracker->consumption() - _load_soft_mem_limit;
int64_t limit2 = _sys_avail_mem_less_than_warning_water_mark();
int64_t limit3 = _process_used_mem_more_than_soft_mem_limit();
@@ -113,9 +119,15 @@ int64_t MemTableMemoryLimiter::_need_flush() {
void MemTableMemoryLimiter::handle_memtable_flush() {
// Check the soft limit.
DCHECK(_load_soft_mem_limit > 0);
- if (!_soft_limit_reached() || _load_usage_low()) {
- return;
- }
+ do {
+
DBUG_EXECUTE_IF("MemTableMemoryLimiter._handle_memtable_flush.limit_reached", {
+ LOG(INFO) << "debug memtable limit reached";
+ break;
+ });
+ if (!_soft_limit_reached() || _load_usage_low()) {
+ return;
+ }
+ } while (false);
MonotonicStopWatch timer;
timer.start();
std::unique_lock<std::mutex> l(_lock);
diff --git a/be/src/olap/memtable_writer.cpp b/be/src/olap/memtable_writer.cpp
index 3dc88ae668f..56312073c57 100644
--- a/be/src/olap/memtable_writer.cpp
+++ b/be/src/olap/memtable_writer.cpp
@@ -124,8 +124,12 @@ Status MemTableWriter::write(const vectorized::Block*
block,
// 2. However, memory pressure might trigger a flush operation on this
failed memtable
// 3. By resetting here, we ensure the failed memtable won't be included
in any subsequent flush,
// thus preventing potential crashes
+ DBUG_EXECUTE_IF("MemTableWriter.write.random_insert_error", {
+ if (rand() % 100 < (100 * dp->param("percent", 0.3))) {
+ st = Status::InternalError<false>("write memtable random failed
for debug");
+ }
+ });
if (!st.ok()) [[unlikely]] {
- std::lock_guard<SpinLock> l(_mem_table_ptr_lock);
_reset_mem_table();
return st;
}
diff --git
a/regression-test/suites/fault_injection_p0/test_memtable_write_failed.groovy
b/regression-test/suites/fault_injection_p0/test_memtable_write_failed.groovy
new file mode 100644
index 00000000000..5664c022616
--- /dev/null
+++
b/regression-test/suites/fault_injection_p0/test_memtable_write_failed.groovy
@@ -0,0 +1,90 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_memtable_write_failed", "nonConcurrent") {
+ GetDebugPoint().clearDebugPointsForAllBEs()
+ def testTable = "test_memtable_write_failed"
+ sql """ DROP TABLE IF EXISTS ${testTable}"""
+
+ sql """
+ CREATE TABLE IF NOT EXISTS `${testTable}` (
+ `id` BIGINT NOT NULL AUTO_INCREMENT,
+ `value` int(11) NOT NULL
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`id`)
+ COMMENT "OLAP"
+ DISTRIBUTED BY HASH(`id`) BUCKETS 1
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1"
+ )
+ """
+
+ def run_test = {thread_num, rows, iters ->
+ def threads = []
+ (1..thread_num).each { id1 ->
+ threads.add(Thread.start {
+ (1..iters).each { id2 ->
+ try {
+ sql """insert into ${testTable}(value) select number
from numbers("number" = "${rows}");"""
+ String content = ""
+ (1..4096).each {
+ content += "${it},${it}\n"
+ }
+ content += content
+ streamLoad {
+ table "${testTable}"
+ set 'column_separator', ','
+ inputStream new
ByteArrayInputStream(content.getBytes())
+ time 30000 // limit inflight 10s
+
+ check { result, exception, startTime, endTime ->
+ if (exception != null) {
+ throw exception
+ }
+ def json = parseJson(result)
+ if (json.Status.equalsIgnoreCase("success")) {
+ assertEquals(8192, json.NumberTotalRows)
+ assertEquals(0, json.NumberFilteredRows)
+ } else {
+ assertTrue(json.Message.contains("write
memtable random failed for debug"))
+ }
+ }
+ }
+ } catch (Exception e) {
+ logger.info(e.getMessage())
+ assertTrue(e.getMessage().contains("write memtable
random failed for debug"))
+ }
+ }
+ })
+ }
+ threads.each { thread -> thread.join() }
+ }
+
+ try {
+
GetDebugPoint().enableDebugPointForAllBEs("MemTableWriter.write.random_insert_error")
+
GetDebugPoint().enableDebugPointForAllBEs("MemTableMemoryLimiter._handle_memtable_flush.limit_reached")
+
GetDebugPoint().enableDebugPointForAllBEs("MemTableMemoryLimiter._need_flush.random_flush")
+ run_test(5, 10000, 10)
+ } catch (Exception e){
+ logger.info(e.getMessage())
+ assertTrue(e.getMessage().contains("write memtable random failed for
debug"))
+ } finally {
+
GetDebugPoint().disableDebugPointForAllBEs("MemTableWriter.write.random_insert_error")
+
GetDebugPoint().disableDebugPointForAllBEs("MemTableMemoryLimiter._handle_memtable_flush.limit_reached")
+
GetDebugPoint().disableDebugPointForAllBEs("MemTableMemoryLimiter._need_flush.random_flush")
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]