This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push: new 80c11da [refactor] modify the implements of Tuple & RowBatch (#7319) 80c11da is described below commit 80c11da3df0d4b599256c23c1965f94ee5747ca6 Author: thinker <zchw...@qq.com> AuthorDate: Thu Dec 9 22:36:37 2021 +0800 [refactor] modify the implements of Tuple & RowBatch (#7319) code refactor: improve code's readability, avoid const_cast 1. make loop simpler and clearer by using range-based loop grammar, it's safer than old loop style 2. iteration for _row_desc.tuple_descriptors() use index replace index and iterator mixed 3. add new function To cast_to(From from), use this union-based casting between two types to replace reinterpret_cast, this new cast is more readable 4. avoid using the same variable name for nested loop, it's dangerous 5. add const keyword for member functions followed CppCoreGuidelines --- be/src/common/utils.h | 10 ++ be/src/runtime/row_batch.cpp | 235 ++++++++++++++++++++----------------------- be/src/runtime/row_batch.h | 11 +- be/src/runtime/tuple.cpp | 64 ++++++------ 4 files changed, 153 insertions(+), 167 deletions(-) diff --git a/be/src/common/utils.h b/be/src/common/utils.h index cb6647a..61fed1d 100644 --- a/be/src/common/utils.h +++ b/be/src/common/utils.h @@ -61,4 +61,14 @@ static constexpr int RELEASE_CONTEXT_COUNTER = 1 << 7; static_assert((RELEASE_CONTEXT_COUNTER & (RELEASE_CONTEXT_COUNTER - 1)) == 0, "should be power of 2"); +template <typename To, typename From> +static inline To convert_to(From from) { + union { + From _from; + To _to; + }; + _from = from; + return _to; +} + } // namespace doris diff --git a/be/src/runtime/row_batch.cpp b/be/src/runtime/row_batch.cpp index 4652df4..19c91f6 100644 --- a/be/src/runtime/row_batch.cpp +++ b/be/src/runtime/row_batch.cpp @@ -20,6 +20,7 @@ #include <snappy/snappy.h> #include <stdint.h> // for intptr_t +#include "common/utils.h" #include "gen_cpp/Data_types.h" #include "gen_cpp/data.pb.h" #include "runtime/buffered_tuple_stream2.inline.h" @@ -58,10 +59,10 @@ RowBatch::RowBatch(const RowDescriptor& row_desc, int capacity, MemTracker* mem_ // TODO: switch to Init() pattern so we can check memory limit and return Status. if (config::enable_partitioned_aggregation) { _mem_tracker->Consume(_tuple_ptrs_size); - _tuple_ptrs = reinterpret_cast<Tuple**>(malloc(_tuple_ptrs_size)); + _tuple_ptrs = (Tuple**)(malloc(_tuple_ptrs_size)); DCHECK(_tuple_ptrs != nullptr); } else { - _tuple_ptrs = reinterpret_cast<Tuple**>(_tuple_data_pool.allocate(_tuple_ptrs_size)); + _tuple_ptrs = (Tuple**)(_tuple_data_pool.allocate(_tuple_ptrs_size)); } } @@ -89,13 +90,13 @@ RowBatch::RowBatch(const RowDescriptor& row_desc, const PRowBatch& input_batch, // TODO: switch to Init() pattern so we can check memory limit and return Status. if (config::enable_partitioned_aggregation) { _mem_tracker->Consume(_tuple_ptrs_size); - _tuple_ptrs = reinterpret_cast<Tuple**>(malloc(_tuple_ptrs_size)); + _tuple_ptrs = (Tuple**)(malloc(_tuple_ptrs_size)); DCHECK(_tuple_ptrs != nullptr); } else { - _tuple_ptrs = reinterpret_cast<Tuple**>(_tuple_data_pool.allocate(_tuple_ptrs_size)); + _tuple_ptrs = (Tuple**)_tuple_data_pool.allocate(_tuple_ptrs_size); } - uint8_t* tuple_data = nullptr; + char* tuple_data = nullptr; if (input_batch.is_compressed()) { // Decompress tuple data into data pool const char* compressed_data = input_batch.tuple_data().c_str(); @@ -104,13 +105,12 @@ RowBatch::RowBatch(const RowDescriptor& row_desc, const PRowBatch& input_batch, bool success = snappy::GetUncompressedLength(compressed_data, compressed_size, &uncompressed_size); DCHECK(success) << "snappy::GetUncompressedLength failed"; - tuple_data = reinterpret_cast<uint8_t*>(_tuple_data_pool.allocate(uncompressed_size)); - success = snappy::RawUncompress(compressed_data, compressed_size, - reinterpret_cast<char*>(tuple_data)); + tuple_data = (char*)_tuple_data_pool.allocate(uncompressed_size); + success = snappy::RawUncompress(compressed_data, compressed_size, tuple_data); DCHECK(success) << "snappy::RawUncompress failed"; } else { // Tuple data uncompressed, copy directly into data pool - tuple_data = _tuple_data_pool.allocate(input_batch.tuple_data().size()); + tuple_data = (char*)_tuple_data_pool.allocate(input_batch.tuple_data().size()); memcpy(tuple_data, input_batch.tuple_data().c_str(), input_batch.tuple_data().size()); } @@ -120,7 +120,7 @@ RowBatch::RowBatch(const RowDescriptor& row_desc, const PRowBatch& input_batch, if (offset == -1) { _tuple_ptrs[tuple_idx++] = nullptr; } else { - _tuple_ptrs[tuple_idx++] = reinterpret_cast<Tuple*>(tuple_data + offset); + _tuple_ptrs[tuple_idx++] = convert_to<Tuple*>(tuple_data + offset); } } @@ -128,7 +128,8 @@ RowBatch::RowBatch(const RowDescriptor& row_desc, const PRowBatch& input_batch, if (!_row_desc.has_varlen_slots()) { return; } - const std::vector<TupleDescriptor*>& tuple_descs = _row_desc.tuple_descriptors(); + + const auto& tuple_descs = _row_desc.tuple_descriptors(); // For every unique tuple, convert string offsets contained in tuple data into // pointers. Tuples were serialized in the order we are deserializing them in, @@ -136,21 +137,22 @@ RowBatch::RowBatch(const RowDescriptor& row_desc, const PRowBatch& input_batch, // we already converted. for (int i = 0; i < _num_rows; ++i) { TupleRow* row = get_row(i); - std::vector<TupleDescriptor*>::const_iterator desc = tuple_descs.begin(); - for (int j = 0; desc != tuple_descs.end(); ++desc, ++j) { - if ((*desc)->string_slots().empty() && (*desc)->collection_slots().empty()) { + for (size_t j = 0; j < tuple_descs.size(); ++j) { + auto desc = tuple_descs[j]; + if (desc->string_slots().empty() && desc->collection_slots().empty()) { continue; } + Tuple* tuple = row->get_tuple(j); if (tuple == nullptr) { continue; } - for (auto slot : (*desc)->string_slots()) { + for (auto slot : desc->string_slots()) { DCHECK(slot->type().is_string_type()); StringValue* string_val = tuple->get_string_slot(slot->tuple_offset()); - int offset = reinterpret_cast<intptr_t>(string_val->ptr); - string_val->ptr = reinterpret_cast<char*>(tuple_data + offset); + int offset = convert_to<int>(string_val->ptr); + string_val->ptr = tuple_data + offset; // Why we do this mask? Field len of StringValue is changed from int to size_t in // Doris 0.11. When upgrading, some bits of len sent from 0.10 is random value, @@ -160,37 +162,35 @@ RowBatch::RowBatch(const RowDescriptor& row_desc, const PRowBatch& input_batch, } // copy collection slots - vector<SlotDescriptor*>::const_iterator slot_collection = - (*desc)->collection_slots().begin(); - for (; slot_collection != (*desc)->collection_slots().end(); ++slot_collection) { - DCHECK((*slot_collection)->type().is_collection_type()); + for (auto slot_collection : desc->collection_slots()) { + DCHECK(slot_collection->type().is_collection_type()); CollectionValue* array_val = - tuple->get_collection_slot((*slot_collection)->tuple_offset()); + tuple->get_collection_slot(slot_collection->tuple_offset()); // assgin data and null_sign pointer position in tuple_data - int data_offset = reinterpret_cast<intptr_t>(array_val->data()); - array_val->set_data(reinterpret_cast<char*>(tuple_data + data_offset)); - int null_offset = reinterpret_cast<intptr_t>(array_val->null_signs()); - array_val->set_null_signs(reinterpret_cast<bool*>(tuple_data + null_offset)); + int data_offset = convert_to<int>(array_val->data()); + array_val->set_data(tuple_data + data_offset); + int null_offset = convert_to<int>(array_val->null_signs()); + array_val->set_null_signs(convert_to<bool*>(tuple_data + null_offset)); - const TypeDescriptor& item_type = (*slot_collection)->type().children.at(0); + const TypeDescriptor& item_type = slot_collection->type().children.at(0); if (!item_type.is_string_type()) { continue; } // copy every string item - for (int i = 0; i < array_val->length(); ++i) { - if (array_val->is_null_at(i)) { + for (size_t k = 0; k < array_val->length(); ++k) { + if (array_val->is_null_at(k)) { continue; } - StringValue* dst_item_v = reinterpret_cast<StringValue*>( - (uint8_t*)array_val->data() + i * item_type.get_slot_size()); + StringValue* dst_item_v = convert_to<StringValue*>( + (uint8_t*)array_val->data() + k * item_type.get_slot_size()); if (dst_item_v->len != 0) { - int offset = reinterpret_cast<intptr_t>(dst_item_v->ptr); - dst_item_v->ptr = reinterpret_cast<char*>(tuple_data + offset); + int offset = convert_to<int>(dst_item_v->ptr); + dst_item_v->ptr = tuple_data + offset; } } } @@ -222,13 +222,13 @@ RowBatch::RowBatch(const RowDescriptor& row_desc, const TRowBatch& input_batch, // TODO: switch to Init() pattern so we can check memory limit and return Status. if (config::enable_partitioned_aggregation) { _mem_tracker->Consume(_tuple_ptrs_size); - _tuple_ptrs = reinterpret_cast<Tuple**>(malloc(_tuple_ptrs_size)); + _tuple_ptrs = (Tuple**)malloc(_tuple_ptrs_size); DCHECK(_tuple_ptrs != nullptr); } else { - _tuple_ptrs = reinterpret_cast<Tuple**>(_tuple_data_pool.allocate(_tuple_ptrs_size)); + _tuple_ptrs = (Tuple**)_tuple_data_pool.allocate(_tuple_ptrs_size); } - uint8_t* tuple_data = nullptr; + char* tuple_data = nullptr; if (input_batch.is_compressed) { // Decompress tuple data into data pool const char* compressed_data = input_batch.tuple_data.c_str(); @@ -237,24 +237,22 @@ RowBatch::RowBatch(const RowDescriptor& row_desc, const TRowBatch& input_batch, bool success = snappy::GetUncompressedLength(compressed_data, compressed_size, &uncompressed_size); DCHECK(success) << "snappy::GetUncompressedLength failed"; - tuple_data = reinterpret_cast<uint8_t*>(_tuple_data_pool.allocate(uncompressed_size)); - success = snappy::RawUncompress(compressed_data, compressed_size, - reinterpret_cast<char*>(tuple_data)); + tuple_data = (char*)_tuple_data_pool.allocate(uncompressed_size); + success = snappy::RawUncompress(compressed_data, compressed_size, tuple_data); DCHECK(success) << "snappy::RawUncompress failed"; } else { // Tuple data uncompressed, copy directly into data pool - tuple_data = _tuple_data_pool.allocate(input_batch.tuple_data.size()); + tuple_data = (char*)_tuple_data_pool.allocate(input_batch.tuple_data.size()); memcpy(tuple_data, input_batch.tuple_data.c_str(), input_batch.tuple_data.size()); } // convert input_batch.tuple_offsets into pointers int tuple_idx = 0; - for (vector<int32_t>::const_iterator offset = input_batch.tuple_offsets.begin(); - offset != input_batch.tuple_offsets.end(); ++offset) { - if (*offset == -1) { + for (auto offset : input_batch.tuple_offsets) { + if (offset == -1) { _tuple_ptrs[tuple_idx++] = nullptr; } else { - _tuple_ptrs[tuple_idx++] = reinterpret_cast<Tuple*>(tuple_data + *offset); + _tuple_ptrs[tuple_idx++] = convert_to<Tuple*>(tuple_data + offset); } } @@ -262,7 +260,8 @@ RowBatch::RowBatch(const RowDescriptor& row_desc, const TRowBatch& input_batch, if (!_row_desc.has_varlen_slots()) { return; } - const std::vector<TupleDescriptor*>& tuple_descs = _row_desc.tuple_descriptors(); + + const auto& tuple_descs = _row_desc.tuple_descriptors(); // For every unique tuple, convert string offsets contained in tuple data into // pointers. Tuples were serialized in the order we are deserializing them in, @@ -270,9 +269,9 @@ RowBatch::RowBatch(const RowDescriptor& row_desc, const TRowBatch& input_batch, // we already converted. for (int i = 0; i < _num_rows; ++i) { TupleRow* row = get_row(i); - std::vector<TupleDescriptor*>::const_iterator desc = tuple_descs.begin(); - for (int j = 0; desc != tuple_descs.end(); ++desc, ++j) { - if ((*desc)->string_slots().empty() && (*desc)->collection_slots().empty()) { + for (size_t j = 0; j < tuple_descs.size(); ++j) { + auto desc = tuple_descs[j]; + if (desc->string_slots().empty() && desc->collection_slots().empty()) { continue; } @@ -281,13 +280,12 @@ RowBatch::RowBatch(const RowDescriptor& row_desc, const TRowBatch& input_batch, continue; } - std::vector<SlotDescriptor*>::const_iterator slot = (*desc)->string_slots().begin(); - for (; slot != (*desc)->string_slots().end(); ++slot) { - DCHECK((*slot)->type().is_string_type()); - StringValue* string_val = tuple->get_string_slot((*slot)->tuple_offset()); + for (auto slot : desc->string_slots()) { + DCHECK(slot->type().is_string_type()); + StringValue* string_val = tuple->get_string_slot(slot->tuple_offset()); - int offset = reinterpret_cast<intptr_t>(string_val->ptr); - string_val->ptr = reinterpret_cast<char*>(tuple_data + offset); + int offset = convert_to<int>(string_val->ptr); + string_val->ptr = tuple_data + offset; // Why we do this mask? Field len of StringValue is changed from int to size_t in // Doris 0.11. When upgrading, some bits of len sent from 0.10 is random value, @@ -297,35 +295,33 @@ RowBatch::RowBatch(const RowDescriptor& row_desc, const TRowBatch& input_batch, } // copy collection slot - vector<SlotDescriptor*>::const_iterator slot_collection = - (*desc)->collection_slots().begin(); - for (; slot_collection != (*desc)->collection_slots().end(); ++slot_collection) { - DCHECK((*slot_collection)->type().is_collection_type()); + for (auto slot_collection : desc->collection_slots()) { + DCHECK(slot_collection->type().is_collection_type()); CollectionValue* array_val = - tuple->get_collection_slot((*slot_collection)->tuple_offset()); + tuple->get_collection_slot(slot_collection->tuple_offset()); - int offset = reinterpret_cast<intptr_t>(array_val->data()); - array_val->set_data(reinterpret_cast<char*>(tuple_data + offset)); - int null_offset = reinterpret_cast<intptr_t>(array_val->null_signs()); - array_val->set_null_signs(reinterpret_cast<bool*>(tuple_data + null_offset)); + int offset = convert_to<int>(array_val->data()); + array_val->set_data(tuple_data + offset); + int null_offset = convert_to<int>(array_val->null_signs()); + array_val->set_null_signs(convert_to<bool*>(tuple_data + null_offset)); - const TypeDescriptor& item_type = (*slot_collection)->type().children.at(0); + const TypeDescriptor& item_type = slot_collection->type().children.at(0); if (!item_type.is_string_type()) { continue; } // copy string item - for (int i = 0; i < array_val->length(); ++i) { - if (array_val->is_null_at(i)) { + for (size_t k = 0; k < array_val->length(); ++k) { + if (array_val->is_null_at(k)) { continue; } - StringValue* dst_item_v = reinterpret_cast<StringValue*>( - (uint8_t*)array_val->data() + i * item_type.get_slot_size()); + StringValue* dst_item_v = convert_to<StringValue*>( + (uint8_t*)array_val->data() + k * item_type.get_slot_size()); if (dst_item_v->len != 0) { - int offset = reinterpret_cast<intptr_t>(dst_item_v->ptr); - dst_item_v->ptr = reinterpret_cast<char*>(tuple_data + offset); + int offset = convert_to<int>(dst_item_v->ptr); + dst_item_v->ptr = tuple_data + offset; } } } @@ -381,14 +377,13 @@ size_t RowBatch::serialize(TRowBatch* output_batch) { // Copy tuple data, including strings, into output_batch (converting string // pointers into offsets in the process) int offset = 0; // current offset into output_batch->tuple_data - char* tuple_data = const_cast<char*>(output_batch->tuple_data.c_str()); + char* tuple_data = output_batch->tuple_data.data(); + const auto& tuple_descs = _row_desc.tuple_descriptors(); for (int i = 0; i < _num_rows; ++i) { TupleRow* row = get_row(i); - const std::vector<TupleDescriptor*>& tuple_descs = _row_desc.tuple_descriptors(); - std::vector<TupleDescriptor*>::const_iterator desc = tuple_descs.begin(); - - for (int j = 0; desc != tuple_descs.end(); ++desc, ++j) { + for (size_t j = 0; j < tuple_descs.size(); ++j) { + auto desc = tuple_descs[j]; if (row->get_tuple(j) == nullptr) { // NULLs are encoded as -1 output_batch->tuple_offsets.push_back(-1); @@ -397,7 +392,7 @@ size_t RowBatch::serialize(TRowBatch* output_batch) { // Record offset before creating copy (which increments offset and tuple_data) output_batch->tuple_offsets.push_back(offset); - row->get_tuple(j)->deep_copy(**desc, &tuple_data, &offset, /* convert_ptrs */ true); + row->get_tuple(j)->deep_copy(*desc, &tuple_data, &offset, /* convert_ptrs */ true); DCHECK_LE(offset, size); } } @@ -414,7 +409,7 @@ size_t RowBatch::serialize(TRowBatch* output_batch) { } size_t compressed_size = 0; - char* compressed_output = const_cast<char*>(_compression_scratch.c_str()); + char* compressed_output = _compression_scratch.data(); snappy::RawCompress(output_batch->tuple_data.c_str(), size, compressed_output, &compressed_size); @@ -450,20 +445,22 @@ size_t RowBatch::serialize(PRowBatch* output_batch) { // Copy tuple data, including strings, into output_batch (converting string // pointers into offsets in the process) int offset = 0; // current offset into output_batch->tuple_data - char* tuple_data = const_cast<char*>(mutable_tuple_data->data()); + char* tuple_data = mutable_tuple_data->data(); + const auto& tuple_descs = _row_desc.tuple_descriptors(); + const auto& mutable_tuple_offsets = output_batch->mutable_tuple_offsets(); + for (int i = 0; i < _num_rows; ++i) { TupleRow* row = get_row(i); - const std::vector<TupleDescriptor*>& tuple_descs = _row_desc.tuple_descriptors(); - std::vector<TupleDescriptor*>::const_iterator desc = tuple_descs.begin(); - for (int j = 0; desc != tuple_descs.end(); ++desc, ++j) { + for (size_t j = 0; j < tuple_descs.size(); ++j) { + auto desc = tuple_descs[j]; if (row->get_tuple(j) == nullptr) { // NULLs are encoded as -1 - output_batch->mutable_tuple_offsets()->Add(-1); + mutable_tuple_offsets->Add(-1); continue; } // Record offset before creating copy (which increments offset and tuple_data) - output_batch->mutable_tuple_offsets()->Add(offset); - row->get_tuple(j)->deep_copy(**desc, &tuple_data, &offset, /* convert_ptrs */ true); + mutable_tuple_offsets->Add(offset); + row->get_tuple(j)->deep_copy(*desc, &tuple_data, &offset, /* convert_ptrs */ true); DCHECK_LE(offset, size); } } @@ -480,7 +477,7 @@ size_t RowBatch::serialize(PRowBatch* output_batch) { } size_t compressed_size = 0; - char* compressed_output = const_cast<char*>(_compression_scratch.c_str()); + char* compressed_output = _compression_scratch.data(); snappy::RawCompress(mutable_tuple_data->data(), size, compressed_output, &compressed_size); if (LIKELY(compressed_size < size)) { @@ -506,12 +503,12 @@ void RowBatch::add_io_buffer(DiskIoMgr::BufferDescriptor* buffer) { Status RowBatch::resize_and_allocate_tuple_buffer(RuntimeState* state, int64_t* tuple_buffer_size, uint8_t** buffer) { - const int row_size = _row_desc.get_row_size(); + int64_t row_size = _row_desc.get_row_size(); // Avoid divide-by-zero. Don't need to modify capacity for empty rows anyway. if (row_size != 0) { - _capacity = std::max(1, std::min(_capacity, FIXED_LEN_BUFFER_LIMIT / row_size)); + _capacity = std::max(1, std::min<int>(_capacity, FIXED_LEN_BUFFER_LIMIT / row_size)); } - *tuple_buffer_size = static_cast<int64_t>(row_size) * _capacity; + *tuple_buffer_size = row_size * _capacity; // TODO(dhc): change allocate to try_allocate? *buffer = _tuple_data_pool.allocate(*tuple_buffer_size); if (*buffer == nullptr) { @@ -560,7 +557,7 @@ void RowBatch::reset() { _blocks.clear(); _auxiliary_mem_usage = 0; if (!config::enable_partitioned_aggregation) { - _tuple_ptrs = reinterpret_cast<Tuple**>(_tuple_data_pool.allocate(_tuple_ptrs_size)); + _tuple_ptrs = (Tuple**)(_tuple_data_pool.allocate(_tuple_ptrs_size)); } _need_to_return = false; _flush = FlushMode::NO_FLUSH_RESOURCES; @@ -677,52 +674,48 @@ void RowBatch::deep_copy_to(RowBatch* dst) { dst->add_rows(_num_rows); for (int i = 0; i < _num_rows; ++i) { TupleRow* src_row = get_row(i); - TupleRow* dst_row = reinterpret_cast<TupleRow*>(dst->_tuple_ptrs + i * _num_tuples_per_row); - src_row->deep_copy(dst_row, _row_desc.tuple_descriptors(), &dst->_tuple_data_pool, - false); + TupleRow* dst_row = convert_to<TupleRow*>(dst->_tuple_ptrs + i * _num_tuples_per_row); + src_row->deep_copy(dst_row, _row_desc.tuple_descriptors(), &dst->_tuple_data_pool, false); } dst->commit_rows(_num_rows); } // TODO: consider computing size of batches as they are built up -size_t RowBatch::total_byte_size() { +size_t RowBatch::total_byte_size() const { size_t result = 0; // Sum total variable length byte sizes. for (int i = 0; i < _num_rows; ++i) { TupleRow* row = get_row(i); - const std::vector<TupleDescriptor*>& tuple_descs = _row_desc.tuple_descriptors(); - std::vector<TupleDescriptor*>::const_iterator desc = tuple_descs.begin(); - - for (int j = 0; desc != tuple_descs.end(); ++desc, ++j) { + const auto& tuple_descs = _row_desc.tuple_descriptors(); + for (size_t j = 0; j < tuple_descs.size(); ++j) { + auto desc = tuple_descs[j]; Tuple* tuple = row->get_tuple(j); if (tuple == nullptr) { continue; } - result += (*desc)->byte_size(); - std::vector<SlotDescriptor*>::const_iterator slot = (*desc)->string_slots().begin(); - for (; slot != (*desc)->string_slots().end(); ++slot) { - DCHECK((*slot)->type().is_string_type()); - if (tuple->is_null((*slot)->null_indicator_offset())) { + result += desc->byte_size(); + + for (auto slot : desc->string_slots()) { + DCHECK(slot->type().is_string_type()); + if (tuple->is_null(slot->null_indicator_offset())) { continue; } - StringValue* string_val = tuple->get_string_slot((*slot)->tuple_offset()); + StringValue* string_val = tuple->get_string_slot(slot->tuple_offset()); result += string_val->len; } // compute slot collection size - vector<SlotDescriptor*>::const_iterator slot_collection = - (*desc)->collection_slots().begin(); - for (; slot_collection != (*desc)->collection_slots().end(); ++slot_collection) { - DCHECK((*slot_collection)->type().is_collection_type()); - if (tuple->is_null((*slot_collection)->null_indicator_offset())) { + for (auto slot_collection : desc->collection_slots()) { + DCHECK(slot_collection->type().is_collection_type()); + if (tuple->is_null(slot_collection->null_indicator_offset())) { continue; } // compute data null_signs size CollectionValue* array_val = - tuple->get_collection_slot((*slot_collection)->tuple_offset()); + tuple->get_collection_slot(slot_collection->tuple_offset()); result += array_val->length() * sizeof(bool); - const TypeDescriptor& item_type = (*slot_collection)->type().children.at(0); + const TypeDescriptor& item_type = slot_collection->type().children.at(0); result += array_val->length() * item_type.get_slot_size(); if (!item_type.is_string_type()) { @@ -730,12 +723,12 @@ size_t RowBatch::total_byte_size() { } // compute string type item size - for (int i = 0; i < array_val->length(); ++i) { - if (array_val->is_null_at(i)) { + for (int k = 0; k < array_val->length(); ++k) { + if (array_val->is_null_at(k)) { continue; } - StringValue* dst_item_v = reinterpret_cast<StringValue*>( - (uint8_t*)array_val->data() + i * item_type.get_slot_size()); + StringValue* dst_item_v = convert_to<StringValue*>( + (uint8_t*)array_val->data() + k * item_type.get_slot_size()); result += dst_item_v->len; } } @@ -745,20 +738,6 @@ size_t RowBatch::total_byte_size() { return result; } -int RowBatch::max_tuple_buffer_size() const { - int row_size = _row_desc.get_row_size(); - if (row_size > AT_CAPACITY_MEM_USAGE) { - return row_size; - } - int num_rows = 0; - if (row_size != 0) { - num_rows = std::min(_capacity, AT_CAPACITY_MEM_USAGE / row_size); - } - int tuple_buffer_size = num_rows * row_size; - DCHECK_LE(tuple_buffer_size, AT_CAPACITY_MEM_USAGE); - return tuple_buffer_size; -} - void RowBatch::add_buffer(BufferPool::ClientHandle* client, BufferPool::BufferHandle&& buffer, FlushMode flush) { _auxiliary_mem_usage += buffer.len(); diff --git a/be/src/runtime/row_batch.h b/be/src/runtime/row_batch.h index 637c4b9..5fcae81 100644 --- a/be/src/runtime/row_batch.h +++ b/be/src/runtime/row_batch.h @@ -169,7 +169,7 @@ public: // The total size of all data represented in this row batch (tuples and referenced // string data). - size_t total_byte_size(); + size_t total_byte_size() const; TupleRow* get_row(int row_idx) const { DCHECK(_tuple_ptrs != nullptr); @@ -214,10 +214,10 @@ public: /// Returns true if the iterator is beyond the last row for read iterators. /// Useful for read iterators to determine the limit. Write iterators should use /// RowBatch::AtCapacity() instead. - bool IR_ALWAYS_INLINE at_end() { return _row >= _row_batch_end; } + bool IR_ALWAYS_INLINE at_end() const { return _row >= _row_batch_end; } /// Returns the row batch which this iterator is iterating through. - RowBatch* parent() { return _parent; } + RowBatch* parent() const { return _parent; } private: /// Number of tuples per row. @@ -309,7 +309,7 @@ public: // we firstly update dest resource, and then reset current resource void transfer_resource_ownership(RowBatch* dest); - void copy_row(TupleRow* src, TupleRow* dest) { + void copy_row(const TupleRow* src, TupleRow* dest) const { memcpy(dest, src, _num_tuples_per_row * sizeof(Tuple*)); } @@ -385,9 +385,6 @@ public: void set_scanner_id(int id) { _scanner_id = id; } int scanner_id() const { return _scanner_id; } - // Computes the maximum size needed to store tuple data for this row batch. - int max_tuple_buffer_size() const; - static const int MAX_MEM_POOL_SIZE = 32 * 1024 * 1024; std::string to_string(); diff --git a/be/src/runtime/tuple.cpp b/be/src/runtime/tuple.cpp index 398a26c..9816f79 100644 --- a/be/src/runtime/tuple.cpp +++ b/be/src/runtime/tuple.cpp @@ -23,6 +23,7 @@ #include <string> #include <vector> +#include "common/utils.h" #include "exprs/expr.h" #include "exprs/expr_context.h" #include "runtime/collection_value.h" @@ -60,7 +61,7 @@ int64_t Tuple::varlen_byte_size(const TupleDescriptor& desc) const { } Tuple* Tuple::deep_copy(const TupleDescriptor& desc, MemPool* pool, bool convert_ptrs) { - Tuple* result = reinterpret_cast<Tuple*>(pool->allocate(desc.byte_size())); + Tuple* result = (Tuple*)(pool->allocate(desc.byte_size())); deep_copy(result, desc, pool, convert_ptrs); return result; } @@ -69,17 +70,15 @@ void Tuple::deep_copy(Tuple* dst, const TupleDescriptor& desc, MemPool* pool, bo memory_copy(dst, this, desc.byte_size()); // allocate in the same pool and then copy all non-null string slots - for (std::vector<SlotDescriptor*>::const_iterator i = desc.string_slots().begin(); - i != desc.string_slots().end(); ++i) { - DCHECK((*i)->type().is_string_type()); - - StringValue* string_v = dst->get_string_slot((*i)->tuple_offset()); - if (!dst->is_null((*i)->null_indicator_offset())) { + for (auto string_slot : desc.string_slots()) { + DCHECK(string_slot->type().is_string_type()); + StringValue* string_v = dst->get_string_slot(string_slot->tuple_offset()); + if (!dst->is_null(string_slot->null_indicator_offset())) { if (string_v->len != 0) { int offset = pool->total_allocated_bytes(); - char* string_copy = reinterpret_cast<char*>(pool->allocate(string_v->len)); + char* string_copy = (char*)(pool->allocate(string_v->len)); memory_copy(string_copy, string_v->ptr, string_v->len); - string_v->ptr = (convert_ptrs ? reinterpret_cast<char*>(offset) : string_copy); + string_v->ptr = (convert_ptrs ? convert_to<char*>(offset) : string_copy); } } else { string_v->ptr = nullptr; @@ -103,12 +102,12 @@ void Tuple::deep_copy(Tuple* dst, const TupleDescriptor& desc, MemPool* pool, bo int nulls_size = cv->length() * sizeof(bool); int offset = pool->total_allocated_bytes(); - char* coll_data = reinterpret_cast<char*>(pool->allocate(coll_byte_size + nulls_size)); + char* coll_data = (char*)(pool->allocate(coll_byte_size + nulls_size)); // copy data and null_signs if (nulls_size > 0) { cv->set_has_null(true); - cv->set_null_signs(reinterpret_cast<bool*>(coll_data) + coll_byte_size); + cv->set_null_signs(convert_to<bool*>(coll_data) + coll_byte_size); memory_copy(coll_data, cv->null_signs(), nulls_size); } else { cv->set_has_null(false); @@ -116,9 +115,8 @@ void Tuple::deep_copy(Tuple* dst, const TupleDescriptor& desc, MemPool* pool, bo memory_copy(coll_data + nulls_size, cv->data(), coll_byte_size); // assgin new null_sign and data location - cv->set_null_signs(convert_ptrs ? reinterpret_cast<bool*>(offset) - : reinterpret_cast<bool*>(coll_data)); - cv->set_data(convert_ptrs ? reinterpret_cast<char*>(offset + nulls_size) + cv->set_null_signs(convert_ptrs ? convert_to<bool*>(offset) : convert_to<bool*>(coll_data)); + cv->set_data(convert_ptrs ? convert_to<char*>(offset + nulls_size) : coll_data + nulls_size); if (!item_type.is_string_type()) { @@ -130,19 +128,19 @@ void Tuple::deep_copy(Tuple* dst, const TupleDescriptor& desc, MemPool* pool, bo if (cv->is_null_at(i)) { continue; } - StringValue* dst_item_v = reinterpret_cast<StringValue*>(coll_data + item_offset); + StringValue* dst_item_v = convert_to<StringValue*>(coll_data + item_offset); if (dst_item_v->len != 0) { int offset = pool->total_allocated_bytes(); - char* string_copy = reinterpret_cast<char*>(pool->allocate(dst_item_v->len)); + char* string_copy = (char*)(pool->allocate(dst_item_v->len)); memory_copy(string_copy, dst_item_v->ptr, dst_item_v->len); - dst_item_v->ptr = (convert_ptrs ? reinterpret_cast<char*>(offset) : string_copy); + dst_item_v->ptr = (convert_ptrs ? convert_to<char*>(offset) : string_copy); } } } } Tuple* Tuple::dcopy_with_new(const TupleDescriptor& desc, MemPool* pool, int64_t* bytes) { - Tuple* result = reinterpret_cast<Tuple*>(pool->allocate(desc.byte_size())); + Tuple* result = (Tuple*)(pool->allocate(desc.byte_size())); *bytes = dcopy_with_new(result, desc); return result; } @@ -176,6 +174,7 @@ int64_t Tuple::release_string(const TupleDescriptor& desc) { if (!is_null(slot->null_indicator_offset())) { StringValue* string_v = get_string_slot(slot->tuple_offset()); delete[] string_v->ptr; + string_v->ptr = nullptr; bytes += string_v->len; } } @@ -183,7 +182,7 @@ int64_t Tuple::release_string(const TupleDescriptor& desc) { } void Tuple::deep_copy(const TupleDescriptor& desc, char** data, int* offset, bool convert_ptrs) { - Tuple* dst = reinterpret_cast<Tuple*>(*data); + Tuple* dst = (Tuple*)(*data); memory_copy(dst, this, desc.byte_size()); *data += desc.byte_size(); *offset += desc.byte_size(); @@ -193,11 +192,11 @@ void Tuple::deep_copy(const TupleDescriptor& desc, char** data, int* offset, boo StringValue* string_v = dst->get_string_slot(slot_desc->tuple_offset()); if (!dst->is_null(slot_desc->null_indicator_offset())) { memory_copy(*data, string_v->ptr, string_v->len); - string_v->ptr = (convert_ptrs ? reinterpret_cast<char*>(*offset) : *data); + string_v->ptr = (convert_ptrs ? convert_to<char*>(*offset) : *data); *data += string_v->len; *offset += string_v->len; } else { - string_v->ptr = (convert_ptrs ? reinterpret_cast<char*>(*offset) : *data); + string_v->ptr = (convert_ptrs ? convert_to<char*>(*offset) : *data); string_v->len = 0; } } @@ -221,9 +220,9 @@ void Tuple::deep_copy(const TupleDescriptor& desc, char** data, int* offset, boo memory_copy(*data + nulls_size, cv->data(), coll_byte_size); if (!item_type.is_string_type()) { - cv->set_null_signs(convert_ptrs ? reinterpret_cast<bool*>(*offset) - : reinterpret_cast<bool*>(*data)); - cv->set_data(convert_ptrs ? reinterpret_cast<char*>(*offset + nulls_size) + cv->set_null_signs(convert_ptrs ? convert_to<bool*>(*offset) + : convert_to<bool*>(*data)); + cv->set_data(convert_ptrs ? convert_to<char*>(*offset + nulls_size) : *data + nulls_size); *data += coll_byte_size + nulls_size; *offset += coll_byte_size + nulls_size; @@ -242,18 +241,18 @@ void Tuple::deep_copy(const TupleDescriptor& desc, char** data, int* offset, boo if (cv->is_null_at(i)) { continue; } - StringValue* dst_item_v = reinterpret_cast<StringValue*>(base_data + item_offset); + StringValue* dst_item_v = convert_to<StringValue*>(base_data + item_offset); if (dst_item_v->len != 0) { memory_copy(*data, dst_item_v->ptr, dst_item_v->len); - dst_item_v->ptr = (convert_ptrs ? reinterpret_cast<char*>(*offset) : *data); + dst_item_v->ptr = (convert_ptrs ? convert_to<char*>(*offset) : *data); *data += dst_item_v->len; *offset += dst_item_v->len; } } // assgin new null_sign and data location - cv->set_null_signs(convert_ptrs ? reinterpret_cast<bool*>(base_offset) - : reinterpret_cast<bool*>(base_data)); - cv->set_data(convert_ptrs ? reinterpret_cast<char*>(base_offset + nulls_size) + cv->set_null_signs(convert_ptrs ? convert_to<bool*>(base_offset) + : convert_to<bool*>(base_data)); + cv->set_data(convert_ptrs ? convert_to<char*>(base_offset + nulls_size) : base_data + nulls_size); } } @@ -270,8 +269,9 @@ void Tuple::materialize_exprs(TupleRow* row, const TupleDescriptor& desc, memset(this, 0, desc.num_null_bytes()); // Evaluate the output_slot_exprs and place the results in the tuples. int mat_expr_index = 0; - for (int i = 0; i < desc.slots().size(); ++i) { - SlotDescriptor* slot_desc = desc.slots()[i]; + auto& slots = desc.slots(); + for (int i = 0; i < slots.size(); ++i) { + SlotDescriptor* slot_desc = slots[i]; if (!slot_desc->is_materialized()) { continue; } @@ -297,7 +297,7 @@ void Tuple::materialize_exprs(TupleRow* row, const TupleDescriptor& desc, RawValue::write(src, dst, slot_desc->type(), pool); if (collect_string_vals) { if (slot_desc->type().is_string_type()) { - StringValue* string_val = reinterpret_cast<StringValue*>(dst); + StringValue* string_val = convert_to<StringValue*>(dst); non_null_var_len_values->push_back(string_val); *total_var_len += string_val->len; } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org