This is an automated email from the ASF dual-hosted git repository. kxiao pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push: new d9f502693e7 [feature](AuditLog) add scanRows scanBytes in auditlog # 25435 (#26268) d9f502693e7 is described below commit d9f502693e7c0caa4c888088cfa14927893ec119 Author: Mryange <59914473+mrya...@users.noreply.github.com> AuthorDate: Thu Nov 2 14:19:10 2023 +0800 [feature](AuditLog) add scanRows scanBytes in auditlog # 25435 (#26268) --- be/src/exec/exec_node.cpp | 8 +++++++ be/src/exec/exec_node.h | 2 ++ be/src/pipeline/exec/exchange_sink_buffer.cpp | 8 +++++++ be/src/pipeline/exec/exchange_sink_buffer.h | 5 ++++ be/src/pipeline/exec/exchange_sink_operator.cpp | 1 + be/src/pipeline/exec/operator.h | 21 ++++++++++++++++ be/src/pipeline/pipeline.h | 11 +++++++++ be/src/pipeline/pipeline_fragment_context.cpp | 2 ++ be/src/pipeline/pipeline_task.cpp | 27 +++++++++++++++++++++ be/src/pipeline/pipeline_task.h | 4 ++++ be/src/runtime/query_statistics.cpp | 32 +++++++++++++------------ be/src/runtime/query_statistics.h | 20 ++++++++++++---- be/src/vec/exec/scan/new_olap_scan_node.cpp | 9 +++++-- be/src/vec/exec/scan/new_olap_scan_node.h | 1 + be/src/vec/exec/scan/new_olap_scanner.cpp | 3 ++- be/src/vec/exec/scan/vscan_node.cpp | 3 ++- be/src/vec/exec/scan/vscan_node.h | 1 + be/src/vec/exec/scan/vscanner.cpp | 9 +++++-- be/src/vec/exec/scan/vscanner.h | 2 ++ be/src/vec/exec/vexchange_node.cpp | 6 ++++- be/src/vec/exec/vexchange_node.h | 1 + be/src/vec/runtime/vdata_stream_recvr.cpp | 6 +++++ be/src/vec/runtime/vdata_stream_recvr.h | 3 +++ be/src/vec/sink/vdata_stream_sender.cpp | 6 +++-- be/src/vec/sink/vdata_stream_sender.h | 3 +++ 25 files changed, 166 insertions(+), 28 deletions(-) diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp index 56902e38239..0dc8df911b2 100644 --- a/be/src/exec/exec_node.cpp +++ b/be/src/exec/exec_node.cpp @@ -180,6 +180,14 @@ Status ExecNode::collect_query_statistics(QueryStatistics* statistics) { return Status::OK(); } +Status ExecNode::collect_query_statistics(QueryStatistics* statistics, int sender_id) { + DCHECK(statistics != nullptr); + for (auto child_node : _children) { + RETURN_IF_ERROR(child_node->collect_query_statistics(statistics, sender_id)); + } + return Status::OK(); +} + void ExecNode::release_resource(doris::RuntimeState* state) { if (!_is_resource_released) { if (_rows_returned_counter != nullptr) { diff --git a/be/src/exec/exec_node.h b/be/src/exec/exec_node.h index ae7b40dc20e..d92f884204b 100644 --- a/be/src/exec/exec_node.h +++ b/be/src/exec/exec_node.h @@ -162,6 +162,8 @@ public: // error. [[nodiscard]] virtual Status collect_query_statistics(QueryStatistics* statistics); + [[nodiscard]] virtual Status collect_query_statistics(QueryStatistics* statistics, + int sender_id); // close() will get called for every exec node, regardless of what else is called and // the status of these calls (i.e. prepare() may never have been called, or // prepare()/open()/get_next() returned with an error). diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp b/be/src/pipeline/exec/exchange_sink_buffer.cpp index 696e0643880..68d899e0c48 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.cpp +++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp @@ -190,6 +190,10 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) { auto& brpc_request = _instance_to_request[id]; brpc_request->set_eos(request.eos); brpc_request->set_packet_seq(_instance_to_seq[id]++); + if (_statistics && _statistics->collected()) { + auto statistic = brpc_request->mutable_query_statistics(); + _statistics->to_pb(statistic); + } if (request.block) { brpc_request->set_allocated_block(request.block.get()); } @@ -244,6 +248,10 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) { if (request.block_holder->get_block()) { brpc_request->set_allocated_block(request.block_holder->get_block()); } + if (_statistics && _statistics->collected()) { + auto statistic = brpc_request->mutable_query_statistics(); + _statistics->to_pb(statistic); + } auto* closure = request.channel->get_closure(id, request.eos, request.block_holder); ExchangeRpcContext rpc_ctx; diff --git a/be/src/pipeline/exec/exchange_sink_buffer.h b/be/src/pipeline/exec/exchange_sink_buffer.h index c4636563108..7e30620cae8 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.h +++ b/be/src/pipeline/exec/exchange_sink_buffer.h @@ -33,6 +33,7 @@ #include "common/global_types.h" #include "common/status.h" +#include "runtime/query_statistics.h" #include "runtime/runtime_state.h" #include "service/backend_options.h" @@ -173,6 +174,8 @@ public: void set_rpc_time(InstanceLoId id, int64_t start_rpc_time, int64_t receive_rpc_time); void update_profile(RuntimeProfile* profile); + void set_query_statistics(QueryStatistics* statistics) { _statistics = statistics; } + private: phmap::flat_hash_map<InstanceLoId, std::unique_ptr<std::mutex>> _instance_to_package_queue_mutex; @@ -211,6 +214,8 @@ private: inline bool _is_receiver_eof(InstanceLoId id); void get_max_min_rpc_time(int64_t* max_time, int64_t* min_time); int64_t get_sum_rpc_time(); + + QueryStatistics* _statistics = nullptr; }; } // namespace pipeline diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp b/be/src/pipeline/exec/exchange_sink_operator.cpp index dd0a35ce959..79517e8b041 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.cpp +++ b/be/src/pipeline/exec/exchange_sink_operator.cpp @@ -67,6 +67,7 @@ Status ExchangeSinkOperator::prepare(RuntimeState* state) { _sink_buffer = std::make_unique<ExchangeSinkBuffer>(id, _dest_node_id, _sink->_sender_id, _state->be_number(), _context); + _sink_buffer->set_query_statistics(_sink->query_statistics()); RETURN_IF_ERROR(DataSinkOperator::prepare(state)); _sink->registe_channels(_sink_buffer.get()); return Status::OK(); diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h index d98b4f16bd4..4a68a95e366 100644 --- a/be/src/pipeline/exec/operator.h +++ b/be/src/pipeline/exec/operator.h @@ -163,6 +163,14 @@ public: bool is_source() const; + virtual Status collect_query_statistics(QueryStatistics* statistics) { return Status::OK(); }; + + virtual Status collect_query_statistics(QueryStatistics* statistics, int sender_id) { + return Status::OK(); + }; + + virtual void set_query_statistics(std::shared_ptr<QueryStatistics>) {}; + virtual Status init(const TDataSink& tsink) { return Status::OK(); } // Prepare for running. (e.g. resource allocation, etc.) @@ -299,6 +307,9 @@ public: Status finalize(RuntimeState* state) override { return Status::OK(); } [[nodiscard]] RuntimeProfile* get_runtime_profile() const override { return _sink->profile(); } + void set_query_statistics(std::shared_ptr<QueryStatistics> statistics) override { + _sink->set_query_statistics(statistics); + } protected: NodeType* _sink; @@ -369,6 +380,16 @@ public: return _node->runtime_profile(); } + Status collect_query_statistics(QueryStatistics* statistics) override { + RETURN_IF_ERROR(_node->collect_query_statistics(statistics)); + return Status::OK(); + } + + Status collect_query_statistics(QueryStatistics* statistics, int sender_id) override { + RETURN_IF_ERROR(_node->collect_query_statistics(statistics, sender_id)); + return Status::OK(); + } + protected: NodeType* _node; bool _use_projection; diff --git a/be/src/pipeline/pipeline.h b/be/src/pipeline/pipeline.h index 759e4fce2c9..056c331dd0c 100644 --- a/be/src/pipeline/pipeline.h +++ b/be/src/pipeline/pipeline.h @@ -102,6 +102,15 @@ public: RuntimeProfile* pipeline_profile() { return _pipeline_profile.get(); } + void set_is_root_pipeline() { _is_root_pipeline = true; } + bool is_root_pipeline() const { return _is_root_pipeline; } + void set_collect_query_statistics_with_every_batch() { + _collect_query_statistics_with_every_batch = true; + } + [[nodiscard]] bool collect_query_statistics_with_every_batch() const { + return _collect_query_statistics_with_every_batch; + } + private: void _init_profile(); @@ -145,6 +154,8 @@ private: */ bool _always_can_read = false; bool _always_can_write = false; + bool _is_root_pipeline = false; + bool _collect_query_statistics_with_every_batch = false; }; } // namespace doris::pipeline diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 87578fbc665..0ce125f1891 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -318,6 +318,8 @@ Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams& re } _root_pipeline = fragment_context->add_pipeline(); + _root_pipeline->set_is_root_pipeline(); + _root_pipeline->set_collect_query_statistics_with_every_batch(); RETURN_IF_ERROR(_build_pipelines(_root_plan, _root_pipeline)); if (_sink) { RETURN_IF_ERROR(_create_sink(request.local_params[idx].sender_id, diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp index 2b428ac5f14..8adc2fd0783 100644 --- a/be/src/pipeline/pipeline_task.cpp +++ b/be/src/pipeline/pipeline_task.cpp @@ -29,6 +29,7 @@ #include "pipeline_fragment_context.h" #include "runtime/descriptors.h" #include "runtime/query_context.h" +#include "runtime/query_statistics.h" #include "runtime/thread_context.h" #include "task_queue.h" #include "util/defer_op.h" @@ -60,6 +61,10 @@ PipelineTask::PipelineTask(PipelinePtr& pipeline, uint32_t index, RuntimeState* _fragment_context(fragment_context), _parent_profile(parent_profile) { _pipeline_task_watcher.start(); + _query_statistics.reset(new QueryStatistics()); + _sink->set_query_statistics(_query_statistics); + _collect_query_statistics_with_every_batch = + _pipeline->collect_query_statistics_with_every_batch(); } void PipelineTask::_fresh_profile_counter() { @@ -256,6 +261,10 @@ Status PipelineTask::execute(bool* eos) { *eos = _data_state == SourceState::FINISHED; if (_block->rows() != 0 || *eos) { SCOPED_TIMER(_sink_timer); + if (_data_state == SourceState::FINISHED || + _collect_query_statistics_with_every_batch) { + RETURN_IF_ERROR(_collect_query_statistics()); + } auto status = _sink->sink(_state, block, _data_state); if (!status.is<ErrorCode::END_OF_FILE>()) { RETURN_IF_ERROR(status); @@ -282,6 +291,24 @@ Status PipelineTask::finalize() { return _sink->finalize(_state); } +Status PipelineTask::_collect_query_statistics() { + // The execnode tree of a fragment will be split into multiple pipelines, we only need to collect the root pipeline. + if (_pipeline->is_root_pipeline()) { + // If the current fragment has only one instance, we can collect all of them; + // otherwise, we need to collect them based on the sender_id. + if (_state->num_per_fragment_instances() == 1) { + _query_statistics->clear(); + RETURN_IF_ERROR(_root->collect_query_statistics(_query_statistics.get())); + } else { + _query_statistics->clear(); + RETURN_IF_ERROR(_root->collect_query_statistics(_query_statistics.get(), + _state->per_fragment_instance_idx())); + } + } + return Status::OK(); +} + + Status PipelineTask::try_close() { if (_try_close_flag) { return Status::OK(); diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h index 696b335f0e1..9fdf3c82e05 100644 --- a/be/src/pipeline/pipeline_task.h +++ b/be/src/pipeline/pipeline_task.h @@ -341,5 +341,9 @@ private: int64_t _close_pipeline_time = 0; RuntimeProfile::Counter* _pip_task_total_timer; + + std::shared_ptr<QueryStatistics> _query_statistics; + Status _collect_query_statistics(); + bool _collect_query_statistics_with_every_batch = false; }; } // namespace doris::pipeline diff --git a/be/src/runtime/query_statistics.cpp b/be/src/runtime/query_statistics.cpp index a6754215ace..22c18faa1e6 100644 --- a/be/src/runtime/query_statistics.cpp +++ b/be/src/runtime/query_statistics.cpp @@ -20,6 +20,8 @@ #include <gen_cpp/data.pb.h> #include <glog/logging.h> +#include <memory> + namespace doris { void NodeStatistics::merge(const NodeStatistics& other) { @@ -85,6 +87,13 @@ void QueryStatistics::merge(QueryStatisticsRecvr* recvr) { recvr->merge(this); } +void QueryStatistics::merge(QueryStatisticsRecvr* recvr, int sender_id) { + auto it = recvr->_query_statistics.find(sender_id); + if (it != recvr->_query_statistics.end()) { + merge(*it->second); + } +} + void QueryStatistics::clearNodeStatistics() { for (auto& pair : _nodes_statistics_map) { delete pair.second; @@ -98,24 +107,17 @@ QueryStatistics::~QueryStatistics() { void QueryStatisticsRecvr::insert(const PQueryStatistics& statistics, int sender_id) { std::lock_guard<SpinLock> l(_lock); - QueryStatistics* query_statistics = nullptr; - auto iter = _query_statistics.find(sender_id); - if (iter == _query_statistics.end()) { - query_statistics = new QueryStatistics; - _query_statistics[sender_id] = query_statistics; - } else { - query_statistics = iter->second; + if (!_query_statistics.contains(sender_id)) { + _query_statistics[sender_id] = std::make_shared<QueryStatistics>(); } - query_statistics->from_pb(statistics); + _query_statistics[sender_id]->from_pb(statistics); } -QueryStatisticsRecvr::~QueryStatisticsRecvr() { - // It is unnecessary to lock here, because the destructor will be - // called alter DataStreamRecvr's close in ExchangeNode. - for (auto& pair : _query_statistics) { - delete pair.second; - } - _query_statistics.clear(); +void QueryStatisticsRecvr::insert(QueryStatisticsPtr statistics, int sender_id) { + if (!statistics->collected()) return; + if (_query_statistics.contains(sender_id)) return; + std::lock_guard<SpinLock> l(_lock); + _query_statistics[sender_id] = statistics; } } // namespace doris diff --git a/be/src/runtime/query_statistics.h b/be/src/runtime/query_statistics.h index c4ace8f23e2..42c1457472f 100644 --- a/be/src/runtime/query_statistics.h +++ b/be/src/runtime/query_statistics.h @@ -20,6 +20,7 @@ #include <stdint.h> #include <map> +#include <memory> #include <mutex> #include <unordered_map> #include <utility> @@ -88,6 +89,7 @@ public: void merge(QueryStatisticsRecvr* recvr); + void merge(QueryStatisticsRecvr* recvr, int sender_id); // Get the maximum value from the peak memory collected by all node statistics int64_t calculate_max_peak_memory_bytes(); @@ -100,13 +102,18 @@ public: returned_rows = 0; max_peak_memory_bytes = 0; clearNodeStatistics(); + //clear() is used before collection, so calling "clear" is equivalent to being collected. + set_collected(); } void to_pb(PQueryStatistics* statistics); void from_pb(const PQueryStatistics& statistics); + bool collected() const { return _collected; } + void set_collected() { _collected = true; } private: + friend class QueryStatisticsRecvr; int64_t scan_rows; int64_t scan_bytes; int64_t cpu_ms; @@ -117,17 +124,22 @@ private: // only set once by result sink when closing. int64_t max_peak_memory_bytes; // The statistics of the query on each backend. - typedef std::unordered_map<int64_t, NodeStatistics*> NodeStatisticsMap; + using NodeStatisticsMap = std::unordered_map<int64_t, NodeStatistics*>; NodeStatisticsMap _nodes_statistics_map; + bool _collected = false; }; - +using QueryStatisticsPtr = std::shared_ptr<QueryStatistics>; // It is used for collecting sub plan query statistics in DataStreamRecvr. class QueryStatisticsRecvr { public: - ~QueryStatisticsRecvr(); + ~QueryStatisticsRecvr() = default; + // Transmitted via RPC, incurring serialization overhead. void insert(const PQueryStatistics& statistics, int sender_id); + // using local_exchange for transmission, only need to hold a shared pointer. + void insert(QueryStatisticsPtr statistics, int sender_id); + private: friend class QueryStatistics; @@ -138,7 +150,7 @@ private: } } - std::map<int, QueryStatistics*> _query_statistics; + std::map<int, QueryStatisticsPtr> _query_statistics; SpinLock _lock; }; diff --git a/be/src/vec/exec/scan/new_olap_scan_node.cpp b/be/src/vec/exec/scan/new_olap_scan_node.cpp index 5aa179d3006..28035ce291d 100644 --- a/be/src/vec/exec/scan/new_olap_scan_node.cpp +++ b/be/src/vec/exec/scan/new_olap_scan_node.cpp @@ -82,13 +82,18 @@ NewOlapScanNode::NewOlapScanNode(ObjectPool* pool, const TPlanNode& tnode, Status NewOlapScanNode::collect_query_statistics(QueryStatistics* statistics) { RETURN_IF_ERROR(ExecNode::collect_query_statistics(statistics)); if (!_is_pipeline_scan || _should_create_scanner) { - statistics->add_scan_bytes(_read_compressed_counter->value()); - statistics->add_scan_rows(_raw_rows_counter->value()); + statistics->add_scan_bytes(_byte_read_counter->value()); + statistics->add_scan_rows(_rows_read_counter->value()); statistics->add_cpu_ms(_scan_cpu_timer->value() / NANOS_PER_MILLIS); } return Status::OK(); } +Status NewOlapScanNode::collect_query_statistics(QueryStatistics* statistics, int) { + RETURN_IF_ERROR(collect_query_statistics(statistics)); + return Status::OK(); +} + Status NewOlapScanNode::prepare(RuntimeState* state) { RETURN_IF_ERROR(VScanNode::prepare(state)); // if you want to add some profile in scan node, even it have not new VScanner object diff --git a/be/src/vec/exec/scan/new_olap_scan_node.h b/be/src/vec/exec/scan/new_olap_scan_node.h index 0725c37cf5e..cbbf3072872 100644 --- a/be/src/vec/exec/scan/new_olap_scan_node.h +++ b/be/src/vec/exec/scan/new_olap_scan_node.h @@ -64,6 +64,7 @@ public: Status prepare(RuntimeState* state) override; Status collect_query_statistics(QueryStatistics* statistics) override; + Status collect_query_statistics(QueryStatistics* statistics, int sender_id) override; void set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) override; diff --git a/be/src/vec/exec/scan/new_olap_scanner.cpp b/be/src/vec/exec/scan/new_olap_scanner.cpp index ea4187e556a..03c14ddafe0 100644 --- a/be/src/vec/exec/scan/new_olap_scanner.cpp +++ b/be/src/vec/exec/scan/new_olap_scanner.cpp @@ -501,7 +501,8 @@ void NewOlapScanner::_update_realtime_counters() { } void NewOlapScanner::_update_counters_before_close() { - if (!_state->enable_profile() || _has_updated_counter) { + // Please don't directly enable the profile here, we need to set QueryStatistics using the counter inside. + if (_has_updated_counter) { return; } _has_updated_counter = true; diff --git a/be/src/vec/exec/scan/vscan_node.cpp b/be/src/vec/exec/scan/vscan_node.cpp index e0c954042d1..c79faf6d6fa 100644 --- a/be/src/vec/exec/scan/vscan_node.cpp +++ b/be/src/vec/exec/scan/vscan_node.cpp @@ -274,7 +274,8 @@ Status VScanNode::get_next(RuntimeState* state, vectorized::Block* block, bool* Status VScanNode::_init_profile() { // 1. counters for scan node - _rows_read_counter = ADD_COUNTER(_runtime_profile, "RowsRead", TUnit::UNIT); + _rows_read_counter = ADD_COUNTER(_runtime_profile, "ScanRowsRead", TUnit::UNIT); + _byte_read_counter = ADD_COUNTER(_runtime_profile, "ScanByteRead", TUnit::BYTES); _total_throughput_counter = runtime_profile()->add_rate_counter("TotalReadThroughput", _rows_read_counter); _num_scanners = ADD_COUNTER(_runtime_profile, "NumScanners", TUnit::UNIT); diff --git a/be/src/vec/exec/scan/vscan_node.h b/be/src/vec/exec/scan/vscan_node.h index 7600189a251..27c0667e8c6 100644 --- a/be/src/vec/exec/scan/vscan_node.h +++ b/be/src/vec/exec/scan/vscan_node.h @@ -317,6 +317,7 @@ protected: // rows read from the scanner (including those discarded by (pre)filters) RuntimeProfile::Counter* _rows_read_counter; + RuntimeProfile::Counter* _byte_read_counter; // Wall based aggregate read throughput [rows/sec] RuntimeProfile::Counter* _total_throughput_counter; RuntimeProfile::Counter* _num_scanners; diff --git a/be/src/vec/exec/scan/vscanner.cpp b/be/src/vec/exec/scan/vscanner.cpp index 19d5302286a..52cf88f260a 100644 --- a/be/src/vec/exec/scan/vscanner.cpp +++ b/be/src/vec/exec/scan/vscanner.cpp @@ -20,6 +20,7 @@ #include <glog/logging.h> #include "common/config.h" +#include "common/logging.h" #include "runtime/descriptors.h" #include "util/runtime_profile.h" #include "vec/core/column_with_type_and_name.h" @@ -79,6 +80,7 @@ Status VScanner::get_block(RuntimeState* state, Block* block, bool* eof) { break; } _num_rows_read += block->rows(); + _num_byte_read += block->allocated_bytes(); } // 2. Filter the output block finally. @@ -159,9 +161,12 @@ Status VScanner::close(RuntimeState* state) { } void VScanner::_update_counters_before_close() { - COUNTER_UPDATE(_parent->_scan_cpu_timer, _scan_cpu_timer); + if (_parent) { + COUNTER_UPDATE(_parent->_scan_cpu_timer, _scan_cpu_timer); + COUNTER_UPDATE(_parent->_rows_read_counter, _num_rows_read); + COUNTER_UPDATE(_parent->_byte_read_counter, _num_byte_read); + } if (!_state->enable_profile() && !_is_load) return; - COUNTER_UPDATE(_parent->_rows_read_counter, _num_rows_read); // Update stats for load _state->update_num_rows_load_filtered(_counter.num_rows_filtered); _state->update_num_rows_load_unselected(_counter.num_rows_unselected); diff --git a/be/src/vec/exec/scan/vscanner.h b/be/src/vec/exec/scan/vscanner.h index 06b35465491..2ee39979563 100644 --- a/be/src/vec/exec/scan/vscanner.h +++ b/be/src/vec/exec/scan/vscanner.h @@ -187,6 +187,8 @@ protected: // num of rows read from scanner int64_t _num_rows_read = 0; + int64_t _num_byte_read = 0; + // num of rows return from scanner, after filter block int64_t _num_rows_return = 0; diff --git a/be/src/vec/exec/vexchange_node.cpp b/be/src/vec/exec/vexchange_node.cpp index 3d9a50ded23..797ff590ca9 100644 --- a/be/src/vec/exec/vexchange_node.cpp +++ b/be/src/vec/exec/vexchange_node.cpp @@ -147,7 +147,11 @@ Status VExchangeNode::collect_query_statistics(QueryStatistics* statistics) { statistics->merge(_sub_plan_query_statistics_recvr.get()); return Status::OK(); } - +Status VExchangeNode::collect_query_statistics(QueryStatistics* statistics, int sender_id) { + RETURN_IF_ERROR(ExecNode::collect_query_statistics(statistics)); + statistics->merge(_sub_plan_query_statistics_recvr.get(), sender_id); + return Status::OK(); +} Status VExchangeNode::close(RuntimeState* state) { if (is_closed()) { return Status::OK(); diff --git a/be/src/vec/exec/vexchange_node.h b/be/src/vec/exec/vexchange_node.h index c4f083dda48..94302e84d9b 100644 --- a/be/src/vec/exec/vexchange_node.h +++ b/be/src/vec/exec/vexchange_node.h @@ -56,6 +56,7 @@ public: Status get_next(RuntimeState* state, Block* row_batch, bool* eos) override; void release_resource(RuntimeState* state) override; Status collect_query_statistics(QueryStatistics* statistics) override; + Status collect_query_statistics(QueryStatistics* statistics, int sender_id) override; Status close(RuntimeState* state) override; void set_num_senders(int num_senders) { _num_senders = num_senders; } diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp b/be/src/vec/runtime/vdata_stream_recvr.cpp index 9cbc76693b9..cb2b9184430 100644 --- a/be/src/vec/runtime/vdata_stream_recvr.cpp +++ b/be/src/vec/runtime/vdata_stream_recvr.cpp @@ -412,6 +412,12 @@ void VDataStreamRecvr::remove_sender(int sender_id, int be_number) { _sender_queues[use_sender_id]->decrement_senders(be_number); } +void VDataStreamRecvr::remove_sender(int sender_id, int be_number, QueryStatisticsPtr statistics) { + int use_sender_id = _is_merging ? sender_id : 0; + _sender_queues[use_sender_id]->decrement_senders(be_number); + _sub_plan_query_statistics_recvr->insert(statistics, sender_id); +} + void VDataStreamRecvr::cancel_stream() { for (int i = 0; i < _sender_queues.size(); ++i) { _sender_queues[i]->cancel(); diff --git a/be/src/vec/runtime/vdata_stream_recvr.h b/be/src/vec/runtime/vdata_stream_recvr.h index 5e88aa8eb43..37fd282e117 100644 --- a/be/src/vec/runtime/vdata_stream_recvr.h +++ b/be/src/vec/runtime/vdata_stream_recvr.h @@ -96,8 +96,11 @@ public: // Indicate that a particular sender is done. Delegated to the appropriate // sender queue. Called from DataStreamMgr. + void remove_sender(int sender_id, int be_number); + void remove_sender(int sender_id, int be_number, QueryStatisticsPtr statistics); + void cancel_stream(); void close(); diff --git a/be/src/vec/sink/vdata_stream_sender.cpp b/be/src/vec/sink/vdata_stream_sender.cpp index ad473fcd775..42ceb04aea7 100644 --- a/be/src/vec/sink/vdata_stream_sender.cpp +++ b/be/src/vec/sink/vdata_stream_sender.cpp @@ -129,7 +129,8 @@ Status Channel::send_local_block(bool eos) { COUNTER_UPDATE(_parent->_blocks_sent_counter, 1); _local_recvr->add_block(&block, _parent->_sender_id, true); if (eos) { - _local_recvr->remove_sender(_parent->_sender_id, _be_number); + _local_recvr->remove_sender(_parent->_sender_id, _be_number, + _parent->query_statisticsPtr()); } return Status::OK(); } else { @@ -271,7 +272,8 @@ Status Channel::close_internal() { SCOPED_CONSUME_MEM_TRACKER(_parent->_mem_tracker.get()); if (is_local()) { if (_recvr_is_valid()) { - _local_recvr->remove_sender(_parent->_sender_id, _be_number); + _local_recvr->remove_sender(_parent->_sender_id, _be_number, + _parent->query_statisticsPtr()); } } else { status = send_block((PBlock*)nullptr, true); diff --git a/be/src/vec/sink/vdata_stream_sender.h b/be/src/vec/sink/vdata_stream_sender.h index 07f95c48e38..1b207d824c3 100644 --- a/be/src/vec/sink/vdata_stream_sender.h +++ b/be/src/vec/sink/vdata_stream_sender.h @@ -106,6 +106,9 @@ public: const RowDescriptor& row_desc() { return _row_desc; } + QueryStatistics* query_statistics() { return _query_statistics.get(); } + QueryStatisticsPtr query_statisticsPtr() { return _query_statistics; } + protected: friend class Channel; friend class PipChannel; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org