This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit 5d5928e4744cd65bb95dd4fd44a9f8e74953f43e
Author: Pxl <952130...@qq.com>
AuthorDate: Thu Apr 21 11:02:04 2022 +0800

    [Bug][Storage-vectorized] fix code dump on outer join with not nullable 
column (#9112)
---
 be/src/olap/rowset/segment_v2/segment_iterator.cpp | 60 ++++++----------------
 be/src/olap/rowset/segment_v2/segment_iterator.h   | 11 ++--
 be/src/vec/core/block.cpp                          |  9 ++++
 be/src/vec/core/block.h                            | 33 ++++++------
 be/src/vec/exec/volap_scan_node.cpp                | 51 +++++++++---------
 be/src/vec/exec/volap_scan_node.h                  |  3 ++
 6 files changed, 76 insertions(+), 91 deletions(-)

diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.cpp 
b/be/src/olap/rowset/segment_v2/segment_iterator.cpp
index 58774e2352..52cc584fdb 100644
--- a/be/src/olap/rowset/segment_v2/segment_iterator.cpp
+++ b/be/src/olap/rowset/segment_v2/segment_iterator.cpp
@@ -668,8 +668,10 @@ void SegmentIterator::_vec_init_lazy_materialization() {
                     // todo(wb) make a cost-based lazy-materialization 
framework
                     // check non-pred column type to decide whether using 
lazy-materialization
                     FieldType type = _schema.column(cid)->type();
-                    if (_is_all_column_basic_type && (type == 
OLAP_FIELD_TYPE_HLL || type == OLAP_FIELD_TYPE_OBJECT 
-                            || type == OLAP_FIELD_TYPE_VARCHAR || type == 
OLAP_FIELD_TYPE_CHAR || type == OLAP_FIELD_TYPE_STRING)) {
+                    if (_is_all_column_basic_type &&
+                        (type == OLAP_FIELD_TYPE_HLL || type == 
OLAP_FIELD_TYPE_OBJECT ||
+                         type == OLAP_FIELD_TYPE_VARCHAR || type == 
OLAP_FIELD_TYPE_CHAR ||
+                         type == OLAP_FIELD_TYPE_STRING)) {
                         _is_all_column_basic_type = false;
                     }
                 }
@@ -753,23 +755,7 @@ Status SegmentIterator::_read_columns(const 
std::vector<ColumnId>& column_ids,
 
 void SegmentIterator::_init_current_block(
         vectorized::Block* block, std::vector<vectorized::MutableColumnPtr>& 
current_columns) {
-    bool is_block_mem_reuse = block->mem_reuse();
-    if (is_block_mem_reuse) {
-        block->clear_column_data(_schema.num_column_ids());
-    } else { // pre fill output block here
-        for (size_t i = 0; i < _schema.num_column_ids(); i++) {
-            auto cid = _schema.column_id(i);
-            auto column_desc = _schema.column(cid);
-            auto data_type = Schema::get_data_type_ptr(column_desc->type());
-            if (column_desc->is_nullable()) {
-                block->insert({nullptr,
-                               
std::make_shared<vectorized::DataTypeNullable>(std::move(data_type)),
-                               column_desc->name()});
-            } else {
-                block->insert({nullptr, std::move(data_type), 
column_desc->name()});
-            }
-        }
-    }
+    block->clear_column_data(_schema.num_column_ids());
 
     for (size_t i = 0; i < _schema.num_column_ids(); i++) {
         auto cid = _schema.column_id(i);
@@ -778,17 +764,8 @@ void SegmentIterator::_init_current_block(
         if (_is_pred_column[cid]) { //todo(wb) maybe we can release it after 
output block
             current_columns[cid]->clear();
         } else { // non-predicate column
-            if (is_block_mem_reuse) {
-                current_columns[cid] = 
std::move(*block->get_by_position(i).column).mutate();
-            } else {
-                auto data_type = 
Schema::get_data_type_ptr(column_desc->type());
-                if (column_desc->is_nullable()) {
-                    current_columns[cid] = 
doris::vectorized::ColumnNullable::create(
-                            data_type->create_column(), 
doris::vectorized::ColumnUInt8::create());
-                } else {
-                    current_columns[cid] = data_type->create_column();
-                }
-            }
+            current_columns[cid] = 
std::move(*block->get_by_position(i).column).mutate();
+
             if (column_desc->type() == OLAP_FIELD_TYPE_DATE) {
                 current_columns[cid]->set_date_type();
             } else if (column_desc->type() == OLAP_FIELD_TYPE_DATETIME) {
@@ -799,7 +776,7 @@ void SegmentIterator::_init_current_block(
     }
 }
 
-void SegmentIterator::_output_non_pred_columns(vectorized::Block* block, bool 
is_block_mem_reuse) {
+void SegmentIterator::_output_non_pred_columns(vectorized::Block* block) {
     for (auto cid : _non_predicate_columns) {
         block->replace_by_position(_schema_block_id_map[cid],
                                    std::move(_current_return_columns[cid]));
@@ -919,6 +896,8 @@ void 
SegmentIterator::_read_columns_by_rowids(std::vector<ColumnId>& read_column
 
 Status SegmentIterator::next_batch(vectorized::Block* block) {
     bool is_mem_reuse = block->mem_reuse();
+    DCHECK(is_mem_reuse);
+
     SCOPED_RAW_TIMER(&_opts.stats->block_load_ns);
     if (UNLIKELY(!_inited)) {
         RETURN_IF_ERROR(_init(true));
@@ -953,24 +932,15 @@ Status SegmentIterator::next_batch(vectorized::Block* 
block) {
             // todo(wb) abstract make column where
             if (!_is_pred_column[cid]) { // non-predicate
                 block->replace_by_position(i, 
std::move(_current_return_columns[cid]));
-            } else { // predicate
-                if (!is_mem_reuse) {
-                    auto column_desc = _schema.column(cid);
-                    auto data_type = 
Schema::get_data_type_ptr(column_desc->type());
-                    block->replace_by_position(i, data_type->create_column());
-                }
             }
         }
-        // not sure whether block is clear before enter segmentIter, so clear 
it here.
-        if (is_mem_reuse) {
-            block->clear_column_data();
-        }
+        block->clear_column_data();
         return Status::EndOfFile("no more data in segment");
     }
 
     // when no predicate(include delete condition) is provided, output column 
directly
     if (_vec_pred_column_ids.empty() && _short_cir_pred_column_ids.empty()) {
-        _output_non_pred_columns(block, is_mem_reuse);
+        _output_non_pred_columns(block);
     } else { // need predicate evaluation
         uint16_t selected_size = nrows_read;
         uint16_t sel_rowid_idx[selected_size];
@@ -982,7 +952,7 @@ Status SegmentIterator::next_batch(vectorized::Block* 
block) {
         // So output block directly after vectorization evaluation
         if (_is_all_column_basic_type) {
             RETURN_IF_ERROR(_output_column_by_sel_idx(block, 
_first_read_column_ids, sel_rowid_idx,
-                                                      selected_size, 
is_mem_reuse));
+                                                      selected_size));
         } else {
             // step 2: evaluate short ciruit predicate
             // todo(wb) research whether need to read short predicate after 
vectorization evaluation
@@ -998,7 +968,7 @@ Status SegmentIterator::next_batch(vectorized::Block* 
block) {
 
             // step4: output columns
             // 4.1 output non-predicate column
-            _output_non_pred_columns(block, is_mem_reuse);
+            _output_non_pred_columns(block);
 
             // 4.2 get union of short_cir_pred and vec_pred
             std::set<ColumnId> pred_column_ids;
@@ -1008,7 +978,7 @@ Status SegmentIterator::next_batch(vectorized::Block* 
block) {
 
             // 4.3 output short circuit and predicate column
             RETURN_IF_ERROR(_output_column_by_sel_idx(block, pred_column_ids, 
sel_rowid_idx,
-                                                      selected_size, 
is_mem_reuse));
+                                                      selected_size));
         }
     }
 
diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.h 
b/be/src/olap/rowset/segment_v2/segment_iterator.h
index 7f2d11e0b6..fb461fe54d 100644
--- a/be/src/olap/rowset/segment_v2/segment_iterator.h
+++ b/be/src/olap/rowset/segment_v2/segment_iterator.h
@@ -102,20 +102,19 @@ private:
                              std::vector<vectorized::MutableColumnPtr>& 
non_pred_vector);
     void _evaluate_vectorization_predicate(uint16_t* sel_rowid_idx, uint16_t& 
selected_size);
     void _evaluate_short_circuit_predicate(uint16_t* sel_rowid_idx, uint16_t* 
selected_size);
-    void _output_non_pred_columns(vectorized::Block* block, bool 
is_block_mem_reuse);
+    void _output_non_pred_columns(vectorized::Block* block);
     void _read_columns_by_rowids(std::vector<ColumnId>& read_column_ids,
                                  std::vector<rowid_t>& rowid_vector, uint16_t* 
sel_rowid_idx,
                                  size_t select_size, 
vectorized::MutableColumns* mutable_columns);
 
     template <class Container>
     Status _output_column_by_sel_idx(vectorized::Block* block, const 
Container& column_ids,
-                                     uint16_t* sel_rowid_idx, uint16_t 
select_size,
-                                     bool is_block_mem_reuse) {
+                                     uint16_t* sel_rowid_idx, uint16_t 
select_size) {
         for (auto cid : column_ids) {
             int block_cid = _schema_block_id_map[cid];
-            RETURN_IF_ERROR(block->copy_column_data_to_block(
-                    is_block_mem_reuse, _current_return_columns[cid].get(), 
sel_rowid_idx,
-                    select_size, block_cid, _opts.block_row_max));
+            
RETURN_IF_ERROR(block->copy_column_data_to_block(_current_return_columns[cid].get(),
+                                                             sel_rowid_idx, 
select_size, block_cid,
+                                                             
_opts.block_row_max));
         }
         return Status::OK();
     }
diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp
index 33c73205a5..0d275f6604 100644
--- a/be/src/vec/core/block.cpp
+++ b/be/src/vec/core/block.cpp
@@ -137,6 +137,15 @@ Block::Block(const ColumnsWithTypeAndName& data_) : data 
{data_} {
     initialize_index_by_name();
 }
 
+Block::Block(const std::vector<SlotDescriptor*>& slots, size_t block_size) {
+    for (const auto slot_desc : slots) {
+        auto column_ptr = slot_desc->get_empty_mutable_column();
+        column_ptr->reserve(block_size);
+        insert(ColumnWithTypeAndName(std::move(column_ptr), 
slot_desc->get_data_type_ptr(),
+                                     slot_desc->col_name()));
+    }
+}
+
 Block::Block(const PBlock& pblock) {
     const char* buf = nullptr;
     std::string compression_scratch;
diff --git a/be/src/vec/core/block.h b/be/src/vec/core/block.h
index 6ef105cf3b..5b543dd8ff 100644
--- a/be/src/vec/core/block.h
+++ b/be/src/vec/core/block.h
@@ -28,6 +28,8 @@
 #include <parallel_hashmap/phmap.h>
 
 #include "gen_cpp/data.pb.h"
+#include "runtime/descriptors.h"
+#include "vec/columns/column.h"
 #include "vec/columns/column_nullable.h"
 #include "vec/core/block_info.h"
 #include "vec/core/column_with_type_and_name.h"
@@ -67,6 +69,7 @@ public:
     Block(std::initializer_list<ColumnWithTypeAndName> il);
     Block(const ColumnsWithTypeAndName& data_);
     Block(const PBlock& pblock);
+    Block(const std::vector<SlotDescriptor*>& slots, size_t block_size);
 
     /// insert the column at the specified position
     void insert(size_t position, const ColumnWithTypeAndName& elem);
@@ -97,8 +100,7 @@ public:
     ColumnWithTypeAndName& get_by_position(size_t position) { return 
data[position]; }
     const ColumnWithTypeAndName& get_by_position(size_t position) const { 
return data[position]; }
 
-    Status copy_column_data_to_block(bool is_block_mem_reuse,
-                                     doris::vectorized::IColumn* input_col_ptr,
+    Status copy_column_data_to_block(doris::vectorized::IColumn* input_col_ptr,
                                      uint16_t* sel_rowid_idx, uint16_t 
select_size, int block_cid,
                                      size_t batch_size) {
         // Only the additional deleted filter condition need to materialize 
column be at the end of the block
@@ -108,21 +110,22 @@ public:
         //      `select b from table;`
         // a column only effective in segment iterator, the block from query 
engine only contain the b column.
         // so the `block_cid >= data.size()` is true
-        if (block_cid >= data.size())
+        if (block_cid >= data.size()) {
             return Status::OK();
+        }
 
-        if (is_block_mem_reuse) {
-            auto* raw_res_ptr = this->get_by_position(block_cid).column.get();
-            
const_cast<doris::vectorized::IColumn*>(raw_res_ptr)->reserve(batch_size);
-            return input_col_ptr->filter_by_selector(sel_rowid_idx, 
select_size, const_cast<doris::vectorized::IColumn*>(raw_res_ptr));
-        } else {
-            MutableColumnPtr res_col_ptr = 
data[block_cid].type->create_column();
-            res_col_ptr->reserve(batch_size);
-            auto* raw_res_ptr = res_col_ptr.get();
-            RETURN_IF_ERROR(input_col_ptr->filter_by_selector(sel_rowid_idx, 
select_size, const_cast<doris::vectorized::IColumn*>(raw_res_ptr)));
-            this->replace_by_position(block_cid, std::move(res_col_ptr));
-            return Status::OK();
+        MutableColumnPtr raw_res_ptr = 
this->get_by_position(block_cid).column->assume_mutable();
+        raw_res_ptr->reserve(batch_size);
+
+        // adapt for outer join change column to nullable
+        if (raw_res_ptr->is_nullable()) {
+            auto col_ptr_nullable =
+                    
reinterpret_cast<vectorized::ColumnNullable*>(raw_res_ptr.get());
+            
col_ptr_nullable->get_null_map_column().insert_many_defaults(select_size);
+            raw_res_ptr = col_ptr_nullable->get_nested_column_ptr();
         }
+
+        return input_col_ptr->filter_by_selector(sel_rowid_idx, select_size, 
raw_res_ptr);
     }
 
     void replace_by_position(size_t position, ColumnPtr&& res) {
@@ -335,7 +338,7 @@ public:
     size_t rows() const;
     size_t columns() const { return _columns.size(); }
 
-    bool empty() { return rows() == 0; }
+    bool empty() const { return rows() == 0; }
 
     MutableColumns& mutable_columns() { return _columns; }
 
diff --git a/be/src/vec/exec/volap_scan_node.cpp 
b/be/src/vec/exec/volap_scan_node.cpp
index 59387fbf10..f6101692ce 100644
--- a/be/src/vec/exec/volap_scan_node.cpp
+++ b/be/src/vec/exec/volap_scan_node.cpp
@@ -65,22 +65,16 @@ void VOlapScanNode::transfer_thread(RuntimeState* state) {
     auto doris_scanner_row_num =
             _limit == -1 ? config::doris_scanner_row_num
                          : 
std::min(static_cast<int64_t>(config::doris_scanner_row_num), _limit);
-    auto block_size = _limit == -1 ? state->batch_size()
-                                   : 
std::min(static_cast<int64_t>(state->batch_size()), _limit);
-    auto block_per_scanner = (doris_scanner_row_num + (block_size - 1)) / 
block_size;
+    _block_size = _limit == -1 ? state->batch_size()
+                               : 
std::min(static_cast<int64_t>(state->batch_size()), _limit);
+    auto block_per_scanner = (doris_scanner_row_num + (_block_size - 1)) / 
_block_size;
     auto pre_block_count =
             std::min(_volap_scanners.size(),
                      
static_cast<size_t>(config::doris_scanner_thread_pool_thread_num)) *
             block_per_scanner;
 
     for (int i = 0; i < pre_block_count; ++i) {
-        auto block = new Block;
-        for (const auto slot_desc : _tuple_desc->slots()) {
-            auto column_ptr = slot_desc->get_empty_mutable_column();
-            column_ptr->reserve(block_size);
-            block->insert(ColumnWithTypeAndName(
-                    std::move(column_ptr), slot_desc->get_data_type_ptr(), 
slot_desc->col_name()));
-        }
+        auto block = new Block(_tuple_desc->slots(), _block_size);
         _free_blocks.emplace_back(block);
         _buffered_bytes += block->allocated_bytes();
     }
@@ -152,7 +146,7 @@ void VOlapScanNode::scanner_thread(VOlapScanner* scanner) {
     Status status = Status::OK();
     bool eos = false;
     RuntimeState* state = scanner->runtime_state();
-    DCHECK(NULL != state);
+    DCHECK(nullptr != state);
     if (!scanner->is_open()) {
         status = scanner->open();
         if (!status.ok()) {
@@ -203,8 +197,8 @@ void VOlapScanNode::scanner_thread(VOlapScanner* scanner) {
     int64_t raw_bytes_threshold = config::doris_scanner_row_bytes;
     bool get_free_block = true;
 
-    while (!eos && raw_rows_read < raw_rows_threshold &&
-           raw_bytes_read < raw_bytes_threshold && get_free_block) {
+    while (!eos && raw_rows_read < raw_rows_threshold && raw_bytes_read < 
raw_bytes_threshold &&
+           get_free_block) {
         if (UNLIKELY(_transfer_done)) {
             eos = true;
             status = Status::Cancelled("Cancelled");
@@ -230,7 +224,8 @@ void VOlapScanNode::scanner_thread(VOlapScanner* scanner) {
             std::lock_guard<std::mutex> l(_free_blocks_lock);
             _free_blocks.emplace_back(block);
         } else {
-            if (!blocks.empty() && blocks.back()->rows() + block->rows() <= 
_runtime_state->batch_size()) {
+            if (!blocks.empty() &&
+                blocks.back()->rows() + block->rows() <= 
_runtime_state->batch_size()) {
                 MutableBlock(blocks.back()).merge(*block);
                 block->clear_column_data();
                 std::lock_guard<std::mutex> l(_free_blocks_lock);
@@ -408,7 +403,9 @@ Status VOlapScanNode::close(RuntimeState* state) {
     _scan_block_added_cv.notify_all();
 
     // join transfer thread
-    if (_transfer_thread) _transfer_thread->join();
+    if (_transfer_thread) {
+        _transfer_thread->join();
+    }
 
     // clear some block in queue
     // TODO: The presence of transfer_thread here may cause Block's memory 
alloc and be released not in a thread,
@@ -475,7 +472,7 @@ Status VOlapScanNode::get_next(RuntimeState* state, Block* 
block, bool* eos) {
     }
 
     // wait for block from queue
-    Block* materialized_block = NULL;
+    Block* materialized_block = nullptr;
     {
         std::unique_lock<std::mutex> l(_blocks_lock);
         SCOPED_TIMER(_olap_wait_batch_queue_timer);
@@ -490,14 +487,14 @@ Status VOlapScanNode::get_next(RuntimeState* state, 
Block* block, bool* eos) {
 
         if (!_materialized_blocks.empty()) {
             materialized_block = _materialized_blocks.back();
-            DCHECK(materialized_block != NULL);
+            DCHECK(materialized_block != nullptr);
             _materialized_blocks.pop_back();
             _materialized_row_batches_bytes -= 
materialized_block->allocated_bytes();
         }
     }
 
     // return block
-    if (NULL != materialized_block) {
+    if (nullptr != materialized_block) {
         // notify scanner
         _block_consumed_cv.notify_one();
         // get scanner's block memory
@@ -533,8 +530,6 @@ Status VOlapScanNode::get_next(RuntimeState* state, Block* 
block, bool* eos) {
     return _status;
 }
 
-// TODO: we should register the mem cost of new Block in
-// alloc block
 Block* VOlapScanNode::_alloc_block(bool& get_free_block) {
     {
         std::lock_guard<std::mutex> l(_free_blocks_lock);
@@ -544,15 +539,19 @@ Block* VOlapScanNode::_alloc_block(bool& get_free_block) {
             return block;
         }
     }
+
     get_free_block = false;
-    return new Block();
+
+    auto block = new Block(_tuple_desc->slots(), _block_size);
+    _buffered_bytes += block->allocated_bytes();
+    return block;
 }
 
 int VOlapScanNode::_start_scanner_thread_task(RuntimeState* state, int 
block_per_scanner) {
     std::list<VOlapScanner*> olap_scanners;
     int assigned_thread_num = _running_thread;
     size_t max_thread = std::min(_volap_scanners.size(),
-                     
static_cast<size_t>(config::doris_scanner_thread_pool_thread_num));
+                                 
static_cast<size_t>(config::doris_scanner_thread_pool_thread_num));
     // copy to local
     {
         // How many thread can apply to this query
@@ -563,7 +562,9 @@ int VOlapScanNode::_start_scanner_thread_task(RuntimeState* 
state, int block_per
                 thread_slot_num = _free_blocks.size() / block_per_scanner;
                 thread_slot_num += (_free_blocks.size() % block_per_scanner != 
0);
                 thread_slot_num = std::min(thread_slot_num, max_thread - 
assigned_thread_num);
-                if (thread_slot_num <= 0) thread_slot_num = 1;
+                if (thread_slot_num <= 0) {
+                    thread_slot_num = 1;
+                }
             } else {
                 std::lock_guard<std::mutex> l(_scan_blocks_lock);
                 if (_scan_blocks.empty()) {
@@ -583,9 +584,9 @@ int VOlapScanNode::_start_scanner_thread_task(RuntimeState* 
state, int block_per
                 auto scanner = _volap_scanners.front();
                 _volap_scanners.pop_front();
 
-                if (scanner->need_to_close())
+                if (scanner->need_to_close()) {
                     scanner->close(state);
-                else {
+                } else {
                     olap_scanners.push_back(scanner);
                     _running_thread++;
                     assigned_thread_num++;
diff --git a/be/src/vec/exec/volap_scan_node.h 
b/be/src/vec/exec/volap_scan_node.h
index 921399ee6b..831b0963b6 100644
--- a/be/src/vec/exec/volap_scan_node.h
+++ b/be/src/vec/exec/volap_scan_node.h
@@ -39,6 +39,7 @@ public:
     }
     Status get_next(RuntimeState* state, Block* block, bool* eos) override;
     Status close(RuntimeState* state) override;
+
 private:
     void transfer_thread(RuntimeState* state);
     void scanner_thread(VOlapScanner* scanner);
@@ -64,6 +65,8 @@ private:
     std::mutex _volap_scanners_lock;
 
     int _max_materialized_blocks;
+
+    size_t _block_size = 0;
 };
 } // namespace vectorized
 } // namespace doris


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to