This is an automated email from the ASF dual-hosted git repository.

zouxinyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new d615e73b3b7 [opt](resource) step2: Remove `QueryStatistic`, replaced 
by `ResourceContext` (#47784)
d615e73b3b7 is described below

commit d615e73b3b7190ce1af9bd87d4be2d610bf19889
Author: Xinyi Zou <zouxi...@selectdb.com>
AuthorDate: Fri Feb 14 14:16:36 2025 +0800

    [opt](resource) step2: Remove `QueryStatistic`, replaced by 
`ResourceContext` (#47784)
    
    ### What problem does this PR solve?
    
    `ResourceContext` is a superset of `QueryStatistic`
---
 be/src/pipeline/exec/exchange_sink_buffer.h        |   1 -
 be/src/pipeline/exec/operator.cpp                  |   8 +-
 be/src/pipeline/exec/operator.h                    |  11 --
 be/src/pipeline/exec/result_file_sink_operator.cpp |   4 +-
 be/src/pipeline/exec/result_sink_operator.cpp      |   5 +-
 be/src/pipeline/exec/scan_operator.cpp             |   3 -
 be/src/pipeline/pipeline_task.cpp                  |  10 +-
 be/src/runtime/buffer_control_block.cpp            |  12 +-
 be/src/runtime/buffer_control_block.h              |  14 +-
 be/src/runtime/fragment_mgr.cpp                    |   3 +-
 be/src/runtime/load_channel.cpp                    |   2 +-
 be/src/runtime/load_stream.cpp                     |   2 +-
 be/src/runtime/memory/mem_tracker_limiter.cpp      |   5 -
 be/src/runtime/memory/mem_tracker_limiter.h        |  16 +--
 be/src/runtime/query_context.cpp                   |  64 ++-------
 be/src/runtime/query_context.h                     |  27 +---
 be/src/runtime/query_statistics.cpp                |  84 ------------
 be/src/runtime/query_statistics.h                  | 113 ----------------
 be/src/runtime/runtime_query_statistics_mgr.cpp    | 144 ++++++++-------------
 be/src/runtime/runtime_query_statistics_mgr.h      |  44 ++-----
 be/src/runtime/thread_context.cpp                  |   4 +-
 .../{workload_group_context.h => cpu_context.cpp}  |  27 ++--
 be/src/runtime/workload_management/cpu_context.h   |  15 ++-
 be/src/runtime/workload_management/io_context.h    |  38 +++++-
 .../runtime/workload_management/memory_context.h   |  21 ++-
 .../workload_management/resource_context.cpp       |  54 ++++++++
 .../runtime/workload_management/resource_context.h |  39 ++++--
 .../runtime/workload_management/task_controller.h  |  33 ++++-
 .../workload_management/workload_group_context.h   |   6 +
 be/src/vec/exec/scan/new_olap_scanner.cpp          |  10 +-
 be/src/vec/exec/scan/scanner_context.cpp           |   2 +-
 be/src/vec/exec/scan/vscanner.cpp                  |  15 ++-
 be/src/vec/exec/scan/vscanner.h                    |   6 -
 be/src/vec/runtime/vdata_stream_recvr.cpp          |   2 +-
 be/src/vec/sink/vdata_stream_sender.cpp            |   6 +-
 be/src/vec/sink/writer/async_result_writer.cpp     |   3 +-
 36 files changed, 320 insertions(+), 533 deletions(-)

diff --git a/be/src/pipeline/exec/exchange_sink_buffer.h 
b/be/src/pipeline/exec/exchange_sink_buffer.h
index e6c4635aef3..d67a1dc6051 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.h
+++ b/be/src/pipeline/exec/exchange_sink_buffer.h
@@ -34,7 +34,6 @@
 
 #include "common/global_types.h"
 #include "common/status.h"
-#include "runtime/query_statistics.h"
 #include "runtime/runtime_state.h"
 #include "service/backend_options.h"
 #include "util/ref_count_closure.h"
diff --git a/be/src/pipeline/exec/operator.cpp 
b/be/src/pipeline/exec/operator.cpp
index 4049b285a1e..5fa6f91460f 100644
--- a/be/src/pipeline/exec/operator.cpp
+++ b/be/src/pipeline/exec/operator.cpp
@@ -436,14 +436,10 @@ Status 
OperatorX<LocalStateType>::setup_local_state(RuntimeState* state, LocalSt
 
 
PipelineXSinkLocalStateBase::PipelineXSinkLocalStateBase(DataSinkOperatorXBase* 
parent,
                                                          RuntimeState* state)
-        : _parent(parent), _state(state) {
-    _query_statistics = std::make_shared<QueryStatistics>();
-}
+        : _parent(parent), _state(state) {}
 
 PipelineXLocalStateBase::PipelineXLocalStateBase(RuntimeState* state, 
OperatorXBase* parent)
-        : _num_rows_returned(0), _rows_returned_counter(nullptr), 
_parent(parent), _state(state) {
-    _query_statistics = std::make_shared<QueryStatistics>();
-}
+        : _num_rows_returned(0), _rows_returned_counter(nullptr), 
_parent(parent), _state(state) {}
 
 template <typename SharedStateArg>
 Status PipelineXLocalState<SharedStateArg>::init(RuntimeState* state, 
LocalStateInfo& info) {
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index 3120c195692..f8ea0070415 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -189,8 +189,6 @@ public:
     //  override in Scan  MultiCastSink
     virtual std::vector<Dependency*> filter_dependencies() { return {}; }
 
-    std::shared_ptr<QueryStatistics> get_query_statistics_ptr() { return 
_query_statistics; }
-
 protected:
     friend class OperatorXBase;
     template <typename LocalStateType>
@@ -201,8 +199,6 @@ protected:
 
     std::unique_ptr<RuntimeProfile> _runtime_profile;
 
-    std::shared_ptr<QueryStatistics> _query_statistics = nullptr;
-
     RuntimeProfile::Counter* _rows_returned_counter = nullptr;
     RuntimeProfile::Counter* _blocks_returned_counter = nullptr;
     RuntimeProfile::Counter* _wait_for_dependency_timer = nullptr;
@@ -356,8 +352,6 @@ public:
     // override in exchange sink , AsyncWriterSink
     virtual Dependency* finishdependency() { return nullptr; }
 
-    std::shared_ptr<QueryStatistics> get_query_statistics_ptr() { return 
_query_statistics; }
-
 protected:
     DataSinkOperatorXBase* _parent = nullptr;
     RuntimeState* _state = nullptr;
@@ -381,8 +375,6 @@ protected:
     RuntimeProfile::Counter* _wait_for_finish_dependency_timer = nullptr;
     RuntimeProfile::Counter* _exec_timer = nullptr;
     RuntimeProfile::HighWaterMarkCounter* _memory_used_counter = nullptr;
-
-    std::shared_ptr<QueryStatistics> _query_statistics = nullptr;
 };
 
 template <typename SharedStateArg = FakeSharedState>
@@ -534,9 +526,6 @@ protected:
     int _nereids_id = -1;
     std::vector<int> _dests_id;
     std::string _name;
-
-    // Maybe this will be transferred to BufferControlBlock.
-    std::shared_ptr<QueryStatistics> _query_statistics;
 };
 
 template <typename LocalStateType>
diff --git a/be/src/pipeline/exec/result_file_sink_operator.cpp 
b/be/src/pipeline/exec/result_file_sink_operator.cpp
index c785a709384..76d60bbdf1a 100644
--- a/be/src/pipeline/exec/result_file_sink_operator.cpp
+++ b/be/src/pipeline/exec/result_file_sink_operator.cpp
@@ -129,7 +129,9 @@ Status ResultFileSinkLocalState::close(RuntimeState* state, 
Status exec_status)
     }
     // close sender, this is normal path end
     if (_sender) {
-        _sender->update_return_rows(_writer == nullptr ? 0 : 
_writer->get_written_rows());
+        int64_t written_rows = _writer == nullptr ? 0 : 
_writer->get_written_rows();
+        _sender->update_return_rows(written_rows);
+        
state->get_query_ctx()->resource_ctx()->io_context()->update_returned_rows(written_rows);
         RETURN_IF_ERROR(_sender->close(state->fragment_instance_id(), 
final_status));
     }
     state->exec_env()->result_mgr()->cancel_at_time(
diff --git a/be/src/pipeline/exec/result_sink_operator.cpp 
b/be/src/pipeline/exec/result_sink_operator.cpp
index 4698523f8be..256a90d8852 100644
--- a/be/src/pipeline/exec/result_sink_operator.cpp
+++ b/be/src/pipeline/exec/result_sink_operator.cpp
@@ -198,7 +198,10 @@ Status ResultSinkLocalState::close(RuntimeState* state, 
Status exec_status) {
     // close sender, this is normal path end
     if (_sender) {
         if (_writer) {
-            _sender->update_return_rows(_writer->get_written_rows());
+            int64_t written_rows = _writer->get_written_rows();
+            _sender->update_return_rows(written_rows);
+            
state->get_query_ctx()->resource_ctx()->io_context()->update_returned_rows(
+                    written_rows);
         }
         RETURN_IF_ERROR(_sender->close(state->fragment_instance_id(), 
final_status));
     }
diff --git a/be/src/pipeline/exec/scan_operator.cpp 
b/be/src/pipeline/exec/scan_operator.cpp
index bd7833691db..af52ef34c60 100644
--- a/be/src/pipeline/exec/scan_operator.cpp
+++ b/be/src/pipeline/exec/scan_operator.cpp
@@ -981,9 +981,6 @@ Status ScanLocalState<Derived>::_prepare_scanners() {
         _eos = true;
         _scan_dependency->set_always_ready();
     } else {
-        for (auto& scanner : scanners) {
-            scanner->set_query_statistics(_query_statistics.get());
-        }
         COUNTER_SET(_num_scanners, static_cast<int64_t>(scanners.size()));
         RETURN_IF_ERROR(_start_scanners(_scanners));
     }
diff --git a/be/src/pipeline/pipeline_task.cpp 
b/be/src/pipeline/pipeline_task.cpp
index 4c8fa2df44b..123c0d29567 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -95,8 +95,6 @@ Status PipelineTask::prepare(const 
std::vector<TScanRangeParams>& scan_range, co
 
     _scan_ranges = scan_range;
     auto* parent_profile = _state->get_sink_local_state()->profile();
-    query_ctx->register_query_statistics(
-            _state->get_sink_local_state()->get_query_statistics_ptr());
 
     for (int op_idx = _operators.size() - 1; op_idx >= 0; op_idx--) {
         auto& op = _operators[op_idx];
@@ -104,8 +102,6 @@ Status PipelineTask::prepare(const 
std::vector<TScanRangeParams>& scan_range, co
                              _le_state_map, _task_idx};
         RETURN_IF_ERROR(op->setup_local_state(_state, info));
         parent_profile = _state->get_local_state(op->operator_id())->profile();
-        query_ctx->register_query_statistics(
-                
_state->get_local_state(op->operator_id())->get_query_statistics_ptr());
     }
     {
         std::vector<Dependency*> filter_dependencies;
@@ -296,11 +292,7 @@ Status PipelineTask::execute(bool* eos) {
         }
         int64_t delta_cpu_time = cpu_time_stop_watch.elapsed_time();
         _task_cpu_timer->update(delta_cpu_time);
-        auto cpu_qs = query_context()->get_cpu_statistics();
-        if (cpu_qs) {
-            cpu_qs->add_cpu_nanos(delta_cpu_time);
-        }
-        query_context()->update_cpu_time(delta_cpu_time);
+        
query_context()->resource_ctx()->cpu_context()->update_cpu_cost_ms(delta_cpu_time);
     }};
     if (_wait_to_start()) {
         if (config::enable_prefetch_tablet) {
diff --git a/be/src/runtime/buffer_control_block.cpp 
b/be/src/runtime/buffer_control_block.cpp
index be8363fb3f2..e14ee1f8809 100644
--- a/be/src/runtime/buffer_control_block.cpp
+++ b/be/src/runtime/buffer_control_block.cpp
@@ -49,12 +49,11 @@ void GetResultBatchCtx::on_failure(const Status& status) {
     delete this;
 }
 
-void GetResultBatchCtx::on_close(int64_t packet_seq, QueryStatistics* 
statistics) {
+void GetResultBatchCtx::on_close(int64_t packet_seq, int64_t returned_rows) {
     Status status;
     status.to_protobuf(result->mutable_status());
-    if (statistics != nullptr) {
-        statistics->to_pb(result->mutable_query_statistics());
-    }
+    PQueryStatistics* statistics = result->mutable_query_statistics();
+    statistics->set_returned_rows(returned_rows);
     result->set_packet_seq(packet_seq);
     result->set_eos(true);
     { done->Run(); }
@@ -160,7 +159,6 @@ BufferControlBlock::BufferControlBlock(TUniqueId id, int 
buffer_size, RuntimeSta
           _fragement_transmission_compression_type(
                   state->fragement_transmission_compression_type()),
           _profile("BufferControlBlock " + print_id(_fragment_id)) {
-    _query_statistics = std::make_unique<QueryStatistics>();
     _serialize_batch_ns_timer = ADD_TIMER(&_profile, "SerializeBatchNsTime");
     _uncompressed_bytes_counter = ADD_COUNTER(&_profile, "UncompressedBytes", 
TUnit::BYTES);
     _compressed_bytes_counter = ADD_COUNTER(&_profile, "CompressedBytes", 
TUnit::BYTES);
@@ -271,7 +269,7 @@ void BufferControlBlock::get_batch(GetResultBatchCtx* ctx) {
         return;
     }
     if (_is_close) {
-        ctx->on_close(_packet_num, _query_statistics.get());
+        ctx->on_close(_packet_num, _returned_rows);
         return;
     }
     // no ready data, push ctx to waiting list
@@ -428,7 +426,7 @@ Status BufferControlBlock::close(const TUniqueId& id, 
Status exec_status) {
     if (!_waiting_rpc.empty()) {
         if (_status.ok()) {
             for (auto& ctx : _waiting_rpc) {
-                ctx->on_close(_packet_num, _query_statistics.get());
+                ctx->on_close(_packet_num, _returned_rows);
             }
         } else {
             for (auto& ctx : _waiting_rpc) {
diff --git a/be/src/runtime/buffer_control_block.h 
b/be/src/runtime/buffer_control_block.h
index 9060007232e..4c519491315 100644
--- a/be/src/runtime/buffer_control_block.h
+++ b/be/src/runtime/buffer_control_block.h
@@ -32,7 +32,6 @@
 #include <unordered_map>
 
 #include "common/status.h"
-#include "runtime/query_statistics.h"
 #include "runtime/runtime_state.h"
 
 namespace google::protobuf {
@@ -70,7 +69,7 @@ struct GetResultBatchCtx {
             : cntl(cntl_), result(result_), done(done_) {}
 
     void on_failure(const Status& status);
-    void on_close(int64_t packet_seq, QueryStatistics* statistics = nullptr);
+    void on_close(int64_t packet_seq, int64_t returned_rows = 0);
     void on_data(const std::unique_ptr<TFetchDataResult>& t_result, int64_t 
packet_seq,
                  bool eos = false);
 };
@@ -125,14 +124,7 @@ public:
     [[nodiscard]] const TUniqueId& fragment_id() const { return _fragment_id; }
     [[nodiscard]] std::shared_ptr<MemTrackerLimiter> mem_tracker() { return 
_mem_tracker; }
 
-    void update_return_rows(int64_t num_rows) {
-        // _query_statistics may be null when the result sink init failed
-        // or some other failure.
-        // and the number of written rows is only needed when all things go 
well.
-        if (_query_statistics != nullptr) {
-            _query_statistics->add_returned_rows(num_rows);
-        }
-    }
+    void update_return_rows(int64_t num_rows) { 
_returned_rows.fetch_add(num_rows); }
 
     void set_dependency(const TUniqueId& id,
                         std::shared_ptr<pipeline::Dependency> 
result_sink_dependency);
@@ -169,7 +161,7 @@ protected:
     std::deque<GetArrowResultBatchCtx*> _waiting_arrow_result_batch_rpc;
 
     // only used for FE using return rows to check limit
-    std::unique_ptr<QueryStatistics> _query_statistics;
+    std::atomic<int64_t> _returned_rows {0};
     // instance id to dependency
     std::unordered_map<TUniqueId, std::shared_ptr<pipeline::Dependency>> 
_result_sink_dependencys;
     std::unordered_map<TUniqueId, size_t> _instance_rows;
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index f7f7f41e901..4dfa966b20b 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -765,9 +765,8 @@ Status FragmentMgr::_get_or_create_query_ctx(const 
TPipelineFragmentParams& para
                         if (workload_group_ptr != nullptr) {
                             
RETURN_IF_ERROR(workload_group_ptr->add_query(query_id, query_ctx));
                             query_ctx->set_workload_group(workload_group_ptr);
-                            
_exec_env->runtime_query_statistics_mgr()->set_workload_group_id(
-                                    print_id(query_id), 
workload_group_ptr->id());
                         }
+
                         // There is some logic in query ctx's dctor, we could 
not check if exists and delete the
                         // temp query ctx now. For example, the query id maybe 
removed from workload group's queryset.
                         map.insert({query_id, query_ctx});
diff --git a/be/src/runtime/load_channel.cpp b/be/src/runtime/load_channel.cpp
index 331a67b05b7..99f41899215 100644
--- a/be/src/runtime/load_channel.cpp
+++ b/be/src/runtime/load_channel.cpp
@@ -48,7 +48,7 @@ LoadChannel::LoadChannel(const UniqueId& load_id, int64_t 
timeout_s, bool is_hig
             
ExecEnv::GetInstance()->fragment_mgr()->get_query_ctx(_load_id.to_thrift());
 
     if (query_context != nullptr) {
-        _resource_ctx = query_context->resource_ctx;
+        _resource_ctx = query_context->resource_ctx();
     } else {
         _resource_ctx = ResourceContext::create_shared();
         _resource_ctx->task_controller()->set_task_id(_load_id.to_thrift());
diff --git a/be/src/runtime/load_stream.cpp b/be/src/runtime/load_stream.cpp
index bdf96eca415..3d643f09c94 100644
--- a/be/src/runtime/load_stream.cpp
+++ b/be/src/runtime/load_stream.cpp
@@ -426,7 +426,7 @@ LoadStream::LoadStream(PUniqueId load_id, LoadStreamMgr* 
load_stream_mgr, bool e
     std::shared_ptr<QueryContext> query_context =
             ExecEnv::GetInstance()->fragment_mgr()->get_query_ctx(load_tid);
     if (query_context != nullptr) {
-        _resource_ctx = query_context->resource_ctx;
+        _resource_ctx = query_context->resource_ctx();
     } else {
         _resource_ctx = ResourceContext::create_shared();
         _resource_ctx->task_controller()->set_task_id(load_tid);
diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp 
b/be/src/runtime/memory/mem_tracker_limiter.cpp
index 516ad5e5ab2..2f19d220d4a 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.cpp
+++ b/be/src/runtime/memory/mem_tracker_limiter.cpp
@@ -74,11 +74,6 @@ MemTrackerLimiter::MemTrackerLimiter(Type type, const 
std::string& label, int64_
         _group_num =
                 mem_tracker_limiter_group_counter.fetch_add(1) % 
(MEM_TRACKER_GROUP_NUM - 3) + 3;
     }
-
-    // currently only select/load need runtime query statistics
-    if (_type == Type::LOAD || _type == Type::QUERY) {
-        _query_statistics = std::make_shared<QueryStatistics>();
-    }
     memory_memtrackerlimiter_cnt << 1;
 }
 
diff --git a/be/src/runtime/memory/mem_tracker_limiter.h 
b/be/src/runtime/memory/mem_tracker_limiter.h
index 5f93cc7af6b..29536735924 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.h
+++ b/be/src/runtime/memory/mem_tracker_limiter.h
@@ -35,7 +35,6 @@
 #include "common/config.h"
 #include "common/status.h"
 #include "runtime/memory/mem_counter.h"
-#include "runtime/query_statistics.h"
 #include "util/string_util.h"
 #include "util/uid_util.h"
 
@@ -139,7 +138,6 @@ public:
 
     Type type() const { return _type; }
     const std::string& label() const { return _label; }
-    std::shared_ptr<QueryStatistics> get_query_statistics() { return 
_query_statistics; }
     int64_t group_num() const { return _group_num; }
     bool has_limit() const { return _limit >= 0; }
     int64_t limit() const { return _limit; }
@@ -166,13 +164,7 @@ public:
 
     // Use carefully! only memory that cannot be allocated using Doris 
Allocator needs to be consumed manually.
     // Ideally, all memory should use Doris Allocator.
-    void consume(int64_t bytes) {
-        _mem_counter.add(bytes);
-        if (_query_statistics) {
-            _query_statistics->set_max_peak_memory_bytes(peak_consumption());
-            _query_statistics->set_current_used_memory_bytes(consumption());
-        }
-    }
+    void consume(int64_t bytes) { _mem_counter.add(bytes); }
 
     void consume_no_update_peak(int64_t bytes) { 
_mem_counter.add_no_update_peak(bytes); }
 
@@ -188,10 +180,6 @@ public:
         } else {
             _mem_counter.add(bytes);
         }
-        if (rt && _query_statistics) {
-            _query_statistics->set_max_peak_memory_bytes(peak_consumption());
-            _query_statistics->set_current_used_memory_bytes(consumption());
-        }
         return rt;
     }
 
@@ -333,8 +321,6 @@ private:
     // Avoid frequent printing.
     bool _enable_print_log_usage = false;
 
-    std::shared_ptr<QueryStatistics> _query_statistics = nullptr;
-
     struct AddressSanitizer {
         size_t size;
         std::string stack_trace;
diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp
index 1beea41dc8a..a6cccc22091 100644
--- a/be/src/runtime/query_context.cpp
+++ b/be/src/runtime/query_context.cpp
@@ -141,8 +141,6 @@ QueryContext::QueryContext(TUniqueId query_id, ExecEnv* 
exec_env,
         DCHECK_EQ(is_report_fe_addr_valid, true);
     }
     clock_gettime(CLOCK_MONOTONIC, &this->_query_arrival_timestamp);
-    register_memory_statistics();
-    register_cpu_statistics();
     DorisMetrics::instance()->query_ctx_cnt->increment(1);
 }
 
@@ -176,18 +174,24 @@ void QueryContext::_init_query_mem_tracker() {
     if (_query_options.__isset.is_report_success && 
_query_options.is_report_success) {
         query_mem_tracker->enable_print_log_usage();
     }
-    resource_ctx->memory_context()->set_mem_tracker(query_mem_tracker);
+    _resource_ctx->memory_context()->set_mem_tracker(query_mem_tracker);
 }
 
 void QueryContext::_init_resource_context() {
-    resource_ctx = ResourceContext::create_shared();
-    
resource_ctx->set_memory_context(QueryContext::QueryMemoryContext::create());
+    _resource_ctx = ResourceContext::create_shared();
+    
_resource_ctx->set_memory_context(QueryContext::QueryMemoryContext::create());
     _init_query_mem_tracker();
+#ifndef BE_TEST
+    
_exec_env->runtime_query_statistics_mgr()->register_resource_context(print_id(_query_id),
+                                                                         
_resource_ctx);
+#endif
 }
 
 void QueryContext::init_query_task_controller() {
-    
resource_ctx->set_task_controller(QueryContext::QueryTaskController::create(this));
-    resource_ctx->task_controller()->set_task_id(_query_id);
+    
_resource_ctx->set_task_controller(QueryContext::QueryTaskController::create(this));
+    _resource_ctx->task_controller()->set_task_id(_query_id);
+    _resource_ctx->task_controller()->set_fe_addr(current_connect_fe);
+    
_resource_ctx->task_controller()->set_query_type(_query_options.query_type);
 }
 
 QueryContext::~QueryContext() {
@@ -210,9 +214,7 @@ QueryContext::~QueryContext() {
         group_id = workload_group()->id(); // before remove
     }
 
-#ifndef BE_TEST
-    
_exec_env->runtime_query_statistics_mgr()->set_query_finished(print_id(_query_id));
-#endif
+    _resource_ctx->task_controller()->finish();
 
     if (enable_profile()) {
         _report_query_profile();
@@ -331,44 +333,6 @@ void QueryContext::set_pipeline_context(
     _fragment_id_to_pipeline_ctx.insert({fragment_id, pip_ctx});
 }
 
-void QueryContext::register_query_statistics(std::shared_ptr<QueryStatistics> 
qs) {
-#ifndef BE_TEST
-    _exec_env->runtime_query_statistics_mgr()->register_query_statistics(
-            print_id(_query_id), qs, current_connect_fe, 
_query_options.query_type);
-#endif
-}
-
-std::shared_ptr<QueryStatistics> QueryContext::get_query_statistics() {
-    return 
_exec_env->runtime_query_statistics_mgr()->get_runtime_query_statistics(
-            print_id(_query_id));
-}
-
-void QueryContext::register_memory_statistics() {
-    if (query_mem_tracker()) {
-        std::shared_ptr<QueryStatistics> qs = 
query_mem_tracker()->get_query_statistics();
-        std::string query_id = print_id(_query_id);
-        if (qs) {
-#ifndef BE_TEST
-            
_exec_env->runtime_query_statistics_mgr()->register_query_statistics(
-                    query_id, qs, current_connect_fe, 
_query_options.query_type);
-#endif
-        } else {
-            LOG(INFO) << " query " << query_id << " get memory query 
statistics failed ";
-        }
-    }
-}
-
-void QueryContext::register_cpu_statistics() {
-    if (!_cpu_statistics) {
-        _cpu_statistics = std::make_shared<QueryStatistics>();
-#ifndef BE_TEST
-        _exec_env->runtime_query_statistics_mgr()->register_query_statistics(
-                print_id(_query_id), _cpu_statistics, current_connect_fe,
-                _query_options.query_type);
-#endif
-    }
-}
-
 doris::pipeline::TaskScheduler* QueryContext::get_pipe_exec_scheduler() {
     if (workload_group()) {
         if (_task_scheduler) {
@@ -386,8 +350,8 @@ ThreadPool* QueryContext::get_memtable_flush_pool() {
     }
 }
 
-void QueryContext::set_workload_group(WorkloadGroupPtr& tg) {
-    resource_ctx->workload_group_context()->set_workload_group(tg);
+void QueryContext::set_workload_group(WorkloadGroupPtr& wg) {
+    _resource_ctx->workload_group_context()->set_workload_group(wg);
     // Should add query first, then the workload group will not be deleted.
     // see task_group_manager::delete_workload_group_by_ids
     workload_group()->add_mem_tracker_limiter(query_mem_tracker());
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index e994df3886e..c02cf97bf1b 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -33,7 +33,6 @@
 #include "common/object_pool.h"
 #include "runtime/exec_env.h"
 #include "runtime/memory/mem_tracker_limiter.h"
-#include "runtime/query_statistics.h"
 #include "runtime/runtime_filter_mgr.h"
 #include "runtime/runtime_predicate.h"
 #include "runtime/workload_management/resource_context.h"
@@ -200,7 +199,7 @@ public:
         }
     }
 
-    void set_workload_group(WorkloadGroupPtr& tg);
+    void set_workload_group(WorkloadGroupPtr& wg);
 
     int execution_timeout() const {
         return _query_options.__isset.execution_timeout ? 
_query_options.execution_timeout
@@ -248,16 +247,6 @@ public:
 
     pipeline::Dependency* get_execution_dependency() { return 
_execution_dependency.get(); }
 
-    void register_query_statistics(std::shared_ptr<QueryStatistics> qs);
-
-    std::shared_ptr<QueryStatistics> get_query_statistics();
-
-    void register_memory_statistics();
-
-    void register_cpu_statistics();
-
-    std::shared_ptr<QueryStatistics> get_cpu_statistics() { return 
_cpu_statistics; }
-
     doris::pipeline::TaskScheduler* get_pipe_exec_scheduler();
 
     ThreadPool* get_memtable_flush_pool();
@@ -272,10 +261,10 @@ public:
     bool is_nereids() const { return _is_nereids; }
 
     WorkloadGroupPtr workload_group() const {
-        return resource_ctx->workload_group_context()->workload_group();
+        return _resource_ctx->workload_group_context()->workload_group();
     }
     std::shared_ptr<MemTrackerLimiter> query_mem_tracker() const {
-        return resource_ctx->memory_context()->mem_tracker();
+        return _resource_ctx->memory_context()->mem_tracker();
     }
 
     void inc_running_big_mem_op_num() {
@@ -300,7 +289,7 @@ public:
 
     ObjectPool obj_pool;
 
-    std::shared_ptr<ResourceContext> resource_ctx;
+    std::shared_ptr<ResourceContext> resource_ctx() { return _resource_ctx; }
 
     std::vector<TUniqueId> fragment_instance_ids;
 
@@ -308,12 +297,6 @@ public:
     // only for file scan node
     std::map<int, TFileScanRangeParams> file_scan_range_params_map;
 
-    void update_cpu_time(int64_t delta_cpu_time) const {
-        if (workload_group() != nullptr) {
-            workload_group()->update_cpu_time(delta_cpu_time);
-        }
-    }
-
     void add_using_brpc_stub(const TNetworkAddress& network_address,
                              std::shared_ptr<PBackendService_Stub> brpc_stub) {
         if (network_address.port == 0) {
@@ -341,6 +324,7 @@ private:
     int64_t _bytes_limit = 0;
     bool _is_nereids = false;
     std::atomic<int> _running_big_mem_op_num = 0;
+    std::shared_ptr<ResourceContext> _resource_ctx;
 
     // A token used to submit olap scanner to the "_limited_scan_thread_pool",
     // This thread pool token is created from "_limited_scan_thread_pool" from 
exec env.
@@ -368,7 +352,6 @@ private:
     vectorized::SimplifiedScanScheduler* _remote_scan_task_scheduler = nullptr;
     std::unique_ptr<pipeline::Dependency> _execution_dependency;
 
-    std::shared_ptr<QueryStatistics> _cpu_statistics = nullptr;
     // This shared ptr is never used. It is just a reference to hold the 
object.
     // There is a weak ptr in runtime filter manager to reference this object.
     std::shared_ptr<RuntimeFilterMergeControllerEntity> 
_merge_controller_handler;
diff --git a/be/src/runtime/query_statistics.cpp 
b/be/src/runtime/query_statistics.cpp
deleted file mode 100644
index 110efef5ab9..00000000000
--- a/be/src/runtime/query_statistics.cpp
+++ /dev/null
@@ -1,84 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "runtime/query_statistics.h"
-
-#include <gen_cpp/data.pb.h>
-#include <glog/logging.h>
-
-#include <memory>
-
-#include "util/time.h"
-
-namespace doris {
-
-void QueryStatistics::merge(const QueryStatistics& other) {
-    scan_rows += other.scan_rows;
-    scan_bytes += other.scan_bytes;
-    cpu_nanos += other.cpu_nanos;
-    shuffle_send_bytes += other.shuffle_send_bytes;
-    shuffle_send_rows += other.shuffle_send_rows;
-    _scan_bytes_from_local_storage += other._scan_bytes_from_local_storage;
-    _scan_bytes_from_remote_storage += other._scan_bytes_from_remote_storage;
-
-    int64_t other_peak_mem = other.max_peak_memory_bytes;
-    if (other_peak_mem > this->max_peak_memory_bytes) {
-        this->max_peak_memory_bytes = other_peak_mem;
-    }
-
-    int64_t other_memory_used = other.current_used_memory_bytes;
-    if (other_memory_used > 0) {
-        this->current_used_memory_bytes = other_memory_used;
-    }
-}
-
-void QueryStatistics::to_pb(PQueryStatistics* statistics) {
-    DCHECK(statistics != nullptr);
-    statistics->set_scan_rows(scan_rows);
-    statistics->set_scan_bytes(scan_bytes);
-    statistics->set_cpu_ms(cpu_nanos / NANOS_PER_MILLIS);
-    statistics->set_returned_rows(returned_rows);
-    statistics->set_max_peak_memory_bytes(max_peak_memory_bytes);
-    
statistics->set_scan_bytes_from_remote_storage(_scan_bytes_from_remote_storage);
-    
statistics->set_scan_bytes_from_local_storage(_scan_bytes_from_local_storage);
-}
-
-void QueryStatistics::to_thrift(TQueryStatistics* statistics) const {
-    DCHECK(statistics != nullptr);
-    statistics->__set_scan_bytes(scan_bytes);
-    statistics->__set_scan_rows(scan_rows);
-    statistics->__set_cpu_ms(cpu_nanos / NANOS_PER_MILLIS);
-    statistics->__set_returned_rows(returned_rows);
-    statistics->__set_max_peak_memory_bytes(max_peak_memory_bytes);
-    statistics->__set_current_used_memory_bytes(current_used_memory_bytes);
-    statistics->__set_shuffle_send_bytes(shuffle_send_bytes);
-    statistics->__set_shuffle_send_rows(shuffle_send_rows);
-    
statistics->__set_scan_bytes_from_remote_storage(_scan_bytes_from_remote_storage);
-    
statistics->__set_scan_bytes_from_local_storage(_scan_bytes_from_local_storage);
-}
-
-void QueryStatistics::from_pb(const PQueryStatistics& statistics) {
-    scan_rows = statistics.scan_rows();
-    scan_bytes = statistics.scan_bytes();
-    cpu_nanos = statistics.cpu_ms() * NANOS_PER_MILLIS;
-    _scan_bytes_from_local_storage = 
statistics.scan_bytes_from_local_storage();
-    _scan_bytes_from_remote_storage = 
statistics.scan_bytes_from_remote_storage();
-}
-
-QueryStatistics::~QueryStatistics() {}
-
-} // namespace doris
diff --git a/be/src/runtime/query_statistics.h 
b/be/src/runtime/query_statistics.h
deleted file mode 100644
index 0a19dfd46f0..00000000000
--- a/be/src/runtime/query_statistics.h
+++ /dev/null
@@ -1,113 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#pragma once
-
-#include <gen_cpp/FrontendService_types.h>
-#include <gen_cpp/PaloInternalService_types.h>
-#include <stdint.h>
-
-#include <map>
-#include <memory>
-#include <mutex>
-#include <unordered_map>
-#include <utility>
-
-#include "util/spinlock.h"
-
-namespace doris {
-
-class PNodeStatistics;
-class PQueryStatistics;
-
-// This is responsible for collecting query statistics, usually it consists of
-// two parts, one is current fragment or plan's statistics, the other is sub 
fragment
-// or plan's statistics and QueryStatisticsRecvr is responsible for collecting 
it.
-class QueryStatistics {
-public:
-    QueryStatistics()
-            : scan_rows(0),
-              scan_bytes(0),
-              cpu_nanos(0),
-              returned_rows(0),
-              max_peak_memory_bytes(0),
-              current_used_memory_bytes(0),
-              shuffle_send_bytes(0),
-              shuffle_send_rows(0) {}
-    virtual ~QueryStatistics();
-
-    void merge(const QueryStatistics& other);
-
-    void add_scan_rows(int64_t delta_scan_rows) { scan_rows += 
delta_scan_rows; }
-
-    void add_scan_bytes(int64_t delta_scan_bytes) { scan_bytes += 
delta_scan_bytes; }
-
-    void add_cpu_nanos(int64_t delta_cpu_time) { cpu_nanos += delta_cpu_time; }
-
-    void add_shuffle_send_bytes(int64_t delta_bytes) { shuffle_send_bytes += 
delta_bytes; }
-
-    void add_shuffle_send_rows(int64_t delta_rows) { shuffle_send_rows += 
delta_rows; }
-
-    void add_scan_bytes_from_local_storage(int64_t 
scan_bytes_from_local_storage) {
-        _scan_bytes_from_local_storage += scan_bytes_from_local_storage;
-    }
-
-    void add_scan_bytes_from_remote_storage(int64_t 
scan_bytes_from_remote_storage) {
-        _scan_bytes_from_remote_storage += scan_bytes_from_remote_storage;
-    }
-
-    void add_returned_rows(int64_t num_rows) { returned_rows += num_rows; }
-
-    void set_max_peak_memory_bytes(int64_t max_peak_memory_bytes) {
-        this->max_peak_memory_bytes = max_peak_memory_bytes;
-    }
-
-    void set_current_used_memory_bytes(int64_t current_used_memory) {
-        current_used_memory_bytes = current_used_memory;
-    }
-
-    void to_pb(PQueryStatistics* statistics);
-    void to_thrift(TQueryStatistics* statistics) const;
-    void from_pb(const PQueryStatistics& statistics);
-    bool collected() const { return _collected; }
-
-    int64_t get_scan_rows() { return scan_rows; }
-    int64_t get_scan_bytes() { return scan_bytes; }
-    int64_t get_current_used_memory_bytes() { return 
current_used_memory_bytes; }
-
-private:
-    std::atomic<int64_t> scan_rows;
-    std::atomic<int64_t> scan_bytes;
-    std::atomic<int64_t> cpu_nanos;
-    std::atomic<int64_t> _scan_bytes_from_local_storage;
-    std::atomic<int64_t> _scan_bytes_from_remote_storage;
-    // number rows returned by query.
-    // only set once by result sink when closing.
-    std::atomic<int64_t> returned_rows;
-    // Maximum memory peak for all backends.
-    // only set once by result sink when closing.
-    std::atomic<int64_t> max_peak_memory_bytes;
-    bool _collected = false;
-    std::atomic<int64_t> current_used_memory_bytes;
-
-    std::atomic<int64_t> shuffle_send_bytes;
-    std::atomic<int64_t> shuffle_send_rows;
-};
-using QueryStatisticsPtr = std::shared_ptr<QueryStatistics>;
-// It is used for collecting sub plan query statistics in DataStreamRecvr.
-
-} // namespace doris
diff --git a/be/src/runtime/runtime_query_statistics_mgr.cpp 
b/be/src/runtime/runtime_query_statistics_mgr.cpp
index ebcaf30eab1..f09558f7b9c 100644
--- a/be/src/runtime/runtime_query_statistics_mgr.cpp
+++ b/be/src/runtime/runtime_query_statistics_mgr.cpp
@@ -27,7 +27,6 @@
 #include <cstdint>
 #include <memory>
 #include <mutex>
-#include <random>
 #include <shared_mutex>
 #include <string>
 #include <tuple>
@@ -38,10 +37,7 @@
 #include "exec/schema_scanner/schema_scanner_helper.h"
 #include "runtime/client_cache.h"
 #include "runtime/exec_env.h"
-#include "runtime/query_context.h"
-#include "service/backend_options.h"
 #include "util/debug_util.h"
-#include "util/hash_util.hpp"
 #include "util/thrift_client.h"
 #include "util/time.h"
 #include "util/uid_util.h"
@@ -324,25 +320,17 @@ void 
RuntimeQueryStatisticsMgr::_report_query_profiles_function() {
     }
 }
 
-void QueryStatisticsCtx::collect_query_statistics(TQueryStatistics* tq_s) {
-    QueryStatistics tmp_qs;
-    for (auto& qs_ptr : _qs_list) {
-        tmp_qs.merge(*qs_ptr);
-    }
-    tmp_qs.to_thrift(tq_s);
-    tq_s->__set_workload_group_id(_wg_id);
-}
-
-void RuntimeQueryStatisticsMgr::register_query_statistics(std::string query_id,
-                                                          
std::shared_ptr<QueryStatistics> qs_ptr,
-                                                          TNetworkAddress 
fe_addr,
-                                                          TQueryType::type 
query_type) {
-    std::lock_guard<std::shared_mutex> write_lock(_qs_ctx_map_lock);
-    if (_query_statistics_ctx_map.find(query_id) == 
_query_statistics_ctx_map.end()) {
-        _query_statistics_ctx_map[query_id] =
-                std::make_unique<QueryStatisticsCtx>(fe_addr, query_type);
-    }
-    _query_statistics_ctx_map.at(query_id)->_qs_list.push_back(qs_ptr);
+void RuntimeQueryStatisticsMgr::register_resource_context(
+        std::string query_id, std::shared_ptr<ResourceContext> resource_ctx) {
+    std::lock_guard<std::shared_mutex> write_lock(_resource_contexts_map_lock);
+    // Note: `group_commit_insert` will use the same `query_id` to submit 
multiple load tasks in sequence.
+    // After the previous load task ends but QueryStatistics has not been 
reported to FE,
+    // if the next load task with the same `query_id` starts to execute, 
`register_resource_context` will
+    // find that `query_id` already exists in _resource_contexts_map.
+    // At this time, directly overwriting the `resource_ctx` corresponding to 
the `query_id`
+    // in `register_resource_context` will cause the previous load task not to 
be reported to FE.
+    // DCHECK(_resource_contexts_map.find(query_id) == 
_resource_contexts_map.end());
+    _resource_contexts_map[query_id] = resource_ctx;
 }
 
 void RuntimeQueryStatisticsMgr::report_runtime_query_statistics() {
@@ -351,27 +339,28 @@ void 
RuntimeQueryStatisticsMgr::report_runtime_query_statistics() {
     std::map<TNetworkAddress, std::map<std::string, TQueryStatistics>> 
fe_qs_map;
     std::map<std::string, std::pair<bool, bool>> qs_status; // <finished, 
timeout>
     {
-        std::lock_guard<std::shared_mutex> write_lock(_qs_ctx_map_lock);
+        std::lock_guard<std::shared_mutex> 
write_lock(_resource_contexts_map_lock);
         int64_t current_time = MonotonicMillis();
         int64_t conf_qs_timeout = config::query_statistics_reserve_timeout_ms;
-        for (auto& [query_id, qs_ctx_ptr] : _query_statistics_ctx_map) {
-            if (qs_ctx_ptr->_query_type == TQueryType::EXTERNAL) {
+        for (auto& [query_id, resource_ctx] : _resource_contexts_map) {
+            if (resource_ctx->task_controller()->query_type() == 
TQueryType::EXTERNAL) {
                 continue;
             }
-            if (fe_qs_map.find(qs_ctx_ptr->_fe_addr) == fe_qs_map.end()) {
+            if (fe_qs_map.find(resource_ctx->task_controller()->fe_addr()) == 
fe_qs_map.end()) {
                 std::map<std::string, TQueryStatistics> tmp_map;
-                fe_qs_map[qs_ctx_ptr->_fe_addr] = std::move(tmp_map);
+                fe_qs_map[resource_ctx->task_controller()->fe_addr()] = 
std::move(tmp_map);
             }
 
             TQueryStatistics ret_t_qs;
-            qs_ctx_ptr->collect_query_statistics(&ret_t_qs);
-            fe_qs_map.at(qs_ctx_ptr->_fe_addr)[query_id] = ret_t_qs;
+            resource_ctx->to_thrift_query_statistics(&ret_t_qs);
+            fe_qs_map.at(resource_ctx->task_controller()->fe_addr())[query_id] 
= ret_t_qs;
 
-            bool is_query_finished = qs_ctx_ptr->_is_query_finished;
+            bool is_query_finished = 
resource_ctx->task_controller()->is_finished();
             bool is_timeout_after_finish = false;
             if (is_query_finished) {
                 is_timeout_after_finish =
-                        (current_time - qs_ctx_ptr->_query_finish_time) > 
conf_qs_timeout;
+                        (current_time - 
resource_ctx->task_controller()->finish_time()) >
+                        conf_qs_timeout;
             }
             qs_status[query_id] = std::make_pair(is_query_finished, 
is_timeout_after_finish);
         }
@@ -444,12 +433,12 @@ void 
RuntimeQueryStatisticsMgr::report_runtime_query_statistics() {
     }
 
     //  3 when query is finished and (last rpc is send success), remove 
finished query statistics
-    if (fe_qs_map.size() == 0) {
+    if (fe_qs_map.empty()) {
         return;
     }
 
     {
-        std::lock_guard<std::shared_mutex> write_lock(_qs_ctx_map_lock);
+        std::lock_guard<std::shared_mutex> 
write_lock(_resource_contexts_map_lock);
         for (auto& [addr, qs_map] : fe_qs_map) {
             bool is_rpc_success = rpc_result[addr];
             for (auto& [query_id, qs] : qs_map) {
@@ -457,82 +446,53 @@ void 
RuntimeQueryStatisticsMgr::report_runtime_query_statistics() {
                 bool is_query_finished = qs_status_pair.first;
                 bool is_timeout_after_finish = qs_status_pair.second;
                 if ((is_rpc_success && is_query_finished) || 
is_timeout_after_finish) {
-                    _query_statistics_ctx_map.erase(query_id);
+                    _resource_contexts_map.erase(query_id);
                 }
             }
         }
     }
 }
 
-void RuntimeQueryStatisticsMgr::set_query_finished(std::string query_id) {
-    // NOTE: here must be a write lock
-    std::lock_guard<std::shared_mutex> write_lock(_qs_ctx_map_lock);
-    // when a query get query_ctx succ, but failed before create node/operator,
-    // it may not register query statistics, so it can not be mark finish
-    if (_query_statistics_ctx_map.find(query_id) != 
_query_statistics_ctx_map.end()) {
-        auto* qs_ptr = _query_statistics_ctx_map.at(query_id).get();
-        qs_ptr->_is_query_finished = true;
-        qs_ptr->_query_finish_time = MonotonicMillis();
-    }
-}
-
-std::shared_ptr<QueryStatistics> 
RuntimeQueryStatisticsMgr::get_runtime_query_statistics(
-        std::string query_id) {
-    std::shared_lock<std::shared_mutex> read_lock(_qs_ctx_map_lock);
-    if (_query_statistics_ctx_map.find(query_id) == 
_query_statistics_ctx_map.end()) {
-        return nullptr;
-    }
-    std::shared_ptr<QueryStatistics> qs_ptr = 
std::make_shared<QueryStatistics>();
-    for (auto const& qs : _query_statistics_ctx_map[query_id]->_qs_list) {
-        qs_ptr->merge(*qs);
-    }
-    return qs_ptr;
-}
-
 void RuntimeQueryStatisticsMgr::get_metric_map(
         std::string query_id, std::map<WorkloadMetricType, std::string>& 
metric_map) {
-    QueryStatistics ret_qs;
-    int64_t query_time_ms = 0;
-    {
-        std::shared_lock<std::shared_mutex> read_lock(_qs_ctx_map_lock);
-        if (_query_statistics_ctx_map.find(query_id) != 
_query_statistics_ctx_map.end()) {
-            for (auto const& qs : 
_query_statistics_ctx_map[query_id]->_qs_list) {
-                ret_qs.merge(*qs);
-            }
-            query_time_ms =
-                    MonotonicMillis() - 
_query_statistics_ctx_map.at(query_id)->_query_start_time;
-        }
-    }
-    metric_map.emplace(WorkloadMetricType::QUERY_TIME, 
std::to_string(query_time_ms));
-    metric_map.emplace(WorkloadMetricType::SCAN_ROWS, 
std::to_string(ret_qs.get_scan_rows()));
-    metric_map.emplace(WorkloadMetricType::SCAN_BYTES, 
std::to_string(ret_qs.get_scan_bytes()));
-    metric_map.emplace(WorkloadMetricType::QUERY_MEMORY_BYTES,
-                       std::to_string(ret_qs.get_current_used_memory_bytes()));
-}
-
-void RuntimeQueryStatisticsMgr::set_workload_group_id(std::string query_id, 
int64_t wg_id) {
-    // wg id just need eventual consistency, read lock is ok
-    std::shared_lock<std::shared_mutex> read_lock(_qs_ctx_map_lock);
-    if (_query_statistics_ctx_map.find(query_id) != 
_query_statistics_ctx_map.end()) {
-        _query_statistics_ctx_map.at(query_id)->_wg_id = wg_id;
+    std::shared_lock<std::shared_mutex> read_lock(_resource_contexts_map_lock);
+    if (_resource_contexts_map.find(query_id) != _resource_contexts_map.end()) 
{
+        auto* resource_ctx = _resource_contexts_map.at(query_id).get();
+        metric_map.emplace(
+                WorkloadMetricType::QUERY_TIME,
+                std::to_string(MonotonicMillis() - 
resource_ctx->task_controller()->finish_time()));
+        metric_map.emplace(WorkloadMetricType::SCAN_ROWS,
+                           
std::to_string(resource_ctx->io_context()->scan_rows()));
+        metric_map.emplace(WorkloadMetricType::SCAN_BYTES,
+                           
std::to_string(resource_ctx->io_context()->scan_bytes()));
+        metric_map.emplace(WorkloadMetricType::QUERY_MEMORY_BYTES,
+                           
std::to_string(resource_ctx->memory_context()->current_memory_bytes()));
+    } else {
+        metric_map.emplace(WorkloadMetricType::QUERY_TIME, "-1");
+        metric_map.emplace(WorkloadMetricType::SCAN_ROWS, "-1");
+        metric_map.emplace(WorkloadMetricType::SCAN_BYTES, "-1");
+        metric_map.emplace(WorkloadMetricType::QUERY_MEMORY_BYTES, "-1");
     }
 }
 
 void RuntimeQueryStatisticsMgr::get_active_be_tasks_block(vectorized::Block* 
block) {
-    std::shared_lock<std::shared_mutex> read_lock(_qs_ctx_map_lock);
+    std::shared_lock<std::shared_mutex> read_lock(_resource_contexts_map_lock);
     int64_t be_id = ExecEnv::GetInstance()->cluster_info()->backend_id;
 
     // block's schema come from 
SchemaBackendActiveTasksScanner::_s_tbls_columns
-    for (auto& [query_id, qs_ctx_ptr] : _query_statistics_ctx_map) {
+    for (auto& [query_id, resource_ctx] : _resource_contexts_map) {
         TQueryStatistics tqs;
-        qs_ctx_ptr->collect_query_statistics(&tqs);
+        resource_ctx->to_thrift_query_statistics(&tqs);
         SchemaScannerHelper::insert_int64_value(0, be_id, block);
-        SchemaScannerHelper::insert_string_value(1, 
qs_ctx_ptr->_fe_addr.hostname, block);
+        SchemaScannerHelper::insert_string_value(
+                1, resource_ctx->task_controller()->fe_addr().hostname, block);
         SchemaScannerHelper::insert_string_value(2, query_id, block);
 
-        int64_t task_time = qs_ctx_ptr->_is_query_finished
-                                    ? qs_ctx_ptr->_query_finish_time - 
qs_ctx_ptr->_query_start_time
-                                    : MonotonicMillis() - 
qs_ctx_ptr->_query_start_time;
+        int64_t task_time =
+                resource_ctx->task_controller()->is_finished()
+                        ? resource_ctx->task_controller()->finish_time() -
+                                  resource_ctx->task_controller()->start_time()
+                        : MonotonicMillis() - 
resource_ctx->task_controller()->start_time();
         SchemaScannerHelper::insert_int64_value(3, task_time, block);
         SchemaScannerHelper::insert_int64_value(4, tqs.cpu_ms, block);
         SchemaScannerHelper::insert_int64_value(5, tqs.scan_rows, block);
@@ -543,7 +503,7 @@ void 
RuntimeQueryStatisticsMgr::get_active_be_tasks_block(vectorized::Block* blo
         SchemaScannerHelper::insert_int64_value(10, tqs.shuffle_send_rows, 
block);
 
         std::stringstream ss;
-        ss << qs_ctx_ptr->_query_type;
+        ss << resource_ctx->task_controller()->query_type();
         SchemaScannerHelper::insert_string_value(11, ss.str(), block);
     }
 }
diff --git a/be/src/runtime/runtime_query_statistics_mgr.h 
b/be/src/runtime/runtime_query_statistics_mgr.h
index 1b7b9926698..71a93ee9220 100644
--- a/be/src/runtime/runtime_query_statistics_mgr.h
+++ b/be/src/runtime/runtime_query_statistics_mgr.h
@@ -31,10 +31,8 @@
 #include <unordered_map>
 
 #include "gutil/integral_types.h"
-#include "runtime/query_statistics.h"
+#include "runtime/workload_management/resource_context.h"
 #include "runtime/workload_management/workload_condition.h"
-#include "util/hash_util.hpp"
-#include "util/time.h"
 
 namespace doris {
 
@@ -42,28 +40,6 @@ namespace vectorized {
 class Block;
 } // namespace vectorized
 
-class QueryStatisticsCtx {
-public:
-    QueryStatisticsCtx(TNetworkAddress fe_addr, TQueryType::type query_type)
-            : _fe_addr(fe_addr), _query_type(query_type) {
-        this->_is_query_finished = false;
-        this->_wg_id = -1;
-        this->_query_start_time = MonotonicMillis();
-    }
-    ~QueryStatisticsCtx() = default;
-
-    void collect_query_statistics(TQueryStatistics* tq_s);
-
-public:
-    std::vector<std::shared_ptr<QueryStatistics>> _qs_list;
-    bool _is_query_finished;
-    const TNetworkAddress _fe_addr;
-    const TQueryType::type _query_type;
-    int64_t _query_finish_time;
-    int64_t _wg_id;
-    int64_t _query_start_time;
-};
-
 class RuntimeQueryStatisticsMgr {
 public:
     RuntimeQueryStatisticsMgr() = default;
@@ -75,18 +51,13 @@ public:
                     fragment_id_to_profile,
             std::vector<std::shared_ptr<TRuntimeProfileTree>> 
load_channel_profile, bool is_done);
 
-    void register_query_statistics(std::string query_id, 
std::shared_ptr<QueryStatistics> qs_ptr,
-                                   TNetworkAddress fe_addr, TQueryType::type 
query_type);
+    void register_resource_context(std::string query_id,
+                                   std::shared_ptr<ResourceContext> 
resource_ctx);
 
     void report_runtime_query_statistics();
 
-    void set_query_finished(std::string query_id);
-
-    std::shared_ptr<QueryStatistics> get_runtime_query_statistics(std::string 
query_id);
-
-    void set_workload_group_id(std::string query_id, int64_t wg_id);
-
     // used for workload scheduler policy
+    // TODO: save ResourceContext in WorkloadGroupMgr, put get_metric_map into 
WorkloadGroupMgr.
     void get_metric_map(std::string query_id,
                         std::map<WorkloadMetricType, std::string>& metric_map);
 
@@ -104,8 +75,11 @@ public:
                                    std::shared_ptr<TRuntimeProfileTree> 
load_channel_profile_x);
 
 private:
-    std::shared_mutex _qs_ctx_map_lock;
-    std::map<std::string, std::unique_ptr<QueryStatisticsCtx>> 
_query_statistics_ctx_map;
+    std::shared_mutex _resource_contexts_map_lock;
+    // Must be shared_ptr of ResourceContext, because ResourceContext can only 
be removed from
+    // _resource_contexts_map after QueryStatistics is reported to FE,
+    // at which time the Query may have ended.
+    std::map<std::string, std::shared_ptr<ResourceContext>> 
_resource_contexts_map;
 
     std::mutex _report_profile_mutex;
     std::atomic_bool started = false;
diff --git a/be/src/runtime/thread_context.cpp 
b/be/src/runtime/thread_context.cpp
index 036d89a29f2..09394c8a721 100644
--- a/be/src/runtime/thread_context.cpp
+++ b/be/src/runtime/thread_context.cpp
@@ -42,11 +42,11 @@ AttachTask::AttachTask(const 
std::shared_ptr<MemTrackerLimiter>& mem_tracker) {
 
 AttachTask::AttachTask(RuntimeState* runtime_state) {
     signal::set_signal_is_nereids(runtime_state->is_nereids());
-    init(runtime_state->get_query_ctx()->resource_ctx);
+    init(runtime_state->get_query_ctx()->resource_ctx());
 }
 
 AttachTask::AttachTask(QueryContext* query_ctx) {
-    init(query_ctx->resource_ctx);
+    init(query_ctx->resource_ctx());
 }
 
 AttachTask::~AttachTask() {
diff --git a/be/src/runtime/workload_management/workload_group_context.h 
b/be/src/runtime/workload_management/cpu_context.cpp
similarity index 64%
copy from be/src/runtime/workload_management/workload_group_context.h
copy to be/src/runtime/workload_management/cpu_context.cpp
index c072704efc0..b6bbcc0da8a 100644
--- a/be/src/runtime/workload_management/workload_group_context.h
+++ b/be/src/runtime/workload_management/cpu_context.cpp
@@ -15,25 +15,20 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#pragma once
+#include "runtime/workload_management/cpu_context.h"
 
-#include "common/factory_creator.h"
-#include "runtime/workload_group/workload_group.h"
+#include <glog/logging.h>
 
-namespace doris {
-
-class WorkloadGroupContext {
-    ENABLE_FACTORY_CREATOR(WorkloadGroupContext);
+#include "runtime/workload_management/resource_context.h"
 
-public:
-    WorkloadGroupContext() = default;
-    virtual ~WorkloadGroupContext() = default;
-
-    WorkloadGroupPtr workload_group() { return _workload_group; }
-    void set_workload_group(WorkloadGroupPtr wg) { _workload_group = wg; }
+namespace doris {
 
-protected:
-    WorkloadGroupPtr _workload_group = nullptr;
-};
+void CPUContext::update_cpu_cost_ms(int64_t delta) const {
+    stats_.cpu_cost_ms_counter_->update(delta);
+    if (resource_ctx_ != nullptr &&
+        resource_ctx_->workload_group_context()->workload_group() != nullptr) {
+        
resource_ctx_->workload_group_context()->workload_group()->update_cpu_time(delta);
+    }
+}
 
 } // namespace doris
diff --git a/be/src/runtime/workload_management/cpu_context.h 
b/be/src/runtime/workload_management/cpu_context.h
index 2abce39387e..6f922b228f7 100644
--- a/be/src/runtime/workload_management/cpu_context.h
+++ b/be/src/runtime/workload_management/cpu_context.h
@@ -18,10 +18,13 @@
 #pragma once
 
 #include "common/factory_creator.h"
+#include "runtime/workload_group/workload_group.h"
 #include "util/runtime_profile.h"
 
 namespace doris {
 
+class ResourceContext;
+
 class CPUContext : public std::enable_shared_from_this<CPUContext> {
     ENABLE_FACTORY_CREATOR(CPUContext);
 
@@ -47,7 +50,12 @@ public:
 
     CPUContext() { stats_.init_profile(); }
     virtual ~CPUContext() = default;
-    Stats* stats() { return &stats_; }
+
+    RuntimeProfile* stats_profile() { return stats_.profile(); }
+
+    int64_t cpu_cost_ms() const { return stats_.cpu_cost_ms_counter_->value(); 
}
+
+    void update_cpu_cost_ms(int64_t delta) const;
 
     // Bind current thread to cgroup, only some load thread should do this.
     void bind_workload_group() {
@@ -55,7 +63,12 @@ public:
     }
 
 protected:
+    friend class ResourceContext;
+
+    void set_resource_ctx(ResourceContext* resource_ctx) { resource_ctx_ = 
resource_ctx; }
+
     Stats stats_;
+    ResourceContext* resource_ctx_ {nullptr};
 };
 
 } // namespace doris
diff --git a/be/src/runtime/workload_management/io_context.h 
b/be/src/runtime/workload_management/io_context.h
index a377df3feb5..b9874bae251 100644
--- a/be/src/runtime/workload_management/io_context.h
+++ b/be/src/runtime/workload_management/io_context.h
@@ -23,6 +23,8 @@
 
 namespace doris {
 
+class ResourceContext;
+
 class IOContext : public std::enable_shared_from_this<IOContext> {
     ENABLE_FACTORY_CREATOR(IOContext);
 
@@ -65,7 +67,36 @@ public:
 
     IOContext() { stats_.init_profile(); }
     virtual ~IOContext() = default;
-    Stats* stats() { return &stats_; }
+
+    RuntimeProfile* stats_profile() { return stats_.profile(); }
+
+    int64_t scan_rows() const { return stats_.scan_rows_counter_->value(); }
+    int64_t scan_bytes() const { return stats_.scan_bytes_counter_->value(); }
+    int64_t scan_bytes_from_local_storage() const {
+        return stats_.scan_bytes_from_local_storage_counter_->value();
+    }
+    int64_t scan_bytes_from_remote_storage() const {
+        return stats_.scan_bytes_from_remote_storage_counter_->value();
+    }
+    int64_t returned_rows() const { return 
stats_.returned_rows_counter_->value(); }
+    int64_t shuffle_send_bytes() const { return 
stats_.shuffle_send_bytes_counter_->value(); }
+    int64_t shuffle_send_rows() const { return 
stats_.shuffle_send_rows_counter_->value(); }
+
+    void update_scan_rows(int64_t delta) const { 
stats_.scan_rows_counter_->update(delta); }
+    void update_scan_bytes(int64_t delta) const { 
stats_.scan_bytes_counter_->update(delta); }
+    void update_scan_bytes_from_local_storage(int64_t delta) const {
+        stats_.scan_bytes_from_local_storage_counter_->update(delta);
+    }
+    void update_scan_bytes_from_remote_storage(int64_t delta) const {
+        stats_.scan_bytes_from_remote_storage_counter_->update(delta);
+    }
+    void update_returned_rows(int64_t delta) const { 
stats_.returned_rows_counter_->update(delta); }
+    void update_shuffle_send_bytes(int64_t delta) const {
+        stats_.shuffle_send_bytes_counter_->update(delta);
+    }
+    void update_shuffle_send_rows(int64_t delta) const {
+        stats_.shuffle_send_rows_counter_->update(delta);
+    }
 
     IOThrottle* io_throttle() {
         // TODO: get io throttle from workload group
@@ -73,7 +104,12 @@ public:
     }
 
 protected:
+    friend class ResourceContext;
+
+    void set_resource_ctx(ResourceContext* resource_ctx) { resource_ctx_ = 
resource_ctx; }
+
     Stats stats_;
+    ResourceContext* resource_ctx_ {nullptr};
 };
 
 } // namespace doris
diff --git a/be/src/runtime/workload_management/memory_context.h 
b/be/src/runtime/workload_management/memory_context.h
index 77fb2a52a2c..6caff41f026 100644
--- a/be/src/runtime/workload_management/memory_context.h
+++ b/be/src/runtime/workload_management/memory_context.h
@@ -22,11 +22,13 @@
 
 #include "common/factory_creator.h"
 #include "common/status.h"
+#include "runtime/memory/mem_tracker_limiter.h"
 #include "util/runtime_profile.h"
 
 namespace doris {
 
 class MemTrackerLimiter;
+class ResourceContext;
 
 class MemoryContext : public std::enable_shared_from_this<MemoryContext> {
     ENABLE_FACTORY_CREATOR(MemoryContext);
@@ -71,13 +73,21 @@ public:
 
     MemoryContext() { stats_.init_profile(); }
     virtual ~MemoryContext() = default;
-    Stats* stats() { return &stats_; }
 
-    std::shared_ptr<MemTrackerLimiter> mem_tracker() { return mem_tracker_; }
+    RuntimeProfile* stats_profile() { return stats_.profile(); }
+
+    std::shared_ptr<MemTrackerLimiter> mem_tracker() const { return 
mem_tracker_; }
     void set_mem_tracker(const std::shared_ptr<MemTrackerLimiter>& 
mem_tracker) {
         mem_tracker_ = mem_tracker;
     }
 
+    int64_t current_memory_bytes() const { return mem_tracker_->consumption(); 
}
+    int64_t peak_memory_bytes() const { return 
mem_tracker_->peak_consumption(); }
+    int64_t max_peak_memory_bytes() const { return 
stats_.max_peak_memory_bytes_counter_->value(); }
+    int64_t revoke_attempts() const { return 
stats_.revoke_attempts_counter_->value(); }
+    int64_t revoke_wait_time_ms() const { return 
stats_.revoke_wait_time_ms_counter_->value(); }
+    int64_t revoked_bytes() const { return 
stats_.revoked_bytes_counter_->value(); }
+
     // Following method is related with spill disk.
     // Compute the number of bytes could be released.
     virtual int64_t revokable_bytes() { return 0; }
@@ -92,9 +102,14 @@ public:
     virtual Status leave_arbitration(Status reason) { return Status::OK(); }
 
 protected:
+    friend class ResourceContext;
+
+    void set_resource_ctx(ResourceContext* resource_ctx) { resource_ctx_ = 
resource_ctx; }
+
     Stats stats_;
     // MemTracker that is shared by all fragment instances running on this 
host.
-    std::shared_ptr<MemTrackerLimiter> mem_tracker_;
+    std::shared_ptr<MemTrackerLimiter> mem_tracker_ {nullptr};
+    ResourceContext* resource_ctx_ {nullptr};
 };
 
 } // namespace doris
diff --git a/be/src/runtime/workload_management/resource_context.cpp 
b/be/src/runtime/workload_management/resource_context.cpp
new file mode 100644
index 00000000000..765e4615505
--- /dev/null
+++ b/be/src/runtime/workload_management/resource_context.cpp
@@ -0,0 +1,54 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime/workload_management/resource_context.h"
+
+#include <gen_cpp/data.pb.h>
+#include <glog/logging.h>
+
+#include "util/time.h"
+
+namespace doris {
+
+void ResourceContext::to_pb_query_statistics(PQueryStatistics* statistics) 
const {
+    DCHECK(statistics != nullptr);
+    statistics->set_scan_rows(io_context()->scan_rows());
+    statistics->set_scan_bytes(io_context()->scan_bytes());
+    statistics->set_cpu_ms(cpu_context()->cpu_cost_ms() / NANOS_PER_MILLIS);
+    statistics->set_returned_rows(io_context()->returned_rows());
+    
statistics->set_max_peak_memory_bytes(memory_context()->max_peak_memory_bytes());
+    
statistics->set_scan_bytes_from_remote_storage(io_context()->scan_bytes_from_remote_storage());
+    
statistics->set_scan_bytes_from_local_storage(io_context()->scan_bytes_from_local_storage());
+}
+
+void ResourceContext::to_thrift_query_statistics(TQueryStatistics* statistics) 
const {
+    DCHECK(statistics != nullptr);
+    statistics->__set_scan_rows(io_context()->scan_rows());
+    statistics->__set_scan_bytes(io_context()->scan_bytes());
+    statistics->__set_cpu_ms(cpu_context()->cpu_cost_ms() / NANOS_PER_MILLIS);
+    statistics->__set_returned_rows(io_context()->returned_rows());
+    
statistics->__set_max_peak_memory_bytes(memory_context()->max_peak_memory_bytes());
+    
statistics->__set_current_used_memory_bytes(memory_context()->current_memory_bytes());
+    statistics->__set_shuffle_send_bytes(io_context()->shuffle_send_bytes());
+    statistics->__set_shuffle_send_rows(io_context()->shuffle_send_rows());
+    statistics->__set_scan_bytes_from_remote_storage(
+            io_context()->scan_bytes_from_remote_storage());
+    
statistics->__set_scan_bytes_from_local_storage(io_context()->scan_bytes_from_local_storage());
+    
statistics->__set_workload_group_id(workload_group_context()->workload_group_id());
+}
+
+} // namespace doris
diff --git a/be/src/runtime/workload_management/resource_context.h 
b/be/src/runtime/workload_management/resource_context.h
index 80767dfaa9a..0c163f515cb 100644
--- a/be/src/runtime/workload_management/resource_context.h
+++ b/be/src/runtime/workload_management/resource_context.h
@@ -17,6 +17,8 @@
 
 #pragma once
 
+#include <gen_cpp/data.pb.h>
+
 #include <memory>
 
 #include "common/factory_creator.h"
@@ -46,24 +48,31 @@ public:
         io_context_ = IOContext::create_unique();
         workload_group_context_ = WorkloadGroupContext::create_unique();
         task_controller_ = TaskController::create_unique();
+
+        cpu_context_->set_resource_ctx(this);
+        memory_context_->set_resource_ctx(this);
+        io_context_->set_resource_ctx(this);
     }
     ~ResourceContext() = default;
 
     // Only return the raw pointer to the caller, so that the caller should 
not save it to other variables.
-    CPUContext* cpu_context() { return cpu_context_.get(); }
-    MemoryContext* memory_context() { return memory_context_.get(); }
-    IOContext* io_context() { return io_context_.get(); }
-    WorkloadGroupContext* workload_group_context() { return 
workload_group_context_.get(); }
-    TaskController* task_controller() { return task_controller_.get(); }
+    CPUContext* cpu_context() const { return cpu_context_.get(); }
+    MemoryContext* memory_context() const { return memory_context_.get(); }
+    IOContext* io_context() const { return io_context_.get(); }
+    WorkloadGroupContext* workload_group_context() const { return 
workload_group_context_.get(); }
+    TaskController* task_controller() const { return task_controller_.get(); }
 
     void set_cpu_context(std::unique_ptr<CPUContext> cpu_context) {
         cpu_context_ = std::move(cpu_context);
+        cpu_context_->set_resource_ctx(this);
     }
     void set_memory_context(std::unique_ptr<MemoryContext> memory_context) {
         memory_context_ = std::move(memory_context);
+        memory_context_->set_resource_ctx(this);
     }
     void set_io_context(std::unique_ptr<IOContext> io_context) {
         io_context_ = std::move(io_context);
+        io_context_->set_resource_ctx(this);
     }
     void set_workload_group_context(std::unique_ptr<WorkloadGroupContext> 
wg_context) {
         workload_group_context_ = std::move(wg_context);
@@ -73,20 +82,24 @@ public:
     }
 
     RuntimeProfile* profile() { return 
const_cast<RuntimeProfile*>(resource_profile_.get().get()); }
+
+    void to_pb_query_statistics(PQueryStatistics* statistics) const;
+    void to_thrift_query_statistics(TQueryStatistics* statistics) const;
+
     std::string debug_string() { return 
resource_profile_.get()->pretty_print(); }
     void refresh_resource_profile() {
         std::unique_ptr<RuntimeProfile> resource_profile =
                 std::make_unique<RuntimeProfile>("ResourceContext");
 
-        RuntimeProfile* cpu_profile = resource_profile->create_child(
-                cpu_context_->stats()->profile()->name(), true, false);
-        cpu_profile->merge(cpu_context_->stats()->profile());
+        RuntimeProfile* cpu_profile =
+                
resource_profile->create_child(cpu_context_->stats_profile()->name(), true, 
false);
+        cpu_profile->merge(cpu_context_->stats_profile());
         RuntimeProfile* memory_profile = resource_profile->create_child(
-                memory_context_->stats()->profile()->name(), true, false);
-        memory_profile->merge(memory_context_->stats()->profile());
-        RuntimeProfile* io_profile = resource_profile->create_child(
-                io_context_->stats()->profile()->name(), true, false);
-        io_profile->merge(io_context_->stats()->profile());
+                memory_context_->stats_profile()->name(), true, false);
+        memory_profile->merge(memory_context_->stats_profile());
+        RuntimeProfile* io_profile =
+                
resource_profile->create_child(io_context_->stats_profile()->name(), true, 
false);
+        io_profile->merge(io_context_->stats_profile());
 
         resource_profile_.set(std::move(resource_profile));
     }
diff --git a/be/src/runtime/workload_management/task_controller.h 
b/be/src/runtime/workload_management/task_controller.h
index c1f246bf08a..f88d7d50832 100644
--- a/be/src/runtime/workload_management/task_controller.h
+++ b/be/src/runtime/workload_management/task_controller.h
@@ -17,10 +17,12 @@
 
 #pragma once
 
+#include <gen_cpp/PaloInternalService_types.h>
 #include <gen_cpp/Types_types.h>
 
 #include "common/factory_creator.h"
 #include "common/status.h"
+#include "util/time.h"
 
 namespace doris {
 
@@ -35,15 +37,38 @@ public:
     const TUniqueId& task_id() const { return task_id_; }
     void set_task_id(TUniqueId task_id) { task_id_ = task_id; }
 
-    virtual bool is_cancelled() const { return false; }
-    virtual Status cancel(const Status& reason) { return Status::OK(); }
-    virtual Status running_time(int64_t* running_time_msecs) {
-        *running_time_msecs = 0;
+    virtual bool is_cancelled() const { return is_cancelled_; }
+    virtual Status cancel(const Status& reason) {
+        is_cancelled_ = true;
         return Status::OK();
     }
 
+    virtual bool is_finished() const { return is_finished_; }
+    virtual void finish() {
+        is_finished_ = true;
+        finish_time_ = MonotonicMillis();
+    }
+
+    int64_t start_time() const { return start_time_; }
+    int64_t finish_time() const { return finish_time_; }
+    Status running_time(int64_t* running_time_msecs) const {
+        *running_time_msecs = finish_time_ - start_time_;
+        return Status::OK();
+    }
+    TNetworkAddress fe_addr() { return fe_addr_; }
+    TQueryType::type query_type() { return query_type_; }
+
+    void set_fe_addr(TNetworkAddress fe_addr) { fe_addr_ = fe_addr; }
+    void set_query_type(TQueryType::type query_type) { query_type_ = 
query_type; }
+
 protected:
     TUniqueId task_id_;
+    bool is_cancelled_ = false;
+    bool is_finished_ = false;
+    int64_t start_time_;
+    int64_t finish_time_;
+    TNetworkAddress fe_addr_;
+    TQueryType::type query_type_;
 };
 
 } // namespace doris
diff --git a/be/src/runtime/workload_management/workload_group_context.h 
b/be/src/runtime/workload_management/workload_group_context.h
index c072704efc0..38cad310925 100644
--- a/be/src/runtime/workload_management/workload_group_context.h
+++ b/be/src/runtime/workload_management/workload_group_context.h
@@ -29,6 +29,12 @@ public:
     WorkloadGroupContext() = default;
     virtual ~WorkloadGroupContext() = default;
 
+    int64_t workload_group_id() {
+        if (workload_group() != nullptr) {
+            return workload_group()->id();
+        }
+        return -1;
+    }
     WorkloadGroupPtr workload_group() { return _workload_group; }
     void set_workload_group(WorkloadGroupPtr wg) { _workload_group = wg; }
 
diff --git a/be/src/vec/exec/scan/new_olap_scanner.cpp 
b/be/src/vec/exec/scan/new_olap_scanner.cpp
index d026e86b1a9..248d391aadd 100644
--- a/be/src/vec/exec/scan/new_olap_scanner.cpp
+++ b/be/src/vec/exec/scan/new_olap_scanner.cpp
@@ -686,12 +686,10 @@ void NewOlapScanner::_collect_profile_before_close() {
     
tablet->query_scan_bytes->increment(local_state->_read_compressed_counter->value());
     tablet->query_scan_rows->increment(local_state->_scan_rows->value());
     tablet->query_scan_count->increment(1);
-    if (_query_statistics) {
-        _query_statistics->add_scan_bytes_from_local_storage(
-                stats.file_cache_stats.bytes_read_from_local);
-        _query_statistics->add_scan_bytes_from_remote_storage(
-                stats.file_cache_stats.bytes_read_from_remote);
-    }
+    
_state->get_query_ctx()->resource_ctx()->io_context()->update_scan_bytes_from_local_storage(
+            stats.file_cache_stats.bytes_read_from_local);
+    
_state->get_query_ctx()->resource_ctx()->io_context()->update_scan_bytes_from_remote_storage(
+            stats.file_cache_stats.bytes_read_from_remote);
 }
 
 } // namespace doris::vectorized
diff --git a/be/src/vec/exec/scan/scanner_context.cpp 
b/be/src/vec/exec/scan/scanner_context.cpp
index 21be021581f..8cd2b843f4c 100644
--- a/be/src/vec/exec/scan/scanner_context.cpp
+++ b/be/src/vec/exec/scan/scanner_context.cpp
@@ -81,7 +81,7 @@ ScannerContext::ScannerContext(
     if (limit < 0) {
         limit = -1;
     }
-    _resource_ctx = _state->get_query_ctx()->resource_ctx;
+    _resource_ctx = _state->get_query_ctx()->resource_ctx();
     _dependency = dependency;
     if (_min_scan_concurrency_of_scan_scheduler == 0) {
         _min_scan_concurrency_of_scan_scheduler = 2 * 
config::doris_scanner_thread_pool_thread_num;
diff --git a/be/src/vec/exec/scan/vscanner.cpp 
b/be/src/vec/exec/scan/vscanner.cpp
index 6efcc189b4b..5baf2ae9dad 100644
--- a/be/src/vec/exec/scan/vscanner.cpp
+++ b/be/src/vec/exec/scan/vscanner.cpp
@@ -103,8 +103,10 @@ Status VScanner::get_block(RuntimeState* state, Block* 
block, bool* eof) {
         }
     }
 
+#ifndef BE_TEST
     int64_t old_scan_rows = _num_rows_read;
     int64_t old_scan_bytes = _num_byte_read;
+#endif
     {
         do {
             // if step 2 filter all rows of block, and block will be reused to 
get next rows,
@@ -136,10 +138,12 @@ Status VScanner::get_block(RuntimeState* state, Block* 
block, bool* eof) {
                  _num_rows_read < rows_read_threshold);
     }
 
-    if (_query_statistics) {
-        _query_statistics->add_scan_rows(_num_rows_read - old_scan_rows);
-        _query_statistics->add_scan_bytes(_num_byte_read - old_scan_bytes);
-    }
+#ifndef BE_TEST
+    
_state->get_query_ctx()->resource_ctx()->io_context()->update_scan_rows(_num_rows_read
 -
+                                                                            
old_scan_rows);
+    
_state->get_query_ctx()->resource_ctx()->io_context()->update_scan_bytes(_num_byte_read
 -
+                                                                             
old_scan_bytes);
+#endif
 
     if (state->is_cancelled()) {
         // TODO: Should return the specific ErrorStatus instead of just 
Cancelled.
@@ -260,9 +264,8 @@ void VScanner::_collect_profile_before_close() {
 void VScanner::update_scan_cpu_timer() {
     int64_t cpu_time = _cpu_watch.elapsed_time();
     _scan_cpu_timer += cpu_time;
-    _query_statistics->add_cpu_nanos(cpu_time);
     if (_state && _state->get_query_ctx()) {
-        _state->get_query_ctx()->update_cpu_time(cpu_time);
+        
_state->get_query_ctx()->resource_ctx()->cpu_context()->update_cpu_cost_ms(cpu_time);
     }
 }
 
diff --git a/be/src/vec/exec/scan/vscanner.h b/be/src/vec/exec/scan/vscanner.h
index bb68055e1f0..4cf6c780d83 100644
--- a/be/src/vec/exec/scan/vscanner.h
+++ b/be/src/vec/exec/scan/vscanner.h
@@ -32,7 +32,6 @@
 namespace doris {
 class RuntimeProfile;
 class TupleDescriptor;
-class QueryStatistics;
 
 namespace vectorized {
 class VExprContext;
@@ -152,10 +151,6 @@ public:
 
     void set_status_on_failure(const Status& st) { _status = st; }
 
-    void set_query_statistics(QueryStatistics* query_statistics) {
-        _query_statistics = query_statistics;
-    }
-
     int64_t limit() const { return _limit; }
 
 protected:
@@ -168,7 +163,6 @@ protected:
 
     RuntimeState* _state = nullptr;
     pipeline::ScanLocalStateBase* _local_state = nullptr;
-    QueryStatistics* _query_statistics = nullptr;
 
     // Set if scan node has sort limit info
     int64_t _limit = -1;
diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp 
b/be/src/vec/runtime/vdata_stream_recvr.cpp
index a76324e140f..561ae3c764d 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.cpp
+++ b/be/src/vec/runtime/vdata_stream_recvr.cpp
@@ -318,7 +318,7 @@ VDataStreamRecvr::VDataStreamRecvr(VDataStreamMgr* 
stream_mgr,
         : HasTaskExecutionCtx(state),
           _mgr(stream_mgr),
           _memory_used_counter(memory_used_counter),
-          _resource_ctx(state->get_query_ctx()->resource_ctx),
+          _resource_ctx(state->get_query_ctx()->resource_ctx()),
           _fragment_instance_id(fragment_instance_id),
           _dest_node_id(dest_node_id),
           _row_desc(row_desc),
diff --git a/be/src/vec/sink/vdata_stream_sender.cpp 
b/be/src/vec/sink/vdata_stream_sender.cpp
index b4a3b2282fd..a2a6c89d4b7 100644
--- a/be/src/vec/sink/vdata_stream_sender.cpp
+++ b/be/src/vec/sink/vdata_stream_sender.cpp
@@ -318,8 +318,10 @@ Status BlockSerializer::serialize_block(const Block* src, 
PBlock* dest, size_t n
     COUNTER_UPDATE(_parent->_bytes_sent_counter, compressed_bytes * 
num_receivers);
     COUNTER_UPDATE(_parent->_uncompressed_bytes_counter, uncompressed_bytes * 
num_receivers);
     COUNTER_UPDATE(_parent->_compress_timer, src->get_compress_time());
-    
_parent->get_query_statistics_ptr()->add_shuffle_send_bytes(compressed_bytes * 
num_receivers);
-    _parent->get_query_statistics_ptr()->add_shuffle_send_rows(src->rows() * 
num_receivers);
+    
_parent->state()->get_query_ctx()->resource_ctx()->io_context()->update_shuffle_send_bytes(
+            compressed_bytes * num_receivers);
+    
_parent->state()->get_query_ctx()->resource_ctx()->io_context()->update_shuffle_send_rows(
+            src->rows() * num_receivers);
 
     return Status::OK();
 }
diff --git a/be/src/vec/sink/writer/async_result_writer.cpp 
b/be/src/vec/sink/writer/async_result_writer.cpp
index 9a5911140bf..8c400d49a9a 100644
--- a/be/src/vec/sink/writer/async_result_writer.cpp
+++ b/be/src/vec/sink/writer/async_result_writer.cpp
@@ -136,7 +136,8 @@ void AsyncResultWriter::process_block(RuntimeState* state, 
RuntimeProfile* profi
             cpu_time_stop_watch.start();
             Defer defer {[&]() {
                 if (state && state->get_query_ctx()) {
-                    
state->get_query_ctx()->update_cpu_time(cpu_time_stop_watch.elapsed_time());
+                    
state->get_query_ctx()->resource_ctx()->cpu_context()->update_cpu_cost_ms(
+                            cpu_time_stop_watch.elapsed_time());
                 }
             }};
             if (!_eos && _data_queue.empty() && _writer_status.ok()) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to