This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push: new e7c4175 [fix] fix hash table insert() may be failed but not handle this error (#8207) e7c4175 is described below commit e7c417505c583fc5f337f19d0df63f0dc59fb804 Author: Zhengguo Yang <yangz...@gmail.com> AuthorDate: Thu Mar 3 22:33:05 2022 +0800 [fix] fix hash table insert() may be failed but not handle this error (#8207) --- be/src/exec/except_node.cpp | 12 ++------ be/src/exec/hash_join_node.cpp | 8 +++--- be/src/exec/hash_table.h | 32 +++++++++++++++++----- be/src/exec/hash_table.hpp | 4 ++- be/src/exec/intersect_node.cpp | 12 ++------ be/src/exec/set_operation_node.cpp | 12 ++++---- be/src/exec/set_operation_node.h | 25 +++++++---------- be/test/exec/es_predicate_test.cpp | 1 - .../rowset/segment_v2/binary_dict_page_test.cpp | 2 +- 9 files changed, 56 insertions(+), 52 deletions(-) diff --git a/be/src/exec/except_node.cpp b/be/src/exec/except_node.cpp index 8229b73..d79aceb 100644 --- a/be/src/exec/except_node.cpp +++ b/be/src/exec/except_node.cpp @@ -50,7 +50,9 @@ Status ExceptNode::open(RuntimeState* state) { for (int i = 1; i < _children.size(); ++i) { // rebuild hash table, for first time will rebuild with the no duplicated _hash_tbl, - if (i > 1) { refresh_hash_table<false>(i); } + if (i > 1) { + RETURN_IF_ERROR(refresh_hash_table<false>(i)); + } // probe _probe_batch.reset( @@ -63,17 +65,12 @@ Status ExceptNode::open(RuntimeState* state) { RETURN_IF_ERROR(child(i)->get_next(state, _probe_batch.get(), &eos)); RETURN_IF_LIMIT_EXCEEDED(state, " Except , while probing the hash table."); for (int j = 0; j < _probe_batch->num_rows(); ++j) { - VLOG_ROW << "probe row: " - << get_row_output_string(_probe_batch->get_row(j), child(i)->row_desc()); _hash_tbl_iterator = _hash_tbl->find(_probe_batch->get_row(j)); if (_hash_tbl_iterator != _hash_tbl->end()) { if (!_hash_tbl_iterator.matched()) { _hash_tbl_iterator.set_matched(); _valid_element_in_hash_tbl--; } - VLOG_ROW << "probe matched: " - << get_row_output_string(_hash_tbl_iterator.get_row(), - child(0)->row_desc()); } } _probe_batch->reset(); @@ -101,9 +98,6 @@ Status ExceptNode::get_next(RuntimeState* state, RowBatch* out_batch, bool* eos) out_batch->resize_and_allocate_tuple_buffer(state, &tuple_buf_size, &tuple_buf)); memset(tuple_buf, 0, tuple_buf_size); while (_hash_tbl_iterator.has_next()) { - VLOG_ROW << "find row: " - << get_row_output_string(_hash_tbl_iterator.get_row(), child(0)->row_desc()) - << " matched: " << _hash_tbl_iterator.matched(); if (!_hash_tbl_iterator.matched()) { create_output_row(_hash_tbl_iterator.get_row(), out_batch, tuple_buf); tuple_buf += _tuple_desc->byte_size(); diff --git a/be/src/exec/hash_join_node.cpp b/be/src/exec/hash_join_node.cpp index f7e8bbc..291edbc 100644 --- a/be/src/exec/hash_join_node.cpp +++ b/be/src/exec/hash_join_node.cpp @@ -144,7 +144,8 @@ Status HashJoinNode::prepare(RuntimeState* state) { (std::find(_is_null_safe_eq_join.begin(), _is_null_safe_eq_join.end(), true) != _is_null_safe_eq_join.end()); _hash_tbl.reset(new HashTable(_build_expr_ctxs, _probe_expr_ctxs, _build_tuple_size, - stores_nulls, _is_null_safe_eq_join, id(), mem_tracker(), 1024)); + stores_nulls, _is_null_safe_eq_join, id(), mem_tracker(), + state->batch_size() * 2)); _probe_batch.reset( new RowBatch(child(0)->row_desc(), state->batch_size(), mem_tracker().get())); @@ -762,7 +763,6 @@ Status HashJoinNode::process_build_batch(RuntimeState* state, RowBatch* build_ba // insert build row into our hash table if (_build_unique) { for (int i = 0; i < build_batch->num_rows(); ++i) { - // _hash_tbl->insert_unique(build_batch->get_row(i)); TupleRow* tuple_row = nullptr; if (_hash_tbl->emplace_key(build_batch->get_row(i), &tuple_row)) { build_batch->get_row(i)->deep_copy(tuple_row, @@ -775,9 +775,9 @@ Status HashJoinNode::process_build_batch(RuntimeState* state, RowBatch* build_ba // take ownership of tuple data of build_batch _build_pool->acquire_data(build_batch->tuple_data_pool(), false); RETURN_IF_LIMIT_EXCEEDED(state, "Hash join, while constructing the hash table."); - + RETURN_IF_ERROR(_hash_tbl->resize_buckets_ahead(build_batch->num_rows())); for (int i = 0; i < build_batch->num_rows(); ++i) { - _hash_tbl->insert(build_batch->get_row(i)); + _hash_tbl->insert_without_check(build_batch->get_row(i)); } } return Status::OK(); diff --git a/be/src/exec/hash_table.h b/be/src/exec/hash_table.h index 973be73..304d936 100644 --- a/be/src/exec/hash_table.h +++ b/be/src/exec/hash_table.h @@ -35,8 +35,6 @@ class TupleRow; class MemTracker; class RuntimeState; -using std::vector; - // Hash table implementation designed for hash aggregation and hash joins. This is not // templatized and is tailored to the usage pattern for aggregation and joins. The // hash table store TupleRows and allows for different exprs for insertions and finds. @@ -101,20 +99,40 @@ public: // Insert row into the hash table. Row will be evaluated over _build_expr_ctxs // This will grow the hash table if necessary - void insert(TupleRow* row) { + Status insert(TupleRow* row) { if (_num_filled_buckets > _num_buckets_till_resize) { - // TODO: next prime instead of double? - resize_buckets(_num_buckets * 2); + RETURN_IF_ERROR(resize_buckets(_num_buckets * 2)); } insert_impl(row); + return Status::OK(); } + void insert_without_check(TupleRow* row) { insert_impl(row); } + // Insert row into the hash table. if the row is already exist will not insert - void insert_unique(TupleRow* row) { + Status insert_unique(TupleRow* row) { + if (find(row, false) == end()) { + return insert(row); + } + return Status::OK(); + } + + void insert_unique_without_check(TupleRow* row) { if (find(row, false) == end()) { - insert(row); + insert_without_check(row); + } + } + + Status resize_buckets_ahead(int64_t estimate_buckets) { + if (_num_filled_buckets + estimate_buckets > _num_buckets_till_resize) { + int64_t new_bucket_size = _num_buckets * 2; + while (new_bucket_size <= _num_filled_buckets + estimate_buckets) { + new_bucket_size = new_bucket_size * 2; + } + return resize_buckets(new_bucket_size); } + return Status::OK(); } bool emplace_key(TupleRow* row, TupleRow** key_addr); diff --git a/be/src/exec/hash_table.hpp b/be/src/exec/hash_table.hpp index 4b679c7..b9deac5 100644 --- a/be/src/exec/hash_table.hpp +++ b/be/src/exec/hash_table.hpp @@ -24,7 +24,9 @@ namespace doris { inline bool HashTable::emplace_key(TupleRow* row, TupleRow** dest_addr) { if (_num_filled_buckets > _num_buckets_till_resize) { - resize_buckets(_num_buckets * 2); + if (!resize_buckets(_num_buckets * 2).ok()) { + return false; + } } if (_current_used == _current_capacity) { grow_node_array(); diff --git a/be/src/exec/intersect_node.cpp b/be/src/exec/intersect_node.cpp index 60481cc..2cbf4db 100644 --- a/be/src/exec/intersect_node.cpp +++ b/be/src/exec/intersect_node.cpp @@ -52,7 +52,9 @@ Status IntersectNode::open(RuntimeState* state) { bool eos = false; for (int i = 1; i < _children.size(); ++i) { - if (i > 1) { refresh_hash_table<true>(i); } + if (i > 1) { + RETURN_IF_ERROR(refresh_hash_table<true>(i)); + } _valid_element_in_hash_tbl = 0; // probe @@ -66,17 +68,12 @@ Status IntersectNode::open(RuntimeState* state) { RETURN_IF_ERROR(child(i)->get_next(state, _probe_batch.get(), &eos)); RETURN_IF_LIMIT_EXCEEDED(state, " Intersect , while probing the hash table."); for (int j = 0; j < _probe_batch->num_rows(); ++j) { - VLOG_ROW << "probe row: " - << get_row_output_string(_probe_batch->get_row(j), child(i)->row_desc()); _hash_tbl_iterator = _hash_tbl->find(_probe_batch->get_row(j)); if (_hash_tbl_iterator != _hash_tbl->end()) { if (!_hash_tbl_iterator.matched()) { _valid_element_in_hash_tbl++; _hash_tbl_iterator.set_matched(); } - VLOG_ROW << "probe matched: " - << get_row_output_string(_hash_tbl_iterator.get_row(), - child(0)->row_desc()); } } _probe_batch->reset(); @@ -100,9 +97,6 @@ Status IntersectNode::get_next(RuntimeState* state, RowBatch* out_batch, bool* e out_batch->resize_and_allocate_tuple_buffer(state, &tuple_buf_size, &tuple_buf)); memset(tuple_buf, 0, tuple_buf_size); while (_hash_tbl_iterator.has_next()) { - VLOG_ROW << "find row: " - << get_row_output_string(_hash_tbl_iterator.get_row(), child(0)->row_desc()) - << " matched: " << _hash_tbl_iterator.matched(); if (_hash_tbl_iterator.matched()) { create_output_row(_hash_tbl_iterator.get_row(), out_batch, tuple_buf); tuple_buf += _tuple_desc->byte_size(); diff --git a/be/src/exec/set_operation_node.cpp b/be/src/exec/set_operation_node.cpp index 0ca6dd5..5958c25 100644 --- a/be/src/exec/set_operation_node.cpp +++ b/be/src/exec/set_operation_node.cpp @@ -27,7 +27,10 @@ namespace doris { SetOperationNode::SetOperationNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs, int tuple_id) - : ExecNode(pool, tnode, descs), _tuple_id(tuple_id), _tuple_desc(nullptr), _valid_element_in_hash_tbl(0) {} + : ExecNode(pool, tnode, descs), + _tuple_id(tuple_id), + _tuple_desc(nullptr), + _valid_element_in_hash_tbl(0) {} Status SetOperationNode::init(const TPlanNode& tnode, RuntimeState* state) { RETURN_IF_ERROR(ExecNode::init(tnode, state)); @@ -142,7 +145,7 @@ Status SetOperationNode::open(RuntimeState* state) { } // initial build hash table used for remove duplicated _hash_tbl.reset(new HashTable(_child_expr_lists[0], _child_expr_lists[1], _build_tuple_size, - true, _find_nulls, id(), mem_tracker(), 1024)); + true, _find_nulls, id(), mem_tracker(), state->batch_size() * 2)); RowBatch build_batch(child(0)->row_desc(), state->batch_size(), mem_tracker().get()); RETURN_IF_ERROR(child(0)->open(state)); @@ -155,10 +158,9 @@ Status SetOperationNode::open(RuntimeState* state) { _build_pool->acquire_data(build_batch.tuple_data_pool(), false); RETURN_IF_LIMIT_EXCEEDED(state, " SetOperation, while constructing the hash table."); // build hash table and remove duplicate items + RETURN_IF_ERROR(_hash_tbl->resize_buckets_ahead(build_batch.num_rows())); for (int i = 0; i < build_batch.num_rows(); ++i) { - VLOG_ROW << "build row: " - << get_row_output_string(build_batch.get_row(i), child(0)->row_desc()); - _hash_tbl->insert_unique(build_batch.get_row(i)); + _hash_tbl->insert_unique_without_check(build_batch.get_row(i)); } VLOG_ROW << "hash table content: " << _hash_tbl->debug_string(true, &child(0)->row_desc()); build_batch.reset(); diff --git a/be/src/exec/set_operation_node.h b/be/src/exec/set_operation_node.h index 40645b1..9fbb5d7 100644 --- a/be/src/exec/set_operation_node.h +++ b/be/src/exec/set_operation_node.h @@ -54,7 +54,7 @@ protected: // TODO: Check whether the hash table should be shrink to reduce necessary refresh // but may different child has different probe expr which may cause wrong result. // so we need keep probe expr same in FE to optimize this issue. - void refresh_hash_table(int child); + Status refresh_hash_table(int child); /// Tuple id resolved in Prepare() to set tuple_desc_; const int _tuple_id; @@ -81,33 +81,28 @@ protected: }; template <bool keep_matched> -void SetOperationNode::refresh_hash_table(int child_id) { +Status SetOperationNode::refresh_hash_table(int child_id) { SCOPED_TIMER(_build_timer); - std::unique_ptr<HashTable> temp_tbl( - new HashTable(_child_expr_lists[0], _child_expr_lists[child_id], _build_tuple_size, - true, _find_nulls, id(), mem_tracker(), - _valid_element_in_hash_tbl / HashTable::MAX_BUCKET_OCCUPANCY_FRACTION + 1)); + std::unique_ptr<HashTable> temp_tbl(new HashTable( + _child_expr_lists[0], _child_expr_lists[child_id], _build_tuple_size, true, _find_nulls, + id(), mem_tracker(), + _valid_element_in_hash_tbl / HashTable::MAX_BUCKET_OCCUPANCY_FRACTION + 1)); _hash_tbl_iterator = _hash_tbl->begin(); while (_hash_tbl_iterator.has_next()) { if constexpr (keep_matched) { if (_hash_tbl_iterator.matched()) { - VLOG_ROW << "rebuild row: " - << get_row_output_string(_hash_tbl_iterator.get_row(), - child(0)->row_desc()); - temp_tbl->insert(_hash_tbl_iterator.get_row()); + RETURN_IF_ERROR(temp_tbl->insert(_hash_tbl_iterator.get_row())); } } else { if (!_hash_tbl_iterator.matched()) { - VLOG_ROW << "rebuild row: " - << get_row_output_string(_hash_tbl_iterator.get_row(), - child(0)->row_desc()); - temp_tbl->insert(_hash_tbl_iterator.get_row()); + RETURN_IF_ERROR(temp_tbl->insert(_hash_tbl_iterator.get_row())); } } _hash_tbl_iterator.next<false>(); } _hash_tbl.swap(temp_tbl); temp_tbl->close(); + return Status::OK(); } -}; // namespace doris \ No newline at end of file +}; // namespace doris diff --git a/be/test/exec/es_predicate_test.cpp b/be/test/exec/es_predicate_test.cpp index 1e382ba..28ce548 100644 --- a/be/test/exec/es_predicate_test.cpp +++ b/be/test/exec/es_predicate_test.cpp @@ -143,7 +143,6 @@ TEST_F(EsPredicateTest, normal) { std::vector<ExprContext*> conjunct_ctxs; Status status = build_expr_context_list(conjunct_ctxs); ASSERT_TRUE(status.ok()); - TupleDescriptor* tuple_desc = _desc_tbl->get_tuple_descriptor(0); std::vector<EsPredicate*> predicates; for (int i = 0; i < conjunct_ctxs.size(); ++i) { diff --git a/be/test/olap/rowset/segment_v2/binary_dict_page_test.cpp b/be/test/olap/rowset/segment_v2/binary_dict_page_test.cpp index b2fb740..1bf2f3d 100644 --- a/be/test/olap/rowset/segment_v2/binary_dict_page_test.cpp +++ b/be/test/olap/rowset/segment_v2/binary_dict_page_test.cpp @@ -30,8 +30,8 @@ #include "olap/types.h" #include "runtime/mem_pool.h" #include "runtime/mem_tracker.h" -#include "util/debug_util.h" #include "test_util/test_util.h" +#include "util/debug_util.h" namespace doris { namespace segment_v2 { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org