This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new b1fd701493 [fix](memtracker) Improve memory tracking accuracy for exec 
nodes (#11947)
b1fd701493 is described below

commit b1fd701493919e89936662f84f151b3071cc4578
Author: Xinyi Zou <zouxiny...@gmail.com>
AuthorDate: Mon Aug 22 08:56:05 2022 +0800

    [fix](memtracker) Improve memory tracking accuracy for exec nodes (#11947)
---
 be/src/exec/blocking_join_node.cpp            | 1 +
 be/src/exec/hash_join_node.cpp                | 1 +
 be/src/exec/olap_scan_node.cpp                | 2 ++
 be/src/exec/tablet_sink.cpp                   | 4 +++-
 be/src/runtime/memory/mem_tracker_limiter.cpp | 8 ++++----
 be/src/service/internal_service.cpp           | 3 +++
 be/src/vec/exec/join/vhash_join_node.cpp      | 1 +
 be/src/vec/exec/vblocking_join_node.cpp       | 1 +
 be/src/vec/exec/volap_scan_node.cpp           | 2 ++
 be/src/vec/sink/vtablet_sink.cpp              | 1 +
 10 files changed, 19 insertions(+), 5 deletions(-)

diff --git a/be/src/exec/blocking_join_node.cpp 
b/be/src/exec/blocking_join_node.cpp
index 10419a3499..5e5aee3714 100644
--- a/be/src/exec/blocking_join_node.cpp
+++ b/be/src/exec/blocking_join_node.cpp
@@ -93,6 +93,7 @@ Status BlockingJoinNode::close(RuntimeState* state) {
 
 void BlockingJoinNode::build_side_thread(RuntimeState* state, 
std::promise<Status>* status) {
     SCOPED_ATTACH_TASK(state);
+    SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
     status->set_value(construct_build_side(state));
 }
 
diff --git a/be/src/exec/hash_join_node.cpp b/be/src/exec/hash_join_node.cpp
index cbba3c607c..f5a7fec132 100644
--- a/be/src/exec/hash_join_node.cpp
+++ b/be/src/exec/hash_join_node.cpp
@@ -179,6 +179,7 @@ Status HashJoinNode::close(RuntimeState* state) {
 
 void HashJoinNode::build_side_thread(RuntimeState* state, 
std::promise<Status>* status) {
     SCOPED_ATTACH_TASK(state);
+    SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
     status->set_value(construct_hash_table(state));
 }
 
diff --git a/be/src/exec/olap_scan_node.cpp b/be/src/exec/olap_scan_node.cpp
index 048a922bae..99caa8afb9 100644
--- a/be/src/exec/olap_scan_node.cpp
+++ b/be/src/exec/olap_scan_node.cpp
@@ -1480,6 +1480,7 @@ Status 
OlapScanNode::normalize_bloom_filter_predicate(SlotDescriptor* slot) {
 void OlapScanNode::transfer_thread(RuntimeState* state) {
     // scanner open pushdown to scanThread
     SCOPED_ATTACH_TASK(state);
+    SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
     Status status = Status::OK();
     for (auto scanner : _olap_scanners) {
         status = Expr::clone_if_not_exists(_conjunct_ctxs, state, 
scanner->conjunct_ctxs());
@@ -1663,6 +1664,7 @@ void OlapScanNode::transfer_thread(RuntimeState* state) {
 
 void OlapScanNode::scanner_thread(OlapScanner* scanner) {
     SCOPED_ATTACH_TASK(_runtime_state);
+    SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
     Thread::set_self_name("olap_scanner");
     if (UNLIKELY(_transfer_done)) {
         _scanner_done = true;
diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp
index cdb503551b..a1213d348d 100644
--- a/be/src/exec/tablet_sink.cpp
+++ b/be/src/exec/tablet_sink.cpp
@@ -468,8 +468,9 @@ int NodeChannel::try_send_and_fetch_status(RuntimeState* 
state,
 }
 
 void NodeChannel::try_send_batch(RuntimeState* state) {
-    SCOPED_ATTACH_TASK(state);
     SCOPED_ATOMIC_TIMER(&_actual_consume_ns);
+    SCOPED_ATTACH_TASK(state);
+    SCOPED_CONSUME_MEM_TRACKER(_node_channel_tracker.get());
     AddBatchReq send_batch;
     {
         debug::ScopedTSANIgnoreReadsAndWrites ignore_tsan;
@@ -1321,6 +1322,7 @@ Status OlapTableSink::_validate_data(RuntimeState* state, 
RowBatch* batch, Bitma
 void OlapTableSink::_send_batch_process(RuntimeState* state) {
     SCOPED_TIMER(_non_blocking_send_timer);
     SCOPED_ATTACH_TASK(state);
+    SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
     do {
         int running_channels_num = 0;
         for (auto index_channel : _channels) {
diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp 
b/be/src/runtime/memory/mem_tracker_limiter.cpp
index 50eb011028..84209888ef 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.cpp
+++ b/be/src/runtime/memory/mem_tracker_limiter.cpp
@@ -226,11 +226,11 @@ std::string MemTrackerLimiter::log_usage(int 
max_recursive_depth,
 Status MemTrackerLimiter::mem_limit_exceeded_log(const std::string& msg) {
     DCHECK(_limit != -1);
     std::string detail = fmt::format(
-            "{}, backend={} free memory left={}. If is query, can change the 
limit by `set "
-            "exec_mem_limit=xxx`, details mem usage see be.INFO.",
+            "{}, backend={} memory used={}, free memory left={}. If is query, 
can change the limit "
+            "by `set exec_mem_limit=xxx`, details mem usage see be.INFO.",
             msg, BackendOptions::get_localhost(),
-            
PrettyPrinter::print(ExecEnv::GetInstance()->process_mem_tracker()->spare_capacity(),
-                                 TUnit::BYTES));
+            PrettyPrinter::print(PerfCounters::get_vm_rss(), TUnit::BYTES),
+            PrettyPrinter::print(MemInfo::mem_limit() - 
PerfCounters::get_vm_rss(), TUnit::BYTES));
     Status status = Status::MemoryLimitExceeded(detail);
 
     // only print the tracker log_usage in be log.
diff --git a/be/src/service/internal_service.cpp 
b/be/src/service/internal_service.cpp
index 02a52ce0e5..87f89f48e0 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -263,6 +263,7 @@ void 
PInternalServiceImpl::_tablet_writer_add_block(google::protobuf::RpcControl
         int64_t execution_time_ns = 0;
         {
             SCOPED_RAW_TIMER(&execution_time_ns);
+            SCOPED_SWITCH_BTHREAD_TLS();
 
             auto st = _exec_env->load_channel_mgr()->add_batch(*request, 
response);
             if (!st.ok()) {
@@ -320,6 +321,8 @@ void 
PInternalServiceImpl::_tablet_writer_add_batch(google::protobuf::RpcControl
         int64_t execution_time_ns = 0;
         {
             SCOPED_RAW_TIMER(&execution_time_ns);
+            SCOPED_SWITCH_BTHREAD_TLS();
+
             // TODO(zxy) delete in 1.2 version
             brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
             
attachment_transfer_request_row_batch<PTabletWriterAddBatchRequest>(request, 
cntl);
diff --git a/be/src/vec/exec/join/vhash_join_node.cpp 
b/be/src/vec/exec/join/vhash_join_node.cpp
index d27c0aea06..228e0020e8 100644
--- a/be/src/vec/exec/join/vhash_join_node.cpp
+++ b/be/src/vec/exec/join/vhash_join_node.cpp
@@ -1114,6 +1114,7 @@ Status HashJoinNode::open(RuntimeState* state) {
 void HashJoinNode::_hash_table_build_thread(RuntimeState* state, 
std::promise<Status>* status) {
     START_AND_SCOPE_SPAN(state->get_tracer(), span, 
"HashJoinNode::_hash_table_build_thread");
     SCOPED_ATTACH_TASK(state);
+    SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
     status->set_value(_hash_table_build(state));
 }
 
diff --git a/be/src/vec/exec/vblocking_join_node.cpp 
b/be/src/vec/exec/vblocking_join_node.cpp
index f98005ca9c..c6d548de91 100644
--- a/be/src/vec/exec/vblocking_join_node.cpp
+++ b/be/src/vec/exec/vblocking_join_node.cpp
@@ -75,6 +75,7 @@ Status VBlockingJoinNode::close(RuntimeState* state) {
 
 void VBlockingJoinNode::build_side_thread(RuntimeState* state, 
std::promise<Status>* status) {
     SCOPED_ATTACH_TASK(state);
+    SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
     status->set_value(construct_build_side(state));
     // Release the thread token as soon as possible (before the main thread 
joins
     // on it).  This way, if we had a chain of 10 joins using 1 additional 
thread,
diff --git a/be/src/vec/exec/volap_scan_node.cpp 
b/be/src/vec/exec/volap_scan_node.cpp
index ab2f9eda38..5542eefeb0 100644
--- a/be/src/vec/exec/volap_scan_node.cpp
+++ b/be/src/vec/exec/volap_scan_node.cpp
@@ -274,6 +274,7 @@ void VOlapScanNode::transfer_thread(RuntimeState* state) {
     // scanner open pushdown to scanThread
     START_AND_SCOPE_SPAN(state->get_tracer(), span, 
"VOlapScanNode::transfer_thread");
     SCOPED_ATTACH_TASK(state);
+    SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
     Status status = Status::OK();
 
     if (_vconjunct_ctx_ptr) {
@@ -386,6 +387,7 @@ void VOlapScanNode::transfer_thread(RuntimeState* state) {
 
 void VOlapScanNode::scanner_thread(VOlapScanner* scanner) {
     SCOPED_ATTACH_TASK(_runtime_state);
+    SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
     Thread::set_self_name("volap_scanner");
     int64_t wait_time = scanner->update_wait_worker_timer();
     // Do not use ScopedTimer. There is no guarantee that, the counter
diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp
index 7a0faee0d6..3cbc73348c 100644
--- a/be/src/vec/sink/vtablet_sink.cpp
+++ b/be/src/vec/sink/vtablet_sink.cpp
@@ -235,6 +235,7 @@ int VNodeChannel::try_send_and_fetch_status(RuntimeState* 
state,
 
 void VNodeChannel::try_send_block(RuntimeState* state) {
     SCOPED_ATTACH_TASK(state);
+    SCOPED_CONSUME_MEM_TRACKER(_node_channel_tracker.get());
     SCOPED_ATOMIC_TIMER(&_actual_consume_ns);
     AddBlockReq send_block;
     {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to