This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new e2e9b9d8a35 [improve](insert-into) record rows info in log for check 
(#29581)
e2e9b9d8a35 is described below

commit e2e9b9d8a3577eadfb664f8bd91ad33cb2b9d3c5
Author: HHoflittlefish777 <[email protected]>
AuthorDate: Fri Jan 5 17:28:07 2024 +0800

    [improve](insert-into) record rows info in log for check (#29581)
---
 be/src/runtime/fragment_mgr.cpp    | 43 ++++++++++++++++++--------------------
 be/src/runtime/load_channel.cpp    | 10 ++++++++-
 be/src/runtime/load_channel.h      |  2 ++
 be/src/runtime/tablets_channel.cpp |  2 ++
 be/src/runtime/tablets_channel.h   |  8 +++++++
 5 files changed, 41 insertions(+), 24 deletions(-)

diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 0d32c16186a..8cf11047b62 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -294,29 +294,22 @@ void FragmentMgr::coordinator_callback(const 
ReportStatusRequest& req) {
                 params.__isset.delta_urls = true;
             }
         }
+
+        // load rows
+        static std::string s_dpp_normal_all = "dpp.norm.ALL";
+        static std::string s_dpp_abnormal_all = "dpp.abnorm.ALL";
+        static std::string s_unselected_rows = "unselected.rows";
+        int64_t num_rows_load_success = 0;
+        int64_t num_rows_load_filtered = 0;
+        int64_t num_rows_load_unselected = 0;
         if (req.runtime_state->num_rows_load_total() > 0 ||
             req.runtime_state->num_rows_load_filtered() > 0) {
             params.__isset.load_counters = true;
 
-            static std::string s_dpp_normal_all = "dpp.norm.ALL";
-            static std::string s_dpp_abnormal_all = "dpp.abnorm.ALL";
-            static std::string s_unselected_rows = "unselected.rows";
-
-            params.load_counters.emplace(
-                    s_dpp_normal_all, 
std::to_string(req.runtime_state->num_rows_load_success()));
-            params.load_counters.emplace(
-                    s_dpp_abnormal_all,
-                    
std::to_string(req.runtime_state->num_rows_load_filtered()));
-            params.load_counters.emplace(
-                    s_unselected_rows,
-                    
std::to_string(req.runtime_state->num_rows_load_unselected()));
+            num_rows_load_success = req.runtime_state->num_rows_load_success();
+            num_rows_load_filtered = 
req.runtime_state->num_rows_load_filtered();
+            num_rows_load_unselected = 
req.runtime_state->num_rows_load_unselected();
         } else if (!req.runtime_states.empty()) {
-            static std::string s_dpp_normal_all = "dpp.norm.ALL";
-            static std::string s_dpp_abnormal_all = "dpp.abnorm.ALL";
-            static std::string s_unselected_rows = "unselected.rows";
-            int64_t num_rows_load_success = 0;
-            int64_t num_rows_load_filtered = 0;
-            int64_t num_rows_load_unselected = 0;
             for (auto* rs : req.runtime_states) {
                 if (rs->num_rows_load_total() > 0 || 
rs->num_rows_load_filtered() > 0) {
                     params.__isset.load_counters = true;
@@ -325,12 +318,16 @@ void FragmentMgr::coordinator_callback(const 
ReportStatusRequest& req) {
                     num_rows_load_unselected += rs->num_rows_load_unselected();
                 }
             }
-            params.load_counters.emplace(s_dpp_normal_all, 
std::to_string(num_rows_load_success));
-            params.load_counters.emplace(s_dpp_abnormal_all,
-                                         
std::to_string(num_rows_load_filtered));
-            params.load_counters.emplace(s_unselected_rows,
-                                         
std::to_string(num_rows_load_unselected));
         }
+        params.load_counters.emplace(s_dpp_normal_all, 
std::to_string(num_rows_load_success));
+        params.load_counters.emplace(s_dpp_abnormal_all, 
std::to_string(num_rows_load_filtered));
+        params.load_counters.emplace(s_unselected_rows, 
std::to_string(num_rows_load_unselected));
+        LOG(INFO) << "execute coordinator callback, query id: " << 
print_id(req.query_id)
+                  << ", instance id: " << print_id(req.fragment_instance_id)
+                  << ", num_rows_load_success: " << num_rows_load_success
+                  << ", num_rows_load_filtered: " << num_rows_load_filtered
+                  << ", num_rows_load_unselected: " << 
num_rows_load_unselected;
+
         if (!req.runtime_state->get_error_log_file_path().empty()) {
             params.__set_tracking_url(
                     
to_load_error_http_path(req.runtime_state->get_error_log_file_path()));
diff --git a/be/src/runtime/load_channel.cpp b/be/src/runtime/load_channel.cpp
index 0dc0ac344b3..5969461f35e 100644
--- a/be/src/runtime/load_channel.cpp
+++ b/be/src/runtime/load_channel.cpp
@@ -47,9 +47,14 @@ LoadChannel::LoadChannel(const UniqueId& load_id, int64_t 
timeout_s, bool is_hig
 
 LoadChannel::~LoadChannel() {
     g_loadchannel_cnt << -1;
+    std::stringstream rows_str;
+    for (const auto& entry : _tablets_channels_rows) {
+        rows_str << ", index id: " << entry.first << ", total_received_rows: " 
<< entry.second.first
+                 << ", num_rows_filtered: " << entry.second.second;
+    }
     LOG(INFO) << "load channel removed"
               << " load_id=" << _load_id << ", is high priority=" << 
_is_high_priority
-              << ", sender_ip=" << _sender_ip;
+              << ", sender_ip=" << _sender_ip << rows_str.str();
 }
 
 void LoadChannel::_init_profile() {
@@ -165,6 +170,9 @@ Status LoadChannel::_handle_eos(BaseTabletsChannel* channel,
         std::lock_guard<std::mutex> l(_lock);
         {
             std::lock_guard<SpinLock> l(_tablets_channels_lock);
+            _tablets_channels_rows.insert(std::make_pair(
+                    index_id,
+                    std::make_pair(channel->total_received_rows(), 
channel->num_rows_filtered())));
             _tablets_channels.erase(index_id);
         }
         _finished_channel_ids.emplace(index_id);
diff --git a/be/src/runtime/load_channel.h b/be/src/runtime/load_channel.h
index bdeedbd9eae..fc19e942153 100644
--- a/be/src/runtime/load_channel.h
+++ b/be/src/runtime/load_channel.h
@@ -107,6 +107,8 @@ private:
     std::mutex _lock;
     // index id -> tablets channel
     std::unordered_map<int64_t, std::shared_ptr<BaseTabletsChannel>> 
_tablets_channels;
+    // index id -> (received rows, filtered rows)
+    std::unordered_map<int64_t, std::pair<size_t, size_t>> 
_tablets_channels_rows;
     SpinLock _tablets_channels_lock;
     // This is to save finished channels id, to handle the retry request.
     std::unordered_set<int64_t> _finished_channel_ids;
diff --git a/be/src/runtime/tablets_channel.cpp 
b/be/src/runtime/tablets_channel.cpp
index 2cafe308271..7ead68d916f 100644
--- a/be/src/runtime/tablets_channel.cpp
+++ b/be/src/runtime/tablets_channel.cpp
@@ -361,6 +361,8 @@ void TabletsChannel::_commit_txn(DeltaWriter* writer, const 
PTabletWriterAddBloc
         tablet_info->set_schema_hash(0);
         tablet_info->set_received_rows(writer->total_received_rows());
         tablet_info->set_num_rows_filtered(writer->num_rows_filtered());
+        _total_received_rows += writer->total_received_rows();
+        _num_rows_filtered += writer->num_rows_filtered();
     } else {
         _add_error_tablet(res->mutable_tablet_errors(), writer->tablet_id(), 
st);
     }
diff --git a/be/src/runtime/tablets_channel.h b/be/src/runtime/tablets_channel.h
index 75a0b7679ef..15f68ba8e38 100644
--- a/be/src/runtime/tablets_channel.h
+++ b/be/src/runtime/tablets_channel.h
@@ -109,6 +109,10 @@ public:
 
     void refresh_profile();
 
+    size_t total_received_rows() const { return _total_received_rows; }
+
+    size_t num_rows_filtered() const { return _num_rows_filtered; }
+
 protected:
     Status _get_current_seq(int64_t& cur_seq, const 
PTabletWriterAddBlockRequest& request);
 
@@ -186,6 +190,10 @@ protected:
     RuntimeProfile::Counter* _add_batch_timer = nullptr;
     RuntimeProfile::Counter* _write_block_timer = nullptr;
     RuntimeProfile::Counter* _incremental_open_timer = nullptr;
+
+    // record rows received and filtered
+    size_t _total_received_rows = 0;
+    size_t _num_rows_filtered = 0;
 };
 
 class DeltaWriter;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to