This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch array-type in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
commit 3f74fb619c8f7e514919e712be17a17070ad97ff Author: Adonis Ling <adonis0...@gmail.com> AuthorDate: Thu Mar 10 10:51:26 2022 +0800 [feature-wip][array-type] Support nested array insertion. (#8305) --- be/src/exec/olap_scanner.cpp | 43 +- be/src/olap/aggregate_func.cpp | 2 + be/src/olap/field.h | 4 +- be/src/olap/rowset/segment_v2/column_reader.cpp | 9 +- be/src/olap/types.cpp | 13 + be/src/olap/types.h | 49 +- be/src/runtime/collection_value.cpp | 241 +++++++-- be/src/runtime/collection_value.h | 38 +- be/src/runtime/mysql_result_writer.cpp | 17 +- be/src/runtime/raw_value.cpp | 10 +- be/src/runtime/row_batch.cpp | 50 +- be/src/runtime/tuple.cpp | 129 ++--- be/test/runtime/CMakeLists.txt | 1 + be/test/runtime/array_test.cpp | 556 +++++++++++++++++++++ .../main/java/org/apache/doris/catalog/Column.java | 6 + 15 files changed, 927 insertions(+), 241 deletions(-) diff --git a/be/src/exec/olap_scanner.cpp b/be/src/exec/olap_scanner.cpp index d7dc839..f480135 100644 --- a/be/src/exec/olap_scanner.cpp +++ b/be/src/exec/olap_scanner.cpp @@ -353,39 +353,16 @@ Status OlapScanner::get_batch(RuntimeState* state, RowBatch* batch, bool* eof) { // Copy collection slot for (auto desc : _parent->_collection_slots) { CollectionValue* slot = tuple->get_collection_slot(desc->tuple_offset()); - - TypeDescriptor item_type = desc->type().children.at(0); - size_t item_size = item_type.get_slot_size() * slot->length(); - - size_t nulls_size = slot->length(); - uint8_t* data = batch->tuple_data_pool()->allocate(item_size + nulls_size); - - // copy null_signs - memory_copy(data, slot->null_signs(), nulls_size); - memory_copy(data + nulls_size, slot->data(), item_size); - - slot->set_null_signs(reinterpret_cast<bool*>(data)); - slot->set_data(reinterpret_cast<char*>(data + nulls_size)); - - if (!item_type.is_string_type()) { - continue; - } - - // when string type, copy every item - for (int i = 0; i < slot->length(); ++i) { - int item_offset = nulls_size + i * item_type.get_slot_size(); - if (slot->is_null_at(i)) { - continue; - } - StringValue* dst_item_v = - reinterpret_cast<StringValue*>(data + item_offset); - if (dst_item_v->len != 0) { - char* string_copy = reinterpret_cast<char*>( - batch->tuple_data_pool()->allocate(dst_item_v->len)); - memory_copy(string_copy, dst_item_v->ptr, dst_item_v->len); - dst_item_v->ptr = string_copy; - } - } + const TypeDescriptor& item_type = desc->type().children.at(0); + auto pool = batch->tuple_data_pool(); + CollectionValue::deep_copy_collection( + slot, item_type, [pool](int size) -> MemFootprint { + int64_t offset = pool->total_allocated_bytes(); + uint8_t* data = pool->allocate(size); + return { offset, data }; + }, + false + ); } // the memory allocate by mem pool has been copied, // so we should release these memory immediately diff --git a/be/src/olap/aggregate_func.cpp b/be/src/olap/aggregate_func.cpp index f3c6d8b..6e844e7 100644 --- a/be/src/olap/aggregate_func.cpp +++ b/be/src/olap/aggregate_func.cpp @@ -119,6 +119,8 @@ AggregateFuncResolver::AggregateFuncResolver() { OLAP_FIELD_TYPE_VARCHAR>(); add_aggregate_mapping<OLAP_FIELD_AGGREGATION_NONE, OLAP_FIELD_TYPE_ARRAY, OLAP_FIELD_TYPE_CHAR>(); + add_aggregate_mapping<OLAP_FIELD_AGGREGATION_NONE, OLAP_FIELD_TYPE_ARRAY, + OLAP_FIELD_TYPE_ARRAY>(); // Min Aggregate Function add_aggregate_mapping<OLAP_FIELD_AGGREGATION_MIN, OLAP_FIELD_TYPE_TINYINT>(); diff --git a/be/src/olap/field.h b/be/src/olap/field.h index d4fedf2..e3b35da 100644 --- a/be/src/olap/field.h +++ b/be/src/olap/field.h @@ -454,9 +454,7 @@ public: char* allocate_memory(char* cell_ptr, char* variable_ptr) const override { auto array_v = (CollectionValue*)cell_ptr; - array_v->set_null_signs(reinterpret_cast<bool*>(variable_ptr + sizeof(CollectionValue))); - array_v->set_data(variable_ptr + sizeof(CollectionValue) + - OLAP_ARRAY_MAX_BYTES / sizeof(char*)); + array_v->set_null_signs(reinterpret_cast<bool*>(variable_ptr)); return variable_ptr + _length; } diff --git a/be/src/olap/rowset/segment_v2/column_reader.cpp b/be/src/olap/rowset/segment_v2/column_reader.cpp index 9530d80..968027c 100644 --- a/be/src/olap/rowset/segment_v2/column_reader.cpp +++ b/be/src/olap/rowset/segment_v2/column_reader.cpp @@ -57,22 +57,21 @@ Status ColumnReader::create(const ColumnReaderOptions& opts, const ColumnMetaPB& RETURN_IF_ERROR(ColumnReader::create(opts, meta.children_columns(0), meta.children_columns(0).num_rows(), path_desc, &item_reader)); - RETURN_IF_ERROR(item_reader->init()); std::unique_ptr<ColumnReader> offset_reader; RETURN_IF_ERROR(ColumnReader::create(opts, meta.children_columns(1), meta.children_columns(1).num_rows(), path_desc, &offset_reader)); - RETURN_IF_ERROR(offset_reader->init()); std::unique_ptr<ColumnReader> null_reader; if (meta.is_nullable()) { RETURN_IF_ERROR(ColumnReader::create(opts, meta.children_columns(2), meta.children_columns(2).num_rows(), path_desc, &null_reader)); - RETURN_IF_ERROR(null_reader->init()); } + // The num rows of the array reader equals to the num rows of the length reader. + num_rows = meta.children_columns(1).num_rows(); std::unique_ptr<ColumnReader> array_reader( new ColumnReader(opts, meta, num_rows, path_desc)); // array reader do not need to init @@ -127,7 +126,9 @@ Status ColumnReader::init() { "Bad file $0: invalid column index type $1", _path_desc.filepath, index_meta.type())); } } - if (!is_empty() && _ordinal_index_meta == nullptr) { + // ArrayColumnWriter writes a single empty array and flushes. In this scenario, + // the item writer doesn't write any data and the corresponding ordinal index is empty. + if (_ordinal_index_meta == nullptr && !is_empty()) { return Status::Corruption(strings::Substitute( "Bad file $0: missing ordinal index for column $1", _path_desc.filepath, _meta.column_id())); } diff --git a/be/src/olap/types.cpp b/be/src/olap/types.cpp index 920c938..69761ad 100644 --- a/be/src/olap/types.cpp +++ b/be/src/olap/types.cpp @@ -98,6 +98,19 @@ bool is_scalar_type(FieldType field_type) { } } +bool is_olap_string_type(FieldType field_type) { + switch (field_type) { + case OLAP_FIELD_TYPE_CHAR: + case OLAP_FIELD_TYPE_VARCHAR: + case OLAP_FIELD_TYPE_HLL: + case OLAP_FIELD_TYPE_OBJECT: + case OLAP_FIELD_TYPE_STRING: + return true; + default: + return false; + } +} + std::shared_ptr<const TypeInfo> get_scalar_type_info(FieldType field_type) { return ScalarTypeInfoResolver::instance()->get_type_info(field_type); } diff --git a/be/src/olap/types.h b/be/src/olap/types.h index abfe4b6..e007df9 100644 --- a/be/src/olap/types.h +++ b/be/src/olap/types.h @@ -44,6 +44,8 @@ namespace doris { class TabletColumn; +extern bool is_olap_string_type(FieldType field_type); + class TypeInfo { public: virtual ~TypeInfo() = default; @@ -254,6 +256,11 @@ public: auto dest_value = reinterpret_cast<CollectionValue*>(dest); auto src_value = reinterpret_cast<const CollectionValue*>(src); + if (src_value->length() == 0) { + new (dest_value) CollectionValue(src_value->length()); + return; + } + dest_value->set_length(src_value->length()); size_t item_size = src_value->length() * _item_size; @@ -284,20 +291,49 @@ public: inline void direct_copy(void* dest, const void* src) const override { auto dest_value = reinterpret_cast<CollectionValue*>(dest); + // NOTICE: The address pointed by null_signs of the dest_value can NOT be modified here. + auto base = reinterpret_cast<uint8_t*>(dest_value->mutable_null_signs()); + direct_copy(&base, dest, src); + } + + inline void direct_copy(uint8_t** base, void* dest, const void* src) const { + auto dest_value = reinterpret_cast<CollectionValue*>(dest); auto src_value = reinterpret_cast<const CollectionValue*>(src); + + auto nulls_size = src_value->has_null() ? src_value->length() : 0; + dest_value->set_data(src_value->length() ? (*base + nulls_size) : nullptr); dest_value->set_length(src_value->length()); dest_value->set_has_null(src_value->has_null()); if (src_value->has_null()) { // direct copy null_signs + dest_value->set_null_signs(reinterpret_cast<bool*>(*base)); memory_copy(dest_value->mutable_null_signs(), src_value->null_signs(), src_value->length()); } - // direct opy item - for (uint32_t i = 0; i < src_value->length(); ++i) { - if (dest_value->is_null_at(i)) continue; - _item_type_info->direct_copy((uint8_t*)(dest_value->mutable_data()) + i * _item_size, - (uint8_t*)(src_value->data()) + i * _item_size); + *base += nulls_size + src_value->length() * _item_type_info->size(); + // direct copy item + if (_item_type_info->type() == OLAP_FIELD_TYPE_ARRAY) { + for (uint32_t i = 0; i < src_value->length(); ++i) { + if (dest_value->is_null_at(i)) continue; + dynamic_cast<const ArrayTypeInfo*>(_item_type_info.get()) + ->direct_copy(base, (uint8_t*)(dest_value->mutable_data()) + i * _item_size, + (uint8_t*)(src_value->data()) + i * _item_size); + } + } else { + for (uint32_t i = 0; i < src_value->length(); ++i) { + if (dest_value->is_null_at(i)) continue; + auto dest_address = (uint8_t*)(dest_value->mutable_data()) + i * _item_size; + auto src_address = (uint8_t*)(src_value->data()) + i * _item_size; + if (is_olap_string_type(_item_type_info->type())) { + auto dest_slice = reinterpret_cast<Slice*>(dest_address); + auto src_slice = reinterpret_cast<const Slice*>(src_address); + dest_slice->data = reinterpret_cast<char*>(*base); + dest_slice->size = src_slice->size; + *base += src_slice->size; + } + _item_type_info->direct_copy(dest_address, src_address); + } } } @@ -1075,8 +1111,7 @@ struct FieldTypeTraits<OLAP_FIELD_TYPE_VARCHAR> : public FieldTypeTraits<OLAP_FI case OLAP_FIELD_TYPE_DOUBLE: case OLAP_FIELD_TYPE_DECIMAL: { auto result = src_type->to_string(src); - if (result.size() > variable_len) - return OLAP_ERR_INPUT_PARAMETER_ERROR; + if (result.size() > variable_len) return OLAP_ERR_INPUT_PARAMETER_ERROR; auto slice = reinterpret_cast<Slice*>(dest); slice->data = reinterpret_cast<char*>(mem_pool->allocate(result.size())); memcpy(slice->data, result.c_str(), result.size()); diff --git a/be/src/runtime/collection_value.cpp b/be/src/runtime/collection_value.cpp index 9b7ea7d..f738695 100644 --- a/be/src/runtime/collection_value.cpp +++ b/be/src/runtime/collection_value.cpp @@ -17,10 +17,24 @@ #include "runtime/collection_value.h" +#include <functional> + #include "common/logging.h" +#include "common/utils.h" #include "exprs/anyval_util.h" +#include "exprs/literal.h" +#include "runtime/descriptors.h" +#include "util//mem_util.hpp" namespace doris { + +using AllocateMemFunc = std::function<uint8_t* (size_t size)>; +static Status init_collection( + CollectionValue* value, + const AllocateMemFunc& allocate, + uint32_t size, + PrimitiveType child_type); + int sizeof_type(PrimitiveType type) { switch (type) { case TYPE_TINYINT: @@ -32,6 +46,8 @@ int sizeof_type(PrimitiveType type) { case TYPE_CHAR: case TYPE_VARCHAR: return sizeof(StringValue); + case TYPE_ARRAY: + return sizeof(CollectionValue); case TYPE_NULL: return 0; default: @@ -50,6 +66,7 @@ Status type_check(PrimitiveType type) { case TYPE_CHAR: case TYPE_VARCHAR: case TYPE_NULL: + case TYPE_ARRAY: break; default: return Status::InvalidArgument(fmt::format("Type not implemented: {}", type)); @@ -80,73 +97,221 @@ void CollectionValue::copy_null_signs(const CollectionValue* other) { } } +size_t CollectionValue::get_byte_size(const TypeDescriptor& type) const { + size_t result = 0; + if (_length == 0) { + return result; + } + if (_has_null) { + result += _length * sizeof(bool); + } + const auto& item_type = type.children[0]; + result += _length * item_type.get_slot_size(); + if (item_type.is_string_type()) { + for (int i = 0; i < _length; ++ i) { + if (is_null_at(i)) { + continue; + } + int item_offset = i * item_type.get_slot_size(); + StringValue* item = reinterpret_cast<StringValue*>(((uint8_t*)_data) + item_offset); + result += item->len; + } + } else if (item_type.type == TYPE_ARRAY) { + for (int i = 0; i < _length; ++ i) { + if (is_null_at(i)) { + continue; + } + int item_offset = i * item_type.get_slot_size(); + CollectionValue* item = reinterpret_cast<CollectionValue*>(((uint8_t*)_data) + item_offset); + result += item->get_byte_size(item_type); + } + } + return result; +} + ArrayIterator CollectionValue::iterator(PrimitiveType children_type) const { return ArrayIterator(children_type, this); } Status CollectionValue::init_collection(ObjectPool* pool, uint32_t size, PrimitiveType child_type, - CollectionValue* val) { - if (val == nullptr) { + CollectionValue* value) { + return doris::init_collection(value, [pool](size_t size) -> uint8_t* { + return pool->add_array(new uint8_t[size]); + }, + size, child_type + ); +} + +static Status init_collection( + CollectionValue* value, + const AllocateMemFunc& allocate, + uint32_t size, + PrimitiveType child_type) { + if (value == nullptr) { return Status::InvalidArgument("collection value is null"); } RETURN_IF_ERROR(type_check(child_type)); if (size == 0) { + new (value) CollectionValue(size); return Status::OK(); } - val->_length = size; - val->_null_signs = pool->add_array(new bool[size] {0}); - val->_data = pool->add_array(new uint8_t[size * sizeof_type(child_type)]); + value->_data = allocate(size * sizeof_type(child_type)); + value->_length = size; + value->_has_null = false; + value->_null_signs = reinterpret_cast<bool*>(allocate(size)); + memset(value->_null_signs, 0, size * sizeof(bool)); return Status::OK(); } Status CollectionValue::init_collection(MemPool* pool, uint32_t size, PrimitiveType child_type, - CollectionValue* val) { - if (val == nullptr) { - return Status::InvalidArgument("collection value is null"); - } - - RETURN_IF_ERROR(type_check(child_type)); - - if (size == 0) { - return Status::OK(); - } - - val->_length = size; - val->_null_signs = (bool*)pool->allocate(size * sizeof(bool)); - memset(val->_null_signs, 0, size); + CollectionValue* value) { + return doris::init_collection(value, [pool](size_t size) { + return pool->allocate(size); + }, + size, child_type + ); +} - val->_data = pool->allocate(sizeof_type(child_type) * size); +Status CollectionValue::init_collection(FunctionContext* context, uint32_t size, + PrimitiveType child_type, CollectionValue* value) { + return doris::init_collection(value, [context](size_t size) { + return context->allocate(size); + }, + size, child_type + ); +} - return Status::OK(); +CollectionValue CollectionValue::from_collection_val(const CollectionVal& val) { + return CollectionValue(val.data, val.length, val.has_null, val.null_signs); } -Status CollectionValue::init_collection(FunctionContext* context, uint32_t size, - PrimitiveType child_type, CollectionValue* val) { - if (val == nullptr) { - return Status::InvalidArgument("collection value is null"); +// Deep copy collection. +// NOTICE: The CollectionValue* shallow_copied_cv must be initialized by calling memcpy function first ( +// copy data from origin collection value). +void CollectionValue::deep_copy_collection( + CollectionValue* shallow_copied_cv, + const TypeDescriptor& item_type, + const GenMemFootprintFunc& gen_mem_footprint, + bool convert_ptrs) { + CollectionValue* cv = shallow_copied_cv; + if (cv->length() == 0) { + return; } - RETURN_IF_ERROR(type_check(child_type)); + int coll_byte_size = cv->length() * item_type.get_slot_size(); + int nulls_size = cv->has_null() ? cv->length() * sizeof(bool) : 0; - if (size == 0) { - return Status::OK(); + MemFootprint footprint = gen_mem_footprint(coll_byte_size + nulls_size); + int64_t offset = footprint.first; + char* coll_data = reinterpret_cast<char*>(footprint.second); + + // copy and assign null_signs + if (cv->has_null()) { + memory_copy(convert_to<bool*>(coll_data), cv->null_signs(), nulls_size); + cv->set_null_signs(convert_to<bool*>(coll_data)); + } else { + cv->set_null_signs(nullptr); } + // copy and assgin data + memory_copy(coll_data + nulls_size, cv->data(), coll_byte_size); + cv->set_data(coll_data + nulls_size); - val->_length = size; - val->_null_signs = (bool*)context->allocate(size * sizeof(bool)); - memset(val->_null_signs, 0, size); + deep_copy_items_in_collection(cv, coll_data, item_type, gen_mem_footprint, convert_ptrs); - val->_data = context->allocate(sizeof_type(child_type) * size); + if (convert_ptrs) { + cv->set_data(convert_to<char*>(offset + nulls_size)); + if (cv->has_null()) { + cv->set_null_signs(convert_to<bool*>(offset)); + } + } +} - return Status::OK(); +// Deep copy items in collection. +// NOTICE: The CollectionValue* shallow_copied_cv must be initialized by calling memcpy function first ( +// copy data from origin collection value). +void CollectionValue::deep_copy_items_in_collection( + CollectionValue* shallow_copied_cv, + char* base, + const TypeDescriptor& item_type, + const GenMemFootprintFunc& gen_mem_footprint, + bool convert_ptrs) { + int nulls_size = shallow_copied_cv->has_null() ? shallow_copied_cv->length() : 0; + char* item_base = base + nulls_size; + if (item_type.is_string_type()) { + // when itemtype is string, copy every string item + for (int i = 0; i < shallow_copied_cv->length(); ++ i) { + if (shallow_copied_cv->is_null_at(i)) { + continue; + } + char* item_offset = item_base + i * item_type.get_slot_size(); + StringValue* dst_item_v = convert_to<StringValue*>(item_offset); + if (dst_item_v->len != 0) { + MemFootprint footprint = gen_mem_footprint(dst_item_v->len); + int64_t offset = footprint.first; + char* string_copy = reinterpret_cast<char*>(footprint.second); + memory_copy(string_copy, dst_item_v->ptr, dst_item_v->len); + dst_item_v->ptr = (convert_ptrs ? convert_to<char*>(offset) : string_copy); + } + } + } else if (item_type.type == TYPE_ARRAY) { + for (int i = 0; i < shallow_copied_cv->length(); ++ i) { + if (shallow_copied_cv->is_null_at(i)) { + continue; + } + char* item_offset = item_base + i * item_type.get_slot_size(); + CollectionValue* item_cv = convert_to<CollectionValue*>(item_offset); + deep_copy_collection(item_cv, item_type.children[0], gen_mem_footprint, convert_ptrs); + } + } } -CollectionValue CollectionValue::from_collection_val(const CollectionVal& val) { - return CollectionValue(val.data, val.length, val.has_null, val.null_signs); +void CollectionValue::deserialize_collection( + CollectionValue* cv, + const char* tuple_data, + const TypeDescriptor& type) { + if (cv->length() == 0) { + new (cv) CollectionValue(cv->length()); + return; + } + // assgin data and null_sign pointer position in tuple_data + int data_offset = convert_to<int>(cv->data()); + cv->set_data(convert_to<char*>(tuple_data + data_offset)); + if (cv->has_null()) { + int null_offset = convert_to<int>(cv->null_signs()); + cv->set_null_signs(convert_to<bool*>(tuple_data + null_offset)); + } + + const TypeDescriptor& item_type = type.children[0]; + if (item_type.is_string_type()) { + // copy every string item + for (size_t i = 0; i < cv->length(); ++i) { + if (cv->is_null_at(i)) { + continue; + } + + StringValue* dst_item_v = convert_to<StringValue*>( + (uint8_t*)cv->data() + i * item_type.get_slot_size()); + + if (dst_item_v->len != 0) { + int offset = convert_to<int>(dst_item_v->ptr); + dst_item_v->ptr = convert_to<char*>(tuple_data + offset); + } + } + } else if (item_type.type == TYPE_ARRAY) { + for (size_t i = 0; i < cv->length(); ++i) { + if (cv->is_null_at(i)) { + continue; + } + + CollectionValue* item_cv = convert_to<CollectionValue*>( + (uint8_t*)cv->data() + i * item_type.get_slot_size()); + deserialize_collection(item_cv, tuple_data, item_type); + } + } } Status CollectionValue::set(uint32_t i, PrimitiveType type, const AnyVal* value) { @@ -183,6 +348,12 @@ Status CollectionValue::set(uint32_t i, PrimitiveType type, const AnyVal* value) dest->ptr = (char*)src->ptr; break; } + case TYPE_ARRAY: { + const CollectionVal* src = reinterpret_cast<const CollectionVal*>(value); + CollectionValue* dest = reinterpret_cast<CollectionValue*>(iter.value()); + *dest = CollectionValue::from_collection_val(*src); + break; + } default: DCHECK(false) << "Type not implemented: " << type; return Status::InvalidArgument("Type not implemented"); diff --git a/be/src/runtime/collection_value.h b/be/src/runtime/collection_value.h index ed9f4c8..6050c78 100644 --- a/be/src/runtime/collection_value.h +++ b/be/src/runtime/collection_value.h @@ -27,6 +27,10 @@ namespace doris { using doris_udf::AnyVal; +using MemFootprint = std::pair<int64_t, uint8_t*>; +using GenMemFootprintFunc = std::function<MemFootprint (int size)>; + +struct TypeDescriptor; class ArrayIterator; /** @@ -69,6 +73,8 @@ public: void copy_null_signs(const CollectionValue* other); + size_t get_byte_size(const TypeDescriptor& type) const; + ArrayIterator iterator(PrimitiveType children_type) const; /** @@ -81,17 +87,41 @@ public: * init collection, will alloc (children Type's size + 1) * (children Nums) memory */ static Status init_collection(ObjectPool* pool, uint32_t size, PrimitiveType child_type, - CollectionValue* val); + CollectionValue* value); static Status init_collection(MemPool* pool, uint32_t size, PrimitiveType child_type, - CollectionValue* val); + CollectionValue* value); static Status init_collection(FunctionContext* context, uint32_t size, PrimitiveType child_type, - CollectionValue* val); + CollectionValue* value); static CollectionValue from_collection_val(const CollectionVal& val); - const void* data() const { return _data; } + // Deep copy collection. + // NOTICE: The CollectionValue* shallow_copied_cv must be initialized by calling memcpy function first ( + // copy data from origin collection value). + static void deep_copy_collection( + CollectionValue* shallow_copied_cv, + const TypeDescriptor& item_type, + const GenMemFootprintFunc& gen_mem_footprint, + bool convert_ptrs); + + // Deep copy items in collection. + // NOTICE: The CollectionValue* shallow_copied_cv must be initialized by calling memcpy function first ( + // copy data from origin collection value). + static void deep_copy_items_in_collection( + CollectionValue* shallow_copied_cv, + char* base, + const TypeDescriptor& item_type, + const GenMemFootprintFunc& gen_mem_footprint, + bool convert_ptrs); + + static void deserialize_collection( + CollectionValue* cv, + const char* tuple_data, + const TypeDescriptor& type); + + inline const void* data() const { return _data; } inline bool has_null() const { return _has_null; } inline const bool* null_signs() const { return _null_signs; } inline void* mutable_data() { return _data; } diff --git a/be/src/runtime/mysql_result_writer.cpp b/be/src/runtime/mysql_result_writer.cpp index 229af18..a974741 100644 --- a/be/src/runtime/mysql_result_writer.cpp +++ b/be/src/runtime/mysql_result_writer.cpp @@ -165,10 +165,10 @@ int MysqlResultWriter::_add_row_value(int index, const TypeDescriptor& type, voi } case TYPE_ARRAY: { - auto children_type = type.children[0].type; + auto children_type = type.children[0]; auto array_value = (const CollectionValue*)(item); - ArrayIterator iter = array_value->iterator(children_type); + ArrayIterator iter = array_value->iterator(children_type.type); _row_buffer->open_dynamic_mode(); @@ -179,14 +179,13 @@ int MysqlResultWriter::_add_row_value(int index, const TypeDescriptor& type, voi if (begin != 0) { buf_ret = _row_buffer->push_string(", ", 2); } - - if (children_type == TYPE_CHAR || children_type == TYPE_VARCHAR) { - buf_ret = _row_buffer->push_string("'", 1); - buf_ret = _add_row_value(index, children_type, iter.value()); - buf_ret = _row_buffer->push_string("'", 1); + if (!iter.value()) { + buf_ret = _row_buffer->push_string("NULL", 4); } else { - if (!iter.value()) { - buf_ret = _row_buffer->push_string("NULL", 4); + if (children_type == TYPE_CHAR || children_type == TYPE_VARCHAR) { + buf_ret = _row_buffer->push_string("'", 1); + buf_ret = _add_row_value(index, children_type, iter.value()); + buf_ret = _row_buffer->push_string("'", 1); } else { buf_ret = _add_row_value(index, children_type, iter.value()); } diff --git a/be/src/runtime/raw_value.cpp b/be/src/runtime/raw_value.cpp index ba51616..6d8641e 100644 --- a/be/src/runtime/raw_value.cpp +++ b/be/src/runtime/raw_value.cpp @@ -319,10 +319,10 @@ void RawValue::write(const void* value, void* dst, const TypeDescriptor& type, M CollectionValue* val = reinterpret_cast<CollectionValue*>(dst); if (pool != nullptr) { - auto children_type = type.children.at(0).type; - CollectionValue::init_collection(pool, src->size(), children_type, val); - ArrayIterator src_iter = src->iterator(children_type); - ArrayIterator val_iter = val->iterator(children_type); + const auto& item_type = type.children[0]; + CollectionValue::init_collection(pool, src->size(), item_type.type, val); + ArrayIterator src_iter = src->iterator(item_type.type); + ArrayIterator val_iter = val->iterator(item_type.type); val->set_has_null(src->has_null()); val->copy_null_signs(src); @@ -330,7 +330,7 @@ void RawValue::write(const void* value, void* dst, const TypeDescriptor& type, M while (src_iter.has_next() && val_iter.has_next()) { if (!src_iter.is_null()) { // write children - write(src_iter.value(), val_iter.value(), children_type, pool); + write(src_iter.value(), val_iter.value(), item_type, pool); } src_iter.next(); val_iter.next(); diff --git a/be/src/runtime/row_batch.cpp b/be/src/runtime/row_batch.cpp index 73fc3aa..ce02ff0 100644 --- a/be/src/runtime/row_batch.cpp +++ b/be/src/runtime/row_batch.cpp @@ -175,34 +175,7 @@ RowBatch::RowBatch(const RowDescriptor& row_desc, const PRowBatch& input_batch, CollectionValue* array_val = tuple->get_collection_slot(slot_collection->tuple_offset()); - - // assgin data and null_sign pointer position in tuple_data - int data_offset = convert_to<int>(array_val->data()); - array_val->set_data(tuple_data + data_offset); - if (array_val->has_null()) { - int null_offset = convert_to<int>(array_val->null_signs()); - array_val->set_null_signs(convert_to<bool*>(tuple_data + null_offset)); - } - - const TypeDescriptor& item_type = slot_collection->type().children.at(0); - if (!item_type.is_string_type()) { - continue; - } - - // copy every string item - for (size_t k = 0; k < array_val->length(); ++k) { - if (array_val->is_null_at(k)) { - continue; - } - - StringValue* dst_item_v = convert_to<StringValue*>( - (uint8_t*)array_val->data() + k * item_type.get_slot_size()); - - if (dst_item_v->len != 0) { - int offset = convert_to<int>(dst_item_v->ptr); - dst_item_v->ptr = tuple_data + offset; - } - } + CollectionValue::deserialize_collection(array_val, tuple_data, slot_collection->type()); } } } @@ -599,26 +572,7 @@ size_t RowBatch::total_byte_size() const { // compute data null_signs size CollectionValue* array_val = tuple->get_collection_slot(slot_collection->tuple_offset()); - if (array_val->has_null()) { - result += array_val->length() * sizeof(bool); - } - - const TypeDescriptor& item_type = slot_collection->type().children.at(0); - result += array_val->length() * item_type.get_slot_size(); - - if (!item_type.is_string_type()) { - continue; - } - - // compute string type item size - for (int k = 0; k < array_val->length(); ++k) { - if (array_val->is_null_at(k)) { - continue; - } - StringValue* dst_item_v = convert_to<StringValue*>( - (uint8_t*)array_val->data() + k * item_type.get_slot_size()); - result += dst_item_v->len; - } + result += array_val->get_byte_size(slot_collection->type()); } } } diff --git a/be/src/runtime/tuple.cpp b/be/src/runtime/tuple.cpp index 75ae9ce..4644426 100644 --- a/be/src/runtime/tuple.cpp +++ b/be/src/runtime/tuple.cpp @@ -17,6 +17,7 @@ #include "runtime/tuple.h" +#include <functional> #include <iomanip> #include <iostream> #include <sstream> @@ -36,6 +37,12 @@ namespace doris { +static void deep_copy_collection_slots( + Tuple* shallow_copied_tuple, + const TupleDescriptor& desc, + const GenMemFootprintFunc& gen_mem_footprint, + bool convert_ptrs); + int64_t Tuple::total_byte_size(const TupleDescriptor& desc) const { int64_t result = desc.byte_size(); if (!desc.has_varlen_slots()) { @@ -87,50 +94,33 @@ void Tuple::deep_copy(Tuple* dst, const TupleDescriptor& desc, MemPool* pool, bo } // copy collection slot + deep_copy_collection_slots(dst, desc, [pool](int size) ->MemFootprint { + int64_t offset = pool->total_allocated_bytes(); + uint8_t* data = pool->allocate(size); + return { offset, data }; + }, + convert_ptrs + ); +} + +// Deep copy collection slots. +// NOTICE: The Tuple* shallow_copied_tuple must be initialized by calling memcpy function first ( +// copy data from origin tuple). +static void deep_copy_collection_slots( + Tuple* shallow_copied_tuple, + const TupleDescriptor& desc, + const GenMemFootprintFunc& gen_mem_footprint, + bool convert_ptrs) { for (auto slot_desc : desc.collection_slots()) { DCHECK(slot_desc->type().is_collection_type()); - if (dst->is_null(slot_desc->null_indicator_offset())) { + if (shallow_copied_tuple->is_null(slot_desc->null_indicator_offset())) { continue; } // copy collection item - CollectionValue* cv = dst->get_collection_slot(slot_desc->tuple_offset()); - - const TypeDescriptor& item_type = slot_desc->type().children.at(0); - - int coll_byte_size = cv->length() * item_type.get_slot_size(); - int nulls_size = cv->has_null() ? cv->length() * sizeof(bool) : 0; - - int64_t offset = pool->total_allocated_bytes(); - char* coll_data = (char*)(pool->allocate(coll_byte_size + nulls_size)); - - // copy data and null_signs - memory_copy(convert_to<bool*>(coll_data), cv->null_signs(), nulls_size); - memory_copy(coll_data + nulls_size, cv->data(), coll_byte_size); - - // assgin new null_sign and data location - if (cv->has_null()) { - cv->set_null_signs(convert_ptrs ? convert_to<bool*>(offset) : convert_to<bool*>(coll_data)); - } - cv->set_data(convert_ptrs ? convert_to<char*>(offset + nulls_size) : coll_data + nulls_size); - - if (!item_type.is_string_type()) { - continue; - } - // when itemtype is string, copy every string item - for (int i = 0; i < cv->length(); ++i) { - int item_offset = nulls_size + i * item_type.get_slot_size(); - if (cv->is_null_at(i)) { - continue; - } - StringValue* dst_item_v = convert_to<StringValue*>(coll_data + item_offset); - if (dst_item_v->len != 0) { - int64_t offset = pool->total_allocated_bytes(); - char* string_copy = (char*)(pool->allocate(dst_item_v->len)); - memory_copy(string_copy, dst_item_v->ptr, dst_item_v->len); - dst_item_v->ptr = (convert_ptrs ? convert_to<char*>(offset) : string_copy); - } - } + CollectionValue* cv = shallow_copied_tuple->get_collection_slot(slot_desc->tuple_offset()); + CollectionValue::deep_copy_collection( + cv, slot_desc->type().children[0], gen_mem_footprint, convert_ptrs); } } @@ -197,61 +187,14 @@ void Tuple::deep_copy(const TupleDescriptor& desc, char** data, int64_t* offset, } // copy collection slots - for (auto slot_desc : desc.collection_slots()) { - DCHECK(slot_desc->type().is_collection_type()); - if (dst->is_null(slot_desc->null_indicator_offset())) { - continue; - } - // get cv to copy elements - CollectionValue* cv = dst->get_collection_slot(slot_desc->tuple_offset()); - const TypeDescriptor& item_type = slot_desc->type().children.at(0); - - int coll_byte_size = cv->length() * item_type.get_slot_size(); - int nulls_size = cv->has_null() ? cv->length() * sizeof(bool) : 0; - - // copy null_sign - memory_copy(*data, cv->null_signs(), nulls_size); - // copy data - memory_copy(*data + nulls_size, cv->data(), coll_byte_size); - - if (!item_type.is_string_type()) { - if (cv->has_null()) { - cv->set_null_signs(convert_ptrs ? convert_to<bool*>(*offset) : convert_to<bool*>(*data)); - } - cv->set_data(convert_ptrs ? convert_to<char*>(*offset + nulls_size) - : *data + nulls_size); - *data += coll_byte_size + nulls_size; - *offset += coll_byte_size + nulls_size; - continue; - } - - // when item is string type, copy every item - char* base_data = *data; - int64_t base_offset = *offset; - - *data += coll_byte_size + nulls_size; - *offset += coll_byte_size + nulls_size; - - for (int i = 0; i < cv->length(); ++i) { - int item_offset = nulls_size + i * item_type.get_slot_size(); - if (cv->is_null_at(i)) { - continue; - } - StringValue* dst_item_v = convert_to<StringValue*>(base_data + item_offset); - if (dst_item_v->len != 0) { - memory_copy(*data, dst_item_v->ptr, dst_item_v->len); - dst_item_v->ptr = (convert_ptrs ? convert_to<char*>(*offset) : *data); - *data += dst_item_v->len; - *offset += dst_item_v->len; - } - } - // assgin new null_sign and data location - if (cv->has_null()) { - cv->set_null_signs(convert_ptrs ? convert_to<bool*>(base_offset) : convert_to<bool*>(base_data)); - } - cv->set_data(convert_ptrs ? convert_to<char*>(base_offset + nulls_size) - : base_data + nulls_size); - } + deep_copy_collection_slots(dst, desc, [offset, data](int size) -> MemFootprint { + MemFootprint footprint = { *offset, reinterpret_cast<uint8_t*>(*data) }; + *offset += size; + *data += size; + return footprint; + }, + convert_ptrs + ); } template <bool collect_string_vals> diff --git a/be/test/runtime/CMakeLists.txt b/be/test/runtime/CMakeLists.txt index 28eff33..06a1214 100644 --- a/be/test/runtime/CMakeLists.txt +++ b/be/test/runtime/CMakeLists.txt @@ -64,3 +64,4 @@ ADD_BE_TEST(memory/system_allocator_test) ADD_BE_TEST(cache/partition_cache_test) ADD_BE_TEST(collection_value_test) #ADD_BE_TEST(minidump_test) +ADD_BE_TEST(array_test) diff --git a/be/test/runtime/array_test.cpp b/be/test/runtime/array_test.cpp new file mode 100644 index 0000000..57a5cdb --- /dev/null +++ b/be/test/runtime/array_test.cpp @@ -0,0 +1,556 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <glog/logging.h> +#include <gtest/gtest.h> +#include <rapidjson/document.h> +#include <rapidjson/rapidjson.h> + +#include <functional> +#include <memory> +#include <string> +#include <vector> + +#include "exprs/anyval_util.h" +#include "gen_cpp/olap_file.pb.h" +#include "gen_cpp/segment_v2.pb.h" +#include "olap/field.h" +#include "olap/fs/block_manager.h" +#include "olap/fs/fs_util.h" +#include "olap/rowset/segment_v2/column_reader.h" +#include "olap/rowset/segment_v2/column_writer.h" +#include "olap/tablet_schema.h" +#include "olap/types.h" +#include "runtime/descriptors.h" +#include "runtime/mem_pool.h" +#include "runtime/mem_tracker.h" +#include "runtime/raw_value.h" +#include "testutil/desc_tbl_builder.h" +#include "util/file_utils.h" +#include "util/uid_util.h" + +namespace doris { + +template <typename... Ts> +ColumnPB create_column_pb(const std::string& type, const Ts&... sub_column_types) { + ColumnPB column; + column.set_type(type); + column.set_aggregation("NONE"); + column.set_is_nullable(true); + if (type == "ARRAY") { + column.set_length(OLAP_ARRAY_MAX_BYTES); + } + if constexpr (sizeof...(sub_column_types) > 0) { + auto sub_column = create_column_pb(sub_column_types...); + column.add_children_columns()->Swap(&sub_column); + } + return column; +} + +std::shared_ptr<const TypeInfo> get_type_info(const ColumnPB& column_pb) { + TabletColumn tablet_column; + tablet_column.init_from_pb(column_pb); + return get_type_info(&tablet_column); +} + +std::unique_ptr<Field> create_field(const ColumnPB& column_pb) { + TabletColumn column; + column.init_from_pb(column_pb); + return std::unique_ptr<Field>(FieldFactory::create(column)); +} + +TypeDescriptor get_scalar_type_desc(const TypeInfo* type_info) { + switch (type_info->type()) { + case OLAP_FIELD_TYPE_INT: + return TypeDescriptor(TYPE_INT); + case OLAP_FIELD_TYPE_VARCHAR: + return TypeDescriptor::create_varchar_type(TypeDescriptor::MAX_VARCHAR_LENGTH); + default: + return TypeDescriptor(); + } +} + +TupleDescriptor* get_tuple_descriptor(ObjectPool& object_pool, const TypeInfo* type_info) { + DescriptorTblBuilder builder(&object_pool); + auto& tuple_desc_builder = builder.declare_tuple(); + if (type_info->type() == OLAP_FIELD_TYPE_ARRAY) { + TypeDescriptor type_desc(TYPE_ARRAY); + type_desc.len = OLAP_ARRAY_MAX_BYTES; + auto ptype = dynamic_cast<const ArrayTypeInfo*>(type_info)->item_type_info().get(); + while (ptype->type() == OLAP_FIELD_TYPE_ARRAY) { + type_desc.children.push_back(TypeDescriptor(TYPE_ARRAY)); + ptype = dynamic_cast<const ArrayTypeInfo*>(ptype)->item_type_info().get(); + } + type_desc.children.push_back(get_scalar_type_desc(ptype)); + tuple_desc_builder << type_desc; + } else { + tuple_desc_builder << get_scalar_type_desc(type_info); + } + return builder.build()->get_tuple_descriptor(0); +} + +CollectionValue* parse(ObjectPool& object_pool, + const rapidjson::GenericValue<rapidjson::UTF8<>>::ConstArray& json_array, + const TypeDescriptor& type_desc) { + if (json_array.Empty()) { + return object_pool.add(new CollectionValue(0)); + } else { + auto array = object_pool.add(new CollectionValue()); + const auto& item_type_desc = type_desc.children[0]; + CollectionValue::init_collection(&object_pool, json_array.Size(), item_type_desc.type, + array); + int index = 0; + switch (item_type_desc.type) { + case TYPE_ARRAY: + for (auto it = json_array.Begin(); it != json_array.End(); ++it) { + auto val = CollectionVal(); + if (it->IsNull()) { + val.is_null = true; + } else { + auto sub_array = parse(object_pool, it->GetArray(), item_type_desc); + sub_array->to_collection_val(&val); + } + array->set(index++, item_type_desc.type, &val); + } + break; + case TYPE_INT: + for (auto it = json_array.Begin(); it != json_array.End(); ++it) { + auto val = it->IsNull() ? IntVal::null() : IntVal(it->GetInt()); + array->set(index++, item_type_desc.type, &val); + } + break; + case TYPE_VARCHAR: + for (auto it = json_array.Begin(); it != json_array.End(); ++it) { + if (it->IsNull()) { + auto val = StringVal::null(); + array->set(index++, item_type_desc.type, &val); + } else { + char* string = object_pool.add_array(new char[it->GetStringLength()]); + memcpy(string, it->GetString(), it->GetStringLength()); + auto val = StringVal(reinterpret_cast<uint8_t*>(string), it->GetStringLength()); + array->set(index++, item_type_desc.type, &val); + } + } + break; + default: + break; + } + if (!array->has_null()) { + array->set_null_signs(nullptr); + } + return array; + } +} + +CollectionValue* parse(ObjectPool& object_pool, const std::string& text, + const TypeDescriptor& type_desc) { + rapidjson::Document document; + if (document.Parse(text.c_str()).HasParseError() || !document.IsArray()) { + return nullptr; + } + return parse(object_pool, (const_cast<const rapidjson::Document*>(&document))->GetArray(), + type_desc); +} + +void validate(const Field* field, const CollectionValue* expect, const CollectionValue* actual, + bool check_nullptr) { + EXPECT_TRUE(field->type_info()->equal(expect, actual)); + if (check_nullptr) { + if (expect->length() == 0) { + EXPECT_EQ(nullptr, actual->data()); + EXPECT_EQ(expect->data(), actual->data()); + } + if (!expect->has_null()) { + EXPECT_EQ(nullptr, expect->null_signs()); + EXPECT_EQ(expect->null_signs(), actual->null_signs()); + } + } +} + +void validate(const Field* field, const CollectionValue* expect, const CollectionValue* actual) { + validate(field, expect, actual, true); +} + +class ArrayTest : public ::testing::Test { +public: + ArrayTest() + : _mem_tracker(new MemTracker(MAX_MEMORY_BYTES, "ArrayTest")), + _mem_pool(new MemPool(_mem_tracker.get())) {} + +protected: + void SetUp() override { + if (FileUtils::check_exist(TEST_DIR)) { + ASSERT_TRUE(FileUtils::remove_all(TEST_DIR).ok()); + } + ASSERT_TRUE(FileUtils::create_dir(TEST_DIR).ok()); + } + + void TearDown() override { + if (FileUtils::check_exist(TEST_DIR)) { + ASSERT_TRUE(FileUtils::remove_all(TEST_DIR).ok()); + } + } + +private: + void test_copy_array(const TupleDescriptor* tuple_desc, const Field* field, + const CollectionValue* array) { + auto slot_desc = tuple_desc->slots().front(); + auto type_desc = slot_desc->type(); + auto total_size = tuple_desc->byte_size() + array->get_byte_size(type_desc); + + auto src = allocate_tuple(total_size); + ASSERT_NE(src, nullptr); + + RawValue::write(array, src, slot_desc, _mem_pool.get()); + auto src_cv = reinterpret_cast<CollectionValue*>(src->get_slot(slot_desc->tuple_offset())); + validate(field, array, src_cv); + + auto dst = allocate_tuple(total_size); + ASSERT_NE(dst, nullptr); + + src->deep_copy(dst, *tuple_desc, _mem_pool.get()); + auto dst_cv = reinterpret_cast<CollectionValue*>(dst->get_slot(slot_desc->tuple_offset())); + validate(field, src_cv, dst_cv); + + dst->init(total_size); + int64_t offset = 0; + char* serialized_data = reinterpret_cast<char*>(dst); + src->deep_copy(*tuple_desc, &serialized_data, &offset, true); + EXPECT_EQ(total_size, offset); + EXPECT_EQ(total_size, serialized_data - reinterpret_cast<char*>(dst)); + dst_cv = reinterpret_cast<CollectionValue*>(dst->get_slot(slot_desc->tuple_offset())); + CollectionValue::deserialize_collection(dst_cv, reinterpret_cast<char*>(dst), type_desc); + validate(field, src_cv, dst_cv); + } + + Tuple* allocate_tuple(size_t size) { + auto tuple = reinterpret_cast<Tuple*>(_mem_pool->allocate(size)); + if (tuple) { + tuple->init(size); + } + return tuple; + } + + void test_direct_copy_array(const Field* field, + const std::vector<const CollectionValue*>& arrays) { + CollectionValue cell; + std::unique_ptr<char[]> variable_ptr(new char[field->length()]); + field->allocate_memory(reinterpret_cast<char*>(&cell), variable_ptr.get()); + EXPECT_EQ(cell.null_signs(), reinterpret_cast<bool*>(variable_ptr.get())); + for (auto array : arrays) { + field->type_info()->direct_copy(&cell, array); + EXPECT_EQ(cell.null_signs(), reinterpret_cast<bool*>(variable_ptr.get())); + validate(field, array, &cell, false); + } + } + + template <segment_v2::EncodingTypePB array_encoding, segment_v2::EncodingTypePB item_encoding> + void test_write_and_read_column(const ColumnPB& column_pb, const Field* field, + const std::vector<const CollectionValue*>& arrays) { + const std::string path = TEST_DIR + "/" + generate_uuid_string(); + LOG(INFO) << "Test directory: " << path; + segment_v2::ColumnMetaPB meta; + init_column_meta<array_encoding, item_encoding>(&meta, column_pb); + { + auto wblock = create_writable_block(path); + ASSERT_NE(wblock, nullptr); + auto writer = create_column_writer<array_encoding, item_encoding>(wblock.get(), meta, + column_pb); + ASSERT_NE(writer, nullptr); + Status st; + for (auto array : arrays) { + st = writer->append(false, const_cast<CollectionValue*>(array)); + ASSERT_TRUE(st.ok()); + } + ASSERT_TRUE(writer->finish().ok()); + ASSERT_TRUE(writer->write_data().ok()); + ASSERT_TRUE(writer->write_ordinal_index().ok()); + ASSERT_TRUE(writer->write_zone_map().ok()); + + ASSERT_TRUE(wblock->close().ok()); + } + { + auto reader = create_column_reader(path, meta, arrays.size()); + ASSERT_NE(reader, nullptr); + auto rblock = create_readable_block(path); + ASSERT_NE(rblock, nullptr); + OlapReaderStatistics stats; + std::unique_ptr<segment_v2::ColumnIterator> iter( + new_iterator(rblock.get(), &stats, reader.get())); + ASSERT_NE(iter, nullptr); + auto st = iter->seek_to_first(); + ASSERT_TRUE(st.ok()) << st.to_string(); + + auto tracker = std::make_shared<MemTracker>(); + MemPool pool(tracker.get()); + std::unique_ptr<ColumnVectorBatch> cvb; + ColumnVectorBatch::create(0, true, field->type_info(), const_cast<Field*>(field), &cvb); + ASSERT_NE(cvb, nullptr) << st.to_string(); + cvb->resize(1024); + ColumnBlock col(cvb.get(), &pool); + + int index = 0; + size_t rows_read = 1024; + do { + ColumnBlockView dst(&col); + st = iter->next_batch(&rows_read, &dst); + ASSERT_TRUE(st.ok()); + for (int i = 0; i < rows_read; ++i) { + validate(field, arrays[index++], + reinterpret_cast<const CollectionValue*>(col.cell_ptr(i)), false); + } + ASSERT_TRUE(st.ok()); + } while (rows_read >= 1024); + } + } + template <segment_v2::EncodingTypePB array_encoding, segment_v2::EncodingTypePB item_encoding> + void init_column_meta(segment_v2::ColumnMetaPB* meta, const ColumnPB& column_pb) { + int column_id = 0; + TabletColumn column; + column.init_from_pb(column_pb); + init_column_meta<array_encoding, item_encoding>(meta, &column_id, column); + } + + template <segment_v2::EncodingTypePB array_encoding, segment_v2::EncodingTypePB item_encoding> + void init_column_meta(segment_v2::ColumnMetaPB* meta, int* column_id, + const TabletColumn& column) { + meta->set_column_id(*column_id); + meta->set_unique_id((*column_id)++); + meta->set_type(column.type()); + meta->set_length(column.length()); + if (column.type() == OLAP_FIELD_TYPE_ARRAY) { + meta->set_encoding(array_encoding); + } else { + meta->set_encoding(item_encoding); + } + meta->set_compression(segment_v2::LZ4F); + meta->set_is_nullable(true); + for (uint32_t i = 0; i < column.get_subtype_count(); ++i) { + init_column_meta<array_encoding, item_encoding>(meta->add_children_columns(), column_id, + column.get_sub_column(i)); + } + } + + std::unique_ptr<fs::WritableBlock> create_writable_block(const std::string& path) { + std::unique_ptr<fs::WritableBlock> wblock; + fs::CreateBlockOptions fs_opts(path); + auto st = fs::fs_util::block_manager(TStorageMedium::HDD)->create_block(fs_opts, &wblock); + return st.ok() ? std::move(wblock) : nullptr; + } + + template <segment_v2::EncodingTypePB array_encoding, segment_v2::EncodingTypePB item_encoding> + std::unique_ptr<segment_v2::ColumnWriter> create_column_writer(fs::WritableBlock* wblock, + segment_v2::ColumnMetaPB& meta, + const ColumnPB& column_pb) { + segment_v2::ColumnWriterOptions writer_opts = {.meta = &meta}; + TabletColumn column; + column.init_from_pb(column_pb); + std::unique_ptr<segment_v2::ColumnWriter> writer; + auto st = segment_v2::ColumnWriter::create(writer_opts, &column, wblock, &writer); + if (!st.ok()) { + return nullptr; + } + st = writer->init(); + return st.ok() ? std::move(writer) : nullptr; + } + + std::unique_ptr<segment_v2::ColumnReader> create_column_reader( + const std::string& path, const segment_v2::ColumnMetaPB& meta, size_t num_rows) { + segment_v2::ColumnReaderOptions reader_opts; + FilePathDesc path_desc; + path_desc.filepath = path; + std::unique_ptr<segment_v2::ColumnReader> reader; + auto st = segment_v2::ColumnReader::create(reader_opts, meta, num_rows, path_desc, &reader); + return st.ok() ? std::move(reader) : nullptr; + } + + std::unique_ptr<fs::ReadableBlock> create_readable_block(const std::string& path) { + std::unique_ptr<fs::ReadableBlock> rblock; + FilePathDesc path_desc; + path_desc.filepath = path; + auto block_manager = fs::fs_util::block_manager(TStorageMedium::HDD); + auto st = block_manager->open_block(path_desc, &rblock); + return st.ok() ? std::move(rblock) : nullptr; + } + + segment_v2::ColumnIterator* new_iterator(fs::ReadableBlock* rblock, OlapReaderStatistics* stats, + segment_v2::ColumnReader* reader) { + segment_v2::ColumnIterator* iter = nullptr; + auto st = reader->new_iterator(&iter); + if (!st.ok()) { + return nullptr; + } + segment_v2::ColumnIteratorOptions iter_opts; + iter_opts.stats = stats; + iter_opts.rblock = rblock; + iter_opts.mem_tracker = std::make_shared<MemTracker>(); + st = iter->init(iter_opts); + return st.ok() ? iter : nullptr; + } + + template <segment_v2::EncodingTypePB array_encoding, segment_v2::EncodingTypePB item_encoding> + void test_array(const ColumnPB& column_pb, const Field* field, + const TupleDescriptor* tuple_desc, const CollectionValue* array) { + test_copy_array(tuple_desc, field, array); + test_direct_copy_array(field, {array}); + test_write_and_read_column<array_encoding, item_encoding>(column_pb, field, {array}); + } + +private: + static constexpr size_t MAX_MEMORY_BYTES = 1024 * 1024; + static const std::string TEST_DIR; + std::unique_ptr<MemTracker> _mem_tracker; + std::unique_ptr<MemPool> _mem_pool; + ObjectPool _object_pool; +}; + +const std::string ArrayTest::TEST_DIR = "./ut_dir/array_test"; + +TEST_F(ArrayTest, TestSimpleIntArrays) { + auto column_pb = create_column_pb("ARRAY", "INT"); + auto type_info = get_type_info(column_pb); + auto field = create_field(column_pb); + auto tuple_desc = get_tuple_descriptor(_object_pool, type_info.get()); + ASSERT_EQ(tuple_desc->slots().size(), 1); + auto type_desc = tuple_desc->slots().front()->type(); + + std::vector<const CollectionValue*> arrays = { + parse(_object_pool, "[]", type_desc), + parse(_object_pool, "[null]", type_desc), + parse(_object_pool, "[1, 2, 3]", type_desc), + parse(_object_pool, "[1, null, 3]", type_desc), + parse(_object_pool, "[1, null, null]", type_desc), + parse(_object_pool, "[null, null, 3]", type_desc), + parse(_object_pool, "[null, null, null]", type_desc), + }; + for (auto array : arrays) { + test_array<segment_v2::DEFAULT_ENCODING, segment_v2::BIT_SHUFFLE>(column_pb, field.get(), + tuple_desc, array); + } + test_direct_copy_array(field.get(), arrays); + test_write_and_read_column<segment_v2::DEFAULT_ENCODING, segment_v2::BIT_SHUFFLE>( + column_pb, field.get(), arrays); +} + +TEST_F(ArrayTest, TestNestedIntArrays) { + // depth 2 + auto column_pb = create_column_pb("ARRAY", "ARRAY", "INT"); + auto type_info = get_type_info(column_pb); + auto field = create_field(column_pb); + auto tuple_desc = get_tuple_descriptor(_object_pool, type_info.get()); + ASSERT_EQ(tuple_desc->slots().size(), 1); + auto type_desc = tuple_desc->slots().front()->type(); + + std::vector<const CollectionValue*> arrays = { + parse(_object_pool, "[]", type_desc), + parse(_object_pool, "[[]]", type_desc), + parse(_object_pool, "[[1, 2, 3], [4, 5, 6]]", type_desc), + parse(_object_pool, "[[1, 2, 3], null, [4, 5, 6]]", type_desc), + parse(_object_pool, "[[1, 2, null], null, [4, null, 6], null, [null, 8, 9]]", + type_desc), + }; + for (auto array : arrays) { + test_array<segment_v2::DEFAULT_ENCODING, segment_v2::BIT_SHUFFLE>(column_pb, field.get(), + tuple_desc, array); + } + test_direct_copy_array(field.get(), arrays); + test_write_and_read_column<segment_v2::DEFAULT_ENCODING, segment_v2::BIT_SHUFFLE>( + column_pb, field.get(), arrays); + + // depth 3 + column_pb = create_column_pb("ARRAY", "ARRAY", "ARRAY", "INT"); + type_info = get_type_info(column_pb); + field = create_field(column_pb); + tuple_desc = get_tuple_descriptor(_object_pool, type_info.get()); + ASSERT_EQ(tuple_desc->slots().size(), 1); + type_desc = tuple_desc->slots().front()->type(); + arrays.clear(); + ASSERT_EQ(arrays.size(), 0); + + arrays = { + parse(_object_pool, "[]", type_desc), + parse(_object_pool, "[[]]", type_desc), + parse(_object_pool, "[[[]]]", type_desc), + parse(_object_pool, "[[[null]], [[1], [2, 3]], [[4, 5, 6], null, null]]", type_desc), + }; + for (auto array : arrays) { + test_array<segment_v2::DEFAULT_ENCODING, segment_v2::BIT_SHUFFLE>(column_pb, field.get(), + tuple_desc, array); + } + test_direct_copy_array(field.get(), arrays); + test_write_and_read_column<segment_v2::DEFAULT_ENCODING, segment_v2::BIT_SHUFFLE>( + column_pb, field.get(), arrays); +} + +TEST_F(ArrayTest, TestSimpleStringArrays) { + auto column_pb = create_column_pb("ARRAY", "VARCHAR"); + auto type_info = get_type_info(column_pb); + auto field = create_field(column_pb); + auto tuple_desc = get_tuple_descriptor(_object_pool, type_info.get()); + ASSERT_EQ(tuple_desc->slots().size(), 1); + auto type_desc = tuple_desc->slots().front()->type(); + + std::vector<const CollectionValue*> arrays = { + parse(_object_pool, "[]", type_desc), + parse(_object_pool, "[null]", type_desc), + parse(_object_pool, "[\"a\", \"b\", \"c\"]", type_desc), + parse(_object_pool, "[null, \"b\", \"c\"]", type_desc), + parse(_object_pool, "[\"a\", null, \"c\"]", type_desc), + parse(_object_pool, "[\"a\", \"b\", null]", type_desc), + parse(_object_pool, "[null, \"b\", null]", type_desc), + parse(_object_pool, "[null, null, null]", type_desc), + }; + for (auto array : arrays) { + test_array<segment_v2::DEFAULT_ENCODING, segment_v2::DICT_ENCODING>(column_pb, field.get(), + tuple_desc, array); + } + test_direct_copy_array(field.get(), arrays); + test_write_and_read_column<segment_v2::DEFAULT_ENCODING, segment_v2::DICT_ENCODING>( + column_pb, field.get(), arrays); +} + +TEST_F(ArrayTest, TestNestedStringArrays) { + auto column_pb = create_column_pb("ARRAY", "ARRAY", "ARRAY", "VARCHAR"); + auto type_info = get_type_info(column_pb); + auto field = create_field(column_pb); + auto tuple_desc = get_tuple_descriptor(_object_pool, type_info.get()); + ASSERT_EQ(tuple_desc->slots().size(), 1); + auto type_desc = tuple_desc->slots().front()->type(); + + std::vector<const CollectionValue*> arrays = { + parse(_object_pool, "[]", type_desc), + parse(_object_pool, "[[]]", type_desc), + parse(_object_pool, "[[[]]]", type_desc), + parse(_object_pool, "[null, [null], [[null]]]", type_desc), + parse(_object_pool, "[[[\"a\", null, \"c\"], [\"d\", \"e\", \"f\"]], null, [[\"g\"]]]", + type_desc), + }; + for (auto array : arrays) { + test_array<segment_v2::DEFAULT_ENCODING, segment_v2::DICT_ENCODING>(column_pb, field.get(), + tuple_desc, array); + } + test_direct_copy_array(field.get(), arrays); + test_write_and_read_column<segment_v2::DEFAULT_ENCODING, segment_v2::DICT_ENCODING>( + column_pb, field.get(), arrays); +} + +} // namespace doris + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java index eb45de6..2ab61c7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java @@ -373,6 +373,12 @@ public class Column implements Writable { childrenTColumnType.setIndexLen(children.getOlapColumnIndexSize()); childrenTColumn.setColumnType(childrenTColumnType); childrenTColumn.setIsAllowNull(children.isAllowNull()); + // TODO: If we don't set the aggregate type for children, the type will be + // considered as TAggregationType::SUM after deserializing in BE. + // For now, we make children inherit the aggregate type from their parent. + if (tColumn.getAggregationType() != null) { + childrenTColumn.setAggregationType(tColumn.getAggregationType()); + } tColumn.setChildrenColumn(new ArrayList<>()); tColumn.children_column.add(childrenTColumn); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org