This is an automated email from the ASF dual-hosted git repository.
lichaoyong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new f4c03fe 1. Delete the code of Sort Node we do not use now. (#3666)
f4c03fe is described below
commit f4c03fe8e28ce2fd8ba6a969c5a16ad02c010dce
Author: HappenLee <[email protected]>
AuthorDate: Tue May 26 10:20:57 2020 +0800
1. Delete the code of Sort Node we do not use now. (#3666)
Optimize the quick sort by find_the_median and try to reduce recursion
level of quick sort.
---
be/src/exec/CMakeLists.txt | 1 -
be/src/exec/exec_node.cpp | 1 -
be/src/exec/sort_node.cpp | 156 -----------------------------------------
be/src/exec/sort_node.h | 76 --------------------
be/src/runtime/spill_sorter.cc | 65 +++++++++++++----
5 files changed, 50 insertions(+), 249 deletions(-)
diff --git a/be/src/exec/CMakeLists.txt b/be/src/exec/CMakeLists.txt
index 4844338..c42afce 100644
--- a/be/src/exec/CMakeLists.txt
+++ b/be/src/exec/CMakeLists.txt
@@ -49,7 +49,6 @@ set(EXEC_FILES
text_converter.cpp
topn_node.cpp
sort_exec_exprs.cpp
- sort_node.cpp
olap_rewrite_node.cpp
olap_scan_node.cpp
olap_scanner.cpp
diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp
index 9a6f208..226e42a 100644
--- a/be/src/exec/exec_node.cpp
+++ b/be/src/exec/exec_node.cpp
@@ -45,7 +45,6 @@
#include "exec/repeat_node.h"
#include "exec/schema_scan_node.h"
#include "exec/select_node.h"
-#include "exec/sort_node.h"
#include "exec/spill_sort_node.h"
#include "exec/topn_node.h"
#include "exec/union_node.h"
diff --git a/be/src/exec/sort_node.cpp b/be/src/exec/sort_node.cpp
deleted file mode 100644
index da1d9a3..0000000
--- a/be/src/exec/sort_node.cpp
+++ /dev/null
@@ -1,156 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "exec/sort_node.h"
-#include "exec/sort_exec_exprs.h"
-#include "runtime/row_batch.h"
-#include "runtime/runtime_state.h"
-#include "util/runtime_profile.h"
-
-namespace doris {
-
-SortNode::SortNode(ObjectPool* pool, const TPlanNode& tnode, const
DescriptorTbl& descs)
- : ExecNode(pool, tnode, descs),
- _offset(tnode.sort_node.__isset.offset ? tnode.sort_node.offset : 0),
- _num_rows_skipped(0) {
- Status status = init(tnode, nullptr);
- DCHECK(status.ok()) << "SortNode c'tor:init failed: \n" <<
status.get_error_msg();
-}
-
-SortNode::~SortNode() {
-}
-
-Status SortNode::init(const TPlanNode& tnode, RuntimeState* state) {
- const vector<TExpr>* sort_tuple_slot_exprs =
tnode.sort_node.__isset.sort_tuple_slot_exprs ?
- &tnode.sort_node.sort_tuple_slot_exprs : NULL;
- RETURN_IF_ERROR(_sort_exec_exprs.init(tnode.sort_node.ordering_exprs,
- sort_tuple_slot_exprs, _pool));
- _is_asc_order = tnode.sort_node.is_asc_order;
- _nulls_first = tnode.sort_node.nulls_first;
- return Status::OK();
-}
-
-Status SortNode::prepare(RuntimeState* state) {
- SCOPED_TIMER(_runtime_profile->total_time_counter());
- RETURN_IF_ERROR(ExecNode::prepare(state));
- RETURN_IF_ERROR(_sort_exec_exprs.prepare(
- state, child(0)->row_desc(), _row_descriptor, expr_mem_tracker()));
- return Status::OK();
-}
-
-Status SortNode::open(RuntimeState* state) {
- SCOPED_TIMER(_runtime_profile->total_time_counter());
- RETURN_IF_ERROR(ExecNode::open(state));
- RETURN_IF_ERROR(_sort_exec_exprs.open(state));
- RETURN_IF_CANCELLED(state);
- RETURN_IF_ERROR(child(0)->open(state));
-
- TupleRowComparator less_than(
- _sort_exec_exprs.lhs_ordering_expr_ctxs(),
_sort_exec_exprs.rhs_ordering_expr_ctxs(),
- _is_asc_order, _nulls_first);
- _sorter.reset(new MergeSorter(
- less_than, _sort_exec_exprs.sort_tuple_slot_expr_ctxs(),
- &_row_descriptor, runtime_profile(), state));
-
- // The child has been opened and the sorter created. Sort the input.
- // The final merge is done on-demand as rows are requested in GetNext().
- RETURN_IF_ERROR(sort_input(state));
-
- // The child can be closed at this point.
- child(0)->close(state);
- return Status::OK();
-}
-
-Status SortNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eos)
{
- SCOPED_TIMER(_runtime_profile->total_time_counter());
- RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::GETNEXT));
- RETURN_IF_CANCELLED(state);
- //RETURN_IF_ERROR(QueryMaintenance(state));
-
- if (reached_limit()) {
- *eos = true;
- return Status::OK();
- } else {
- *eos = false;
- }
-
- DCHECK_EQ(row_batch->num_rows(), 0);
- RETURN_IF_ERROR(_sorter->get_next(row_batch, eos));
- while ((_num_rows_skipped < _offset)) {
- _num_rows_skipped += row_batch->num_rows();
- // Throw away rows in the output batch until the offset is skipped.
- int rows_to_keep = _num_rows_skipped - _offset;
- if (rows_to_keep > 0) {
- row_batch->copy_rows(0, row_batch->num_rows() - rows_to_keep,
rows_to_keep);
- row_batch->set_num_rows(rows_to_keep);
- } else {
- row_batch->set_num_rows(0);
- }
- if (rows_to_keep > 0 || *eos) {
- break;
- }
- RETURN_IF_ERROR(_sorter->get_next(row_batch, eos));
- }
-
- _num_rows_returned += row_batch->num_rows();
- if (reached_limit()) {
- row_batch->set_num_rows(row_batch->num_rows() - (_num_rows_returned -
_limit));
- *eos = true;
- }
-
- COUNTER_SET(_rows_returned_counter, _num_rows_returned);
- return Status::OK();
-}
-
-Status SortNode::close(RuntimeState* state) {
- if (is_closed()) {
- return Status::OK();
- }
- _sort_exec_exprs.close(state);
- _sorter.reset();
- return ExecNode::close(state);
-}
-
-void SortNode::debug_string(int indentation_level, stringstream* out) const {
- *out << string(indentation_level * 2, ' ');
- *out << "SortNode(";
- // << Expr::debug_string(_sort_exec_exprs.lhs_ordering_expr_ctxs());
- for (int i = 0; i < _is_asc_order.size(); ++i) {
- *out << (i > 0 ? " " : "")
- << (_is_asc_order[i] ? "asc" : "desc")
- << " nulls " << (_nulls_first[i] ? "first" : "last");
- }
- ExecNode::debug_string(indentation_level, out);
- *out << ")";
-}
-
-Status SortNode::sort_input(RuntimeState* state) {
- RowBatch batch(child(0)->row_desc(), state->batch_size(), mem_tracker());
- bool eos = false;
- do {
- batch.reset();
- RETURN_IF_ERROR(child(0)->get_next(state, &batch, &eos));
- RETURN_IF_ERROR(_sorter->add_batch(&batch));
- RETURN_IF_CANCELLED(state);
- RETURN_IF_LIMIT_EXCEEDED(state, "Sort, while getting next from the
child.");
- // RETURN_IF_ERROR(QueryMaintenance(state));
- } while (!eos);
- RETURN_IF_ERROR(_sorter->input_done());
- return Status::OK();
-}
-
-}
diff --git a/be/src/exec/sort_node.h b/be/src/exec/sort_node.h
deleted file mode 100644
index 2d5b1d8..0000000
--- a/be/src/exec/sort_node.h
+++ /dev/null
@@ -1,76 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#ifndef INF_DORIS_QE_SRC_BE_EXEC_SORT_NODE_H
-#define INF_DORIS_QE_SRC_BE_EXEC_SORT_NODE_H
-
-#include "exec/exec_node.h"
-#include "exec/sort_exec_exprs.h"
-#include "runtime/merge_sorter.h"
-#include "runtime/buffered_block_mgr.h"
-
-namespace doris {
-
-// Node that implements a full sort of its input with a fixed memory budget,
spilling
-// to disk if the input is larger than available memory.
-// Uses Sorter and BufferedBlockMgr for the external sort implementation.
-// Input rows to SortNode are materialized by the Sorter into a single tuple
-// using the expressions specified in sort_exec_exprs_.
-// In GetNext(), SortNode passes in the output batch to the sorter instance
created
-// in Open() to fill it with sorted rows.
-// If a merge phase was performed in the sort, sorted rows are deep copied into
-// the output batch. Otherwise, the sorter instance owns the sorted data.
-class SortNode : public ExecNode {
-public:
- SortNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl&
descs);
- ~SortNode();
-
- virtual Status prepare(RuntimeState* state);
- virtual Status open(RuntimeState* state);
- virtual Status get_next(RuntimeState* state, RowBatch* row_batch, bool*
eos);
- virtual Status close(RuntimeState* state);
-
-protected:
- virtual void debug_string(int indentation_level, std::stringstream* out)
const;
-
-private:
- Status init(const TPlanNode& tnode, RuntimeState* state = nullptr);
- // Fetch input rows and feed them to the sorter until the input is
exhausted.
- Status sort_input(RuntimeState* state);
-
- // Create a block manager object and set it in block_mgr_.
- // Returns and sets the query status to
Status::MemoryLimitExceeded("Memory limit exceeded") if there is not
- // enough memory for the sort.
- Status create_block_mgr(RuntimeState* state);
-
- // Number of rows to skip.
- int64_t _offset;
- int64_t _num_rows_skipped;
-
- // Object used for external sorting.
- boost::scoped_ptr<MergeSorter> _sorter;
-
- // Expressions and parameters used for tuple materialization and tuple
comparison.
- SortExecExprs _sort_exec_exprs;
- std::vector<bool> _is_asc_order;
- std::vector<bool> _nulls_first;
- boost::scoped_ptr<MemPool> _tuple_pool;
-};
-
-}
-
-#endif
diff --git a/be/src/runtime/spill_sorter.cc b/be/src/runtime/spill_sorter.cc
index 0a5b3ff..4456bc0 100644
--- a/be/src/runtime/spill_sorter.cc
+++ b/be/src/runtime/spill_sorter.cc
@@ -364,11 +364,16 @@ private:
void insertion_sort(const TupleIterator& first, const TupleIterator& last);
// Partitions the sequence of tuples in the range [first, last) in a run
into two
- // groups around the pivot tuple - i.e. tuples in first group are <= the
pivot, and
- // tuples in the second group are >= pivot. Tuples are swapped in place to
create the
+ // groups around the mid._current_tuple - i.e. tuples in first group are
<= the mid._current_tuple
+ // and tuples in the second group are >= mid._current_tuple. Tuples are
swapped in place to create the
// groups and the index to the first element in the second group is
returned.
// Checks _state->is_cancelled() and returns early with an invalid result
if true.
- TupleIterator partition(TupleIterator first, TupleIterator last, Tuple*
pivot);
+ TupleIterator partition(TupleIterator first, TupleIterator last,
TupleIterator& mid);
+
+ // Select the median of three iterator tuples. taking the median tends to
help us select better
+ // pivots that more evenly split the input range. This method makes
selection of
+ // bad pivots very infrequent.
+ void find_the_median(TupleIterator& first, TupleIterator& last,
TupleIterator& mid);
// Performs a quicksort of rows in the range [first, last) followed by
insertion sort
// for smaller groups of elements.
@@ -931,12 +936,34 @@ void SpillSorter::TupleSorter::insertion_sort(const
TupleIterator& first,
}
}
+void SpillSorter::TupleSorter::find_the_median(TupleSorter::TupleIterator
&first,
+ TupleSorter::TupleIterator &last, TupleSorter::TupleIterator &mid) {
+ last.prev();
+ auto f_com_result =
_less_than_comp.compare(reinterpret_cast<TupleRow*>(&first._current_tuple),
reinterpret_cast<TupleRow*>(&mid._current_tuple));
+ auto l_com_result =
_less_than_comp.compare(reinterpret_cast<TupleRow*>(&last._current_tuple),
reinterpret_cast<TupleRow*>(&mid._current_tuple));
+ if (f_com_result == -1 && l_com_result == -1) {
+ if
(_less_than_comp(reinterpret_cast<TupleRow*>(&first._current_tuple),reinterpret_cast<TupleRow*>(&last._current_tuple)))
{
+ swap(mid._current_tuple, last._current_tuple);
+ } else {
+ swap(mid._current_tuple, first._current_tuple);
+ }
+ }
+ if (f_com_result == 1 && l_com_result == 1) {
+ if (_less_than_comp(reinterpret_cast<TupleRow
*>(&first._current_tuple),
+ reinterpret_cast<TupleRow
*>(&last._current_tuple))) {
+ swap(mid._current_tuple, first._current_tuple);
+ } else {
+ swap(mid._current_tuple, last._current_tuple);
+ }
+ }
+}
+
SpillSorter::TupleSorter::TupleIterator SpillSorter::TupleSorter::partition(
- TupleIterator first, TupleIterator last, Tuple* pivot) {
- // Copy pivot into temp_tuple since it points to a tuple within [first,
last).
- memcpy(_temp_tuple_buffer, pivot, _tuple_size);
+ TupleIterator first, TupleIterator last, TupleIterator& mid) {
+ find_the_median(first, last, mid);
- last.prev();
+ // Copy &mid._current_tuple into temp_tuple since it points to a tuple
within [first, last).
+ memcpy(_temp_tuple_buffer, mid._current_tuple, _tuple_size);
while (true) {
// Search for the first and last out-of-place elements, and swap them.
while (_less_than_comp(
@@ -968,14 +995,22 @@ void SpillSorter::TupleSorter::sort_helper(TupleIterator
first, TupleIterator la
}
// Use insertion sort for smaller sequences.
while (last._index - first._index > INSERTION_THRESHOLD) {
- TupleIterator iter(this, first._index + (last._index - first._index) /
2);
- DCHECK(iter._current_tuple != NULL);
- // partition() splits the tuples in [first, last) into two groups (<=
pivot
- // and >= pivot) in-place. 'cut' is the index of the first tuple in
the second group.
- TupleIterator cut = partition(first, last,
- reinterpret_cast<Tuple*>(iter._current_tuple));
- sort_helper(cut, last);
- last = cut;
+ TupleIterator mid(this, first._index + (last._index - first._index) /
2);
+
+ DCHECK(mid._current_tuple != NULL);
+ // partition() splits the tuples in [first, last) into two groups (<=
mid iter
+ // and >= mid iter) in-place. 'cut' is the index of the first tuple in
the second group.
+ TupleIterator cut = partition(first, last, mid);
+
+ // Recurse on the smaller partition. This limits stack size to log(n)
stack frames.
+ if (last._index - cut._index < cut._index - first._index) {
+ sort_helper(cut, last);
+ last = cut;
+ } else {
+ sort_helper(first, cut);
+ first = cut;
+ }
+
if (UNLIKELY(_state->is_cancelled())) {
return;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]