yiguolei commented on code in PR #39852:
URL: https://github.com/apache/doris/pull/39852#discussion_r1729634715
##########
be/src/pipeline/exec/exchange_sink_buffer.cpp:
##########
@@ -240,6 +243,9 @@ Status
ExchangeSinkBuffer<Parent>::add_block(BroadcastTransmitInfo<Parent>&& req
template <typename Parent>
Status ExchangeSinkBuffer<Parent>::_send_rpc(InstanceLoId id) {
+ if (_parent) {
+ SCOPED_TIMER(_parent->brpc_send_timer());
Review Comment:
这么写没意义, scoped 就是在这个 {} 有用
##########
be/src/pipeline/exec/exchange_sink_buffer.cpp:
##########
@@ -154,7 +155,8 @@ bool ExchangeSinkBuffer<Parent>::is_pending_finish() {
}
template <typename Parent>
-void ExchangeSinkBuffer<Parent>::register_sink(TUniqueId fragment_instance_id)
{
+void ExchangeSinkBuffer<Parent>::register_sink(TUniqueId fragment_instance_id,
+ vectorized::PipChannel<Parent>*
channel) {
Review Comment:
增加这个channel 变量的目的是?
##########
be/src/pipeline/exec/exchange_sink_buffer.cpp:
##########
@@ -538,6 +555,36 @@ void
ExchangeSinkBuffer<Parent>::update_profile(RuntimeProfile* profile) {
int64_t sum_time = get_sum_rpc_time();
_sum_rpc_timer->set(sum_time);
_avg_rpc_timer->set(sum_time / std::max(static_cast<int64_t>(1),
_rpc_count.load()));
+
+ if constexpr (std::is_same_v<ExchangeSinkLocalState, Parent>) {
+ auto max_count = _state->rpc_verbose_profile_max_instance_count();
+ if (_state->enable_verbose_profile() && max_count > 0) {
+ pdqsort(_instance_to_rpc_stats_vec.begin(),
_instance_to_rpc_stats_vec.end(),
+ [](const auto& a, const auto& b) { return a->max_time >
b->max_time; });
+ auto count = std::min((size_t)max_count,
_instance_to_rpc_stats_vec.size());
+ int i = 0;
+ auto* detail_profile = profile->create_child("RpcInstanceDetails",
true, true);
+ for (const auto& stats : _instance_to_rpc_stats_vec) {
+ if (0 == stats->rpc_count) {
+ continue;
+ }
+ std::stringstream out;
+ out << "Instance " << std::hex << stats->inst_lo_id;
+ auto stats_str = fmt::format(
+ "Count: {}, MaxTime: {}, MinTime: {}, AvgTime: {},
SumTime: {}",
+ stats->rpc_count,
PrettyPrinter::print(stats->max_time, TUnit::TIME_NS),
Review Comment:
CloseTime: avg 11.310us, max 11.310us, min 11.310us
- ExecTime: avg 150.158us, max
150.158us, min 150.158us
- InitTime: avg 72.193us, max 72.193us,
min 72.193us
##########
be/src/pipeline/exec/exchange_sink_buffer.cpp:
##########
@@ -298,7 +304,10 @@ Status ExchangeSinkBuffer<Parent>::_send_rpc(InstanceLoId
id) {
}
// attach task for memory tracker and query id when core
SCOPED_ATTACH_TASK(_state);
- set_rpc_time(id, start_rpc_time, result.receive_time());
Review Comment:
继续保留这个,我们不动以前的逻辑,这次我们只是增加 channel 级别的detail的信息
##########
be/src/pipeline/exec/exchange_sink_buffer.cpp:
##########
@@ -538,6 +555,36 @@ void
ExchangeSinkBuffer<Parent>::update_profile(RuntimeProfile* profile) {
int64_t sum_time = get_sum_rpc_time();
_sum_rpc_timer->set(sum_time);
_avg_rpc_timer->set(sum_time / std::max(static_cast<int64_t>(1),
_rpc_count.load()));
+
+ if constexpr (std::is_same_v<ExchangeSinkLocalState, Parent>) {
+ auto max_count = _state->rpc_verbose_profile_max_instance_count();
+ if (_state->enable_verbose_profile() && max_count > 0) {
+ pdqsort(_instance_to_rpc_stats_vec.begin(),
_instance_to_rpc_stats_vec.end(),
+ [](const auto& a, const auto& b) { return a->max_time >
b->max_time; });
+ auto count = std::min((size_t)max_count,
_instance_to_rpc_stats_vec.size());
+ int i = 0;
+ auto* detail_profile = profile->create_child("RpcInstanceDetails",
true, true);
Review Comment:
这个profile 对象得检查下是否为nullptr
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]