This is an automated email from the ASF dual-hosted git repository. weixiang pushed a commit to branch memtable_opt_rebase in repository https://gitbox.apache.org/repos/asf/doris.git
commit 6c468193db68e818b5e318658562c6ce43505dfb Author: weixiang <weixian...@meituan.com> AuthorDate: Mon Apr 18 21:37:34 2022 +0800 [feature-wip](stream-load-vec) opt memtable --- be/src/olap/delta_writer.cpp | 25 ++- be/src/olap/memtable.cpp | 154 +++++++++++++++--- be/src/olap/memtable.h | 47 ++++-- be/src/vec/CMakeLists.txt | 6 +- .../vec/aggregate_functions/block_aggregator.cpp | 180 +++++++++++++++++++++ be/src/vec/aggregate_functions/block_aggregator.h | 70 ++++++++ be/src/vec/core/block.cpp | 20 +++ be/src/vec/core/block.h | 13 ++ 8 files changed, 479 insertions(+), 36 deletions(-) diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp index b088ef50cc..9ceb897384 100644 --- a/be/src/olap/delta_writer.cpp +++ b/be/src/olap/delta_writer.cpp @@ -198,16 +198,27 @@ Status DeltaWriter::write(const vectorized::Block* block, const std::vector<int> return Status::OLAPInternalError(OLAP_ERR_ALREADY_CANCELLED); } - _mem_table->insert(block, row_idxs); - - if (_mem_table->need_to_agg()) { - _mem_table->shrink_memtable_by_agg(); - if (_mem_table->is_flush()) { - RETURN_NOT_OK(_flush_memtable_async()); - _reset_mem_table(); + int start = 0, end = 0; + bool flush = false; + const size_t num_rows = row_idxs.size(); + for (; start < num_rows;) { + auto count = end + 1 - start; + if (end == num_rows - 1 || (row_idxs[end + 1] - row_idxs[start]) != count) { + if (_mem_table->insert(block, row_idxs[start], count)) { + flush = true; + } + start += count; + end = start; + } else { + end++; } } + if (flush || _mem_table->is_full()) { + RETURN_NOT_OK(_flush_memtable_async()); + _reset_mem_table(); + } + return Status::OK(); } diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp index 1b44d0ee25..f3f4271fea 100644 --- a/be/src/olap/memtable.cpp +++ b/be/src/olap/memtable.cpp @@ -54,6 +54,8 @@ MemTable::MemTable(int64_t tablet_id, Schema* schema, const TabletSchema* tablet _vec_skip_list = std::make_unique<VecTable>( _vec_row_comparator.get(), _table_mem_pool.get(), _keys_type == KeysType::DUP_KEYS); _init_columns_offset_by_slot_descs(slot_descs, tuple_desc); + _block_aggregator = + std::make_unique<vectorized::BlockAggregator>(_schema, _tablet_schema, true); } else { _vec_skip_list = nullptr; if (_keys_type == KeysType::DUP_KEYS) { @@ -122,28 +124,135 @@ int MemTable::RowInBlockComparator::operator()(const RowInBlock* left, *_pblock, -1); } -void MemTable::insert(const vectorized::Block* input_block, const std::vector<int>& row_idxs) { - auto target_block = input_block->copy_block(_column_offset); +bool MemTable::insert(const vectorized::Block* block, size_t row_pos, size_t num_rows) { if (_is_first_insertion) { _is_first_insertion = false; - auto cloneBlock = target_block.clone_without_columns(); - _input_mutable_block = vectorized::MutableBlock::build_mutable_block(&cloneBlock); - _vec_row_comparator->set_block(&_input_mutable_block); - _output_mutable_block = vectorized::MutableBlock::build_mutable_block(&cloneBlock); + auto cloneBlock = block->clone_without_columns(); + _block = std::make_shared<vectorized::MutableBlock>(&cloneBlock); if (_keys_type != KeysType::DUP_KEYS) { _init_agg_functions(&target_block); } } - auto num_rows = row_idxs.size(); - size_t cursor_in_mutableblock = _input_mutable_block.rows(); - _input_mutable_block.add_rows(&target_block, row_idxs.data(), row_idxs.data() + num_rows); - size_t input_size = target_block.allocated_bytes() * num_rows / target_block.rows(); - _mem_usage += input_size; - _mem_tracker->consume(input_size); - - for (int i = 0; i < num_rows; i++) { - _row_in_blocks.emplace_back(new RowInBlock {cursor_in_mutableblock + i}); - _insert_one_row_from_block(_row_in_blocks.back()); + _block->add_rows(block, row_pos, num_rows); + _block_bytes_usage += block->allocated_bytes() * num_rows / block->rows(); + // Memtalbe is full, do not flush immediately + // First try to merge these blocks + // If the merged memtable is still full or we can not benefit a lot from merge at first + // Then flush the memtable into disk. + bool is_flush = false; + if (is_full()) { + size_t before_merge_bytes = bytes_allocated(); + _merge(); + size_t after_merged_bytes = bytes_allocated(); + // TODO(weixiang): magic number here, make it configurable later. + if (is_full() || (after_merged_bytes >= before_merge_bytes * 2 / 3 && _merge_count == 1)) { + is_flush = true; + } + } + return is_flush; +} + +size_t MemTable::bytes_allocated() const { + return _block_bytes_usage + _block_aggregator->get_bytes_usage(); +} + +bool MemTable::is_full() const { + return bytes_allocated() > config::write_buffer_size; +} + +void MemTable::_merge() { + if (_block == nullptr || _keys_type == KeysType::DUP_KEYS) { + return; + } + _sort(false); + _agg(false); + _merge_count++; +} + +void MemTable::_agg(const bool finalize) { + // note that the _block had been sorted before. + if (_sorted_block == nullptr || _sorted_block->rows() <= 0) { + return; + } + vectorized::Block sorted_block = _sorted_block->to_block(); + _block_aggregator->append_block(&sorted_block); + _block_aggregator->partial_sort_merged_aggregate(); + if (finalize) { + _sorted_block.reset(); + } else { + _sorted_block->clear_column_data(); + } +} + +void MemTable::_sort(const bool finalize) { + _index_for_sort.resize(_block->rows()); + for (uint32_t i = 0; i < _block->rows(); i++) { + _index_for_sort[i] = {i, i}; + } + + _sort_block_by_rows(); + _sorted_block = _block->create_same_struct_block(0); + _append_sorted_block(_block.get(), _sorted_block.get()); + if (finalize) { + _block.reset(); + } else { + _block->clear_column_data(); + } + _block_bytes_usage = 0; +} + +void MemTable::_sort_block_by_rows() { + std::sort(_index_for_sort.begin(), _index_for_sort.end(), + [this](const MemTable::OrderedIndexItem& left, + const MemTable::OrderedIndexItem& right) { + int res = _block->compare_at(left.index_in_block, right.index_in_block, + _schema->num_key_columns(), *_block.get(), -1); + if (res != 0) { + return res < 0; + } + return left.incoming_index < right.incoming_index; + }); +} + +void MemTable::_append_sorted_block(vectorized::MutableBlock* src, vectorized::MutableBlock* dst) { + size_t row_num = src->rows(); + _sorted_index_in_block.clear(); + _sorted_index_in_block.reserve(row_num); + for (size_t i = 0; i < row_num; i++) { + _sorted_index_in_block.push_back(_index_for_sort[i].index_in_block); + } + vectorized::Block src_block = src->to_block(); + dst->add_rows(&src_block, _sorted_index_in_block.data(), + _sorted_index_in_block.data() + row_num); +} + +void MemTable::finalize() { + //TODO(weixiang): check here + if (_block == nullptr) { + return; + } + + if (_keys_type != KeysType::DUP_KEYS) { + // agg mode + if (_block->rows() > 0) { + _merge(); + } + if (_merge_count > 1) { + _block = _block_aggregator->get_partial_agged_block(); + _block_aggregator->reset_aggregator(); + _sort(true); + _agg(true); + } else { + _block.reset(); + _sorted_block.reset(); + } + + _block_bytes_usage = 0; + _sorted_block = _block_aggregator->get_partial_agged_block(); + + } else { + // dup mode + _sort(true); } } @@ -321,6 +430,13 @@ bool MemTable::need_to_agg() { } Status MemTable::flush() { + if (!_skip_list) { + finalize(); + if (_sorted_block == nullptr) { + return Status::OK(); + } + } + VLOG_CRITICAL << "begin to flush memtable for tablet: " << _tablet_id << ", memsize: " << memory_usage() << ", rows: " << _rows; int64_t duration_ns = 0; @@ -351,8 +467,10 @@ Status MemTable::_do_flush(int64_t& duration_ns) { RETURN_NOT_OK(st); } } else { - _collect_vskiplist_results<true>(); - vectorized::Block block = _output_mutable_block.to_block(); + vectorized::Block block = _sorted_block->to_block(); + // beta rowset flush parallel, segment write add block is not + // thread safe, so use tmp variable segment_write instead of + // member variable RETURN_NOT_OK(_rowset_writer->flush_single_memtable(&block)); _flush_size = block.allocated_bytes(); } diff --git a/be/src/olap/memtable.h b/be/src/olap/memtable.h index c0b4c92839..a12b67f918 100644 --- a/be/src/olap/memtable.h +++ b/be/src/olap/memtable.h @@ -25,6 +25,7 @@ #include "runtime/mem_tracker.h" #include "util/tuple_row_zorder_compare.h" #include "vec/aggregate_functions/aggregate_function.h" +#include "vec/aggregate_functions/block_aggregator.h" #include "vec/common/string_ref.h" #include "vec/core/block.h" @@ -48,17 +49,17 @@ public: int64_t tablet_id() const { return _tablet_id; } size_t memory_usage() const { return _mem_tracker->consumption(); } - std::shared_ptr<MemTracker>& mem_tracker() { return _mem_tracker; } - inline void insert(const Tuple* tuple) { (this->*_insert_fn)(tuple); } - // insert tuple from (row_pos) to (row_pos+num_rows) - void insert(const vectorized::Block* block, const std::vector<int>& row_idxs); + std::shared_ptr<MemTracker>& mem_tracker() { return _mem_tracker; } - void shrink_memtable_by_agg(); + inline void insert(const Tuple* tuple) { (this->*_insert_fn)(tuple); }; + //insert tuple from (row_pos) to (row_pos+num_rows) + bool insert(const vectorized::Block* block, size_t row_pos, size_t num_rows); - bool is_flush() const; + bool is_full() const; + size_t bytes_allocated() const; - bool need_to_agg(); + void finalize(); /// Flush Status flush(); @@ -152,6 +153,14 @@ private: // for vectorized void _insert_one_row_from_block(RowInBlock* row_in_block); void _aggregate_two_row_in_block(RowInBlock* new_row, RowInBlock* row_in_skiplist); + void _sort(const bool finalize); + void _sort_block_by_rows(); + + void _merge(); + + void _agg(const bool finalize); + + void _append_sorted_block(vectorized::MutableBlock* src, vectorized::MutableBlock* dst); int64_t _tablet_id; Schema* _schema; @@ -205,14 +214,34 @@ private: vectorized::MutableBlock _input_mutable_block; vectorized::MutableBlock _output_mutable_block; - template <bool is_final> - void _collect_vskiplist_results(); + struct OrderedIndexItem { + uint32_t index_in_block; + uint32_t incoming_index; // used for sort by column + }; + + using OrderedIndex = std::vector<OrderedIndexItem>; + + OrderedIndex _index_for_sort; + + std::vector<int> _sorted_index_in_block; + + vectorized::MutableBlockPtr _block; + + vectorized::MutableBlockPtr _sorted_block; + + std::unique_ptr<vectorized::BlockAggregator> _block_aggregator; + + vectorized::Block _collect_vskiplist_results(); + bool _is_first_insertion; void _init_agg_functions(const vectorized::Block* block); std::vector<vectorized::AggregateFunctionPtr> _agg_functions; std::vector<RowInBlock*> _row_in_blocks; size_t _mem_usage; + size_t _block_bytes_usage = 0; + size_t _agg_bytes_usage = 0; + int _merge_count = 0; }; // class MemTable inline std::ostream& operator<<(std::ostream& os, const MemTable& table) { diff --git a/be/src/vec/CMakeLists.txt b/be/src/vec/CMakeLists.txt index 69aaf816ba..6a2f1abf5d 100644 --- a/be/src/vec/CMakeLists.txt +++ b/be/src/vec/CMakeLists.txt @@ -6,7 +6,7 @@ # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # -# http://www.apache.org/licenses/LICENSE-2.0 +# http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an @@ -16,6 +16,7 @@ # under the License. # where to put generated libraries set(LIBRARY_OUTPUT_PATH "${BUILD_DIR}/src/vec") + # where to put generated binaries set(EXECUTABLE_OUTPUT_PATH "${BUILD_DIR}/src/vec") @@ -41,6 +42,7 @@ set(VEC_FILES aggregate_functions/aggregate_function_simple_factory.cpp aggregate_functions/aggregate_function_java_udaf.h aggregate_functions/aggregate_function_orthogonal_bitmap.cpp + aggregate_functions/block_aggregator.cpp columns/collator.cpp columns/column.cpp columns/column_array.cpp @@ -206,5 +208,5 @@ set(VEC_FILES runtime/vsorted_run_merger.cpp) add_library(Vec STATIC - ${VEC_FILES} + ${VEC_FILES} ) diff --git a/be/src/vec/aggregate_functions/block_aggregator.cpp b/be/src/vec/aggregate_functions/block_aggregator.cpp new file mode 100644 index 0000000000..201cca75e0 --- /dev/null +++ b/be/src/vec/aggregate_functions/block_aggregator.cpp @@ -0,0 +1,180 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "block_aggregator.h" + +namespace doris::vectorized { + +BlockAggregator::BlockAggregator(const Schema* schema, const TabletSchema* tablet_schema, bool src_sorted) + : _schema(schema), _tablet_schema(tablet_schema), _src_sorted(src_sorted) { + _init_agg_functions(); +} + +BlockAggregator::~BlockAggregator() { +} + +void BlockAggregator::_init_agg_functions() { + _cols_num = _schema->num_columns(); + _key_cols_num = _schema->num_key_columns(); + _value_cols_num = _cols_num - _key_cols_num; + //TODO(weixiang): save memory just use value length. + _agg_functions.resize(_schema->num_columns()); + _agg_places.resize(_value_cols_num); + for (uint32_t cid = _schema->num_key_columns(); cid < _schema->num_columns(); ++cid) { + FieldAggregationMethod agg_method = _tablet_schema->column(cid).aggregation(); + std::string agg_name = TabletColumn::get_string_by_aggregation_type(agg_method) + AGG_LOAD_SUFFIX; + + std::transform(agg_name.begin(), agg_name.end(), agg_name.begin(), + [](unsigned char c) { return std::tolower(c); }); + + // create aggregate function + DataTypes argument_types; + // TODO(weixiang): 检查这块这么写是否有隐患 + DataTypePtr dtptr = Schema::get_data_type_ptr(*_schema->column(cid)); + argument_types.push_back(dtptr); + Array params; + AggregateFunctionPtr function = + AggregateFunctionSimpleFactory::instance().get( + agg_name, argument_types, params, dtptr->is_nullable()); + + DCHECK(function != nullptr); + _agg_functions[cid] = function; + } +} + + +void BlockAggregator::append_block(Block* block) { + if (block == nullptr || block->rows() <= 0){ + return; + } + _agg_data_counters.reserve(_agg_data_counters.size() + block->rows()); + size_t key_num = _schema->num_key_columns(); + + size_t same_rows = 1; + for (size_t i = 0; i < block->rows(); i++) { + if ( i+1 == block->rows() || block->compare_at(i, i+1, key_num, *block, -1) != 0) { + _agg_data_counters.push_back(same_rows); + same_rows = 0; + } + same_rows++; + } + if (_is_first_append) { + // this means it is appending block for the first time + _aggregated_block = std::make_shared<MutableBlock>(block); + _is_first_append = false; + } else { + _aggregated_block->add_rows(block, 0, block->rows()); + } +} + +/** + * @brief aggregate sorted block + * 1. _agg_data_counters save the following N rows to agg in partial sort block + * 2. first_row_idx records the first row num of rows with the same keys. + * + * + * TODO(weixiang): + * 1. refactor function partial_sort_merged_aggregate, 拆成多个函数:init等 + */ + +void BlockAggregator::partial_sort_merged_aggregate() { + DCHECK(!_agg_data_counters.empty()); + std::vector<int> first_row_idx; // TODO(weixiang): add into member variables + std::vector<MutableColumnPtr> aggregated_cols; + first_row_idx.reserve(_agg_data_counters.size()); + int row_pos = _cumulative_agg_num; + for (size_t i = 0; i < _agg_data_counters.size(); i++) { + first_row_idx.push_back(row_pos); + row_pos += _agg_data_counters[i]; + } + auto col_ids = _schema->column_ids(); + size_t agged_row_num = first_row_idx.size(); + // for keys: + for (size_t cid = 0; cid < _key_cols_num; cid++) { + + MutableColumnPtr key_col = + _schema->get_data_type_ptr(*_schema->column(col_ids[cid]))->create_column(); + key_col->insert_indices_from(*_aggregated_block->mutable_columns()[cid], + first_row_idx.data(), + first_row_idx.data() + agged_row_num); + aggregated_cols.emplace_back(std::move(key_col)); + } + + // init agged place for values: + for (size_t cid = _key_cols_num; cid < _cols_num; cid++) { + size_t place_size = _agg_functions[cid]->size_of_data(); + _agg_places[cid - _key_cols_num] = new char[place_size * agged_row_num]; + for (auto i = 0; i < agged_row_num; i++) { + AggregateDataPtr place = _agg_places[cid - _key_cols_num] + place_size * i; + _agg_functions[cid]->create(place); + } + + } + + // do agg + for (size_t cid = _key_cols_num; cid < _cols_num; cid++) { + size_t place_size = _agg_functions[cid]->size_of_data(); + auto* src_value_col_ptr = _aggregated_block->mutable_columns()[cid].get(); + size_t agg_begin_idx = 0; + + for (size_t i = 0; i < agged_row_num; i++) { + AggregateDataPtr place = _agg_places[cid - _key_cols_num] + place_size * i; + _agg_functions[cid]->add_batch_range( + agg_begin_idx, + agg_begin_idx + _agg_data_counters[i] - 1, place, + const_cast<const doris::vectorized::IColumn**>(&src_value_col_ptr), nullptr); + agg_begin_idx += _agg_data_counters[i]; + } + } + + // move to result column + for (size_t value_col_idx = 0; value_col_idx < _value_cols_num; value_col_idx++) { + size_t place_size = _agg_functions[value_col_idx + _key_cols_num]->size_of_data(); + MutableColumnPtr dst_value_col_ptr = + _schema->get_data_type_ptr(*_schema->column(col_ids[value_col_idx + _key_cols_num])) + ->create_column(); + for (size_t i = 0; i < first_row_idx.size(); i++) { + _agg_functions[value_col_idx + _key_cols_num]->insert_result_into( + _agg_places[value_col_idx] + i * place_size, + *reinterpret_cast<doris::vectorized::IColumn*>(dst_value_col_ptr.get())); + } + aggregated_cols.emplace_back(std::move(dst_value_col_ptr)); + } + + _aggregated_block->clear_column_data(); + _aggregated_block->append_from_columns(aggregated_cols, agged_row_num); + _agg_data_counters.clear(); + _cumulative_agg_num += agged_row_num; + + for(auto place : _agg_places) { + // free aggregated memory + delete[] place; + } + + +} + + + +size_t BlockAggregator::get_bytes_usage() const{ + if(UNLIKELY(_aggregated_block == nullptr)) { + return 0; + } + return _aggregated_block->allocated_bytes(); +} + +} // namespace doris::vectorized diff --git a/be/src/vec/aggregate_functions/block_aggregator.h b/be/src/vec/aggregate_functions/block_aggregator.h new file mode 100644 index 0000000000..510e614dd2 --- /dev/null +++ b/be/src/vec/aggregate_functions/block_aggregator.h @@ -0,0 +1,70 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +#pragma once +#include "vec/aggregate_functions/aggregate_function.h" +#include "vec/aggregate_functions/aggregate_function_reader.h" +#include "vec/aggregate_functions/aggregate_function_simple_factory.h" +#include "vec/core/block.h" +#include "olap/schema.h" + +namespace doris::vectorized { + +using BlockPtr = std::shared_ptr<Block>; +using MutableBlockPtr = std::shared_ptr<MutableBlock>; + +class BlockAggregator { + + +public: + BlockAggregator(const Schema* schema, const TabletSchema* tablet_schema, bool src_sorted); + ~BlockAggregator(); + void append_block(Block* block); + void partial_sort_merged_aggregate(); + void _init_agg_functions(); + size_t get_bytes_usage() const; + + MutableBlockPtr get_partial_agged_block() { + return _aggregated_block; + } + + void reset_aggregator() { + _aggregated_block.reset(); + _agg_data_counters.clear(); + _cumulative_agg_num = 0; + _is_first_append = true; + } + +private: + bool _is_first_append = true; + size_t _key_cols_num; + size_t _value_cols_num; + size_t _cumulative_agg_num = 0; + size_t _cols_num; + const Schema* _schema; + const TabletSchema* _tablet_schema; + bool _src_sorted; + MutableBlockPtr _aggregated_block; + std::vector<int> _agg_data_counters; + std::vector<AggregateFunctionPtr> _agg_functions; + + std::vector<AggregateDataPtr> _agg_places; + +}; + +} // namespace diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp index 3e50c1578b..0b45ab4af1 100644 --- a/be/src/vec/core/block.cpp +++ b/be/src/vec/core/block.cpp @@ -959,6 +959,14 @@ Block MutableBlock::to_block(int start_column, int end_column) { return {columns_with_schema}; } +void MutableBlock::clear_column_data() noexcept { + for (auto& col : _columns) { + if (col) { + col->clear(); + } + } +} + std::string MutableBlock::dump_data(size_t row_limit) const { std::vector<std::string> headers; std::vector<size_t> headers_size; @@ -1018,6 +1026,18 @@ std::unique_ptr<Block> Block::create_same_struct_block(size_t size) const { return temp_block; } +//TODO(weixiang): unique_ptr? +std::shared_ptr<MutableBlock> MutableBlock::create_same_struct_block(size_t size) const { + Block temp_block; + for (const auto& d : _data_types) { + auto column = d->create_column(); + column->resize(size); + temp_block.insert({std::move(column), d, ""}); + } + auto result = std::make_shared<MutableBlock>(std::move(temp_block)); + return result; +} + void Block::shrink_char_type_column_suffix_zero(const std::vector<size_t>& char_type_idx) { for (auto idx : char_type_idx) { if (idx < data.size()) { diff --git a/be/src/vec/core/block.h b/be/src/vec/core/block.h index c15a6728c4..5c23a939cf 100644 --- a/be/src/vec/core/block.h +++ b/be/src/vec/core/block.h @@ -355,12 +355,25 @@ public: size_t rows() const; size_t columns() const { return _columns.size(); } + + std::shared_ptr<MutableBlock> create_same_struct_block(size_t size) const; + + void clear_column_data() noexcept; + bool empty() const { return rows() == 0; } MutableColumns& mutable_columns() { return _columns; } void set_muatable_columns(MutableColumns&& columns) { _columns = std::move(columns); } + void append_from_columns(MutableColumns& columns, size_t length) { + DCHECK(_columns.size() == columns.size()); + for (size_t i = 0; i < _columns.size(); i++) { + DCHECK(columns[i]->size() >= length); + _columns[i]->insert_range_from(*columns[i], 0, length); + } + } + DataTypes& data_types() { return _data_types; } MutableColumnPtr& get_column_by_position(size_t position) { return _columns[position]; } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org