This is an automated email from the ASF dual-hosted git repository.
panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new c2b04a61773 [pipelineX](partition sort) Add some nessacery metrics
(#32020)
c2b04a61773 is described below
commit c2b04a617732c0f33f7f20c71d2429a09bfcbcbf
Author: Gabriel <[email protected]>
AuthorDate: Sat Mar 9 14:49:54 2024 +0800
[pipelineX](partition sort) Add some nessacery metrics (#32020)
Add some necessary metrics
---
be/src/pipeline/exec/partition_sort_sink_operator.cpp | 16 +++++++++-------
be/src/pipeline/exec/partition_sort_sink_operator.h | 17 ++++++++---------
be/src/pipeline/exec/partition_sort_source_operator.cpp | 8 ++++++++
3 files changed, 25 insertions(+), 16 deletions(-)
diff --git a/be/src/pipeline/exec/partition_sort_sink_operator.cpp
b/be/src/pipeline/exec/partition_sort_sink_operator.cpp
index 2481302fd40..571f1ed1399 100644
--- a/be/src/pipeline/exec/partition_sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/partition_sort_sink_operator.cpp
@@ -45,6 +45,7 @@ Status PartitionSortSinkLocalState::init(RuntimeState* state,
LocalSinkStateInfo
_build_timer = ADD_TIMER(_profile, "HashTableBuildTime");
_selector_block_timer = ADD_TIMER(_profile, "SelectorBlockTime");
_emplace_key_timer = ADD_TIMER(_profile, "EmplaceKeyTime");
+ _passthrough_rows_counter = ADD_COUNTER(_profile,
"PassThroughRowsCounter", TUnit::UNIT);
_partition_sort_info = std::make_shared<vectorized::PartitionSortInfo>(
&_vsort_exec_exprs, p._limit, 0, p._pool, p._is_asc_order,
p._nulls_first,
p._child_x->row_desc(), state, _profile, p._has_global_limit,
p._partition_inner_limit,
@@ -60,7 +61,11 @@
PartitionSortSinkOperatorX::PartitionSortSinkOperatorX(ObjectPool* pool, int ope
_pool(pool),
_row_descriptor(descs, tnode.row_tuples, tnode.nullable_tuples),
_limit(tnode.limit),
- _topn_phase(tnode.partition_sort_node.ptopn_phase) {}
+
_partition_exprs_num(tnode.partition_sort_node.partition_exprs.size()),
+ _topn_phase(tnode.partition_sort_node.ptopn_phase),
+ _has_global_limit(tnode.partition_sort_node.has_global_limit),
+ _top_n_algorithm(tnode.partition_sort_node.top_n_algorithm),
+
_partition_inner_limit(tnode.partition_sort_node.partition_inner_limit) {}
Status PartitionSortSinkOperatorX::init(const TPlanNode& tnode, RuntimeState*
state) {
RETURN_IF_ERROR(DataSinkOperatorX::init(tnode, state));
@@ -75,12 +80,8 @@ Status PartitionSortSinkOperatorX::init(const TPlanNode&
tnode, RuntimeState* st
if (tnode.partition_sort_node.__isset.partition_exprs) {
RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(
tnode.partition_sort_node.partition_exprs,
_partition_expr_ctxs));
- _partition_exprs_num = _partition_expr_ctxs.size();
}
- _has_global_limit = tnode.partition_sort_node.has_global_limit;
- _top_n_algorithm = tnode.partition_sort_node.top_n_algorithm;
- _partition_inner_limit = tnode.partition_sort_node.partition_inner_limit;
return Status::OK();
}
@@ -101,15 +102,14 @@ Status PartitionSortSinkOperatorX::sink(RuntimeState*
state, vectorized::Block*
auto& local_state = get_local_state(state);
auto current_rows = input_block->rows();
SCOPED_TIMER(local_state.exec_time_counter());
- COUNTER_UPDATE(local_state.rows_input_counter(),
(int64_t)input_block->rows());
if (current_rows > 0) {
+ COUNTER_UPDATE(local_state.rows_input_counter(),
(int64_t)input_block->rows());
local_state.child_input_rows = local_state.child_input_rows +
current_rows;
if (UNLIKELY(_partition_exprs_num == 0)) {
if (UNLIKELY(local_state._value_places.empty())) {
local_state._value_places.push_back(_pool->add(new
vectorized::PartitionBlocks(
local_state._partition_sort_info,
local_state._value_places.empty())));
}
- //no partition key
local_state._value_places[0]->append_whole_block(input_block,
_child_x->row_desc());
} else {
//just simply use partition num to check
@@ -118,6 +118,8 @@ Status PartitionSortSinkOperatorX::sink(RuntimeState*
state, vectorized::Block*
local_state._num_partition >
config::partition_topn_partition_threshold &&
local_state.child_input_rows < 10000 *
local_state._num_partition) {
{
+ COUNTER_UPDATE(local_state._passthrough_rows_counter,
+ (int64_t)input_block->rows());
std::lock_guard<std::mutex>
lock(local_state._shared_state->buffer_mutex);
local_state._shared_state->blocks_buffer.push(std::move(*input_block));
// buffer have data, source could read this.
diff --git a/be/src/pipeline/exec/partition_sort_sink_operator.h
b/be/src/pipeline/exec/partition_sort_sink_operator.h
index b39001d4723..8602b096f51 100644
--- a/be/src/pipeline/exec/partition_sort_sink_operator.h
+++ b/be/src/pipeline/exec/partition_sort_sink_operator.h
@@ -78,8 +78,8 @@ private:
RuntimeProfile::Counter* _build_timer = nullptr;
RuntimeProfile::Counter* _emplace_key_timer = nullptr;
RuntimeProfile::Counter* _selector_block_timer = nullptr;
-
RuntimeProfile::Counter* _hash_table_size_counter = nullptr;
+ RuntimeProfile::Counter* _passthrough_rows_counter = nullptr;
void _init_hash_method();
};
@@ -108,19 +108,18 @@ private:
friend class PartitionSortSinkLocalState;
ObjectPool* _pool = nullptr;
const RowDescriptor _row_descriptor;
- int64_t _limit = -1;
- int _partition_exprs_num = 0;
- vectorized::VExprContextSPtrs _partition_expr_ctxs;
-
- TPartTopNPhase::type _topn_phase;
+ const int64_t _limit = -1;
+ const int _partition_exprs_num = 0;
+ const TPartTopNPhase::type _topn_phase;
+ const bool _has_global_limit = false;
+ const TopNAlgorithm::type _top_n_algorithm = TopNAlgorithm::ROW_NUMBER;
+ const int64_t _partition_inner_limit = 0;
+ vectorized::VExprContextSPtrs _partition_expr_ctxs;
// Expressions and parameters used for build _sort_description
vectorized::VSortExecExprs _vsort_exec_exprs;
std::vector<bool> _is_asc_order;
std::vector<bool> _nulls_first;
- TopNAlgorithm::type _top_n_algorithm = TopNAlgorithm::ROW_NUMBER;
- bool _has_global_limit = false;
- int64_t _partition_inner_limit = 0;
Status _split_block_by_partition(vectorized::Block* input_block,
PartitionSortSinkLocalState& local_state,
bool eos);
diff --git a/be/src/pipeline/exec/partition_sort_source_operator.cpp
b/be/src/pipeline/exec/partition_sort_source_operator.cpp
index 166cd84fc4e..7fd03a11f7a 100644
--- a/be/src/pipeline/exec/partition_sort_source_operator.cpp
+++ b/be/src/pipeline/exec/partition_sort_source_operator.cpp
@@ -60,6 +60,10 @@ Status PartitionSortSourceOperatorX::get_block(RuntimeState*
state, vectorized::
local_state._dependency->block();
}
}
+ if (!output_block->empty()) {
+ COUNTER_UPDATE(local_state.blocks_returned_counter(), 1);
+ COUNTER_UPDATE(local_state.rows_returned_counter(),
output_block->rows());
+ }
return Status::OK();
}
}
@@ -78,6 +82,10 @@ Status PartitionSortSourceOperatorX::get_block(RuntimeState*
state, vectorized::
*eos = local_state._shared_state->blocks_buffer.empty() &&
local_state._sort_idx >=
local_state._shared_state->partition_sorts.size();
}
+ if (!output_block->empty()) {
+ COUNTER_UPDATE(local_state.blocks_returned_counter(), 1);
+ COUNTER_UPDATE(local_state.rows_returned_counter(),
output_block->rows());
+ }
return Status::OK();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]