This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new e2e9b9d8a35 [improve](insert-into) record rows info in log for check
(#29581)
e2e9b9d8a35 is described below
commit e2e9b9d8a3577eadfb664f8bd91ad33cb2b9d3c5
Author: HHoflittlefish777 <[email protected]>
AuthorDate: Fri Jan 5 17:28:07 2024 +0800
[improve](insert-into) record rows info in log for check (#29581)
---
be/src/runtime/fragment_mgr.cpp | 43 ++++++++++++++++++--------------------
be/src/runtime/load_channel.cpp | 10 ++++++++-
be/src/runtime/load_channel.h | 2 ++
be/src/runtime/tablets_channel.cpp | 2 ++
be/src/runtime/tablets_channel.h | 8 +++++++
5 files changed, 41 insertions(+), 24 deletions(-)
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 0d32c16186a..8cf11047b62 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -294,29 +294,22 @@ void FragmentMgr::coordinator_callback(const
ReportStatusRequest& req) {
params.__isset.delta_urls = true;
}
}
+
+ // load rows
+ static std::string s_dpp_normal_all = "dpp.norm.ALL";
+ static std::string s_dpp_abnormal_all = "dpp.abnorm.ALL";
+ static std::string s_unselected_rows = "unselected.rows";
+ int64_t num_rows_load_success = 0;
+ int64_t num_rows_load_filtered = 0;
+ int64_t num_rows_load_unselected = 0;
if (req.runtime_state->num_rows_load_total() > 0 ||
req.runtime_state->num_rows_load_filtered() > 0) {
params.__isset.load_counters = true;
- static std::string s_dpp_normal_all = "dpp.norm.ALL";
- static std::string s_dpp_abnormal_all = "dpp.abnorm.ALL";
- static std::string s_unselected_rows = "unselected.rows";
-
- params.load_counters.emplace(
- s_dpp_normal_all,
std::to_string(req.runtime_state->num_rows_load_success()));
- params.load_counters.emplace(
- s_dpp_abnormal_all,
-
std::to_string(req.runtime_state->num_rows_load_filtered()));
- params.load_counters.emplace(
- s_unselected_rows,
-
std::to_string(req.runtime_state->num_rows_load_unselected()));
+ num_rows_load_success = req.runtime_state->num_rows_load_success();
+ num_rows_load_filtered =
req.runtime_state->num_rows_load_filtered();
+ num_rows_load_unselected =
req.runtime_state->num_rows_load_unselected();
} else if (!req.runtime_states.empty()) {
- static std::string s_dpp_normal_all = "dpp.norm.ALL";
- static std::string s_dpp_abnormal_all = "dpp.abnorm.ALL";
- static std::string s_unselected_rows = "unselected.rows";
- int64_t num_rows_load_success = 0;
- int64_t num_rows_load_filtered = 0;
- int64_t num_rows_load_unselected = 0;
for (auto* rs : req.runtime_states) {
if (rs->num_rows_load_total() > 0 ||
rs->num_rows_load_filtered() > 0) {
params.__isset.load_counters = true;
@@ -325,12 +318,16 @@ void FragmentMgr::coordinator_callback(const
ReportStatusRequest& req) {
num_rows_load_unselected += rs->num_rows_load_unselected();
}
}
- params.load_counters.emplace(s_dpp_normal_all,
std::to_string(num_rows_load_success));
- params.load_counters.emplace(s_dpp_abnormal_all,
-
std::to_string(num_rows_load_filtered));
- params.load_counters.emplace(s_unselected_rows,
-
std::to_string(num_rows_load_unselected));
}
+ params.load_counters.emplace(s_dpp_normal_all,
std::to_string(num_rows_load_success));
+ params.load_counters.emplace(s_dpp_abnormal_all,
std::to_string(num_rows_load_filtered));
+ params.load_counters.emplace(s_unselected_rows,
std::to_string(num_rows_load_unselected));
+ LOG(INFO) << "execute coordinator callback, query id: " <<
print_id(req.query_id)
+ << ", instance id: " << print_id(req.fragment_instance_id)
+ << ", num_rows_load_success: " << num_rows_load_success
+ << ", num_rows_load_filtered: " << num_rows_load_filtered
+ << ", num_rows_load_unselected: " <<
num_rows_load_unselected;
+
if (!req.runtime_state->get_error_log_file_path().empty()) {
params.__set_tracking_url(
to_load_error_http_path(req.runtime_state->get_error_log_file_path()));
diff --git a/be/src/runtime/load_channel.cpp b/be/src/runtime/load_channel.cpp
index 0dc0ac344b3..5969461f35e 100644
--- a/be/src/runtime/load_channel.cpp
+++ b/be/src/runtime/load_channel.cpp
@@ -47,9 +47,14 @@ LoadChannel::LoadChannel(const UniqueId& load_id, int64_t
timeout_s, bool is_hig
LoadChannel::~LoadChannel() {
g_loadchannel_cnt << -1;
+ std::stringstream rows_str;
+ for (const auto& entry : _tablets_channels_rows) {
+ rows_str << ", index id: " << entry.first << ", total_received_rows: "
<< entry.second.first
+ << ", num_rows_filtered: " << entry.second.second;
+ }
LOG(INFO) << "load channel removed"
<< " load_id=" << _load_id << ", is high priority=" <<
_is_high_priority
- << ", sender_ip=" << _sender_ip;
+ << ", sender_ip=" << _sender_ip << rows_str.str();
}
void LoadChannel::_init_profile() {
@@ -165,6 +170,9 @@ Status LoadChannel::_handle_eos(BaseTabletsChannel* channel,
std::lock_guard<std::mutex> l(_lock);
{
std::lock_guard<SpinLock> l(_tablets_channels_lock);
+ _tablets_channels_rows.insert(std::make_pair(
+ index_id,
+ std::make_pair(channel->total_received_rows(),
channel->num_rows_filtered())));
_tablets_channels.erase(index_id);
}
_finished_channel_ids.emplace(index_id);
diff --git a/be/src/runtime/load_channel.h b/be/src/runtime/load_channel.h
index bdeedbd9eae..fc19e942153 100644
--- a/be/src/runtime/load_channel.h
+++ b/be/src/runtime/load_channel.h
@@ -107,6 +107,8 @@ private:
std::mutex _lock;
// index id -> tablets channel
std::unordered_map<int64_t, std::shared_ptr<BaseTabletsChannel>>
_tablets_channels;
+ // index id -> (received rows, filtered rows)
+ std::unordered_map<int64_t, std::pair<size_t, size_t>>
_tablets_channels_rows;
SpinLock _tablets_channels_lock;
// This is to save finished channels id, to handle the retry request.
std::unordered_set<int64_t> _finished_channel_ids;
diff --git a/be/src/runtime/tablets_channel.cpp
b/be/src/runtime/tablets_channel.cpp
index 2cafe308271..7ead68d916f 100644
--- a/be/src/runtime/tablets_channel.cpp
+++ b/be/src/runtime/tablets_channel.cpp
@@ -361,6 +361,8 @@ void TabletsChannel::_commit_txn(DeltaWriter* writer, const
PTabletWriterAddBloc
tablet_info->set_schema_hash(0);
tablet_info->set_received_rows(writer->total_received_rows());
tablet_info->set_num_rows_filtered(writer->num_rows_filtered());
+ _total_received_rows += writer->total_received_rows();
+ _num_rows_filtered += writer->num_rows_filtered();
} else {
_add_error_tablet(res->mutable_tablet_errors(), writer->tablet_id(),
st);
}
diff --git a/be/src/runtime/tablets_channel.h b/be/src/runtime/tablets_channel.h
index 75a0b7679ef..15f68ba8e38 100644
--- a/be/src/runtime/tablets_channel.h
+++ b/be/src/runtime/tablets_channel.h
@@ -109,6 +109,10 @@ public:
void refresh_profile();
+ size_t total_received_rows() const { return _total_received_rows; }
+
+ size_t num_rows_filtered() const { return _num_rows_filtered; }
+
protected:
Status _get_current_seq(int64_t& cur_seq, const
PTabletWriterAddBlockRequest& request);
@@ -186,6 +190,10 @@ protected:
RuntimeProfile::Counter* _add_batch_timer = nullptr;
RuntimeProfile::Counter* _write_block_timer = nullptr;
RuntimeProfile::Counter* _incremental_open_timer = nullptr;
+
+ // record rows received and filtered
+ size_t _total_received_rows = 0;
+ size_t _num_rows_filtered = 0;
};
class DeltaWriter;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]