This is an automated email from the ASF dual-hosted git repository. lihaopeng pushed a commit to branch vectorized in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
commit 20619e795832ca6947a32787f7f8937f5c6d0411 Author: lihaopeng <lihaop...@baidu.com> AuthorDate: Thu Jan 13 17:27:07 2022 +0800 [Vectorized] Rebase code from master --- be/src/exec/olap_scanner.cpp | 2 +- be/src/exec/olap_scanner.h | 4 ++-- be/src/vec/exec/join/vhash_join_node.cpp | 2 +- be/src/vec/exec/volap_scanner.cpp | 7 ++++--- be/src/vec/exec/volap_scanner.h | 6 +++++- be/src/vec/functions/function_binary_arithmetic.h | 2 +- be/src/vec/olap/block_reader.cpp | 2 +- be/src/vec/olap/block_reader.h | 4 ++-- be/src/vec/olap/vcollect_iterator.cpp | 6 +++--- be/src/vec/olap/vcollect_iterator.h | 14 +++++++------- be/src/vec/runtime/vdatetime_value.cpp | 2 +- be/src/vec/runtime/vdatetime_value.h | 2 +- be/test/vec/core/block_test.cpp | 2 +- 13 files changed, 30 insertions(+), 25 deletions(-) diff --git a/be/src/exec/olap_scanner.cpp b/be/src/exec/olap_scanner.cpp index 34336fa..2e05c5d 100644 --- a/be/src/exec/olap_scanner.cpp +++ b/be/src/exec/olap_scanner.cpp @@ -176,7 +176,7 @@ Status OlapScanner::_init_tablet_reader_params( _tablet_reader_params.rs_readers[1]->rowset()->start_version() == 2 && !_tablet_reader_params.rs_readers[1]->rowset()->rowset_meta()->is_segments_overlapping()); - _params.origin_return_columns = &_return_columns; + _tablet_reader_params.origin_return_columns = &_return_columns; if (_aggregation || single_version) { _tablet_reader_params.return_columns = _return_columns; _tablet_reader_params.direct_mode = true; diff --git a/be/src/exec/olap_scanner.h b/be/src/exec/olap_scanner.h index f234925..0c684d9 100644 --- a/be/src/exec/olap_scanner.h +++ b/be/src/exec/olap_scanner.h @@ -58,7 +58,7 @@ public: Status open(); - Status get_batch(RuntimeState* state, RowBatch* batch, bool* eof); + virtual Status get_batch(RuntimeState* state, RowBatch* batch, bool* eof); Status close(RuntimeState* state); @@ -103,7 +103,7 @@ protected: // Update profile that need to be reported in realtime. void _update_realtime_counter(); - virtual void set_tablet_reader() { _tablet_reader.reset(new TupleReader); } + virtual void set_tablet_reader() { _tablet_reader = std::make_unique<TupleReader>(); } protected: RuntimeState* _runtime_state; diff --git a/be/src/vec/exec/join/vhash_join_node.cpp b/be/src/vec/exec/join/vhash_join_node.cpp index 4533cae..9563ebf 100644 --- a/be/src/vec/exec/join/vhash_join_node.cpp +++ b/be/src/vec/exec/join/vhash_join_node.cpp @@ -590,7 +590,7 @@ Status HashJoinNode::init(const TPlanNode& tnode, RuntimeState* state) { for (const auto& filter_desc : _runtime_filter_descs) { RETURN_IF_ERROR(state->runtime_filter_mgr()->regist_filter(RuntimeFilterRole::PRODUCER, - filter_desc)); + filter_desc, state->query_options())); } return Status::OK(); diff --git a/be/src/vec/exec/volap_scanner.cpp b/be/src/vec/exec/volap_scanner.cpp index 64a51a1..1b4bb02 100644 --- a/be/src/vec/exec/volap_scanner.cpp +++ b/be/src/vec/exec/volap_scanner.cpp @@ -17,6 +17,8 @@ #include "vec/exec/volap_scanner.h" +#include <memory> + #include "vec/columns/column_complex.h" #include "vec/columns/column_nullable.h" #include "vec/columns/column_string.h" @@ -25,14 +27,13 @@ #include "vec/core/block.h" #include "vec/exec/volap_scan_node.h" #include "vec/exprs/vexpr_context.h" -#include "vec/olap/block_reader.h" #include "vec/runtime/vdatetime_value.h" + namespace doris::vectorized { VOlapScanner::VOlapScanner(RuntimeState* runtime_state, VOlapScanNode* parent, bool aggregation, bool need_agg_finalize, const TPaloScanRange& scan_range) : OlapScanner(runtime_state, parent, aggregation, need_agg_finalize, scan_range) { - _reader.reset(new BlockReader); } Status VOlapScanner::get_block(RuntimeState* state, vectorized::Block* block, bool* eof) { @@ -50,7 +51,7 @@ Status VOlapScanner::get_block(RuntimeState* state, vectorized::Block* block, bo do { // Read one block from block reader - auto res = _reader->next_block_with_aggregation(block, nullptr, nullptr, eof); + auto res = _tablet_reader->next_block_with_aggregation(block, nullptr, nullptr, eof); if (res != OLAP_SUCCESS) { std::stringstream ss; ss << "Internal Error: read storage fail. res=" << res diff --git a/be/src/vec/exec/volap_scanner.h b/be/src/vec/exec/volap_scanner.h index 3c66f4d..5efaf9d 100644 --- a/be/src/vec/exec/volap_scanner.h +++ b/be/src/vec/exec/volap_scanner.h @@ -19,9 +19,10 @@ #include "exec/olap_scanner.h" +#include "vec/olap/block_reader.h" + namespace doris { class OlapScanNode; -class OLAPReader; class RuntimeProfile; class Field; class RowBatch; @@ -41,6 +42,9 @@ public: VExprContext** vconjunct_ctx_ptr() { return &_vconjunct_ctx; } +protected: + virtual void set_tablet_reader() { _tablet_reader = std::make_unique<BlockReader>(); } + private: // TODO: Remove this function after we finish reader vec void _convert_row_to_block(std::vector<vectorized::MutableColumnPtr>* columns); diff --git a/be/src/vec/functions/function_binary_arithmetic.h b/be/src/vec/functions/function_binary_arithmetic.h index 180d2f5..f987f90 100644 --- a/be/src/vec/functions/function_binary_arithmetic.h +++ b/be/src/vec/functions/function_binary_arithmetic.h @@ -292,7 +292,7 @@ private: DecimalV2Value r(b); auto ans = Op::template apply(l, r, null_map, index); NativeResultType result; - memcpy(&result, &ans, sizeof(NativeResultType)); + memcpy(&result, &ans, std::min(sizeof(result), sizeof(ans))); return result; } diff --git a/be/src/vec/olap/block_reader.cpp b/be/src/vec/olap/block_reader.cpp index ec891f9..8fda9d6 100644 --- a/be/src/vec/olap/block_reader.cpp +++ b/be/src/vec/olap/block_reader.cpp @@ -119,7 +119,7 @@ void BlockReader::_init_agg_state(const ReaderParams& read_params) { } OLAPStatus BlockReader::init(const ReaderParams& read_params) { - Reader::init(read_params); + TabletReader::init(read_params); _batch_size = read_params.runtime_state->batch_size(); auto return_column_size = diff --git a/be/src/vec/olap/block_reader.h b/be/src/vec/olap/block_reader.h index 0fba0be..b1bc7e8 100644 --- a/be/src/vec/olap/block_reader.h +++ b/be/src/vec/olap/block_reader.h @@ -32,9 +32,9 @@ namespace doris { namespace vectorized { -class BlockReader final : public Reader { +class BlockReader final : public TabletReader { public: - ~BlockReader(); + ~BlockReader() override; // Initialize BlockReader with tablet, data version and fetch range. OLAPStatus init(const ReaderParams& read_params) override; diff --git a/be/src/vec/olap/vcollect_iterator.cpp b/be/src/vec/olap/vcollect_iterator.cpp index 2d132a6..73ef823 100644 --- a/be/src/vec/olap/vcollect_iterator.cpp +++ b/be/src/vec/olap/vcollect_iterator.cpp @@ -24,7 +24,7 @@ namespace vectorized { VCollectIterator::~VCollectIterator() {} -void VCollectIterator::init(Reader* reader) { +void VCollectIterator::init(TabletReader* reader) { _reader = reader; // when aggregate is enabled or key_type is DUP_KEYS, we don't merge // multiple data to aggregate for better performance @@ -159,7 +159,7 @@ OLAPStatus VCollectIterator::next(Block* block) { } } -VCollectIterator::Level0Iterator::Level0Iterator(RowsetReaderSharedPtr rs_reader, Reader* reader) +VCollectIterator::Level0Iterator::Level0Iterator(RowsetReaderSharedPtr rs_reader, TabletReader* reader) : LevelIterator(reader), _rs_reader(rs_reader), _reader(reader) { DCHECK_EQ(RowsetReader::BETA, rs_reader->type()); _block = _schema.create_block(_reader->_return_columns); @@ -207,7 +207,7 @@ OLAPStatus VCollectIterator::Level0Iterator::next(Block* block) { } VCollectIterator::Level1Iterator::Level1Iterator( - const std::list<VCollectIterator::LevelIterator*>& children, Reader* reader, bool merge, + const std::list<VCollectIterator::LevelIterator*>& children, TabletReader* reader, bool merge, bool skip_same) : LevelIterator(reader), _children(children), diff --git a/be/src/vec/olap/vcollect_iterator.h b/be/src/vec/olap/vcollect_iterator.h index c517de2..6cf9d62 100644 --- a/be/src/vec/olap/vcollect_iterator.h +++ b/be/src/vec/olap/vcollect_iterator.h @@ -41,7 +41,7 @@ public: // Hold reader point to get reader params ~VCollectIterator(); - void init(Reader* reader); + void init(TabletReader* reader); OLAPStatus add_child(RowsetReaderSharedPtr rs_reader); @@ -69,7 +69,7 @@ private: // then merged with other rowset readers. class LevelIterator { public: - LevelIterator(Reader* reader) : _schema(reader->tablet()->tablet_schema()) {}; + LevelIterator(TabletReader* reader) : _schema(reader->tablet()->tablet_schema()) {}; virtual OLAPStatus init() = 0; @@ -112,7 +112,7 @@ private: // Iterate from rowset reader. This Iterator usually like a leaf node class Level0Iterator : public LevelIterator { public: - Level0Iterator(RowsetReaderSharedPtr rs_reader, Reader* reader); + Level0Iterator(RowsetReaderSharedPtr rs_reader, TabletReader* reader); ~Level0Iterator() {} OLAPStatus init() override; @@ -127,14 +127,14 @@ private: OLAPStatus _refresh_current_row(); RowsetReaderSharedPtr _rs_reader; - Reader* _reader = nullptr; + TabletReader* _reader = nullptr; Block _block; }; // Iterate from LevelIterators (maybe Level0Iterators or Level1Iterator or mixed) class Level1Iterator : public LevelIterator { public: - Level1Iterator(const std::list<LevelIterator*>& children, Reader* reader, bool merge, + Level1Iterator(const std::list<LevelIterator*>& children, TabletReader* reader, bool merge, bool skip_same); OLAPStatus init() override; @@ -160,7 +160,7 @@ private: // point to the Level0Iterator containing the next output row. // null when VCollectIterator hasn't been initialized or reaches EOF. LevelIterator* _cur_child = nullptr; - Reader* _reader = nullptr; + TabletReader* _reader = nullptr; // when `_merge == true`, rowset reader returns ordered rows and VCollectIterator uses a priority queue to merge // sort them. The output of VCollectIterator is also ordered. @@ -184,7 +184,7 @@ private: bool _merge = true; // Hold reader point to access read params, such as fetch conditions. - Reader* _reader = nullptr; + TabletReader* _reader = nullptr; bool _skip_same; }; diff --git a/be/src/vec/runtime/vdatetime_value.cpp b/be/src/vec/runtime/vdatetime_value.cpp index cc4cc0e..4c59277 100644 --- a/be/src/vec/runtime/vdatetime_value.cpp +++ b/be/src/vec/runtime/vdatetime_value.cpp @@ -292,7 +292,7 @@ void VecDateTimeValue::set_type(int type) { void VecDateTimeValue::set_max_time(bool neg) { set_zero(TIME_TIME); - _hour = TIME_MAX_HOUR; + _hour = static_cast<uint8_t>(TIME_MAX_HOUR); _minute = TIME_MAX_MINUTE; _second = TIME_MAX_SECOND; _neg = neg; diff --git a/be/src/vec/runtime/vdatetime_value.h b/be/src/vec/runtime/vdatetime_value.h index a008f0c..4ec8d6e 100644 --- a/be/src/vec/runtime/vdatetime_value.h +++ b/be/src/vec/runtime/vdatetime_value.h @@ -127,7 +127,7 @@ const int DATE_MAX_DAYNR = 3652424; const int YY_PART_YEAR = 70; // Limits of time value -const int TIME_MAX_HOUR = 838; +const int TIME_MAX_HOUR = 256; const int TIME_MAX_MINUTE = 59; const int TIME_MAX_SECOND = 59; const int TIME_MAX_VALUE = 10000 * TIME_MAX_HOUR + 100 * TIME_MAX_MINUTE + TIME_MAX_SECOND; diff --git a/be/test/vec/core/block_test.cpp b/be/test/vec/core/block_test.cpp index 711f29f..7264369 100644 --- a/be/test/vec/core/block_test.cpp +++ b/be/test/vec/core/block_test.cpp @@ -140,7 +140,7 @@ TEST(BlockTest, RowBatchCovertToBlock) { larget_int = column7->operator[](i).get<vectorized::Int128>(); vectorized::VecDateTimeValue k7; - memcpy(&k7, &larget_int, column_descs[6].size); + memcpy(reinterpret_cast<vectorized::Int128*>(&k7), &larget_int, column_descs[6].size); vectorized::VecDateTimeValue date_time_value; std::string now_time("2020-12-02"); date_time_value.from_date_str(now_time.c_str(), now_time.size()); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org