This is an automated email from the ASF dual-hosted git repository. zouxinyi pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new d615e73b3b7 [opt](resource) step2: Remove `QueryStatistic`, replaced by `ResourceContext` (#47784) d615e73b3b7 is described below commit d615e73b3b7190ce1af9bd87d4be2d610bf19889 Author: Xinyi Zou <zouxi...@selectdb.com> AuthorDate: Fri Feb 14 14:16:36 2025 +0800 [opt](resource) step2: Remove `QueryStatistic`, replaced by `ResourceContext` (#47784) ### What problem does this PR solve? `ResourceContext` is a superset of `QueryStatistic` --- be/src/pipeline/exec/exchange_sink_buffer.h | 1 - be/src/pipeline/exec/operator.cpp | 8 +- be/src/pipeline/exec/operator.h | 11 -- be/src/pipeline/exec/result_file_sink_operator.cpp | 4 +- be/src/pipeline/exec/result_sink_operator.cpp | 5 +- be/src/pipeline/exec/scan_operator.cpp | 3 - be/src/pipeline/pipeline_task.cpp | 10 +- be/src/runtime/buffer_control_block.cpp | 12 +- be/src/runtime/buffer_control_block.h | 14 +- be/src/runtime/fragment_mgr.cpp | 3 +- be/src/runtime/load_channel.cpp | 2 +- be/src/runtime/load_stream.cpp | 2 +- be/src/runtime/memory/mem_tracker_limiter.cpp | 5 - be/src/runtime/memory/mem_tracker_limiter.h | 16 +-- be/src/runtime/query_context.cpp | 64 ++------- be/src/runtime/query_context.h | 27 +--- be/src/runtime/query_statistics.cpp | 84 ------------ be/src/runtime/query_statistics.h | 113 ---------------- be/src/runtime/runtime_query_statistics_mgr.cpp | 144 ++++++++------------- be/src/runtime/runtime_query_statistics_mgr.h | 44 ++----- be/src/runtime/thread_context.cpp | 4 +- .../{workload_group_context.h => cpu_context.cpp} | 27 ++-- be/src/runtime/workload_management/cpu_context.h | 15 ++- be/src/runtime/workload_management/io_context.h | 38 +++++- .../runtime/workload_management/memory_context.h | 21 ++- .../workload_management/resource_context.cpp | 54 ++++++++ .../runtime/workload_management/resource_context.h | 39 ++++-- .../runtime/workload_management/task_controller.h | 33 ++++- .../workload_management/workload_group_context.h | 6 + be/src/vec/exec/scan/new_olap_scanner.cpp | 10 +- be/src/vec/exec/scan/scanner_context.cpp | 2 +- be/src/vec/exec/scan/vscanner.cpp | 15 ++- be/src/vec/exec/scan/vscanner.h | 6 - be/src/vec/runtime/vdata_stream_recvr.cpp | 2 +- be/src/vec/sink/vdata_stream_sender.cpp | 6 +- be/src/vec/sink/writer/async_result_writer.cpp | 3 +- 36 files changed, 320 insertions(+), 533 deletions(-) diff --git a/be/src/pipeline/exec/exchange_sink_buffer.h b/be/src/pipeline/exec/exchange_sink_buffer.h index e6c4635aef3..d67a1dc6051 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.h +++ b/be/src/pipeline/exec/exchange_sink_buffer.h @@ -34,7 +34,6 @@ #include "common/global_types.h" #include "common/status.h" -#include "runtime/query_statistics.h" #include "runtime/runtime_state.h" #include "service/backend_options.h" #include "util/ref_count_closure.h" diff --git a/be/src/pipeline/exec/operator.cpp b/be/src/pipeline/exec/operator.cpp index 4049b285a1e..5fa6f91460f 100644 --- a/be/src/pipeline/exec/operator.cpp +++ b/be/src/pipeline/exec/operator.cpp @@ -436,14 +436,10 @@ Status OperatorX<LocalStateType>::setup_local_state(RuntimeState* state, LocalSt PipelineXSinkLocalStateBase::PipelineXSinkLocalStateBase(DataSinkOperatorXBase* parent, RuntimeState* state) - : _parent(parent), _state(state) { - _query_statistics = std::make_shared<QueryStatistics>(); -} + : _parent(parent), _state(state) {} PipelineXLocalStateBase::PipelineXLocalStateBase(RuntimeState* state, OperatorXBase* parent) - : _num_rows_returned(0), _rows_returned_counter(nullptr), _parent(parent), _state(state) { - _query_statistics = std::make_shared<QueryStatistics>(); -} + : _num_rows_returned(0), _rows_returned_counter(nullptr), _parent(parent), _state(state) {} template <typename SharedStateArg> Status PipelineXLocalState<SharedStateArg>::init(RuntimeState* state, LocalStateInfo& info) { diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h index 3120c195692..f8ea0070415 100644 --- a/be/src/pipeline/exec/operator.h +++ b/be/src/pipeline/exec/operator.h @@ -189,8 +189,6 @@ public: // override in Scan MultiCastSink virtual std::vector<Dependency*> filter_dependencies() { return {}; } - std::shared_ptr<QueryStatistics> get_query_statistics_ptr() { return _query_statistics; } - protected: friend class OperatorXBase; template <typename LocalStateType> @@ -201,8 +199,6 @@ protected: std::unique_ptr<RuntimeProfile> _runtime_profile; - std::shared_ptr<QueryStatistics> _query_statistics = nullptr; - RuntimeProfile::Counter* _rows_returned_counter = nullptr; RuntimeProfile::Counter* _blocks_returned_counter = nullptr; RuntimeProfile::Counter* _wait_for_dependency_timer = nullptr; @@ -356,8 +352,6 @@ public: // override in exchange sink , AsyncWriterSink virtual Dependency* finishdependency() { return nullptr; } - std::shared_ptr<QueryStatistics> get_query_statistics_ptr() { return _query_statistics; } - protected: DataSinkOperatorXBase* _parent = nullptr; RuntimeState* _state = nullptr; @@ -381,8 +375,6 @@ protected: RuntimeProfile::Counter* _wait_for_finish_dependency_timer = nullptr; RuntimeProfile::Counter* _exec_timer = nullptr; RuntimeProfile::HighWaterMarkCounter* _memory_used_counter = nullptr; - - std::shared_ptr<QueryStatistics> _query_statistics = nullptr; }; template <typename SharedStateArg = FakeSharedState> @@ -534,9 +526,6 @@ protected: int _nereids_id = -1; std::vector<int> _dests_id; std::string _name; - - // Maybe this will be transferred to BufferControlBlock. - std::shared_ptr<QueryStatistics> _query_statistics; }; template <typename LocalStateType> diff --git a/be/src/pipeline/exec/result_file_sink_operator.cpp b/be/src/pipeline/exec/result_file_sink_operator.cpp index c785a709384..76d60bbdf1a 100644 --- a/be/src/pipeline/exec/result_file_sink_operator.cpp +++ b/be/src/pipeline/exec/result_file_sink_operator.cpp @@ -129,7 +129,9 @@ Status ResultFileSinkLocalState::close(RuntimeState* state, Status exec_status) } // close sender, this is normal path end if (_sender) { - _sender->update_return_rows(_writer == nullptr ? 0 : _writer->get_written_rows()); + int64_t written_rows = _writer == nullptr ? 0 : _writer->get_written_rows(); + _sender->update_return_rows(written_rows); + state->get_query_ctx()->resource_ctx()->io_context()->update_returned_rows(written_rows); RETURN_IF_ERROR(_sender->close(state->fragment_instance_id(), final_status)); } state->exec_env()->result_mgr()->cancel_at_time( diff --git a/be/src/pipeline/exec/result_sink_operator.cpp b/be/src/pipeline/exec/result_sink_operator.cpp index 4698523f8be..256a90d8852 100644 --- a/be/src/pipeline/exec/result_sink_operator.cpp +++ b/be/src/pipeline/exec/result_sink_operator.cpp @@ -198,7 +198,10 @@ Status ResultSinkLocalState::close(RuntimeState* state, Status exec_status) { // close sender, this is normal path end if (_sender) { if (_writer) { - _sender->update_return_rows(_writer->get_written_rows()); + int64_t written_rows = _writer->get_written_rows(); + _sender->update_return_rows(written_rows); + state->get_query_ctx()->resource_ctx()->io_context()->update_returned_rows( + written_rows); } RETURN_IF_ERROR(_sender->close(state->fragment_instance_id(), final_status)); } diff --git a/be/src/pipeline/exec/scan_operator.cpp b/be/src/pipeline/exec/scan_operator.cpp index bd7833691db..af52ef34c60 100644 --- a/be/src/pipeline/exec/scan_operator.cpp +++ b/be/src/pipeline/exec/scan_operator.cpp @@ -981,9 +981,6 @@ Status ScanLocalState<Derived>::_prepare_scanners() { _eos = true; _scan_dependency->set_always_ready(); } else { - for (auto& scanner : scanners) { - scanner->set_query_statistics(_query_statistics.get()); - } COUNTER_SET(_num_scanners, static_cast<int64_t>(scanners.size())); RETURN_IF_ERROR(_start_scanners(_scanners)); } diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp index 4c8fa2df44b..123c0d29567 100644 --- a/be/src/pipeline/pipeline_task.cpp +++ b/be/src/pipeline/pipeline_task.cpp @@ -95,8 +95,6 @@ Status PipelineTask::prepare(const std::vector<TScanRangeParams>& scan_range, co _scan_ranges = scan_range; auto* parent_profile = _state->get_sink_local_state()->profile(); - query_ctx->register_query_statistics( - _state->get_sink_local_state()->get_query_statistics_ptr()); for (int op_idx = _operators.size() - 1; op_idx >= 0; op_idx--) { auto& op = _operators[op_idx]; @@ -104,8 +102,6 @@ Status PipelineTask::prepare(const std::vector<TScanRangeParams>& scan_range, co _le_state_map, _task_idx}; RETURN_IF_ERROR(op->setup_local_state(_state, info)); parent_profile = _state->get_local_state(op->operator_id())->profile(); - query_ctx->register_query_statistics( - _state->get_local_state(op->operator_id())->get_query_statistics_ptr()); } { std::vector<Dependency*> filter_dependencies; @@ -296,11 +292,7 @@ Status PipelineTask::execute(bool* eos) { } int64_t delta_cpu_time = cpu_time_stop_watch.elapsed_time(); _task_cpu_timer->update(delta_cpu_time); - auto cpu_qs = query_context()->get_cpu_statistics(); - if (cpu_qs) { - cpu_qs->add_cpu_nanos(delta_cpu_time); - } - query_context()->update_cpu_time(delta_cpu_time); + query_context()->resource_ctx()->cpu_context()->update_cpu_cost_ms(delta_cpu_time); }}; if (_wait_to_start()) { if (config::enable_prefetch_tablet) { diff --git a/be/src/runtime/buffer_control_block.cpp b/be/src/runtime/buffer_control_block.cpp index be8363fb3f2..e14ee1f8809 100644 --- a/be/src/runtime/buffer_control_block.cpp +++ b/be/src/runtime/buffer_control_block.cpp @@ -49,12 +49,11 @@ void GetResultBatchCtx::on_failure(const Status& status) { delete this; } -void GetResultBatchCtx::on_close(int64_t packet_seq, QueryStatistics* statistics) { +void GetResultBatchCtx::on_close(int64_t packet_seq, int64_t returned_rows) { Status status; status.to_protobuf(result->mutable_status()); - if (statistics != nullptr) { - statistics->to_pb(result->mutable_query_statistics()); - } + PQueryStatistics* statistics = result->mutable_query_statistics(); + statistics->set_returned_rows(returned_rows); result->set_packet_seq(packet_seq); result->set_eos(true); { done->Run(); } @@ -160,7 +159,6 @@ BufferControlBlock::BufferControlBlock(TUniqueId id, int buffer_size, RuntimeSta _fragement_transmission_compression_type( state->fragement_transmission_compression_type()), _profile("BufferControlBlock " + print_id(_fragment_id)) { - _query_statistics = std::make_unique<QueryStatistics>(); _serialize_batch_ns_timer = ADD_TIMER(&_profile, "SerializeBatchNsTime"); _uncompressed_bytes_counter = ADD_COUNTER(&_profile, "UncompressedBytes", TUnit::BYTES); _compressed_bytes_counter = ADD_COUNTER(&_profile, "CompressedBytes", TUnit::BYTES); @@ -271,7 +269,7 @@ void BufferControlBlock::get_batch(GetResultBatchCtx* ctx) { return; } if (_is_close) { - ctx->on_close(_packet_num, _query_statistics.get()); + ctx->on_close(_packet_num, _returned_rows); return; } // no ready data, push ctx to waiting list @@ -428,7 +426,7 @@ Status BufferControlBlock::close(const TUniqueId& id, Status exec_status) { if (!_waiting_rpc.empty()) { if (_status.ok()) { for (auto& ctx : _waiting_rpc) { - ctx->on_close(_packet_num, _query_statistics.get()); + ctx->on_close(_packet_num, _returned_rows); } } else { for (auto& ctx : _waiting_rpc) { diff --git a/be/src/runtime/buffer_control_block.h b/be/src/runtime/buffer_control_block.h index 9060007232e..4c519491315 100644 --- a/be/src/runtime/buffer_control_block.h +++ b/be/src/runtime/buffer_control_block.h @@ -32,7 +32,6 @@ #include <unordered_map> #include "common/status.h" -#include "runtime/query_statistics.h" #include "runtime/runtime_state.h" namespace google::protobuf { @@ -70,7 +69,7 @@ struct GetResultBatchCtx { : cntl(cntl_), result(result_), done(done_) {} void on_failure(const Status& status); - void on_close(int64_t packet_seq, QueryStatistics* statistics = nullptr); + void on_close(int64_t packet_seq, int64_t returned_rows = 0); void on_data(const std::unique_ptr<TFetchDataResult>& t_result, int64_t packet_seq, bool eos = false); }; @@ -125,14 +124,7 @@ public: [[nodiscard]] const TUniqueId& fragment_id() const { return _fragment_id; } [[nodiscard]] std::shared_ptr<MemTrackerLimiter> mem_tracker() { return _mem_tracker; } - void update_return_rows(int64_t num_rows) { - // _query_statistics may be null when the result sink init failed - // or some other failure. - // and the number of written rows is only needed when all things go well. - if (_query_statistics != nullptr) { - _query_statistics->add_returned_rows(num_rows); - } - } + void update_return_rows(int64_t num_rows) { _returned_rows.fetch_add(num_rows); } void set_dependency(const TUniqueId& id, std::shared_ptr<pipeline::Dependency> result_sink_dependency); @@ -169,7 +161,7 @@ protected: std::deque<GetArrowResultBatchCtx*> _waiting_arrow_result_batch_rpc; // only used for FE using return rows to check limit - std::unique_ptr<QueryStatistics> _query_statistics; + std::atomic<int64_t> _returned_rows {0}; // instance id to dependency std::unordered_map<TUniqueId, std::shared_ptr<pipeline::Dependency>> _result_sink_dependencys; std::unordered_map<TUniqueId, size_t> _instance_rows; diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index f7f7f41e901..4dfa966b20b 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -765,9 +765,8 @@ Status FragmentMgr::_get_or_create_query_ctx(const TPipelineFragmentParams& para if (workload_group_ptr != nullptr) { RETURN_IF_ERROR(workload_group_ptr->add_query(query_id, query_ctx)); query_ctx->set_workload_group(workload_group_ptr); - _exec_env->runtime_query_statistics_mgr()->set_workload_group_id( - print_id(query_id), workload_group_ptr->id()); } + // There is some logic in query ctx's dctor, we could not check if exists and delete the // temp query ctx now. For example, the query id maybe removed from workload group's queryset. map.insert({query_id, query_ctx}); diff --git a/be/src/runtime/load_channel.cpp b/be/src/runtime/load_channel.cpp index 331a67b05b7..99f41899215 100644 --- a/be/src/runtime/load_channel.cpp +++ b/be/src/runtime/load_channel.cpp @@ -48,7 +48,7 @@ LoadChannel::LoadChannel(const UniqueId& load_id, int64_t timeout_s, bool is_hig ExecEnv::GetInstance()->fragment_mgr()->get_query_ctx(_load_id.to_thrift()); if (query_context != nullptr) { - _resource_ctx = query_context->resource_ctx; + _resource_ctx = query_context->resource_ctx(); } else { _resource_ctx = ResourceContext::create_shared(); _resource_ctx->task_controller()->set_task_id(_load_id.to_thrift()); diff --git a/be/src/runtime/load_stream.cpp b/be/src/runtime/load_stream.cpp index bdf96eca415..3d643f09c94 100644 --- a/be/src/runtime/load_stream.cpp +++ b/be/src/runtime/load_stream.cpp @@ -426,7 +426,7 @@ LoadStream::LoadStream(PUniqueId load_id, LoadStreamMgr* load_stream_mgr, bool e std::shared_ptr<QueryContext> query_context = ExecEnv::GetInstance()->fragment_mgr()->get_query_ctx(load_tid); if (query_context != nullptr) { - _resource_ctx = query_context->resource_ctx; + _resource_ctx = query_context->resource_ctx(); } else { _resource_ctx = ResourceContext::create_shared(); _resource_ctx->task_controller()->set_task_id(load_tid); diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp b/be/src/runtime/memory/mem_tracker_limiter.cpp index 516ad5e5ab2..2f19d220d4a 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.cpp +++ b/be/src/runtime/memory/mem_tracker_limiter.cpp @@ -74,11 +74,6 @@ MemTrackerLimiter::MemTrackerLimiter(Type type, const std::string& label, int64_ _group_num = mem_tracker_limiter_group_counter.fetch_add(1) % (MEM_TRACKER_GROUP_NUM - 3) + 3; } - - // currently only select/load need runtime query statistics - if (_type == Type::LOAD || _type == Type::QUERY) { - _query_statistics = std::make_shared<QueryStatistics>(); - } memory_memtrackerlimiter_cnt << 1; } diff --git a/be/src/runtime/memory/mem_tracker_limiter.h b/be/src/runtime/memory/mem_tracker_limiter.h index 5f93cc7af6b..29536735924 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.h +++ b/be/src/runtime/memory/mem_tracker_limiter.h @@ -35,7 +35,6 @@ #include "common/config.h" #include "common/status.h" #include "runtime/memory/mem_counter.h" -#include "runtime/query_statistics.h" #include "util/string_util.h" #include "util/uid_util.h" @@ -139,7 +138,6 @@ public: Type type() const { return _type; } const std::string& label() const { return _label; } - std::shared_ptr<QueryStatistics> get_query_statistics() { return _query_statistics; } int64_t group_num() const { return _group_num; } bool has_limit() const { return _limit >= 0; } int64_t limit() const { return _limit; } @@ -166,13 +164,7 @@ public: // Use carefully! only memory that cannot be allocated using Doris Allocator needs to be consumed manually. // Ideally, all memory should use Doris Allocator. - void consume(int64_t bytes) { - _mem_counter.add(bytes); - if (_query_statistics) { - _query_statistics->set_max_peak_memory_bytes(peak_consumption()); - _query_statistics->set_current_used_memory_bytes(consumption()); - } - } + void consume(int64_t bytes) { _mem_counter.add(bytes); } void consume_no_update_peak(int64_t bytes) { _mem_counter.add_no_update_peak(bytes); } @@ -188,10 +180,6 @@ public: } else { _mem_counter.add(bytes); } - if (rt && _query_statistics) { - _query_statistics->set_max_peak_memory_bytes(peak_consumption()); - _query_statistics->set_current_used_memory_bytes(consumption()); - } return rt; } @@ -333,8 +321,6 @@ private: // Avoid frequent printing. bool _enable_print_log_usage = false; - std::shared_ptr<QueryStatistics> _query_statistics = nullptr; - struct AddressSanitizer { size_t size; std::string stack_trace; diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp index 1beea41dc8a..a6cccc22091 100644 --- a/be/src/runtime/query_context.cpp +++ b/be/src/runtime/query_context.cpp @@ -141,8 +141,6 @@ QueryContext::QueryContext(TUniqueId query_id, ExecEnv* exec_env, DCHECK_EQ(is_report_fe_addr_valid, true); } clock_gettime(CLOCK_MONOTONIC, &this->_query_arrival_timestamp); - register_memory_statistics(); - register_cpu_statistics(); DorisMetrics::instance()->query_ctx_cnt->increment(1); } @@ -176,18 +174,24 @@ void QueryContext::_init_query_mem_tracker() { if (_query_options.__isset.is_report_success && _query_options.is_report_success) { query_mem_tracker->enable_print_log_usage(); } - resource_ctx->memory_context()->set_mem_tracker(query_mem_tracker); + _resource_ctx->memory_context()->set_mem_tracker(query_mem_tracker); } void QueryContext::_init_resource_context() { - resource_ctx = ResourceContext::create_shared(); - resource_ctx->set_memory_context(QueryContext::QueryMemoryContext::create()); + _resource_ctx = ResourceContext::create_shared(); + _resource_ctx->set_memory_context(QueryContext::QueryMemoryContext::create()); _init_query_mem_tracker(); +#ifndef BE_TEST + _exec_env->runtime_query_statistics_mgr()->register_resource_context(print_id(_query_id), + _resource_ctx); +#endif } void QueryContext::init_query_task_controller() { - resource_ctx->set_task_controller(QueryContext::QueryTaskController::create(this)); - resource_ctx->task_controller()->set_task_id(_query_id); + _resource_ctx->set_task_controller(QueryContext::QueryTaskController::create(this)); + _resource_ctx->task_controller()->set_task_id(_query_id); + _resource_ctx->task_controller()->set_fe_addr(current_connect_fe); + _resource_ctx->task_controller()->set_query_type(_query_options.query_type); } QueryContext::~QueryContext() { @@ -210,9 +214,7 @@ QueryContext::~QueryContext() { group_id = workload_group()->id(); // before remove } -#ifndef BE_TEST - _exec_env->runtime_query_statistics_mgr()->set_query_finished(print_id(_query_id)); -#endif + _resource_ctx->task_controller()->finish(); if (enable_profile()) { _report_query_profile(); @@ -331,44 +333,6 @@ void QueryContext::set_pipeline_context( _fragment_id_to_pipeline_ctx.insert({fragment_id, pip_ctx}); } -void QueryContext::register_query_statistics(std::shared_ptr<QueryStatistics> qs) { -#ifndef BE_TEST - _exec_env->runtime_query_statistics_mgr()->register_query_statistics( - print_id(_query_id), qs, current_connect_fe, _query_options.query_type); -#endif -} - -std::shared_ptr<QueryStatistics> QueryContext::get_query_statistics() { - return _exec_env->runtime_query_statistics_mgr()->get_runtime_query_statistics( - print_id(_query_id)); -} - -void QueryContext::register_memory_statistics() { - if (query_mem_tracker()) { - std::shared_ptr<QueryStatistics> qs = query_mem_tracker()->get_query_statistics(); - std::string query_id = print_id(_query_id); - if (qs) { -#ifndef BE_TEST - _exec_env->runtime_query_statistics_mgr()->register_query_statistics( - query_id, qs, current_connect_fe, _query_options.query_type); -#endif - } else { - LOG(INFO) << " query " << query_id << " get memory query statistics failed "; - } - } -} - -void QueryContext::register_cpu_statistics() { - if (!_cpu_statistics) { - _cpu_statistics = std::make_shared<QueryStatistics>(); -#ifndef BE_TEST - _exec_env->runtime_query_statistics_mgr()->register_query_statistics( - print_id(_query_id), _cpu_statistics, current_connect_fe, - _query_options.query_type); -#endif - } -} - doris::pipeline::TaskScheduler* QueryContext::get_pipe_exec_scheduler() { if (workload_group()) { if (_task_scheduler) { @@ -386,8 +350,8 @@ ThreadPool* QueryContext::get_memtable_flush_pool() { } } -void QueryContext::set_workload_group(WorkloadGroupPtr& tg) { - resource_ctx->workload_group_context()->set_workload_group(tg); +void QueryContext::set_workload_group(WorkloadGroupPtr& wg) { + _resource_ctx->workload_group_context()->set_workload_group(wg); // Should add query first, then the workload group will not be deleted. // see task_group_manager::delete_workload_group_by_ids workload_group()->add_mem_tracker_limiter(query_mem_tracker()); diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h index e994df3886e..c02cf97bf1b 100644 --- a/be/src/runtime/query_context.h +++ b/be/src/runtime/query_context.h @@ -33,7 +33,6 @@ #include "common/object_pool.h" #include "runtime/exec_env.h" #include "runtime/memory/mem_tracker_limiter.h" -#include "runtime/query_statistics.h" #include "runtime/runtime_filter_mgr.h" #include "runtime/runtime_predicate.h" #include "runtime/workload_management/resource_context.h" @@ -200,7 +199,7 @@ public: } } - void set_workload_group(WorkloadGroupPtr& tg); + void set_workload_group(WorkloadGroupPtr& wg); int execution_timeout() const { return _query_options.__isset.execution_timeout ? _query_options.execution_timeout @@ -248,16 +247,6 @@ public: pipeline::Dependency* get_execution_dependency() { return _execution_dependency.get(); } - void register_query_statistics(std::shared_ptr<QueryStatistics> qs); - - std::shared_ptr<QueryStatistics> get_query_statistics(); - - void register_memory_statistics(); - - void register_cpu_statistics(); - - std::shared_ptr<QueryStatistics> get_cpu_statistics() { return _cpu_statistics; } - doris::pipeline::TaskScheduler* get_pipe_exec_scheduler(); ThreadPool* get_memtable_flush_pool(); @@ -272,10 +261,10 @@ public: bool is_nereids() const { return _is_nereids; } WorkloadGroupPtr workload_group() const { - return resource_ctx->workload_group_context()->workload_group(); + return _resource_ctx->workload_group_context()->workload_group(); } std::shared_ptr<MemTrackerLimiter> query_mem_tracker() const { - return resource_ctx->memory_context()->mem_tracker(); + return _resource_ctx->memory_context()->mem_tracker(); } void inc_running_big_mem_op_num() { @@ -300,7 +289,7 @@ public: ObjectPool obj_pool; - std::shared_ptr<ResourceContext> resource_ctx; + std::shared_ptr<ResourceContext> resource_ctx() { return _resource_ctx; } std::vector<TUniqueId> fragment_instance_ids; @@ -308,12 +297,6 @@ public: // only for file scan node std::map<int, TFileScanRangeParams> file_scan_range_params_map; - void update_cpu_time(int64_t delta_cpu_time) const { - if (workload_group() != nullptr) { - workload_group()->update_cpu_time(delta_cpu_time); - } - } - void add_using_brpc_stub(const TNetworkAddress& network_address, std::shared_ptr<PBackendService_Stub> brpc_stub) { if (network_address.port == 0) { @@ -341,6 +324,7 @@ private: int64_t _bytes_limit = 0; bool _is_nereids = false; std::atomic<int> _running_big_mem_op_num = 0; + std::shared_ptr<ResourceContext> _resource_ctx; // A token used to submit olap scanner to the "_limited_scan_thread_pool", // This thread pool token is created from "_limited_scan_thread_pool" from exec env. @@ -368,7 +352,6 @@ private: vectorized::SimplifiedScanScheduler* _remote_scan_task_scheduler = nullptr; std::unique_ptr<pipeline::Dependency> _execution_dependency; - std::shared_ptr<QueryStatistics> _cpu_statistics = nullptr; // This shared ptr is never used. It is just a reference to hold the object. // There is a weak ptr in runtime filter manager to reference this object. std::shared_ptr<RuntimeFilterMergeControllerEntity> _merge_controller_handler; diff --git a/be/src/runtime/query_statistics.cpp b/be/src/runtime/query_statistics.cpp deleted file mode 100644 index 110efef5ab9..00000000000 --- a/be/src/runtime/query_statistics.cpp +++ /dev/null @@ -1,84 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "runtime/query_statistics.h" - -#include <gen_cpp/data.pb.h> -#include <glog/logging.h> - -#include <memory> - -#include "util/time.h" - -namespace doris { - -void QueryStatistics::merge(const QueryStatistics& other) { - scan_rows += other.scan_rows; - scan_bytes += other.scan_bytes; - cpu_nanos += other.cpu_nanos; - shuffle_send_bytes += other.shuffle_send_bytes; - shuffle_send_rows += other.shuffle_send_rows; - _scan_bytes_from_local_storage += other._scan_bytes_from_local_storage; - _scan_bytes_from_remote_storage += other._scan_bytes_from_remote_storage; - - int64_t other_peak_mem = other.max_peak_memory_bytes; - if (other_peak_mem > this->max_peak_memory_bytes) { - this->max_peak_memory_bytes = other_peak_mem; - } - - int64_t other_memory_used = other.current_used_memory_bytes; - if (other_memory_used > 0) { - this->current_used_memory_bytes = other_memory_used; - } -} - -void QueryStatistics::to_pb(PQueryStatistics* statistics) { - DCHECK(statistics != nullptr); - statistics->set_scan_rows(scan_rows); - statistics->set_scan_bytes(scan_bytes); - statistics->set_cpu_ms(cpu_nanos / NANOS_PER_MILLIS); - statistics->set_returned_rows(returned_rows); - statistics->set_max_peak_memory_bytes(max_peak_memory_bytes); - statistics->set_scan_bytes_from_remote_storage(_scan_bytes_from_remote_storage); - statistics->set_scan_bytes_from_local_storage(_scan_bytes_from_local_storage); -} - -void QueryStatistics::to_thrift(TQueryStatistics* statistics) const { - DCHECK(statistics != nullptr); - statistics->__set_scan_bytes(scan_bytes); - statistics->__set_scan_rows(scan_rows); - statistics->__set_cpu_ms(cpu_nanos / NANOS_PER_MILLIS); - statistics->__set_returned_rows(returned_rows); - statistics->__set_max_peak_memory_bytes(max_peak_memory_bytes); - statistics->__set_current_used_memory_bytes(current_used_memory_bytes); - statistics->__set_shuffle_send_bytes(shuffle_send_bytes); - statistics->__set_shuffle_send_rows(shuffle_send_rows); - statistics->__set_scan_bytes_from_remote_storage(_scan_bytes_from_remote_storage); - statistics->__set_scan_bytes_from_local_storage(_scan_bytes_from_local_storage); -} - -void QueryStatistics::from_pb(const PQueryStatistics& statistics) { - scan_rows = statistics.scan_rows(); - scan_bytes = statistics.scan_bytes(); - cpu_nanos = statistics.cpu_ms() * NANOS_PER_MILLIS; - _scan_bytes_from_local_storage = statistics.scan_bytes_from_local_storage(); - _scan_bytes_from_remote_storage = statistics.scan_bytes_from_remote_storage(); -} - -QueryStatistics::~QueryStatistics() {} - -} // namespace doris diff --git a/be/src/runtime/query_statistics.h b/be/src/runtime/query_statistics.h deleted file mode 100644 index 0a19dfd46f0..00000000000 --- a/be/src/runtime/query_statistics.h +++ /dev/null @@ -1,113 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#pragma once - -#include <gen_cpp/FrontendService_types.h> -#include <gen_cpp/PaloInternalService_types.h> -#include <stdint.h> - -#include <map> -#include <memory> -#include <mutex> -#include <unordered_map> -#include <utility> - -#include "util/spinlock.h" - -namespace doris { - -class PNodeStatistics; -class PQueryStatistics; - -// This is responsible for collecting query statistics, usually it consists of -// two parts, one is current fragment or plan's statistics, the other is sub fragment -// or plan's statistics and QueryStatisticsRecvr is responsible for collecting it. -class QueryStatistics { -public: - QueryStatistics() - : scan_rows(0), - scan_bytes(0), - cpu_nanos(0), - returned_rows(0), - max_peak_memory_bytes(0), - current_used_memory_bytes(0), - shuffle_send_bytes(0), - shuffle_send_rows(0) {} - virtual ~QueryStatistics(); - - void merge(const QueryStatistics& other); - - void add_scan_rows(int64_t delta_scan_rows) { scan_rows += delta_scan_rows; } - - void add_scan_bytes(int64_t delta_scan_bytes) { scan_bytes += delta_scan_bytes; } - - void add_cpu_nanos(int64_t delta_cpu_time) { cpu_nanos += delta_cpu_time; } - - void add_shuffle_send_bytes(int64_t delta_bytes) { shuffle_send_bytes += delta_bytes; } - - void add_shuffle_send_rows(int64_t delta_rows) { shuffle_send_rows += delta_rows; } - - void add_scan_bytes_from_local_storage(int64_t scan_bytes_from_local_storage) { - _scan_bytes_from_local_storage += scan_bytes_from_local_storage; - } - - void add_scan_bytes_from_remote_storage(int64_t scan_bytes_from_remote_storage) { - _scan_bytes_from_remote_storage += scan_bytes_from_remote_storage; - } - - void add_returned_rows(int64_t num_rows) { returned_rows += num_rows; } - - void set_max_peak_memory_bytes(int64_t max_peak_memory_bytes) { - this->max_peak_memory_bytes = max_peak_memory_bytes; - } - - void set_current_used_memory_bytes(int64_t current_used_memory) { - current_used_memory_bytes = current_used_memory; - } - - void to_pb(PQueryStatistics* statistics); - void to_thrift(TQueryStatistics* statistics) const; - void from_pb(const PQueryStatistics& statistics); - bool collected() const { return _collected; } - - int64_t get_scan_rows() { return scan_rows; } - int64_t get_scan_bytes() { return scan_bytes; } - int64_t get_current_used_memory_bytes() { return current_used_memory_bytes; } - -private: - std::atomic<int64_t> scan_rows; - std::atomic<int64_t> scan_bytes; - std::atomic<int64_t> cpu_nanos; - std::atomic<int64_t> _scan_bytes_from_local_storage; - std::atomic<int64_t> _scan_bytes_from_remote_storage; - // number rows returned by query. - // only set once by result sink when closing. - std::atomic<int64_t> returned_rows; - // Maximum memory peak for all backends. - // only set once by result sink when closing. - std::atomic<int64_t> max_peak_memory_bytes; - bool _collected = false; - std::atomic<int64_t> current_used_memory_bytes; - - std::atomic<int64_t> shuffle_send_bytes; - std::atomic<int64_t> shuffle_send_rows; -}; -using QueryStatisticsPtr = std::shared_ptr<QueryStatistics>; -// It is used for collecting sub plan query statistics in DataStreamRecvr. - -} // namespace doris diff --git a/be/src/runtime/runtime_query_statistics_mgr.cpp b/be/src/runtime/runtime_query_statistics_mgr.cpp index ebcaf30eab1..f09558f7b9c 100644 --- a/be/src/runtime/runtime_query_statistics_mgr.cpp +++ b/be/src/runtime/runtime_query_statistics_mgr.cpp @@ -27,7 +27,6 @@ #include <cstdint> #include <memory> #include <mutex> -#include <random> #include <shared_mutex> #include <string> #include <tuple> @@ -38,10 +37,7 @@ #include "exec/schema_scanner/schema_scanner_helper.h" #include "runtime/client_cache.h" #include "runtime/exec_env.h" -#include "runtime/query_context.h" -#include "service/backend_options.h" #include "util/debug_util.h" -#include "util/hash_util.hpp" #include "util/thrift_client.h" #include "util/time.h" #include "util/uid_util.h" @@ -324,25 +320,17 @@ void RuntimeQueryStatisticsMgr::_report_query_profiles_function() { } } -void QueryStatisticsCtx::collect_query_statistics(TQueryStatistics* tq_s) { - QueryStatistics tmp_qs; - for (auto& qs_ptr : _qs_list) { - tmp_qs.merge(*qs_ptr); - } - tmp_qs.to_thrift(tq_s); - tq_s->__set_workload_group_id(_wg_id); -} - -void RuntimeQueryStatisticsMgr::register_query_statistics(std::string query_id, - std::shared_ptr<QueryStatistics> qs_ptr, - TNetworkAddress fe_addr, - TQueryType::type query_type) { - std::lock_guard<std::shared_mutex> write_lock(_qs_ctx_map_lock); - if (_query_statistics_ctx_map.find(query_id) == _query_statistics_ctx_map.end()) { - _query_statistics_ctx_map[query_id] = - std::make_unique<QueryStatisticsCtx>(fe_addr, query_type); - } - _query_statistics_ctx_map.at(query_id)->_qs_list.push_back(qs_ptr); +void RuntimeQueryStatisticsMgr::register_resource_context( + std::string query_id, std::shared_ptr<ResourceContext> resource_ctx) { + std::lock_guard<std::shared_mutex> write_lock(_resource_contexts_map_lock); + // Note: `group_commit_insert` will use the same `query_id` to submit multiple load tasks in sequence. + // After the previous load task ends but QueryStatistics has not been reported to FE, + // if the next load task with the same `query_id` starts to execute, `register_resource_context` will + // find that `query_id` already exists in _resource_contexts_map. + // At this time, directly overwriting the `resource_ctx` corresponding to the `query_id` + // in `register_resource_context` will cause the previous load task not to be reported to FE. + // DCHECK(_resource_contexts_map.find(query_id) == _resource_contexts_map.end()); + _resource_contexts_map[query_id] = resource_ctx; } void RuntimeQueryStatisticsMgr::report_runtime_query_statistics() { @@ -351,27 +339,28 @@ void RuntimeQueryStatisticsMgr::report_runtime_query_statistics() { std::map<TNetworkAddress, std::map<std::string, TQueryStatistics>> fe_qs_map; std::map<std::string, std::pair<bool, bool>> qs_status; // <finished, timeout> { - std::lock_guard<std::shared_mutex> write_lock(_qs_ctx_map_lock); + std::lock_guard<std::shared_mutex> write_lock(_resource_contexts_map_lock); int64_t current_time = MonotonicMillis(); int64_t conf_qs_timeout = config::query_statistics_reserve_timeout_ms; - for (auto& [query_id, qs_ctx_ptr] : _query_statistics_ctx_map) { - if (qs_ctx_ptr->_query_type == TQueryType::EXTERNAL) { + for (auto& [query_id, resource_ctx] : _resource_contexts_map) { + if (resource_ctx->task_controller()->query_type() == TQueryType::EXTERNAL) { continue; } - if (fe_qs_map.find(qs_ctx_ptr->_fe_addr) == fe_qs_map.end()) { + if (fe_qs_map.find(resource_ctx->task_controller()->fe_addr()) == fe_qs_map.end()) { std::map<std::string, TQueryStatistics> tmp_map; - fe_qs_map[qs_ctx_ptr->_fe_addr] = std::move(tmp_map); + fe_qs_map[resource_ctx->task_controller()->fe_addr()] = std::move(tmp_map); } TQueryStatistics ret_t_qs; - qs_ctx_ptr->collect_query_statistics(&ret_t_qs); - fe_qs_map.at(qs_ctx_ptr->_fe_addr)[query_id] = ret_t_qs; + resource_ctx->to_thrift_query_statistics(&ret_t_qs); + fe_qs_map.at(resource_ctx->task_controller()->fe_addr())[query_id] = ret_t_qs; - bool is_query_finished = qs_ctx_ptr->_is_query_finished; + bool is_query_finished = resource_ctx->task_controller()->is_finished(); bool is_timeout_after_finish = false; if (is_query_finished) { is_timeout_after_finish = - (current_time - qs_ctx_ptr->_query_finish_time) > conf_qs_timeout; + (current_time - resource_ctx->task_controller()->finish_time()) > + conf_qs_timeout; } qs_status[query_id] = std::make_pair(is_query_finished, is_timeout_after_finish); } @@ -444,12 +433,12 @@ void RuntimeQueryStatisticsMgr::report_runtime_query_statistics() { } // 3 when query is finished and (last rpc is send success), remove finished query statistics - if (fe_qs_map.size() == 0) { + if (fe_qs_map.empty()) { return; } { - std::lock_guard<std::shared_mutex> write_lock(_qs_ctx_map_lock); + std::lock_guard<std::shared_mutex> write_lock(_resource_contexts_map_lock); for (auto& [addr, qs_map] : fe_qs_map) { bool is_rpc_success = rpc_result[addr]; for (auto& [query_id, qs] : qs_map) { @@ -457,82 +446,53 @@ void RuntimeQueryStatisticsMgr::report_runtime_query_statistics() { bool is_query_finished = qs_status_pair.first; bool is_timeout_after_finish = qs_status_pair.second; if ((is_rpc_success && is_query_finished) || is_timeout_after_finish) { - _query_statistics_ctx_map.erase(query_id); + _resource_contexts_map.erase(query_id); } } } } } -void RuntimeQueryStatisticsMgr::set_query_finished(std::string query_id) { - // NOTE: here must be a write lock - std::lock_guard<std::shared_mutex> write_lock(_qs_ctx_map_lock); - // when a query get query_ctx succ, but failed before create node/operator, - // it may not register query statistics, so it can not be mark finish - if (_query_statistics_ctx_map.find(query_id) != _query_statistics_ctx_map.end()) { - auto* qs_ptr = _query_statistics_ctx_map.at(query_id).get(); - qs_ptr->_is_query_finished = true; - qs_ptr->_query_finish_time = MonotonicMillis(); - } -} - -std::shared_ptr<QueryStatistics> RuntimeQueryStatisticsMgr::get_runtime_query_statistics( - std::string query_id) { - std::shared_lock<std::shared_mutex> read_lock(_qs_ctx_map_lock); - if (_query_statistics_ctx_map.find(query_id) == _query_statistics_ctx_map.end()) { - return nullptr; - } - std::shared_ptr<QueryStatistics> qs_ptr = std::make_shared<QueryStatistics>(); - for (auto const& qs : _query_statistics_ctx_map[query_id]->_qs_list) { - qs_ptr->merge(*qs); - } - return qs_ptr; -} - void RuntimeQueryStatisticsMgr::get_metric_map( std::string query_id, std::map<WorkloadMetricType, std::string>& metric_map) { - QueryStatistics ret_qs; - int64_t query_time_ms = 0; - { - std::shared_lock<std::shared_mutex> read_lock(_qs_ctx_map_lock); - if (_query_statistics_ctx_map.find(query_id) != _query_statistics_ctx_map.end()) { - for (auto const& qs : _query_statistics_ctx_map[query_id]->_qs_list) { - ret_qs.merge(*qs); - } - query_time_ms = - MonotonicMillis() - _query_statistics_ctx_map.at(query_id)->_query_start_time; - } - } - metric_map.emplace(WorkloadMetricType::QUERY_TIME, std::to_string(query_time_ms)); - metric_map.emplace(WorkloadMetricType::SCAN_ROWS, std::to_string(ret_qs.get_scan_rows())); - metric_map.emplace(WorkloadMetricType::SCAN_BYTES, std::to_string(ret_qs.get_scan_bytes())); - metric_map.emplace(WorkloadMetricType::QUERY_MEMORY_BYTES, - std::to_string(ret_qs.get_current_used_memory_bytes())); -} - -void RuntimeQueryStatisticsMgr::set_workload_group_id(std::string query_id, int64_t wg_id) { - // wg id just need eventual consistency, read lock is ok - std::shared_lock<std::shared_mutex> read_lock(_qs_ctx_map_lock); - if (_query_statistics_ctx_map.find(query_id) != _query_statistics_ctx_map.end()) { - _query_statistics_ctx_map.at(query_id)->_wg_id = wg_id; + std::shared_lock<std::shared_mutex> read_lock(_resource_contexts_map_lock); + if (_resource_contexts_map.find(query_id) != _resource_contexts_map.end()) { + auto* resource_ctx = _resource_contexts_map.at(query_id).get(); + metric_map.emplace( + WorkloadMetricType::QUERY_TIME, + std::to_string(MonotonicMillis() - resource_ctx->task_controller()->finish_time())); + metric_map.emplace(WorkloadMetricType::SCAN_ROWS, + std::to_string(resource_ctx->io_context()->scan_rows())); + metric_map.emplace(WorkloadMetricType::SCAN_BYTES, + std::to_string(resource_ctx->io_context()->scan_bytes())); + metric_map.emplace(WorkloadMetricType::QUERY_MEMORY_BYTES, + std::to_string(resource_ctx->memory_context()->current_memory_bytes())); + } else { + metric_map.emplace(WorkloadMetricType::QUERY_TIME, "-1"); + metric_map.emplace(WorkloadMetricType::SCAN_ROWS, "-1"); + metric_map.emplace(WorkloadMetricType::SCAN_BYTES, "-1"); + metric_map.emplace(WorkloadMetricType::QUERY_MEMORY_BYTES, "-1"); } } void RuntimeQueryStatisticsMgr::get_active_be_tasks_block(vectorized::Block* block) { - std::shared_lock<std::shared_mutex> read_lock(_qs_ctx_map_lock); + std::shared_lock<std::shared_mutex> read_lock(_resource_contexts_map_lock); int64_t be_id = ExecEnv::GetInstance()->cluster_info()->backend_id; // block's schema come from SchemaBackendActiveTasksScanner::_s_tbls_columns - for (auto& [query_id, qs_ctx_ptr] : _query_statistics_ctx_map) { + for (auto& [query_id, resource_ctx] : _resource_contexts_map) { TQueryStatistics tqs; - qs_ctx_ptr->collect_query_statistics(&tqs); + resource_ctx->to_thrift_query_statistics(&tqs); SchemaScannerHelper::insert_int64_value(0, be_id, block); - SchemaScannerHelper::insert_string_value(1, qs_ctx_ptr->_fe_addr.hostname, block); + SchemaScannerHelper::insert_string_value( + 1, resource_ctx->task_controller()->fe_addr().hostname, block); SchemaScannerHelper::insert_string_value(2, query_id, block); - int64_t task_time = qs_ctx_ptr->_is_query_finished - ? qs_ctx_ptr->_query_finish_time - qs_ctx_ptr->_query_start_time - : MonotonicMillis() - qs_ctx_ptr->_query_start_time; + int64_t task_time = + resource_ctx->task_controller()->is_finished() + ? resource_ctx->task_controller()->finish_time() - + resource_ctx->task_controller()->start_time() + : MonotonicMillis() - resource_ctx->task_controller()->start_time(); SchemaScannerHelper::insert_int64_value(3, task_time, block); SchemaScannerHelper::insert_int64_value(4, tqs.cpu_ms, block); SchemaScannerHelper::insert_int64_value(5, tqs.scan_rows, block); @@ -543,7 +503,7 @@ void RuntimeQueryStatisticsMgr::get_active_be_tasks_block(vectorized::Block* blo SchemaScannerHelper::insert_int64_value(10, tqs.shuffle_send_rows, block); std::stringstream ss; - ss << qs_ctx_ptr->_query_type; + ss << resource_ctx->task_controller()->query_type(); SchemaScannerHelper::insert_string_value(11, ss.str(), block); } } diff --git a/be/src/runtime/runtime_query_statistics_mgr.h b/be/src/runtime/runtime_query_statistics_mgr.h index 1b7b9926698..71a93ee9220 100644 --- a/be/src/runtime/runtime_query_statistics_mgr.h +++ b/be/src/runtime/runtime_query_statistics_mgr.h @@ -31,10 +31,8 @@ #include <unordered_map> #include "gutil/integral_types.h" -#include "runtime/query_statistics.h" +#include "runtime/workload_management/resource_context.h" #include "runtime/workload_management/workload_condition.h" -#include "util/hash_util.hpp" -#include "util/time.h" namespace doris { @@ -42,28 +40,6 @@ namespace vectorized { class Block; } // namespace vectorized -class QueryStatisticsCtx { -public: - QueryStatisticsCtx(TNetworkAddress fe_addr, TQueryType::type query_type) - : _fe_addr(fe_addr), _query_type(query_type) { - this->_is_query_finished = false; - this->_wg_id = -1; - this->_query_start_time = MonotonicMillis(); - } - ~QueryStatisticsCtx() = default; - - void collect_query_statistics(TQueryStatistics* tq_s); - -public: - std::vector<std::shared_ptr<QueryStatistics>> _qs_list; - bool _is_query_finished; - const TNetworkAddress _fe_addr; - const TQueryType::type _query_type; - int64_t _query_finish_time; - int64_t _wg_id; - int64_t _query_start_time; -}; - class RuntimeQueryStatisticsMgr { public: RuntimeQueryStatisticsMgr() = default; @@ -75,18 +51,13 @@ public: fragment_id_to_profile, std::vector<std::shared_ptr<TRuntimeProfileTree>> load_channel_profile, bool is_done); - void register_query_statistics(std::string query_id, std::shared_ptr<QueryStatistics> qs_ptr, - TNetworkAddress fe_addr, TQueryType::type query_type); + void register_resource_context(std::string query_id, + std::shared_ptr<ResourceContext> resource_ctx); void report_runtime_query_statistics(); - void set_query_finished(std::string query_id); - - std::shared_ptr<QueryStatistics> get_runtime_query_statistics(std::string query_id); - - void set_workload_group_id(std::string query_id, int64_t wg_id); - // used for workload scheduler policy + // TODO: save ResourceContext in WorkloadGroupMgr, put get_metric_map into WorkloadGroupMgr. void get_metric_map(std::string query_id, std::map<WorkloadMetricType, std::string>& metric_map); @@ -104,8 +75,11 @@ public: std::shared_ptr<TRuntimeProfileTree> load_channel_profile_x); private: - std::shared_mutex _qs_ctx_map_lock; - std::map<std::string, std::unique_ptr<QueryStatisticsCtx>> _query_statistics_ctx_map; + std::shared_mutex _resource_contexts_map_lock; + // Must be shared_ptr of ResourceContext, because ResourceContext can only be removed from + // _resource_contexts_map after QueryStatistics is reported to FE, + // at which time the Query may have ended. + std::map<std::string, std::shared_ptr<ResourceContext>> _resource_contexts_map; std::mutex _report_profile_mutex; std::atomic_bool started = false; diff --git a/be/src/runtime/thread_context.cpp b/be/src/runtime/thread_context.cpp index 036d89a29f2..09394c8a721 100644 --- a/be/src/runtime/thread_context.cpp +++ b/be/src/runtime/thread_context.cpp @@ -42,11 +42,11 @@ AttachTask::AttachTask(const std::shared_ptr<MemTrackerLimiter>& mem_tracker) { AttachTask::AttachTask(RuntimeState* runtime_state) { signal::set_signal_is_nereids(runtime_state->is_nereids()); - init(runtime_state->get_query_ctx()->resource_ctx); + init(runtime_state->get_query_ctx()->resource_ctx()); } AttachTask::AttachTask(QueryContext* query_ctx) { - init(query_ctx->resource_ctx); + init(query_ctx->resource_ctx()); } AttachTask::~AttachTask() { diff --git a/be/src/runtime/workload_management/workload_group_context.h b/be/src/runtime/workload_management/cpu_context.cpp similarity index 64% copy from be/src/runtime/workload_management/workload_group_context.h copy to be/src/runtime/workload_management/cpu_context.cpp index c072704efc0..b6bbcc0da8a 100644 --- a/be/src/runtime/workload_management/workload_group_context.h +++ b/be/src/runtime/workload_management/cpu_context.cpp @@ -15,25 +15,20 @@ // specific language governing permissions and limitations // under the License. -#pragma once +#include "runtime/workload_management/cpu_context.h" -#include "common/factory_creator.h" -#include "runtime/workload_group/workload_group.h" +#include <glog/logging.h> -namespace doris { - -class WorkloadGroupContext { - ENABLE_FACTORY_CREATOR(WorkloadGroupContext); +#include "runtime/workload_management/resource_context.h" -public: - WorkloadGroupContext() = default; - virtual ~WorkloadGroupContext() = default; - - WorkloadGroupPtr workload_group() { return _workload_group; } - void set_workload_group(WorkloadGroupPtr wg) { _workload_group = wg; } +namespace doris { -protected: - WorkloadGroupPtr _workload_group = nullptr; -}; +void CPUContext::update_cpu_cost_ms(int64_t delta) const { + stats_.cpu_cost_ms_counter_->update(delta); + if (resource_ctx_ != nullptr && + resource_ctx_->workload_group_context()->workload_group() != nullptr) { + resource_ctx_->workload_group_context()->workload_group()->update_cpu_time(delta); + } +} } // namespace doris diff --git a/be/src/runtime/workload_management/cpu_context.h b/be/src/runtime/workload_management/cpu_context.h index 2abce39387e..6f922b228f7 100644 --- a/be/src/runtime/workload_management/cpu_context.h +++ b/be/src/runtime/workload_management/cpu_context.h @@ -18,10 +18,13 @@ #pragma once #include "common/factory_creator.h" +#include "runtime/workload_group/workload_group.h" #include "util/runtime_profile.h" namespace doris { +class ResourceContext; + class CPUContext : public std::enable_shared_from_this<CPUContext> { ENABLE_FACTORY_CREATOR(CPUContext); @@ -47,7 +50,12 @@ public: CPUContext() { stats_.init_profile(); } virtual ~CPUContext() = default; - Stats* stats() { return &stats_; } + + RuntimeProfile* stats_profile() { return stats_.profile(); } + + int64_t cpu_cost_ms() const { return stats_.cpu_cost_ms_counter_->value(); } + + void update_cpu_cost_ms(int64_t delta) const; // Bind current thread to cgroup, only some load thread should do this. void bind_workload_group() { @@ -55,7 +63,12 @@ public: } protected: + friend class ResourceContext; + + void set_resource_ctx(ResourceContext* resource_ctx) { resource_ctx_ = resource_ctx; } + Stats stats_; + ResourceContext* resource_ctx_ {nullptr}; }; } // namespace doris diff --git a/be/src/runtime/workload_management/io_context.h b/be/src/runtime/workload_management/io_context.h index a377df3feb5..b9874bae251 100644 --- a/be/src/runtime/workload_management/io_context.h +++ b/be/src/runtime/workload_management/io_context.h @@ -23,6 +23,8 @@ namespace doris { +class ResourceContext; + class IOContext : public std::enable_shared_from_this<IOContext> { ENABLE_FACTORY_CREATOR(IOContext); @@ -65,7 +67,36 @@ public: IOContext() { stats_.init_profile(); } virtual ~IOContext() = default; - Stats* stats() { return &stats_; } + + RuntimeProfile* stats_profile() { return stats_.profile(); } + + int64_t scan_rows() const { return stats_.scan_rows_counter_->value(); } + int64_t scan_bytes() const { return stats_.scan_bytes_counter_->value(); } + int64_t scan_bytes_from_local_storage() const { + return stats_.scan_bytes_from_local_storage_counter_->value(); + } + int64_t scan_bytes_from_remote_storage() const { + return stats_.scan_bytes_from_remote_storage_counter_->value(); + } + int64_t returned_rows() const { return stats_.returned_rows_counter_->value(); } + int64_t shuffle_send_bytes() const { return stats_.shuffle_send_bytes_counter_->value(); } + int64_t shuffle_send_rows() const { return stats_.shuffle_send_rows_counter_->value(); } + + void update_scan_rows(int64_t delta) const { stats_.scan_rows_counter_->update(delta); } + void update_scan_bytes(int64_t delta) const { stats_.scan_bytes_counter_->update(delta); } + void update_scan_bytes_from_local_storage(int64_t delta) const { + stats_.scan_bytes_from_local_storage_counter_->update(delta); + } + void update_scan_bytes_from_remote_storage(int64_t delta) const { + stats_.scan_bytes_from_remote_storage_counter_->update(delta); + } + void update_returned_rows(int64_t delta) const { stats_.returned_rows_counter_->update(delta); } + void update_shuffle_send_bytes(int64_t delta) const { + stats_.shuffle_send_bytes_counter_->update(delta); + } + void update_shuffle_send_rows(int64_t delta) const { + stats_.shuffle_send_rows_counter_->update(delta); + } IOThrottle* io_throttle() { // TODO: get io throttle from workload group @@ -73,7 +104,12 @@ public: } protected: + friend class ResourceContext; + + void set_resource_ctx(ResourceContext* resource_ctx) { resource_ctx_ = resource_ctx; } + Stats stats_; + ResourceContext* resource_ctx_ {nullptr}; }; } // namespace doris diff --git a/be/src/runtime/workload_management/memory_context.h b/be/src/runtime/workload_management/memory_context.h index 77fb2a52a2c..6caff41f026 100644 --- a/be/src/runtime/workload_management/memory_context.h +++ b/be/src/runtime/workload_management/memory_context.h @@ -22,11 +22,13 @@ #include "common/factory_creator.h" #include "common/status.h" +#include "runtime/memory/mem_tracker_limiter.h" #include "util/runtime_profile.h" namespace doris { class MemTrackerLimiter; +class ResourceContext; class MemoryContext : public std::enable_shared_from_this<MemoryContext> { ENABLE_FACTORY_CREATOR(MemoryContext); @@ -71,13 +73,21 @@ public: MemoryContext() { stats_.init_profile(); } virtual ~MemoryContext() = default; - Stats* stats() { return &stats_; } - std::shared_ptr<MemTrackerLimiter> mem_tracker() { return mem_tracker_; } + RuntimeProfile* stats_profile() { return stats_.profile(); } + + std::shared_ptr<MemTrackerLimiter> mem_tracker() const { return mem_tracker_; } void set_mem_tracker(const std::shared_ptr<MemTrackerLimiter>& mem_tracker) { mem_tracker_ = mem_tracker; } + int64_t current_memory_bytes() const { return mem_tracker_->consumption(); } + int64_t peak_memory_bytes() const { return mem_tracker_->peak_consumption(); } + int64_t max_peak_memory_bytes() const { return stats_.max_peak_memory_bytes_counter_->value(); } + int64_t revoke_attempts() const { return stats_.revoke_attempts_counter_->value(); } + int64_t revoke_wait_time_ms() const { return stats_.revoke_wait_time_ms_counter_->value(); } + int64_t revoked_bytes() const { return stats_.revoked_bytes_counter_->value(); } + // Following method is related with spill disk. // Compute the number of bytes could be released. virtual int64_t revokable_bytes() { return 0; } @@ -92,9 +102,14 @@ public: virtual Status leave_arbitration(Status reason) { return Status::OK(); } protected: + friend class ResourceContext; + + void set_resource_ctx(ResourceContext* resource_ctx) { resource_ctx_ = resource_ctx; } + Stats stats_; // MemTracker that is shared by all fragment instances running on this host. - std::shared_ptr<MemTrackerLimiter> mem_tracker_; + std::shared_ptr<MemTrackerLimiter> mem_tracker_ {nullptr}; + ResourceContext* resource_ctx_ {nullptr}; }; } // namespace doris diff --git a/be/src/runtime/workload_management/resource_context.cpp b/be/src/runtime/workload_management/resource_context.cpp new file mode 100644 index 00000000000..765e4615505 --- /dev/null +++ b/be/src/runtime/workload_management/resource_context.cpp @@ -0,0 +1,54 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "runtime/workload_management/resource_context.h" + +#include <gen_cpp/data.pb.h> +#include <glog/logging.h> + +#include "util/time.h" + +namespace doris { + +void ResourceContext::to_pb_query_statistics(PQueryStatistics* statistics) const { + DCHECK(statistics != nullptr); + statistics->set_scan_rows(io_context()->scan_rows()); + statistics->set_scan_bytes(io_context()->scan_bytes()); + statistics->set_cpu_ms(cpu_context()->cpu_cost_ms() / NANOS_PER_MILLIS); + statistics->set_returned_rows(io_context()->returned_rows()); + statistics->set_max_peak_memory_bytes(memory_context()->max_peak_memory_bytes()); + statistics->set_scan_bytes_from_remote_storage(io_context()->scan_bytes_from_remote_storage()); + statistics->set_scan_bytes_from_local_storage(io_context()->scan_bytes_from_local_storage()); +} + +void ResourceContext::to_thrift_query_statistics(TQueryStatistics* statistics) const { + DCHECK(statistics != nullptr); + statistics->__set_scan_rows(io_context()->scan_rows()); + statistics->__set_scan_bytes(io_context()->scan_bytes()); + statistics->__set_cpu_ms(cpu_context()->cpu_cost_ms() / NANOS_PER_MILLIS); + statistics->__set_returned_rows(io_context()->returned_rows()); + statistics->__set_max_peak_memory_bytes(memory_context()->max_peak_memory_bytes()); + statistics->__set_current_used_memory_bytes(memory_context()->current_memory_bytes()); + statistics->__set_shuffle_send_bytes(io_context()->shuffle_send_bytes()); + statistics->__set_shuffle_send_rows(io_context()->shuffle_send_rows()); + statistics->__set_scan_bytes_from_remote_storage( + io_context()->scan_bytes_from_remote_storage()); + statistics->__set_scan_bytes_from_local_storage(io_context()->scan_bytes_from_local_storage()); + statistics->__set_workload_group_id(workload_group_context()->workload_group_id()); +} + +} // namespace doris diff --git a/be/src/runtime/workload_management/resource_context.h b/be/src/runtime/workload_management/resource_context.h index 80767dfaa9a..0c163f515cb 100644 --- a/be/src/runtime/workload_management/resource_context.h +++ b/be/src/runtime/workload_management/resource_context.h @@ -17,6 +17,8 @@ #pragma once +#include <gen_cpp/data.pb.h> + #include <memory> #include "common/factory_creator.h" @@ -46,24 +48,31 @@ public: io_context_ = IOContext::create_unique(); workload_group_context_ = WorkloadGroupContext::create_unique(); task_controller_ = TaskController::create_unique(); + + cpu_context_->set_resource_ctx(this); + memory_context_->set_resource_ctx(this); + io_context_->set_resource_ctx(this); } ~ResourceContext() = default; // Only return the raw pointer to the caller, so that the caller should not save it to other variables. - CPUContext* cpu_context() { return cpu_context_.get(); } - MemoryContext* memory_context() { return memory_context_.get(); } - IOContext* io_context() { return io_context_.get(); } - WorkloadGroupContext* workload_group_context() { return workload_group_context_.get(); } - TaskController* task_controller() { return task_controller_.get(); } + CPUContext* cpu_context() const { return cpu_context_.get(); } + MemoryContext* memory_context() const { return memory_context_.get(); } + IOContext* io_context() const { return io_context_.get(); } + WorkloadGroupContext* workload_group_context() const { return workload_group_context_.get(); } + TaskController* task_controller() const { return task_controller_.get(); } void set_cpu_context(std::unique_ptr<CPUContext> cpu_context) { cpu_context_ = std::move(cpu_context); + cpu_context_->set_resource_ctx(this); } void set_memory_context(std::unique_ptr<MemoryContext> memory_context) { memory_context_ = std::move(memory_context); + memory_context_->set_resource_ctx(this); } void set_io_context(std::unique_ptr<IOContext> io_context) { io_context_ = std::move(io_context); + io_context_->set_resource_ctx(this); } void set_workload_group_context(std::unique_ptr<WorkloadGroupContext> wg_context) { workload_group_context_ = std::move(wg_context); @@ -73,20 +82,24 @@ public: } RuntimeProfile* profile() { return const_cast<RuntimeProfile*>(resource_profile_.get().get()); } + + void to_pb_query_statistics(PQueryStatistics* statistics) const; + void to_thrift_query_statistics(TQueryStatistics* statistics) const; + std::string debug_string() { return resource_profile_.get()->pretty_print(); } void refresh_resource_profile() { std::unique_ptr<RuntimeProfile> resource_profile = std::make_unique<RuntimeProfile>("ResourceContext"); - RuntimeProfile* cpu_profile = resource_profile->create_child( - cpu_context_->stats()->profile()->name(), true, false); - cpu_profile->merge(cpu_context_->stats()->profile()); + RuntimeProfile* cpu_profile = + resource_profile->create_child(cpu_context_->stats_profile()->name(), true, false); + cpu_profile->merge(cpu_context_->stats_profile()); RuntimeProfile* memory_profile = resource_profile->create_child( - memory_context_->stats()->profile()->name(), true, false); - memory_profile->merge(memory_context_->stats()->profile()); - RuntimeProfile* io_profile = resource_profile->create_child( - io_context_->stats()->profile()->name(), true, false); - io_profile->merge(io_context_->stats()->profile()); + memory_context_->stats_profile()->name(), true, false); + memory_profile->merge(memory_context_->stats_profile()); + RuntimeProfile* io_profile = + resource_profile->create_child(io_context_->stats_profile()->name(), true, false); + io_profile->merge(io_context_->stats_profile()); resource_profile_.set(std::move(resource_profile)); } diff --git a/be/src/runtime/workload_management/task_controller.h b/be/src/runtime/workload_management/task_controller.h index c1f246bf08a..f88d7d50832 100644 --- a/be/src/runtime/workload_management/task_controller.h +++ b/be/src/runtime/workload_management/task_controller.h @@ -17,10 +17,12 @@ #pragma once +#include <gen_cpp/PaloInternalService_types.h> #include <gen_cpp/Types_types.h> #include "common/factory_creator.h" #include "common/status.h" +#include "util/time.h" namespace doris { @@ -35,15 +37,38 @@ public: const TUniqueId& task_id() const { return task_id_; } void set_task_id(TUniqueId task_id) { task_id_ = task_id; } - virtual bool is_cancelled() const { return false; } - virtual Status cancel(const Status& reason) { return Status::OK(); } - virtual Status running_time(int64_t* running_time_msecs) { - *running_time_msecs = 0; + virtual bool is_cancelled() const { return is_cancelled_; } + virtual Status cancel(const Status& reason) { + is_cancelled_ = true; return Status::OK(); } + virtual bool is_finished() const { return is_finished_; } + virtual void finish() { + is_finished_ = true; + finish_time_ = MonotonicMillis(); + } + + int64_t start_time() const { return start_time_; } + int64_t finish_time() const { return finish_time_; } + Status running_time(int64_t* running_time_msecs) const { + *running_time_msecs = finish_time_ - start_time_; + return Status::OK(); + } + TNetworkAddress fe_addr() { return fe_addr_; } + TQueryType::type query_type() { return query_type_; } + + void set_fe_addr(TNetworkAddress fe_addr) { fe_addr_ = fe_addr; } + void set_query_type(TQueryType::type query_type) { query_type_ = query_type; } + protected: TUniqueId task_id_; + bool is_cancelled_ = false; + bool is_finished_ = false; + int64_t start_time_; + int64_t finish_time_; + TNetworkAddress fe_addr_; + TQueryType::type query_type_; }; } // namespace doris diff --git a/be/src/runtime/workload_management/workload_group_context.h b/be/src/runtime/workload_management/workload_group_context.h index c072704efc0..38cad310925 100644 --- a/be/src/runtime/workload_management/workload_group_context.h +++ b/be/src/runtime/workload_management/workload_group_context.h @@ -29,6 +29,12 @@ public: WorkloadGroupContext() = default; virtual ~WorkloadGroupContext() = default; + int64_t workload_group_id() { + if (workload_group() != nullptr) { + return workload_group()->id(); + } + return -1; + } WorkloadGroupPtr workload_group() { return _workload_group; } void set_workload_group(WorkloadGroupPtr wg) { _workload_group = wg; } diff --git a/be/src/vec/exec/scan/new_olap_scanner.cpp b/be/src/vec/exec/scan/new_olap_scanner.cpp index d026e86b1a9..248d391aadd 100644 --- a/be/src/vec/exec/scan/new_olap_scanner.cpp +++ b/be/src/vec/exec/scan/new_olap_scanner.cpp @@ -686,12 +686,10 @@ void NewOlapScanner::_collect_profile_before_close() { tablet->query_scan_bytes->increment(local_state->_read_compressed_counter->value()); tablet->query_scan_rows->increment(local_state->_scan_rows->value()); tablet->query_scan_count->increment(1); - if (_query_statistics) { - _query_statistics->add_scan_bytes_from_local_storage( - stats.file_cache_stats.bytes_read_from_local); - _query_statistics->add_scan_bytes_from_remote_storage( - stats.file_cache_stats.bytes_read_from_remote); - } + _state->get_query_ctx()->resource_ctx()->io_context()->update_scan_bytes_from_local_storage( + stats.file_cache_stats.bytes_read_from_local); + _state->get_query_ctx()->resource_ctx()->io_context()->update_scan_bytes_from_remote_storage( + stats.file_cache_stats.bytes_read_from_remote); } } // namespace doris::vectorized diff --git a/be/src/vec/exec/scan/scanner_context.cpp b/be/src/vec/exec/scan/scanner_context.cpp index 21be021581f..8cd2b843f4c 100644 --- a/be/src/vec/exec/scan/scanner_context.cpp +++ b/be/src/vec/exec/scan/scanner_context.cpp @@ -81,7 +81,7 @@ ScannerContext::ScannerContext( if (limit < 0) { limit = -1; } - _resource_ctx = _state->get_query_ctx()->resource_ctx; + _resource_ctx = _state->get_query_ctx()->resource_ctx(); _dependency = dependency; if (_min_scan_concurrency_of_scan_scheduler == 0) { _min_scan_concurrency_of_scan_scheduler = 2 * config::doris_scanner_thread_pool_thread_num; diff --git a/be/src/vec/exec/scan/vscanner.cpp b/be/src/vec/exec/scan/vscanner.cpp index 6efcc189b4b..5baf2ae9dad 100644 --- a/be/src/vec/exec/scan/vscanner.cpp +++ b/be/src/vec/exec/scan/vscanner.cpp @@ -103,8 +103,10 @@ Status VScanner::get_block(RuntimeState* state, Block* block, bool* eof) { } } +#ifndef BE_TEST int64_t old_scan_rows = _num_rows_read; int64_t old_scan_bytes = _num_byte_read; +#endif { do { // if step 2 filter all rows of block, and block will be reused to get next rows, @@ -136,10 +138,12 @@ Status VScanner::get_block(RuntimeState* state, Block* block, bool* eof) { _num_rows_read < rows_read_threshold); } - if (_query_statistics) { - _query_statistics->add_scan_rows(_num_rows_read - old_scan_rows); - _query_statistics->add_scan_bytes(_num_byte_read - old_scan_bytes); - } +#ifndef BE_TEST + _state->get_query_ctx()->resource_ctx()->io_context()->update_scan_rows(_num_rows_read - + old_scan_rows); + _state->get_query_ctx()->resource_ctx()->io_context()->update_scan_bytes(_num_byte_read - + old_scan_bytes); +#endif if (state->is_cancelled()) { // TODO: Should return the specific ErrorStatus instead of just Cancelled. @@ -260,9 +264,8 @@ void VScanner::_collect_profile_before_close() { void VScanner::update_scan_cpu_timer() { int64_t cpu_time = _cpu_watch.elapsed_time(); _scan_cpu_timer += cpu_time; - _query_statistics->add_cpu_nanos(cpu_time); if (_state && _state->get_query_ctx()) { - _state->get_query_ctx()->update_cpu_time(cpu_time); + _state->get_query_ctx()->resource_ctx()->cpu_context()->update_cpu_cost_ms(cpu_time); } } diff --git a/be/src/vec/exec/scan/vscanner.h b/be/src/vec/exec/scan/vscanner.h index bb68055e1f0..4cf6c780d83 100644 --- a/be/src/vec/exec/scan/vscanner.h +++ b/be/src/vec/exec/scan/vscanner.h @@ -32,7 +32,6 @@ namespace doris { class RuntimeProfile; class TupleDescriptor; -class QueryStatistics; namespace vectorized { class VExprContext; @@ -152,10 +151,6 @@ public: void set_status_on_failure(const Status& st) { _status = st; } - void set_query_statistics(QueryStatistics* query_statistics) { - _query_statistics = query_statistics; - } - int64_t limit() const { return _limit; } protected: @@ -168,7 +163,6 @@ protected: RuntimeState* _state = nullptr; pipeline::ScanLocalStateBase* _local_state = nullptr; - QueryStatistics* _query_statistics = nullptr; // Set if scan node has sort limit info int64_t _limit = -1; diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp b/be/src/vec/runtime/vdata_stream_recvr.cpp index a76324e140f..561ae3c764d 100644 --- a/be/src/vec/runtime/vdata_stream_recvr.cpp +++ b/be/src/vec/runtime/vdata_stream_recvr.cpp @@ -318,7 +318,7 @@ VDataStreamRecvr::VDataStreamRecvr(VDataStreamMgr* stream_mgr, : HasTaskExecutionCtx(state), _mgr(stream_mgr), _memory_used_counter(memory_used_counter), - _resource_ctx(state->get_query_ctx()->resource_ctx), + _resource_ctx(state->get_query_ctx()->resource_ctx()), _fragment_instance_id(fragment_instance_id), _dest_node_id(dest_node_id), _row_desc(row_desc), diff --git a/be/src/vec/sink/vdata_stream_sender.cpp b/be/src/vec/sink/vdata_stream_sender.cpp index b4a3b2282fd..a2a6c89d4b7 100644 --- a/be/src/vec/sink/vdata_stream_sender.cpp +++ b/be/src/vec/sink/vdata_stream_sender.cpp @@ -318,8 +318,10 @@ Status BlockSerializer::serialize_block(const Block* src, PBlock* dest, size_t n COUNTER_UPDATE(_parent->_bytes_sent_counter, compressed_bytes * num_receivers); COUNTER_UPDATE(_parent->_uncompressed_bytes_counter, uncompressed_bytes * num_receivers); COUNTER_UPDATE(_parent->_compress_timer, src->get_compress_time()); - _parent->get_query_statistics_ptr()->add_shuffle_send_bytes(compressed_bytes * num_receivers); - _parent->get_query_statistics_ptr()->add_shuffle_send_rows(src->rows() * num_receivers); + _parent->state()->get_query_ctx()->resource_ctx()->io_context()->update_shuffle_send_bytes( + compressed_bytes * num_receivers); + _parent->state()->get_query_ctx()->resource_ctx()->io_context()->update_shuffle_send_rows( + src->rows() * num_receivers); return Status::OK(); } diff --git a/be/src/vec/sink/writer/async_result_writer.cpp b/be/src/vec/sink/writer/async_result_writer.cpp index 9a5911140bf..8c400d49a9a 100644 --- a/be/src/vec/sink/writer/async_result_writer.cpp +++ b/be/src/vec/sink/writer/async_result_writer.cpp @@ -136,7 +136,8 @@ void AsyncResultWriter::process_block(RuntimeState* state, RuntimeProfile* profi cpu_time_stop_watch.start(); Defer defer {[&]() { if (state && state->get_query_ctx()) { - state->get_query_ctx()->update_cpu_time(cpu_time_stop_watch.elapsed_time()); + state->get_query_ctx()->resource_ctx()->cpu_context()->update_cpu_cost_ms( + cpu_time_stop_watch.elapsed_time()); } }}; if (!_eos && _data_queue.empty() && _writer_status.ok()) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org