This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new e7c4175  [fix] fix hash table insert() may be failed but not handle 
this error (#8207)
e7c4175 is described below

commit e7c417505c583fc5f337f19d0df63f0dc59fb804
Author: Zhengguo Yang <yangz...@gmail.com>
AuthorDate: Thu Mar 3 22:33:05 2022 +0800

    [fix] fix hash table insert() may be failed but not handle this error 
(#8207)
---
 be/src/exec/except_node.cpp                        | 12 ++------
 be/src/exec/hash_join_node.cpp                     |  8 +++---
 be/src/exec/hash_table.h                           | 32 +++++++++++++++++-----
 be/src/exec/hash_table.hpp                         |  4 ++-
 be/src/exec/intersect_node.cpp                     | 12 ++------
 be/src/exec/set_operation_node.cpp                 | 12 ++++----
 be/src/exec/set_operation_node.h                   | 25 +++++++----------
 be/test/exec/es_predicate_test.cpp                 |  1 -
 .../rowset/segment_v2/binary_dict_page_test.cpp    |  2 +-
 9 files changed, 56 insertions(+), 52 deletions(-)

diff --git a/be/src/exec/except_node.cpp b/be/src/exec/except_node.cpp
index 8229b73..d79aceb 100644
--- a/be/src/exec/except_node.cpp
+++ b/be/src/exec/except_node.cpp
@@ -50,7 +50,9 @@ Status ExceptNode::open(RuntimeState* state) {
 
     for (int i = 1; i < _children.size(); ++i) {
         // rebuild hash table, for first time will rebuild with the no 
duplicated _hash_tbl,
-        if (i > 1) { refresh_hash_table<false>(i); }
+        if (i > 1) {
+            RETURN_IF_ERROR(refresh_hash_table<false>(i));
+        }
 
         // probe
         _probe_batch.reset(
@@ -63,17 +65,12 @@ Status ExceptNode::open(RuntimeState* state) {
             RETURN_IF_ERROR(child(i)->get_next(state, _probe_batch.get(), 
&eos));
             RETURN_IF_LIMIT_EXCEEDED(state, " Except , while probing the hash 
table.");
             for (int j = 0; j < _probe_batch->num_rows(); ++j) {
-                VLOG_ROW << "probe row: "
-                         << get_row_output_string(_probe_batch->get_row(j), 
child(i)->row_desc());
                 _hash_tbl_iterator = _hash_tbl->find(_probe_batch->get_row(j));
                 if (_hash_tbl_iterator != _hash_tbl->end()) {
                     if (!_hash_tbl_iterator.matched()) {
                         _hash_tbl_iterator.set_matched();
                         _valid_element_in_hash_tbl--;
                     }
-                    VLOG_ROW << "probe matched: "
-                             << 
get_row_output_string(_hash_tbl_iterator.get_row(),
-                                                      child(0)->row_desc());
                 }
             }
             _probe_batch->reset();
@@ -101,9 +98,6 @@ Status ExceptNode::get_next(RuntimeState* state, RowBatch* 
out_batch, bool* eos)
             out_batch->resize_and_allocate_tuple_buffer(state, 
&tuple_buf_size, &tuple_buf));
     memset(tuple_buf, 0, tuple_buf_size);
     while (_hash_tbl_iterator.has_next()) {
-        VLOG_ROW << "find row: "
-                 << get_row_output_string(_hash_tbl_iterator.get_row(), 
child(0)->row_desc())
-                 << " matched: " << _hash_tbl_iterator.matched();
         if (!_hash_tbl_iterator.matched()) {
             create_output_row(_hash_tbl_iterator.get_row(), out_batch, 
tuple_buf);
             tuple_buf += _tuple_desc->byte_size();
diff --git a/be/src/exec/hash_join_node.cpp b/be/src/exec/hash_join_node.cpp
index f7e8bbc..291edbc 100644
--- a/be/src/exec/hash_join_node.cpp
+++ b/be/src/exec/hash_join_node.cpp
@@ -144,7 +144,8 @@ Status HashJoinNode::prepare(RuntimeState* state) {
             (std::find(_is_null_safe_eq_join.begin(), 
_is_null_safe_eq_join.end(), true) !=
              _is_null_safe_eq_join.end());
     _hash_tbl.reset(new HashTable(_build_expr_ctxs, _probe_expr_ctxs, 
_build_tuple_size,
-                                  stores_nulls, _is_null_safe_eq_join, id(), 
mem_tracker(), 1024));
+                                  stores_nulls, _is_null_safe_eq_join, id(), 
mem_tracker(),
+                                  state->batch_size() * 2));
 
     _probe_batch.reset(
             new RowBatch(child(0)->row_desc(), state->batch_size(), 
mem_tracker().get()));
@@ -762,7 +763,6 @@ Status HashJoinNode::process_build_batch(RuntimeState* 
state, RowBatch* build_ba
     // insert build row into our hash table
     if (_build_unique) {
         for (int i = 0; i < build_batch->num_rows(); ++i) {
-            // _hash_tbl->insert_unique(build_batch->get_row(i));
             TupleRow* tuple_row = nullptr;
             if (_hash_tbl->emplace_key(build_batch->get_row(i), &tuple_row)) {
                 build_batch->get_row(i)->deep_copy(tuple_row,
@@ -775,9 +775,9 @@ Status HashJoinNode::process_build_batch(RuntimeState* 
state, RowBatch* build_ba
         // take ownership of tuple data of build_batch
         _build_pool->acquire_data(build_batch->tuple_data_pool(), false);
         RETURN_IF_LIMIT_EXCEEDED(state, "Hash join, while constructing the 
hash table.");
-
+        
RETURN_IF_ERROR(_hash_tbl->resize_buckets_ahead(build_batch->num_rows()));
         for (int i = 0; i < build_batch->num_rows(); ++i) {
-            _hash_tbl->insert(build_batch->get_row(i));
+            _hash_tbl->insert_without_check(build_batch->get_row(i));
         }
     }
     return Status::OK();
diff --git a/be/src/exec/hash_table.h b/be/src/exec/hash_table.h
index 973be73..304d936 100644
--- a/be/src/exec/hash_table.h
+++ b/be/src/exec/hash_table.h
@@ -35,8 +35,6 @@ class TupleRow;
 class MemTracker;
 class RuntimeState;
 
-using std::vector;
-
 // Hash table implementation designed for hash aggregation and hash joins.  
This is not
 // templatized and is tailored to the usage pattern for aggregation and joins. 
 The
 // hash table store TupleRows and allows for different exprs for insertions 
and finds.
@@ -101,20 +99,40 @@ public:
 
     // Insert row into the hash table.  Row will be evaluated over 
_build_expr_ctxs
     // This will grow the hash table if necessary
-    void insert(TupleRow* row) {
+    Status insert(TupleRow* row) {
         if (_num_filled_buckets > _num_buckets_till_resize) {
-            // TODO: next prime instead of double?
-            resize_buckets(_num_buckets * 2);
+            RETURN_IF_ERROR(resize_buckets(_num_buckets * 2));
         }
 
         insert_impl(row);
+        return Status::OK();
     }
 
+    void insert_without_check(TupleRow* row) { insert_impl(row); }
+
     // Insert row into the hash table. if the row is already exist will not 
insert
-    void insert_unique(TupleRow* row) {
+    Status insert_unique(TupleRow* row) {
+        if (find(row, false) == end()) {
+            return insert(row);
+        }
+        return Status::OK();
+    }
+
+    void insert_unique_without_check(TupleRow* row) {
         if (find(row, false) == end()) {
-            insert(row);
+            insert_without_check(row);
+        }
+    }
+
+    Status resize_buckets_ahead(int64_t estimate_buckets) {
+        if (_num_filled_buckets + estimate_buckets > _num_buckets_till_resize) 
{
+            int64_t new_bucket_size = _num_buckets * 2;
+            while (new_bucket_size <= _num_filled_buckets + estimate_buckets) {
+                new_bucket_size = new_bucket_size * 2;
+            }
+            return resize_buckets(new_bucket_size);
         }
+        return Status::OK();
     }
 
     bool emplace_key(TupleRow* row, TupleRow** key_addr);
diff --git a/be/src/exec/hash_table.hpp b/be/src/exec/hash_table.hpp
index 4b679c7..b9deac5 100644
--- a/be/src/exec/hash_table.hpp
+++ b/be/src/exec/hash_table.hpp
@@ -24,7 +24,9 @@ namespace doris {
 
 inline bool HashTable::emplace_key(TupleRow* row, TupleRow** dest_addr) {
     if (_num_filled_buckets > _num_buckets_till_resize) {
-        resize_buckets(_num_buckets * 2);
+        if (!resize_buckets(_num_buckets * 2).ok()) {
+            return false;
+        }
     }
     if (_current_used == _current_capacity) {
         grow_node_array();
diff --git a/be/src/exec/intersect_node.cpp b/be/src/exec/intersect_node.cpp
index 60481cc..2cbf4db 100644
--- a/be/src/exec/intersect_node.cpp
+++ b/be/src/exec/intersect_node.cpp
@@ -52,7 +52,9 @@ Status IntersectNode::open(RuntimeState* state) {
     bool eos = false;
 
     for (int i = 1; i < _children.size(); ++i) {
-        if (i > 1) { refresh_hash_table<true>(i); }
+        if (i > 1) {
+            RETURN_IF_ERROR(refresh_hash_table<true>(i));
+        }
 
         _valid_element_in_hash_tbl = 0;
         // probe
@@ -66,17 +68,12 @@ Status IntersectNode::open(RuntimeState* state) {
             RETURN_IF_ERROR(child(i)->get_next(state, _probe_batch.get(), 
&eos));
             RETURN_IF_LIMIT_EXCEEDED(state, " Intersect , while probing the 
hash table.");
             for (int j = 0; j < _probe_batch->num_rows(); ++j) {
-                VLOG_ROW << "probe row: "
-                         << get_row_output_string(_probe_batch->get_row(j), 
child(i)->row_desc());
                 _hash_tbl_iterator = _hash_tbl->find(_probe_batch->get_row(j));
                 if (_hash_tbl_iterator != _hash_tbl->end()) {
                     if (!_hash_tbl_iterator.matched()) {
                         _valid_element_in_hash_tbl++;
                         _hash_tbl_iterator.set_matched();
                     }
-                    VLOG_ROW << "probe matched: "
-                             << 
get_row_output_string(_hash_tbl_iterator.get_row(),
-                                                      child(0)->row_desc());
                 }
             }
             _probe_batch->reset();
@@ -100,9 +97,6 @@ Status IntersectNode::get_next(RuntimeState* state, 
RowBatch* out_batch, bool* e
             out_batch->resize_and_allocate_tuple_buffer(state, 
&tuple_buf_size, &tuple_buf));
     memset(tuple_buf, 0, tuple_buf_size);
     while (_hash_tbl_iterator.has_next()) {
-        VLOG_ROW << "find row: "
-                 << get_row_output_string(_hash_tbl_iterator.get_row(), 
child(0)->row_desc())
-                 << " matched: " << _hash_tbl_iterator.matched();
         if (_hash_tbl_iterator.matched()) {
             create_output_row(_hash_tbl_iterator.get_row(), out_batch, 
tuple_buf);
             tuple_buf += _tuple_desc->byte_size();
diff --git a/be/src/exec/set_operation_node.cpp 
b/be/src/exec/set_operation_node.cpp
index 0ca6dd5..5958c25 100644
--- a/be/src/exec/set_operation_node.cpp
+++ b/be/src/exec/set_operation_node.cpp
@@ -27,7 +27,10 @@
 namespace doris {
 SetOperationNode::SetOperationNode(ObjectPool* pool, const TPlanNode& tnode,
                                    const DescriptorTbl& descs, int tuple_id)
-        : ExecNode(pool, tnode, descs), _tuple_id(tuple_id), 
_tuple_desc(nullptr), _valid_element_in_hash_tbl(0) {}
+        : ExecNode(pool, tnode, descs),
+          _tuple_id(tuple_id),
+          _tuple_desc(nullptr),
+          _valid_element_in_hash_tbl(0) {}
 
 Status SetOperationNode::init(const TPlanNode& tnode, RuntimeState* state) {
     RETURN_IF_ERROR(ExecNode::init(tnode, state));
@@ -142,7 +145,7 @@ Status SetOperationNode::open(RuntimeState* state) {
     }
     // initial build hash table used for remove duplicated
     _hash_tbl.reset(new HashTable(_child_expr_lists[0], _child_expr_lists[1], 
_build_tuple_size,
-                                  true, _find_nulls, id(), mem_tracker(), 
1024));
+                                  true, _find_nulls, id(), mem_tracker(), 
state->batch_size() * 2));
     RowBatch build_batch(child(0)->row_desc(), state->batch_size(), 
mem_tracker().get());
     RETURN_IF_ERROR(child(0)->open(state));
 
@@ -155,10 +158,9 @@ Status SetOperationNode::open(RuntimeState* state) {
         _build_pool->acquire_data(build_batch.tuple_data_pool(), false);
         RETURN_IF_LIMIT_EXCEEDED(state, " SetOperation, while constructing the 
hash table.");
         // build hash table and remove duplicate items
+        
RETURN_IF_ERROR(_hash_tbl->resize_buckets_ahead(build_batch.num_rows()));
         for (int i = 0; i < build_batch.num_rows(); ++i) {
-            VLOG_ROW << "build row: "
-                     << get_row_output_string(build_batch.get_row(i), 
child(0)->row_desc());
-            _hash_tbl->insert_unique(build_batch.get_row(i));
+            _hash_tbl->insert_unique_without_check(build_batch.get_row(i));
         }
         VLOG_ROW << "hash table content: " << _hash_tbl->debug_string(true, 
&child(0)->row_desc());
         build_batch.reset();
diff --git a/be/src/exec/set_operation_node.h b/be/src/exec/set_operation_node.h
index 40645b1..9fbb5d7 100644
--- a/be/src/exec/set_operation_node.h
+++ b/be/src/exec/set_operation_node.h
@@ -54,7 +54,7 @@ protected:
     // TODO: Check whether the hash table should be shrink to reduce necessary 
refresh
     // but may different child has different probe expr which may cause wrong 
result.
     // so we need keep probe expr same in FE to optimize this issue.
-    void refresh_hash_table(int child);
+    Status refresh_hash_table(int child);
 
     /// Tuple id resolved in Prepare() to set tuple_desc_;
     const int _tuple_id;
@@ -81,33 +81,28 @@ protected:
 };
 
 template <bool keep_matched>
-void SetOperationNode::refresh_hash_table(int child_id) {
+Status SetOperationNode::refresh_hash_table(int child_id) {
     SCOPED_TIMER(_build_timer);
-    std::unique_ptr<HashTable> temp_tbl(
-            new HashTable(_child_expr_lists[0], _child_expr_lists[child_id], 
_build_tuple_size,
-                          true, _find_nulls, id(), mem_tracker(),
-                          _valid_element_in_hash_tbl / 
HashTable::MAX_BUCKET_OCCUPANCY_FRACTION + 1));
+    std::unique_ptr<HashTable> temp_tbl(new HashTable(
+            _child_expr_lists[0], _child_expr_lists[child_id], 
_build_tuple_size, true, _find_nulls,
+            id(), mem_tracker(),
+            _valid_element_in_hash_tbl / 
HashTable::MAX_BUCKET_OCCUPANCY_FRACTION + 1));
     _hash_tbl_iterator = _hash_tbl->begin();
     while (_hash_tbl_iterator.has_next()) {
         if constexpr (keep_matched) {
             if (_hash_tbl_iterator.matched()) {
-                VLOG_ROW << "rebuild row: "
-                         << get_row_output_string(_hash_tbl_iterator.get_row(),
-                                                  child(0)->row_desc());
-                temp_tbl->insert(_hash_tbl_iterator.get_row());
+                
RETURN_IF_ERROR(temp_tbl->insert(_hash_tbl_iterator.get_row()));
             }
         } else {
             if (!_hash_tbl_iterator.matched()) {
-                VLOG_ROW << "rebuild row: "
-                         << get_row_output_string(_hash_tbl_iterator.get_row(),
-                                                  child(0)->row_desc());
-                temp_tbl->insert(_hash_tbl_iterator.get_row());
+                
RETURN_IF_ERROR(temp_tbl->insert(_hash_tbl_iterator.get_row()));
             }
         }
         _hash_tbl_iterator.next<false>();
     }
     _hash_tbl.swap(temp_tbl);
     temp_tbl->close();
+    return Status::OK();
 }
 
-}; // namespace doris
\ No newline at end of file
+}; // namespace doris
diff --git a/be/test/exec/es_predicate_test.cpp 
b/be/test/exec/es_predicate_test.cpp
index 1e382ba..28ce548 100644
--- a/be/test/exec/es_predicate_test.cpp
+++ b/be/test/exec/es_predicate_test.cpp
@@ -143,7 +143,6 @@ TEST_F(EsPredicateTest, normal) {
     std::vector<ExprContext*> conjunct_ctxs;
     Status status = build_expr_context_list(conjunct_ctxs);
     ASSERT_TRUE(status.ok());
-
     TupleDescriptor* tuple_desc = _desc_tbl->get_tuple_descriptor(0);
     std::vector<EsPredicate*> predicates;
     for (int i = 0; i < conjunct_ctxs.size(); ++i) {
diff --git a/be/test/olap/rowset/segment_v2/binary_dict_page_test.cpp 
b/be/test/olap/rowset/segment_v2/binary_dict_page_test.cpp
index b2fb740..1bf2f3d 100644
--- a/be/test/olap/rowset/segment_v2/binary_dict_page_test.cpp
+++ b/be/test/olap/rowset/segment_v2/binary_dict_page_test.cpp
@@ -30,8 +30,8 @@
 #include "olap/types.h"
 #include "runtime/mem_pool.h"
 #include "runtime/mem_tracker.h"
-#include "util/debug_util.h"
 #include "test_util/test_util.h"
+#include "util/debug_util.h"
 
 namespace doris {
 namespace segment_v2 {

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to