This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new fd1db4da3d1 [agg](profile) fix incorrent profile (#28004)
fd1db4da3d1 is described below
commit fd1db4da3d12064b3c5c96cebe9c08ad310d0544
Author: TengJianPing <[email protected]>
AuthorDate: Tue Dec 5 20:48:10 2023 +0800
[agg](profile) fix incorrent profile (#28004)
---
be/src/pipeline/exec/aggregation_sink_operator.cpp | 6 ++++--
be/src/vec/exec/distinct_vaggregation_node.cpp | 1 -
be/src/vec/exec/vaggregation_node.cpp | 8 ++------
be/src/vec/exec/vaggregation_node.h | 2 --
be/src/vec/exprs/vectorized_agg_fn.cpp | 6 ------
be/src/vec/exprs/vectorized_agg_fn.h | 5 +----
6 files changed, 7 insertions(+), 21 deletions(-)
diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp
b/be/src/pipeline/exec/aggregation_sink_operator.cpp
index 4d6f9636de7..a07d19f2c9a 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp
@@ -79,8 +79,6 @@ Status AggSinkLocalState<DependencyType,
Derived>::init(RuntimeState* state,
Base::_shared_state->init_spill_partition_helper(p._spill_partition_count_bits);
for (auto& evaluator : p._aggregate_evaluators) {
Base::_shared_state->aggregate_evaluators.push_back(evaluator->clone(state,
p._pool));
-
Base::_shared_state->aggregate_evaluators.back()->set_timer(_exec_timer,
_merge_timer,
-
_expr_timer);
}
if (p._is_streaming) {
Base::_shared_state->data_queue->set_sink_dependency(Base::_dependency, 0);
@@ -110,6 +108,10 @@ Status AggSinkLocalState<DependencyType,
Derived>::init(RuntimeState* state,
_max_row_size_counter = ADD_COUNTER(Base::profile(), "MaxRowSizeInBytes",
TUnit::UNIT);
COUNTER_SET(_max_row_size_counter, (int64_t)0);
+ for (auto& evaluator : Base::_shared_state->aggregate_evaluators) {
+ evaluator->set_timer(_merge_timer, _expr_timer);
+ }
+
Base::_shared_state->agg_profile_arena =
std::make_unique<vectorized::Arena>();
if (Base::_shared_state->probe_expr_ctxs.empty()) {
diff --git a/be/src/vec/exec/distinct_vaggregation_node.cpp
b/be/src/vec/exec/distinct_vaggregation_node.cpp
index 642ad99bd93..a5c57792ba3 100644
--- a/be/src/vec/exec/distinct_vaggregation_node.cpp
+++ b/be/src/vec/exec/distinct_vaggregation_node.cpp
@@ -36,7 +36,6 @@ DistinctAggregationNode::DistinctAggregationNode(ObjectPool*
pool, const TPlanNo
Status DistinctAggregationNode::_distinct_pre_agg_with_serialized_key(
doris::vectorized::Block* in_block, doris::vectorized::Block*
out_block) {
SCOPED_TIMER(_exec_timer);
- SCOPED_TIMER(_build_timer);
DCHECK(!_probe_expr_ctxs.empty());
size_t key_size = _probe_expr_ctxs.size();
diff --git a/be/src/vec/exec/vaggregation_node.cpp
b/be/src/vec/exec/vaggregation_node.cpp
index 2fd2be94e9b..c594e99d02d 100644
--- a/be/src/vec/exec/vaggregation_node.cpp
+++ b/be/src/vec/exec/vaggregation_node.cpp
@@ -104,7 +104,6 @@ AggregationNode::AggregationNode(ObjectPool* pool, const
TPlanNode& tnode,
: ExecNode(pool, tnode, descs),
_hash_table_compute_timer(nullptr),
_hash_table_input_counter(nullptr),
- _build_timer(nullptr),
_expr_timer(nullptr),
_intermediate_tuple_id(tnode.agg_node.intermediate_tuple_id),
_intermediate_tuple_desc(nullptr),
@@ -250,7 +249,6 @@ Status AggregationNode::prepare_profile(RuntimeState*
state) {
_serialize_key_arena_memory_usage =
runtime_profile()->AddHighWaterMarkCounter(
"SerializeKeyArena", TUnit::BYTES, "MemoryUsage");
- _build_timer = ADD_TIMER_WITH_LEVEL(runtime_profile(), "BuildTime", 1);
_build_table_convert_timer = ADD_TIMER(runtime_profile(),
"BuildConvertToPartitionedTime");
_serialize_key_timer = ADD_TIMER(runtime_profile(), "SerializeKeyTime");
_merge_timer = ADD_TIMER(runtime_profile(), "MergeTime");
@@ -293,7 +291,7 @@ Status AggregationNode::prepare_profile(RuntimeState*
state) {
// set profile timer to evaluators
for (auto& evaluator : _aggregate_evaluators) {
- evaluator->set_timer(_exec_timer, _merge_timer, _expr_timer);
+ evaluator->set_timer(_merge_timer, _expr_timer);
}
_offsets_of_aggregate_states.resize(_aggregate_evaluators.size());
@@ -404,7 +402,7 @@ Status AggregationNode::prepare_profile(RuntimeState*
state) {
_is_merge ? "true" : "false", _needs_finalize ? "true" :
"false",
_is_streaming_preagg ? "true" : "false",
std::to_string(_aggregate_evaluators.size()),
std::to_string(_limit));
- runtime_profile()->add_info_string("AggInfos:", fmt::to_string(msg));
+ runtime_profile()->add_info_string("AggInfos", fmt::to_string(msg));
return Status::OK();
}
@@ -675,7 +673,6 @@ Status
AggregationNode::_serialize_without_key(RuntimeState* state, Block* block
Status AggregationNode::_execute_without_key(Block* block) {
DCHECK(_agg_data->without_key != nullptr);
- SCOPED_TIMER(_build_timer);
for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
RETURN_IF_ERROR(_aggregate_evaluators[i]->execute_single_add(
block, _agg_data->without_key +
_offsets_of_aggregate_states[i],
@@ -901,7 +898,6 @@ void AggregationNode::_find_in_hash_table(AggregateDataPtr*
places, ColumnRawPtr
Status AggregationNode::_pre_agg_with_serialized_key(doris::vectorized::Block*
in_block,
doris::vectorized::Block*
out_block) {
- SCOPED_TIMER(_build_timer);
DCHECK(!_probe_expr_ctxs.empty());
size_t key_size = _probe_expr_ctxs.size();
diff --git a/be/src/vec/exec/vaggregation_node.h
b/be/src/vec/exec/vaggregation_node.h
index cf49817955c..fba82aa8c9d 100644
--- a/be/src/vec/exec/vaggregation_node.h
+++ b/be/src/vec/exec/vaggregation_node.h
@@ -422,7 +422,6 @@ protected:
RuntimeProfile::Counter* _hash_table_compute_timer = nullptr;
RuntimeProfile::Counter* _hash_table_emplace_timer = nullptr;
RuntimeProfile::Counter* _hash_table_input_counter = nullptr;
- RuntimeProfile::Counter* _build_timer = nullptr;
RuntimeProfile::Counter* _expr_timer = nullptr;
private:
@@ -523,7 +522,6 @@ private:
template <bool limit>
Status _execute_with_serialized_key_helper(Block* block) {
- SCOPED_TIMER(_build_timer);
DCHECK(!_probe_expr_ctxs.empty());
size_t key_size = _probe_expr_ctxs.size();
diff --git a/be/src/vec/exprs/vectorized_agg_fn.cpp
b/be/src/vec/exprs/vectorized_agg_fn.cpp
index 06a776efacf..f0f1d3f815d 100644
--- a/be/src/vec/exprs/vectorized_agg_fn.cpp
+++ b/be/src/vec/exprs/vectorized_agg_fn.cpp
@@ -69,7 +69,6 @@ AggFnEvaluator::AggFnEvaluator(const TExprNode& desc)
_return_type(TypeDescriptor::from_thrift(desc.fn.ret_type)),
_intermediate_slot_desc(nullptr),
_output_slot_desc(nullptr),
- _exec_timer(nullptr),
_merge_timer(nullptr),
_expr_timer(nullptr) {
bool nullable = true;
@@ -233,7 +232,6 @@ void AggFnEvaluator::destroy(AggregateDataPtr place) {
Status AggFnEvaluator::execute_single_add(Block* block, AggregateDataPtr
place, Arena* arena) {
RETURN_IF_ERROR(_calc_argument_columns(block));
- SCOPED_TIMER(_exec_timer);
_function->add_batch_single_place(block->rows(), place,
_agg_columns.data(), arena);
return Status::OK();
}
@@ -241,7 +239,6 @@ Status AggFnEvaluator::execute_single_add(Block* block,
AggregateDataPtr place,
Status AggFnEvaluator::execute_batch_add(Block* block, size_t offset,
AggregateDataPtr* places,
Arena* arena, bool agg_many) {
RETURN_IF_ERROR(_calc_argument_columns(block));
- SCOPED_TIMER(_exec_timer);
_function->add_batch(block->rows(), places, offset, _agg_columns.data(),
arena, agg_many);
return Status::OK();
}
@@ -249,7 +246,6 @@ Status AggFnEvaluator::execute_batch_add(Block* block,
size_t offset, AggregateD
Status AggFnEvaluator::execute_batch_add_selected(Block* block, size_t offset,
AggregateDataPtr* places,
Arena* arena) {
RETURN_IF_ERROR(_calc_argument_columns(block));
- SCOPED_TIMER(_exec_timer);
_function->add_batch_selected(block->rows(), places, offset,
_agg_columns.data(), arena);
return Status::OK();
}
@@ -257,7 +253,6 @@ Status AggFnEvaluator::execute_batch_add_selected(Block*
block, size_t offset,
Status AggFnEvaluator::streaming_agg_serialize(Block* block, BufferWritable&
buf,
const size_t num_rows, Arena*
arena) {
RETURN_IF_ERROR(_calc_argument_columns(block));
- SCOPED_TIMER(_exec_timer);
_function->streaming_agg_serialize(_agg_columns.data(), buf, num_rows,
arena);
return Status::OK();
}
@@ -265,7 +260,6 @@ Status AggFnEvaluator::streaming_agg_serialize(Block*
block, BufferWritable& buf
Status AggFnEvaluator::streaming_agg_serialize_to_column(Block* block,
MutableColumnPtr& dst,
const size_t
num_rows, Arena* arena) {
RETURN_IF_ERROR(_calc_argument_columns(block));
- SCOPED_TIMER(_exec_timer);
_function->streaming_agg_serialize_to_column(_agg_columns.data(), dst,
num_rows, arena);
return Status::OK();
}
diff --git a/be/src/vec/exprs/vectorized_agg_fn.h
b/be/src/vec/exprs/vectorized_agg_fn.h
index b3fb4f6d5eb..3cabd275614 100644
--- a/be/src/vec/exprs/vectorized_agg_fn.h
+++ b/be/src/vec/exprs/vectorized_agg_fn.h
@@ -56,9 +56,7 @@ public:
const SlotDescriptor* intermediate_slot_desc,
const SlotDescriptor* output_slot_desc);
- void set_timer(RuntimeProfile::Counter* exec_timer,
RuntimeProfile::Counter* merge_timer,
- RuntimeProfile::Counter* expr_timer) {
- _exec_timer = exec_timer;
+ void set_timer(RuntimeProfile::Counter* merge_timer,
RuntimeProfile::Counter* expr_timer) {
_merge_timer = merge_timer;
_expr_timer = expr_timer;
}
@@ -123,7 +121,6 @@ private:
const SlotDescriptor* _intermediate_slot_desc = nullptr;
const SlotDescriptor* _output_slot_desc = nullptr;
- RuntimeProfile::Counter* _exec_timer = nullptr;
RuntimeProfile::Counter* _merge_timer = nullptr;
RuntimeProfile::Counter* _expr_timer = nullptr;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]