This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new d9f502693e7 [feature](AuditLog) add scanRows scanBytes in auditlog # 
25435 (#26268)
d9f502693e7 is described below

commit d9f502693e7c0caa4c888088cfa14927893ec119
Author: Mryange <59914473+mrya...@users.noreply.github.com>
AuthorDate: Thu Nov 2 14:19:10 2023 +0800

    [feature](AuditLog) add scanRows scanBytes in auditlog # 25435 (#26268)
---
 be/src/exec/exec_node.cpp                       |  8 +++++++
 be/src/exec/exec_node.h                         |  2 ++
 be/src/pipeline/exec/exchange_sink_buffer.cpp   |  8 +++++++
 be/src/pipeline/exec/exchange_sink_buffer.h     |  5 ++++
 be/src/pipeline/exec/exchange_sink_operator.cpp |  1 +
 be/src/pipeline/exec/operator.h                 | 21 ++++++++++++++++
 be/src/pipeline/pipeline.h                      | 11 +++++++++
 be/src/pipeline/pipeline_fragment_context.cpp   |  2 ++
 be/src/pipeline/pipeline_task.cpp               | 27 +++++++++++++++++++++
 be/src/pipeline/pipeline_task.h                 |  4 ++++
 be/src/runtime/query_statistics.cpp             | 32 +++++++++++++------------
 be/src/runtime/query_statistics.h               | 20 ++++++++++++----
 be/src/vec/exec/scan/new_olap_scan_node.cpp     |  9 +++++--
 be/src/vec/exec/scan/new_olap_scan_node.h       |  1 +
 be/src/vec/exec/scan/new_olap_scanner.cpp       |  3 ++-
 be/src/vec/exec/scan/vscan_node.cpp             |  3 ++-
 be/src/vec/exec/scan/vscan_node.h               |  1 +
 be/src/vec/exec/scan/vscanner.cpp               |  9 +++++--
 be/src/vec/exec/scan/vscanner.h                 |  2 ++
 be/src/vec/exec/vexchange_node.cpp              |  6 ++++-
 be/src/vec/exec/vexchange_node.h                |  1 +
 be/src/vec/runtime/vdata_stream_recvr.cpp       |  6 +++++
 be/src/vec/runtime/vdata_stream_recvr.h         |  3 +++
 be/src/vec/sink/vdata_stream_sender.cpp         |  6 +++--
 be/src/vec/sink/vdata_stream_sender.h           |  3 +++
 25 files changed, 166 insertions(+), 28 deletions(-)

diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp
index 56902e38239..0dc8df911b2 100644
--- a/be/src/exec/exec_node.cpp
+++ b/be/src/exec/exec_node.cpp
@@ -180,6 +180,14 @@ Status ExecNode::collect_query_statistics(QueryStatistics* 
statistics) {
     return Status::OK();
 }
 
+Status ExecNode::collect_query_statistics(QueryStatistics* statistics, int 
sender_id) {
+    DCHECK(statistics != nullptr);
+    for (auto child_node : _children) {
+        RETURN_IF_ERROR(child_node->collect_query_statistics(statistics, 
sender_id));
+    }
+    return Status::OK();
+}
+
 void ExecNode::release_resource(doris::RuntimeState* state) {
     if (!_is_resource_released) {
         if (_rows_returned_counter != nullptr) {
diff --git a/be/src/exec/exec_node.h b/be/src/exec/exec_node.h
index ae7b40dc20e..d92f884204b 100644
--- a/be/src/exec/exec_node.h
+++ b/be/src/exec/exec_node.h
@@ -162,6 +162,8 @@ public:
     // error.
     [[nodiscard]] virtual Status collect_query_statistics(QueryStatistics* 
statistics);
 
+    [[nodiscard]] virtual Status collect_query_statistics(QueryStatistics* 
statistics,
+                                                          int sender_id);
     // close() will get called for every exec node, regardless of what else is 
called and
     // the status of these calls (i.e. prepare() may never have been called, or
     // prepare()/open()/get_next() returned with an error).
diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp 
b/be/src/pipeline/exec/exchange_sink_buffer.cpp
index 696e0643880..68d899e0c48 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.cpp
+++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp
@@ -190,6 +190,10 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
         auto& brpc_request = _instance_to_request[id];
         brpc_request->set_eos(request.eos);
         brpc_request->set_packet_seq(_instance_to_seq[id]++);
+        if (_statistics && _statistics->collected()) {
+            auto statistic = brpc_request->mutable_query_statistics();
+            _statistics->to_pb(statistic);
+        }
         if (request.block) {
             brpc_request->set_allocated_block(request.block.get());
         }
@@ -244,6 +248,10 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
         if (request.block_holder->get_block()) {
             
brpc_request->set_allocated_block(request.block_holder->get_block());
         }
+        if (_statistics && _statistics->collected()) {
+            auto statistic = brpc_request->mutable_query_statistics();
+            _statistics->to_pb(statistic);
+        }
         auto* closure = request.channel->get_closure(id, request.eos, 
request.block_holder);
 
         ExchangeRpcContext rpc_ctx;
diff --git a/be/src/pipeline/exec/exchange_sink_buffer.h 
b/be/src/pipeline/exec/exchange_sink_buffer.h
index c4636563108..7e30620cae8 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.h
+++ b/be/src/pipeline/exec/exchange_sink_buffer.h
@@ -33,6 +33,7 @@
 
 #include "common/global_types.h"
 #include "common/status.h"
+#include "runtime/query_statistics.h"
 #include "runtime/runtime_state.h"
 #include "service/backend_options.h"
 
@@ -173,6 +174,8 @@ public:
     void set_rpc_time(InstanceLoId id, int64_t start_rpc_time, int64_t 
receive_rpc_time);
     void update_profile(RuntimeProfile* profile);
 
+    void set_query_statistics(QueryStatistics* statistics) { _statistics = 
statistics; }
+
 private:
     phmap::flat_hash_map<InstanceLoId, std::unique_ptr<std::mutex>>
             _instance_to_package_queue_mutex;
@@ -211,6 +214,8 @@ private:
     inline bool _is_receiver_eof(InstanceLoId id);
     void get_max_min_rpc_time(int64_t* max_time, int64_t* min_time);
     int64_t get_sum_rpc_time();
+
+    QueryStatistics* _statistics = nullptr;
 };
 
 } // namespace pipeline
diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp 
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index dd0a35ce959..79517e8b041 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -67,6 +67,7 @@ Status ExchangeSinkOperator::prepare(RuntimeState* state) {
     _sink_buffer = std::make_unique<ExchangeSinkBuffer>(id, _dest_node_id, 
_sink->_sender_id,
                                                         _state->be_number(), 
_context);
 
+    _sink_buffer->set_query_statistics(_sink->query_statistics());
     RETURN_IF_ERROR(DataSinkOperator::prepare(state));
     _sink->registe_channels(_sink_buffer.get());
     return Status::OK();
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index d98b4f16bd4..4a68a95e366 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -163,6 +163,14 @@ public:
 
     bool is_source() const;
 
+    virtual Status collect_query_statistics(QueryStatistics* statistics) { 
return Status::OK(); };
+
+    virtual Status collect_query_statistics(QueryStatistics* statistics, int 
sender_id) {
+        return Status::OK();
+    };
+
+    virtual void set_query_statistics(std::shared_ptr<QueryStatistics>) {};
+
     virtual Status init(const TDataSink& tsink) { return Status::OK(); }
 
     // Prepare for running. (e.g. resource allocation, etc.)
@@ -299,6 +307,9 @@ public:
     Status finalize(RuntimeState* state) override { return Status::OK(); }
 
     [[nodiscard]] RuntimeProfile* get_runtime_profile() const override { 
return _sink->profile(); }
+    void set_query_statistics(std::shared_ptr<QueryStatistics> statistics) 
override {
+        _sink->set_query_statistics(statistics);
+    }
 
 protected:
     NodeType* _sink;
@@ -369,6 +380,16 @@ public:
         return _node->runtime_profile();
     }
 
+    Status collect_query_statistics(QueryStatistics* statistics) override {
+        RETURN_IF_ERROR(_node->collect_query_statistics(statistics));
+        return Status::OK();
+    }
+
+    Status collect_query_statistics(QueryStatistics* statistics, int 
sender_id) override {
+        RETURN_IF_ERROR(_node->collect_query_statistics(statistics, 
sender_id));
+        return Status::OK();
+    }
+
 protected:
     NodeType* _node;
     bool _use_projection;
diff --git a/be/src/pipeline/pipeline.h b/be/src/pipeline/pipeline.h
index 759e4fce2c9..056c331dd0c 100644
--- a/be/src/pipeline/pipeline.h
+++ b/be/src/pipeline/pipeline.h
@@ -102,6 +102,15 @@ public:
 
     RuntimeProfile* pipeline_profile() { return _pipeline_profile.get(); }
 
+    void set_is_root_pipeline() { _is_root_pipeline = true; }
+    bool is_root_pipeline() const { return _is_root_pipeline; }
+    void set_collect_query_statistics_with_every_batch() {
+        _collect_query_statistics_with_every_batch = true;
+    }
+    [[nodiscard]] bool collect_query_statistics_with_every_batch() const {
+        return _collect_query_statistics_with_every_batch;
+    }
+
 private:
     void _init_profile();
 
@@ -145,6 +154,8 @@ private:
      */
     bool _always_can_read = false;
     bool _always_can_write = false;
+    bool _is_root_pipeline = false;
+    bool _collect_query_statistics_with_every_batch = false;
 };
 
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index 87578fbc665..0ce125f1891 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -318,6 +318,8 @@ Status PipelineFragmentContext::prepare(const 
doris::TPipelineFragmentParams& re
     }
 
     _root_pipeline = fragment_context->add_pipeline();
+    _root_pipeline->set_is_root_pipeline();
+    _root_pipeline->set_collect_query_statistics_with_every_batch();
     RETURN_IF_ERROR(_build_pipelines(_root_plan, _root_pipeline));
     if (_sink) {
         RETURN_IF_ERROR(_create_sink(request.local_params[idx].sender_id,
diff --git a/be/src/pipeline/pipeline_task.cpp 
b/be/src/pipeline/pipeline_task.cpp
index 2b428ac5f14..8adc2fd0783 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -29,6 +29,7 @@
 #include "pipeline_fragment_context.h"
 #include "runtime/descriptors.h"
 #include "runtime/query_context.h"
+#include "runtime/query_statistics.h"
 #include "runtime/thread_context.h"
 #include "task_queue.h"
 #include "util/defer_op.h"
@@ -60,6 +61,10 @@ PipelineTask::PipelineTask(PipelinePtr& pipeline, uint32_t 
index, RuntimeState*
           _fragment_context(fragment_context),
           _parent_profile(parent_profile) {
     _pipeline_task_watcher.start();
+    _query_statistics.reset(new QueryStatistics());
+    _sink->set_query_statistics(_query_statistics);
+    _collect_query_statistics_with_every_batch =
+            _pipeline->collect_query_statistics_with_every_batch();
 }
 
 void PipelineTask::_fresh_profile_counter() {
@@ -256,6 +261,10 @@ Status PipelineTask::execute(bool* eos) {
         *eos = _data_state == SourceState::FINISHED;
         if (_block->rows() != 0 || *eos) {
             SCOPED_TIMER(_sink_timer);
+            if (_data_state == SourceState::FINISHED ||
+                _collect_query_statistics_with_every_batch) {
+                RETURN_IF_ERROR(_collect_query_statistics());
+            }
             auto status = _sink->sink(_state, block, _data_state);
             if (!status.is<ErrorCode::END_OF_FILE>()) {
                 RETURN_IF_ERROR(status);
@@ -282,6 +291,24 @@ Status PipelineTask::finalize() {
     return _sink->finalize(_state);
 }
 
+Status PipelineTask::_collect_query_statistics() {
+    // The execnode tree of a fragment will be split into multiple pipelines, 
we only need to collect the root pipeline.
+    if (_pipeline->is_root_pipeline()) {
+        // If the current fragment has only one instance, we can collect all 
of them;
+        // otherwise, we need to collect them based on the sender_id.
+        if (_state->num_per_fragment_instances() == 1) {
+            _query_statistics->clear();
+            
RETURN_IF_ERROR(_root->collect_query_statistics(_query_statistics.get()));
+        } else {
+            _query_statistics->clear();
+            
RETURN_IF_ERROR(_root->collect_query_statistics(_query_statistics.get(),
+                                                            
_state->per_fragment_instance_idx()));
+        }
+    }
+    return Status::OK();
+}
+
+
 Status PipelineTask::try_close() {
     if (_try_close_flag) {
         return Status::OK();
diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index 696b335f0e1..9fdf3c82e05 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -341,5 +341,9 @@ private:
     int64_t _close_pipeline_time = 0;
 
     RuntimeProfile::Counter* _pip_task_total_timer;
+
+    std::shared_ptr<QueryStatistics> _query_statistics;
+    Status _collect_query_statistics();
+    bool _collect_query_statistics_with_every_batch = false;
 };
 } // namespace doris::pipeline
diff --git a/be/src/runtime/query_statistics.cpp 
b/be/src/runtime/query_statistics.cpp
index a6754215ace..22c18faa1e6 100644
--- a/be/src/runtime/query_statistics.cpp
+++ b/be/src/runtime/query_statistics.cpp
@@ -20,6 +20,8 @@
 #include <gen_cpp/data.pb.h>
 #include <glog/logging.h>
 
+#include <memory>
+
 namespace doris {
 
 void NodeStatistics::merge(const NodeStatistics& other) {
@@ -85,6 +87,13 @@ void QueryStatistics::merge(QueryStatisticsRecvr* recvr) {
     recvr->merge(this);
 }
 
+void QueryStatistics::merge(QueryStatisticsRecvr* recvr, int sender_id) {
+    auto it = recvr->_query_statistics.find(sender_id);
+    if (it != recvr->_query_statistics.end()) {
+        merge(*it->second);
+    }
+}
+
 void QueryStatistics::clearNodeStatistics() {
     for (auto& pair : _nodes_statistics_map) {
         delete pair.second;
@@ -98,24 +107,17 @@ QueryStatistics::~QueryStatistics() {
 
 void QueryStatisticsRecvr::insert(const PQueryStatistics& statistics, int 
sender_id) {
     std::lock_guard<SpinLock> l(_lock);
-    QueryStatistics* query_statistics = nullptr;
-    auto iter = _query_statistics.find(sender_id);
-    if (iter == _query_statistics.end()) {
-        query_statistics = new QueryStatistics;
-        _query_statistics[sender_id] = query_statistics;
-    } else {
-        query_statistics = iter->second;
+    if (!_query_statistics.contains(sender_id)) {
+        _query_statistics[sender_id] = std::make_shared<QueryStatistics>();
     }
-    query_statistics->from_pb(statistics);
+    _query_statistics[sender_id]->from_pb(statistics);
 }
 
-QueryStatisticsRecvr::~QueryStatisticsRecvr() {
-    // It is unnecessary to lock here, because the destructor will be
-    // called alter DataStreamRecvr's close in ExchangeNode.
-    for (auto& pair : _query_statistics) {
-        delete pair.second;
-    }
-    _query_statistics.clear();
+void QueryStatisticsRecvr::insert(QueryStatisticsPtr statistics, int 
sender_id) {
+    if (!statistics->collected()) return;
+    if (_query_statistics.contains(sender_id)) return;
+    std::lock_guard<SpinLock> l(_lock);
+    _query_statistics[sender_id] = statistics;
 }
 
 } // namespace doris
diff --git a/be/src/runtime/query_statistics.h 
b/be/src/runtime/query_statistics.h
index c4ace8f23e2..42c1457472f 100644
--- a/be/src/runtime/query_statistics.h
+++ b/be/src/runtime/query_statistics.h
@@ -20,6 +20,7 @@
 #include <stdint.h>
 
 #include <map>
+#include <memory>
 #include <mutex>
 #include <unordered_map>
 #include <utility>
@@ -88,6 +89,7 @@ public:
 
     void merge(QueryStatisticsRecvr* recvr);
 
+    void merge(QueryStatisticsRecvr* recvr, int sender_id);
     // Get the maximum value from the peak memory collected by all node 
statistics
     int64_t calculate_max_peak_memory_bytes();
 
@@ -100,13 +102,18 @@ public:
         returned_rows = 0;
         max_peak_memory_bytes = 0;
         clearNodeStatistics();
+        //clear() is used before collection, so calling "clear" is equivalent 
to being collected.
+        set_collected();
     }
 
     void to_pb(PQueryStatistics* statistics);
 
     void from_pb(const PQueryStatistics& statistics);
+    bool collected() const { return _collected; }
+    void set_collected() { _collected = true; }
 
 private:
+    friend class QueryStatisticsRecvr;
     int64_t scan_rows;
     int64_t scan_bytes;
     int64_t cpu_ms;
@@ -117,17 +124,22 @@ private:
     // only set once by result sink when closing.
     int64_t max_peak_memory_bytes;
     // The statistics of the query on each backend.
-    typedef std::unordered_map<int64_t, NodeStatistics*> NodeStatisticsMap;
+    using NodeStatisticsMap = std::unordered_map<int64_t, NodeStatistics*>;
     NodeStatisticsMap _nodes_statistics_map;
+    bool _collected = false;
 };
-
+using QueryStatisticsPtr = std::shared_ptr<QueryStatistics>;
 // It is used for collecting sub plan query statistics in DataStreamRecvr.
 class QueryStatisticsRecvr {
 public:
-    ~QueryStatisticsRecvr();
+    ~QueryStatisticsRecvr() = default;
 
+    // Transmitted via RPC, incurring serialization overhead.
     void insert(const PQueryStatistics& statistics, int sender_id);
 
+    // using local_exchange for transmission, only need to hold a shared 
pointer.
+    void insert(QueryStatisticsPtr statistics, int sender_id);
+
 private:
     friend class QueryStatistics;
 
@@ -138,7 +150,7 @@ private:
         }
     }
 
-    std::map<int, QueryStatistics*> _query_statistics;
+    std::map<int, QueryStatisticsPtr> _query_statistics;
     SpinLock _lock;
 };
 
diff --git a/be/src/vec/exec/scan/new_olap_scan_node.cpp 
b/be/src/vec/exec/scan/new_olap_scan_node.cpp
index 5aa179d3006..28035ce291d 100644
--- a/be/src/vec/exec/scan/new_olap_scan_node.cpp
+++ b/be/src/vec/exec/scan/new_olap_scan_node.cpp
@@ -82,13 +82,18 @@ NewOlapScanNode::NewOlapScanNode(ObjectPool* pool, const 
TPlanNode& tnode,
 Status NewOlapScanNode::collect_query_statistics(QueryStatistics* statistics) {
     RETURN_IF_ERROR(ExecNode::collect_query_statistics(statistics));
     if (!_is_pipeline_scan || _should_create_scanner) {
-        statistics->add_scan_bytes(_read_compressed_counter->value());
-        statistics->add_scan_rows(_raw_rows_counter->value());
+        statistics->add_scan_bytes(_byte_read_counter->value());
+        statistics->add_scan_rows(_rows_read_counter->value());
         statistics->add_cpu_ms(_scan_cpu_timer->value() / NANOS_PER_MILLIS);
     }
     return Status::OK();
 }
 
+Status NewOlapScanNode::collect_query_statistics(QueryStatistics* statistics, 
int) {
+    RETURN_IF_ERROR(collect_query_statistics(statistics));
+    return Status::OK();
+}
+
 Status NewOlapScanNode::prepare(RuntimeState* state) {
     RETURN_IF_ERROR(VScanNode::prepare(state));
     // if you want to add some profile in scan node, even it have not new 
VScanner object
diff --git a/be/src/vec/exec/scan/new_olap_scan_node.h 
b/be/src/vec/exec/scan/new_olap_scan_node.h
index 0725c37cf5e..cbbf3072872 100644
--- a/be/src/vec/exec/scan/new_olap_scan_node.h
+++ b/be/src/vec/exec/scan/new_olap_scan_node.h
@@ -64,6 +64,7 @@ public:
 
     Status prepare(RuntimeState* state) override;
     Status collect_query_statistics(QueryStatistics* statistics) override;
+    Status collect_query_statistics(QueryStatistics* statistics, int 
sender_id) override;
 
     void set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) 
override;
 
diff --git a/be/src/vec/exec/scan/new_olap_scanner.cpp 
b/be/src/vec/exec/scan/new_olap_scanner.cpp
index ea4187e556a..03c14ddafe0 100644
--- a/be/src/vec/exec/scan/new_olap_scanner.cpp
+++ b/be/src/vec/exec/scan/new_olap_scanner.cpp
@@ -501,7 +501,8 @@ void NewOlapScanner::_update_realtime_counters() {
 }
 
 void NewOlapScanner::_update_counters_before_close() {
-    if (!_state->enable_profile() || _has_updated_counter) {
+    //  Please don't directly enable the profile here, we need to set 
QueryStatistics using the counter inside.
+    if (_has_updated_counter) {
         return;
     }
     _has_updated_counter = true;
diff --git a/be/src/vec/exec/scan/vscan_node.cpp 
b/be/src/vec/exec/scan/vscan_node.cpp
index e0c954042d1..c79faf6d6fa 100644
--- a/be/src/vec/exec/scan/vscan_node.cpp
+++ b/be/src/vec/exec/scan/vscan_node.cpp
@@ -274,7 +274,8 @@ Status VScanNode::get_next(RuntimeState* state, 
vectorized::Block* block, bool*
 
 Status VScanNode::_init_profile() {
     // 1. counters for scan node
-    _rows_read_counter = ADD_COUNTER(_runtime_profile, "RowsRead", 
TUnit::UNIT);
+    _rows_read_counter = ADD_COUNTER(_runtime_profile, "ScanRowsRead", 
TUnit::UNIT);
+    _byte_read_counter = ADD_COUNTER(_runtime_profile, "ScanByteRead", 
TUnit::BYTES);
     _total_throughput_counter =
             runtime_profile()->add_rate_counter("TotalReadThroughput", 
_rows_read_counter);
     _num_scanners = ADD_COUNTER(_runtime_profile, "NumScanners", TUnit::UNIT);
diff --git a/be/src/vec/exec/scan/vscan_node.h 
b/be/src/vec/exec/scan/vscan_node.h
index 7600189a251..27c0667e8c6 100644
--- a/be/src/vec/exec/scan/vscan_node.h
+++ b/be/src/vec/exec/scan/vscan_node.h
@@ -317,6 +317,7 @@ protected:
 
     // rows read from the scanner (including those discarded by (pre)filters)
     RuntimeProfile::Counter* _rows_read_counter;
+    RuntimeProfile::Counter* _byte_read_counter;
     // Wall based aggregate read throughput [rows/sec]
     RuntimeProfile::Counter* _total_throughput_counter;
     RuntimeProfile::Counter* _num_scanners;
diff --git a/be/src/vec/exec/scan/vscanner.cpp 
b/be/src/vec/exec/scan/vscanner.cpp
index 19d5302286a..52cf88f260a 100644
--- a/be/src/vec/exec/scan/vscanner.cpp
+++ b/be/src/vec/exec/scan/vscanner.cpp
@@ -20,6 +20,7 @@
 #include <glog/logging.h>
 
 #include "common/config.h"
+#include "common/logging.h"
 #include "runtime/descriptors.h"
 #include "util/runtime_profile.h"
 #include "vec/core/column_with_type_and_name.h"
@@ -79,6 +80,7 @@ Status VScanner::get_block(RuntimeState* state, Block* block, 
bool* eof) {
                     break;
                 }
                 _num_rows_read += block->rows();
+                _num_byte_read += block->allocated_bytes();
             }
 
             // 2. Filter the output block finally.
@@ -159,9 +161,12 @@ Status VScanner::close(RuntimeState* state) {
 }
 
 void VScanner::_update_counters_before_close() {
-    COUNTER_UPDATE(_parent->_scan_cpu_timer, _scan_cpu_timer);
+    if (_parent) {
+        COUNTER_UPDATE(_parent->_scan_cpu_timer, _scan_cpu_timer);
+        COUNTER_UPDATE(_parent->_rows_read_counter, _num_rows_read);
+        COUNTER_UPDATE(_parent->_byte_read_counter, _num_byte_read);
+    }
     if (!_state->enable_profile() && !_is_load) return;
-    COUNTER_UPDATE(_parent->_rows_read_counter, _num_rows_read);
     // Update stats for load
     _state->update_num_rows_load_filtered(_counter.num_rows_filtered);
     _state->update_num_rows_load_unselected(_counter.num_rows_unselected);
diff --git a/be/src/vec/exec/scan/vscanner.h b/be/src/vec/exec/scan/vscanner.h
index 06b35465491..2ee39979563 100644
--- a/be/src/vec/exec/scan/vscanner.h
+++ b/be/src/vec/exec/scan/vscanner.h
@@ -187,6 +187,8 @@ protected:
     // num of rows read from scanner
     int64_t _num_rows_read = 0;
 
+    int64_t _num_byte_read = 0;
+
     // num of rows return from scanner, after filter block
     int64_t _num_rows_return = 0;
 
diff --git a/be/src/vec/exec/vexchange_node.cpp 
b/be/src/vec/exec/vexchange_node.cpp
index 3d9a50ded23..797ff590ca9 100644
--- a/be/src/vec/exec/vexchange_node.cpp
+++ b/be/src/vec/exec/vexchange_node.cpp
@@ -147,7 +147,11 @@ Status 
VExchangeNode::collect_query_statistics(QueryStatistics* statistics) {
     statistics->merge(_sub_plan_query_statistics_recvr.get());
     return Status::OK();
 }
-
+Status VExchangeNode::collect_query_statistics(QueryStatistics* statistics, 
int sender_id) {
+    RETURN_IF_ERROR(ExecNode::collect_query_statistics(statistics));
+    statistics->merge(_sub_plan_query_statistics_recvr.get(), sender_id);
+    return Status::OK();
+}
 Status VExchangeNode::close(RuntimeState* state) {
     if (is_closed()) {
         return Status::OK();
diff --git a/be/src/vec/exec/vexchange_node.h b/be/src/vec/exec/vexchange_node.h
index c4f083dda48..94302e84d9b 100644
--- a/be/src/vec/exec/vexchange_node.h
+++ b/be/src/vec/exec/vexchange_node.h
@@ -56,6 +56,7 @@ public:
     Status get_next(RuntimeState* state, Block* row_batch, bool* eos) override;
     void release_resource(RuntimeState* state) override;
     Status collect_query_statistics(QueryStatistics* statistics) override;
+    Status collect_query_statistics(QueryStatistics* statistics, int 
sender_id) override;
     Status close(RuntimeState* state) override;
 
     void set_num_senders(int num_senders) { _num_senders = num_senders; }
diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp 
b/be/src/vec/runtime/vdata_stream_recvr.cpp
index 9cbc76693b9..cb2b9184430 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.cpp
+++ b/be/src/vec/runtime/vdata_stream_recvr.cpp
@@ -412,6 +412,12 @@ void VDataStreamRecvr::remove_sender(int sender_id, int 
be_number) {
     _sender_queues[use_sender_id]->decrement_senders(be_number);
 }
 
+void VDataStreamRecvr::remove_sender(int sender_id, int be_number, 
QueryStatisticsPtr statistics) {
+    int use_sender_id = _is_merging ? sender_id : 0;
+    _sender_queues[use_sender_id]->decrement_senders(be_number);
+    _sub_plan_query_statistics_recvr->insert(statistics, sender_id);
+}
+
 void VDataStreamRecvr::cancel_stream() {
     for (int i = 0; i < _sender_queues.size(); ++i) {
         _sender_queues[i]->cancel();
diff --git a/be/src/vec/runtime/vdata_stream_recvr.h 
b/be/src/vec/runtime/vdata_stream_recvr.h
index 5e88aa8eb43..37fd282e117 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.h
+++ b/be/src/vec/runtime/vdata_stream_recvr.h
@@ -96,8 +96,11 @@ public:
 
     // Indicate that a particular sender is done. Delegated to the appropriate
     // sender queue. Called from DataStreamMgr.
+
     void remove_sender(int sender_id, int be_number);
 
+    void remove_sender(int sender_id, int be_number, QueryStatisticsPtr 
statistics);
+
     void cancel_stream();
 
     void close();
diff --git a/be/src/vec/sink/vdata_stream_sender.cpp 
b/be/src/vec/sink/vdata_stream_sender.cpp
index ad473fcd775..42ceb04aea7 100644
--- a/be/src/vec/sink/vdata_stream_sender.cpp
+++ b/be/src/vec/sink/vdata_stream_sender.cpp
@@ -129,7 +129,8 @@ Status Channel::send_local_block(bool eos) {
         COUNTER_UPDATE(_parent->_blocks_sent_counter, 1);
         _local_recvr->add_block(&block, _parent->_sender_id, true);
         if (eos) {
-            _local_recvr->remove_sender(_parent->_sender_id, _be_number);
+            _local_recvr->remove_sender(_parent->_sender_id, _be_number,
+                                        _parent->query_statisticsPtr());
         }
         return Status::OK();
     } else {
@@ -271,7 +272,8 @@ Status Channel::close_internal() {
         SCOPED_CONSUME_MEM_TRACKER(_parent->_mem_tracker.get());
         if (is_local()) {
             if (_recvr_is_valid()) {
-                _local_recvr->remove_sender(_parent->_sender_id, _be_number);
+                _local_recvr->remove_sender(_parent->_sender_id, _be_number,
+                                            _parent->query_statisticsPtr());
             }
         } else {
             status = send_block((PBlock*)nullptr, true);
diff --git a/be/src/vec/sink/vdata_stream_sender.h 
b/be/src/vec/sink/vdata_stream_sender.h
index 07f95c48e38..1b207d824c3 100644
--- a/be/src/vec/sink/vdata_stream_sender.h
+++ b/be/src/vec/sink/vdata_stream_sender.h
@@ -106,6 +106,9 @@ public:
 
     const RowDescriptor& row_desc() { return _row_desc; }
 
+    QueryStatistics* query_statistics() { return _query_statistics.get(); }
+    QueryStatisticsPtr query_statisticsPtr() { return _query_statistics; }
+
 protected:
     friend class Channel;
     friend class PipChannel;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to