This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push: new 19bc14cf8d [feature-wip](array-type) Add array type support for vectorized parquet-orc scanner (#9856) 19bc14cf8d is described below commit 19bc14cf8d96bd7990c0ad52d9356cf7e686a441 Author: yinzhijian <373141...@qq.com> AuthorDate: Thu Jun 9 12:11:47 2022 +0800 [feature-wip](array-type) Add array type support for vectorized parquet-orc scanner (#9856) Only support one level array now. for example: - nullable(array(nullable(tinyint))) is **support**. - nullable(array(nullable(array(xx))) is **not support**. --- be/src/exec/arrow/orc_reader.h | 4 +- be/src/exec/base_scanner.cpp | 1 + be/src/vec/data_types/data_type.cpp | 4 + be/src/vec/data_types/data_type.h | 2 + be/src/vec/data_types/data_type_array.cpp | 91 ++++++++++++++- be/src/vec/data_types/data_type_array.h | 4 + be/src/vec/data_types/data_type_date.cpp | 12 ++ be/src/vec/data_types/data_type_date.h | 1 + be/src/vec/data_types/data_type_date_time.cpp | 12 ++ be/src/vec/data_types/data_type_date_time.h | 2 + be/src/vec/data_types/data_type_decimal.cpp | 12 ++ be/src/vec/data_types/data_type_decimal.h | 1 + be/src/vec/data_types/data_type_factory.cpp | 5 + be/src/vec/data_types/data_type_factory.hpp | 43 ++++--- be/src/vec/data_types/data_type_nullable.cpp | 30 +++++ be/src/vec/data_types/data_type_nullable.h | 2 + be/src/vec/data_types/data_type_number_base.cpp | 28 +++++ be/src/vec/data_types/data_type_number_base.h | 1 + be/src/vec/data_types/data_type_string.cpp | 6 + be/src/vec/data_types/data_type_string.h | 1 + be/src/vec/exec/varrow_scanner.cpp | 2 +- be/src/vec/functions/function_cast.h | 110 +++++++++++++++++- be/src/vec/utils/arrow_column_to_doris_column.cpp | 47 +++++++- be/src/vec/utils/arrow_column_to_doris_column.h | 2 +- .../utils/arrow_column_to_doris_column_test.cpp | 125 ++++++++++++++++++++- 25 files changed, 511 insertions(+), 37 deletions(-) diff --git a/be/src/exec/arrow/orc_reader.h b/be/src/exec/arrow/orc_reader.h index 5213a18dcf..dd7853efe7 100644 --- a/be/src/exec/arrow/orc_reader.h +++ b/be/src/exec/arrow/orc_reader.h @@ -29,7 +29,7 @@ #include "exec/arrow/arrow_reader.h" namespace doris { -// Reader of orc file +// Reader of ORC file class ORCReaderWrap final : public ArrowReaderWrap { public: ORCReaderWrap(FileReader* file_reader, int64_t batch_size, int32_t num_of_columns_from_file); @@ -48,4 +48,4 @@ private: bool _cur_file_eof; // is read over? }; -} // namespace doris \ No newline at end of file +} // namespace doris diff --git a/be/src/exec/base_scanner.cpp b/be/src/exec/base_scanner.cpp index 005e64c703..11247a860c 100644 --- a/be/src/exec/base_scanner.cpp +++ b/be/src/exec/base_scanner.cpp @@ -329,6 +329,7 @@ Status BaseScanner::_materialize_dest_block(vectorized::Block* dest_block) { // PT1 => dest primitive type RETURN_IF_ERROR(ctx->execute(&_src_block, &result_column_id)); auto column_ptr = _src_block.get_by_position(result_column_id).column; + DCHECK(column_ptr != nullptr); // because of src_slot_desc is always be nullable, so the column_ptr after do dest_expr // is likely to be nullable diff --git a/be/src/vec/data_types/data_type.cpp b/be/src/vec/data_types/data_type.cpp index 0cffc102ff..414959a517 100644 --- a/be/src/vec/data_types/data_type.cpp +++ b/be/src/vec/data_types/data_type.cpp @@ -82,6 +82,10 @@ std::string IDataType::to_string(const IColumn& column, size_t row_num) const { LOG(FATAL) << fmt::format("Data type {} to_string not implement.", get_name()); return ""; } +Status IDataType::from_string(ReadBuffer& rb, IColumn* column) const { + LOG(FATAL) << fmt::format("Data type {} from_string not implement.", get_name()); + return Status::OK(); +} void IDataType::insert_default_into(IColumn& column) const { column.insert_default(); diff --git a/be/src/vec/data_types/data_type.h b/be/src/vec/data_types/data_type.h index 5e49fa90c6..6a4dc16899 100644 --- a/be/src/vec/data_types/data_type.h +++ b/be/src/vec/data_types/data_type.h @@ -28,6 +28,7 @@ #include "vec/common/cow.h" #include "vec/common/string_buffer.hpp" #include "vec/core/types.h" +#include "vec/io/reader_buffer.h" namespace doris { class PBlock; @@ -70,6 +71,7 @@ public: virtual void to_string(const IColumn& column, size_t row_num, BufferWritable& ostr) const; virtual std::string to_string(const IColumn& column, size_t row_num) const; + virtual Status from_string(ReadBuffer& rb, IColumn* column) const; protected: virtual String do_get_name() const; diff --git a/be/src/vec/data_types/data_type_array.cpp b/be/src/vec/data_types/data_type_array.cpp index 1f30b48aae..cc67eb7973 100644 --- a/be/src/vec/data_types/data_type_array.cpp +++ b/be/src/vec/data_types/data_type_array.cpp @@ -21,6 +21,7 @@ #include "vec/data_types/data_type_array.h" #include "gen_cpp/data.pb.h" +#include "vec/common/string_utils/string_utils.h" #include "vec/io/io_helper.h" namespace doris::vectorized { @@ -94,4 +95,92 @@ void DataTypeArray::to_pb_column_meta(PColumnMeta* col_meta) const { get_nested_type()->to_pb_column_meta(children); } -} // namespace doris::vectorized +void DataTypeArray::to_string(const IColumn& column, size_t row_num, BufferWritable& ostr) const { + auto& data_column = + assert_cast<const ColumnArray&>(*column.convert_to_full_column_if_const().get()); + auto& offsets = data_column.get_offsets(); + + size_t offset = offsets[row_num - 1]; + size_t next_offset = offsets[row_num]; + + const IColumn& nested_column = data_column.get_data(); + ostr.write("[", 1); + for (size_t i = offset; i < next_offset; ++i) { + if (i != offset) { + ostr.write(",", 1); + } + nested->to_string(nested_column, i, ostr); + } + ostr.write("]", 1); +} + +std::string DataTypeArray::to_string(const IColumn& column, size_t row_num) const { + auto& data_column = + assert_cast<const ColumnArray&>(*column.convert_to_full_column_if_const().get()); + auto& offsets = data_column.get_offsets(); + + size_t offset = offsets[row_num - 1]; + size_t next_offset = offsets[row_num]; + const IColumn& nested_column = data_column.get_data(); + std::stringstream ss; + ss << "["; + for (size_t i = offset; i < next_offset; ++i) { + if (i != offset) { + ss << ","; + } + ss << nested->to_string(nested_column, i); + } + ss << "]"; + return ss.str(); +} + +Status DataTypeArray::from_string(ReadBuffer& rb, IColumn* column) const { + // only support one level now + auto* array_column = assert_cast<ColumnArray*>(column); + auto& offsets = array_column->get_offsets(); + + IColumn& nested_column = array_column->get_data(); + if (*rb.position() != '[') { + return Status::InvalidArgument("Array does not start with '[' character, found '{}'", + *rb.position()); + } + ++rb.position(); + bool first = true; + size_t size = 0; + while (!rb.eof() && *rb.position() != ']') { + if (!first) { + if (*rb.position() == ',') { + ++rb.position(); + } else { + return Status::InvalidArgument(fmt::format( + "Cannot read array from text, expected comma or end of array, found '{}'", + *rb.position())); + } + } + first = false; + if (*rb.position() == ']') { + break; + } + size_t nested_str_len = 1; + char* temp_char = rb.position() + nested_str_len; + while (*(temp_char) != ']' && *(temp_char) != ',' && temp_char != rb.end()) { + ++nested_str_len; + temp_char = rb.position() + nested_str_len; + } + + ReadBuffer read_buffer(rb.position(), nested_str_len); + auto st = nested->from_string(read_buffer, &nested_column); + if (!st.ok()) { + // we should do revert if error + array_column->pop_back(size); + return st; + } + rb.position() += nested_str_len; + DCHECK_LE(rb.position(), rb.end()); + ++size; + } + offsets.push_back(offsets.back() + size); + return Status::OK(); +} + +} // namespace doris::vectorized \ No newline at end of file diff --git a/be/src/vec/data_types/data_type_array.h b/be/src/vec/data_types/data_type_array.h index 2cd9c45667..c87298b1f0 100644 --- a/be/src/vec/data_types/data_type_array.h +++ b/be/src/vec/data_types/data_type_array.h @@ -78,6 +78,10 @@ public: const char* deserialize(const char* buf, IColumn* column) const override; void to_pb_column_meta(PColumnMeta* col_meta) const override; + + std::string to_string(const IColumn& column, size_t row_num) const override; + void to_string(const IColumn& column, size_t row_num, BufferWritable& ostr) const override; + Status from_string(ReadBuffer& rb, IColumn* column) const override; }; } // namespace doris::vectorized diff --git a/be/src/vec/data_types/data_type_date.cpp b/be/src/vec/data_types/data_type_date.cpp index 2508570a53..3e261d0f99 100644 --- a/be/src/vec/data_types/data_type_date.cpp +++ b/be/src/vec/data_types/data_type_date.cpp @@ -23,6 +23,7 @@ #include "runtime/datetime_value.h" #include "util/binary_cast.hpp" #include "vec/columns/columns_number.h" +#include "vec/io/io_helper.h" #include "vec/runtime/vdatetime_value.h" namespace doris::vectorized { bool DataTypeDate::equals(const IDataType& rhs) const { @@ -59,6 +60,17 @@ void DataTypeDate::to_string(const IColumn& column, size_t row_num, BufferWritab ostr.write(buf, pos - buf - 1); } +Status DataTypeDate::from_string(ReadBuffer& rb, IColumn* column) const { + auto* column_data = static_cast<ColumnInt64*>(column); + Int64 val = 0; + if (!read_date_text_impl<Int64>(val, rb)) { + return Status::InvalidArgument(fmt::format("parse date fail, string: '{}'", + std::string(rb.position(), rb.count()).c_str())); + } + column_data->insert_value(val); + return Status::OK(); +} + void DataTypeDate::cast_to_date(Int64& x) { auto value = binary_cast<Int64, VecDateTimeValue>(x); value.cast_to_date(); diff --git a/be/src/vec/data_types/data_type_date.h b/be/src/vec/data_types/data_type_date.h index 06f8ff0d0e..98899db38b 100644 --- a/be/src/vec/data_types/data_type_date.h +++ b/be/src/vec/data_types/data_type_date.h @@ -36,6 +36,7 @@ public: bool equals(const IDataType& rhs) const override; std::string to_string(const IColumn& column, size_t row_num) const override; void to_string(const IColumn& column, size_t row_num, BufferWritable& ostr) const override; + Status from_string(ReadBuffer& rb, IColumn* column) const override; static void cast_to_date(Int64& x); diff --git a/be/src/vec/data_types/data_type_date_time.cpp b/be/src/vec/data_types/data_type_date_time.cpp index 5271091f87..b758390576 100644 --- a/be/src/vec/data_types/data_type_date_time.cpp +++ b/be/src/vec/data_types/data_type_date_time.cpp @@ -23,6 +23,7 @@ #include "runtime/datetime_value.h" #include "util/binary_cast.hpp" #include "vec/columns/columns_number.h" +#include "vec/io/io_helper.h" #include "vec/runtime/vdatetime_value.h" namespace doris::vectorized { @@ -82,6 +83,17 @@ void DataTypeDateTime::to_string(const IColumn& column, size_t row_num, ostr.write(buf, pos - buf - 1); } +Status DataTypeDateTime::from_string(ReadBuffer& rb, IColumn* column) const { + auto* column_data = static_cast<ColumnInt64*>(column); + Int64 val = 0; + if (!read_datetime_text_impl<Int64>(val, rb)) { + return Status::InvalidArgument(fmt::format("parse datetime fail, string: '{}'", + std::string(rb.position(), rb.count()).c_str())); + } + column_data->insert_value(val); + return Status::OK(); +} + void DataTypeDateTime::cast_to_date_time(Int64& x) { auto value = binary_cast<Int64, doris::vectorized::VecDateTimeValue>(x); value.to_datetime(); diff --git a/be/src/vec/data_types/data_type_date_time.h b/be/src/vec/data_types/data_type_date_time.h index adf41bfc8b..8085407253 100644 --- a/be/src/vec/data_types/data_type_date_time.h +++ b/be/src/vec/data_types/data_type_date_time.h @@ -64,6 +64,8 @@ public: void to_string(const IColumn& column, size_t row_num, BufferWritable& ostr) const override; + Status from_string(ReadBuffer& rb, IColumn* column) const override; + static void cast_to_date_time(Int64& x); MutableColumnPtr create_column() const override; diff --git a/be/src/vec/data_types/data_type_decimal.cpp b/be/src/vec/data_types/data_type_decimal.cpp index 8cd97a38de..04b8713da9 100644 --- a/be/src/vec/data_types/data_type_decimal.cpp +++ b/be/src/vec/data_types/data_type_decimal.cpp @@ -62,6 +62,18 @@ void DataTypeDecimal<T>::to_string(const IColumn& column, size_t row_num, ostr.write(str.data(), str.size()); } +template <typename T> +Status DataTypeDecimal<T>::from_string(ReadBuffer& rb, IColumn* column) const { + auto& column_data = static_cast<ColumnType&>(*column).get_data(); + T val = 0; + if (!read_decimal_text_impl<T>(val, rb)) { + return Status::InvalidArgument(fmt::format("parse decimal fail, string: '{}'", + std::string(rb.position(), rb.count()).c_str())); + } + column_data.emplace_back(val); + return Status::OK(); +} + // binary: row_num | value1 | value2 | ... template <typename T> int64_t DataTypeDecimal<T>::get_uncompressed_serialized_bytes(const IColumn& column) const { diff --git a/be/src/vec/data_types/data_type_decimal.h b/be/src/vec/data_types/data_type_decimal.h index 0613653324..7a2d9541e6 100644 --- a/be/src/vec/data_types/data_type_decimal.h +++ b/be/src/vec/data_types/data_type_decimal.h @@ -145,6 +145,7 @@ public: bool can_be_inside_nullable() const override { return true; } std::string to_string(const IColumn& column, size_t row_num) const override; void to_string(const IColumn& column, size_t row_num, BufferWritable& ostr) const override; + Status from_string(ReadBuffer& rb, IColumn* column) const override; /// Decimal specific diff --git a/be/src/vec/data_types/data_type_factory.cpp b/be/src/vec/data_types/data_type_factory.cpp index 2071b8cedb..3658270ec7 100644 --- a/be/src/vec/data_types/data_type_factory.cpp +++ b/be/src/vec/data_types/data_type_factory.cpp @@ -312,6 +312,11 @@ DataTypePtr DataTypeFactory::create_data_type(const arrow::DataType* type, bool case ::arrow::Type::DECIMAL: nested = std::make_shared<vectorized::DataTypeDecimal<vectorized::Decimal128>>(); break; + case ::arrow::Type::LIST: + DCHECK(type->num_fields() == 1); + nested = std::make_shared<vectorized::DataTypeArray>( + create_data_type(type->field(0)->type().get(), true)); + break; default: DCHECK(false) << "invalid arrow type:" << (int)(type->id()); break; diff --git a/be/src/vec/data_types/data_type_factory.hpp b/be/src/vec/data_types/data_type_factory.hpp index 968617414c..e64bdcf3ac 100644 --- a/be/src/vec/data_types/data_type_factory.hpp +++ b/be/src/vec/data_types/data_type_factory.hpp @@ -49,22 +49,33 @@ public: static std::once_flag oc; static DataTypeFactory instance; std::call_once(oc, []() { - instance.register_data_type("UInt8", std::make_shared<DataTypeUInt8>()); - instance.register_data_type("UInt16", std::make_shared<DataTypeUInt16>()); - instance.register_data_type("UInt32", std::make_shared<DataTypeUInt32>()); - instance.register_data_type("UInt64", std::make_shared<DataTypeUInt64>()); - instance.register_data_type("Int8", std::make_shared<DataTypeInt8>()); - instance.register_data_type("Int16", std::make_shared<DataTypeInt16>()); - instance.register_data_type("Int32", std::make_shared<DataTypeInt32>()); - instance.register_data_type("Int64", std::make_shared<DataTypeInt64>()); - instance.register_data_type("Int128", std::make_shared<DataTypeInt128>()); - instance.register_data_type("Float32", std::make_shared<DataTypeFloat32>()); - instance.register_data_type("Float64", std::make_shared<DataTypeFloat64>()); - instance.register_data_type("Date", std::make_shared<DataTypeDate>()); - instance.register_data_type("DateTime", std::make_shared<DataTypeDateTime>()); - instance.register_data_type("String", std::make_shared<DataTypeString>()); - instance.register_data_type("Decimal", - std::make_shared<DataTypeDecimal<Decimal128>>(27, 9)); + std::unordered_map<std::string, DataTypePtr> base_type_map { + {"UInt8", std::make_shared<DataTypeUInt8>()}, + {"UInt16", std::make_shared<DataTypeUInt16>()}, + {"UInt32", std::make_shared<DataTypeUInt32>()}, + {"UInt64", std::make_shared<DataTypeUInt64>()}, + {"Int8", std::make_shared<DataTypeInt8>()}, + {"Int16", std::make_shared<DataTypeInt16>()}, + {"Int32", std::make_shared<DataTypeInt32>()}, + {"Int64", std::make_shared<DataTypeInt64>()}, + {"Int128", std::make_shared<DataTypeInt128>()}, + {"Float32", std::make_shared<DataTypeFloat32>()}, + {"Float64", std::make_shared<DataTypeFloat64>()}, + {"Date", std::make_shared<DataTypeDate>()}, + {"DateTime", std::make_shared<DataTypeDateTime>()}, + {"String", std::make_shared<DataTypeString>()}, + {"Decimal", std::make_shared<DataTypeDecimal<Decimal128>>(27, 9)}, + + }; + for (auto const& [key, val] : base_type_map) { + instance.register_data_type(key, val); + instance.register_data_type("Array(" + key + ")", + std::make_shared<vectorized::DataTypeArray>(val)); + instance.register_data_type( + "Array(Nullable(" + key + "))", + std::make_shared<vectorized::DataTypeArray>( + std::make_shared<vectorized::DataTypeNullable>(val))); + } }); return instance; } diff --git a/be/src/vec/data_types/data_type_nullable.cpp b/be/src/vec/data_types/data_type_nullable.cpp index 9e63fa5e24..d8a3525bc3 100644 --- a/be/src/vec/data_types/data_type_nullable.cpp +++ b/be/src/vec/data_types/data_type_nullable.cpp @@ -53,6 +53,36 @@ std::string DataTypeNullable::to_string(const IColumn& column, size_t row_num) c } } +void DataTypeNullable::to_string(const IColumn& column, size_t row_num, + BufferWritable& ostr) const { + const ColumnNullable& col = + assert_cast<const ColumnNullable&>(*column.convert_to_full_column_if_const().get()); + + if (col.is_null_at(row_num)) { + ostr.write("NULL", 4); + } else { + nested_data_type->to_string(col.get_nested_column(), row_num, ostr); + } +} + +Status DataTypeNullable::from_string(ReadBuffer& rb, IColumn* column) const { + auto* null_column = assert_cast<ColumnNullable*>(column); + if (rb.count() == 4 && *(rb.position()) == 'N' && *(rb.position() + 1) == 'U' && + *(rb.position() + 2) == 'L' && *(rb.position() + 3) == 'L') { + null_column->insert_data(nullptr, 0); + return Status::OK(); + } + auto st = nested_data_type->from_string(rb, &(null_column->get_nested_column())); + if (!st.ok()) { + // fill null if fail + null_column->insert_data(nullptr, 0); // 0 is meaningless here + return Status::OK(); + } + // fill not null if succ + null_column->get_null_map_data().push_back(0); + return Status::OK(); +} + // binary: row num | <null array> | <values array> // <null array>: is_null1 | is_null2 | ... // <values array>: value1 | value2 | ...> diff --git a/be/src/vec/data_types/data_type_nullable.h b/be/src/vec/data_types/data_type_nullable.h index 537d0bf449..1f51fd1789 100644 --- a/be/src/vec/data_types/data_type_nullable.h +++ b/be/src/vec/data_types/data_type_nullable.h @@ -81,6 +81,8 @@ public: return nested_data_type->can_be_inside_low_cardinality(); } std::string to_string(const IColumn& column, size_t row_num) const override; + void to_string(const IColumn& column, size_t row_num, BufferWritable& ostr) const override; + Status from_string(ReadBuffer& rb, IColumn* column) const override; const DataTypePtr& get_nested_type() const { return nested_data_type; } diff --git a/be/src/vec/data_types/data_type_number_base.cpp b/be/src/vec/data_types/data_type_number_base.cpp index 2acd319ee6..a6111412df 100644 --- a/be/src/vec/data_types/data_type_number_base.cpp +++ b/be/src/vec/data_types/data_type_number_base.cpp @@ -55,6 +55,34 @@ void DataTypeNumberBase<T>::to_string(const IColumn& column, size_t row_num, } } +template <typename T> +Status DataTypeNumberBase<T>::from_string(ReadBuffer& rb, IColumn* column) const { + auto* column_data = static_cast<ColumnVector<T>*>(column); + if constexpr (std::is_same<T, UInt128>::value) { + // TODO support for Uint128 + return Status::InvalidArgument("uint128 is not support"); + } else if constexpr (std::is_same_v<T, float> || std::is_same_v<T, double>) { + T val = 0; + if (!read_float_text_fast_impl(val, rb)) { + return Status::InvalidArgument( + fmt::format("parse number fail, string: '{}'", + std::string(rb.position(), rb.count()).c_str())); + } + column_data->insert_value(val); + } else if constexpr (std::is_integral<T>::value) { + T val = 0; + if (!read_int_text_impl(val, rb)) { + return Status::InvalidArgument( + fmt::format("parse number fail, string: '{}'", + std::string(rb.position(), rb.count()).c_str())); + } + column_data->insert_value(val); + } else { + DCHECK(false); + } + return Status::OK(); +} + template <typename T> Field DataTypeNumberBase<T>::get_default() const { return NearestFieldType<FieldType>(); diff --git a/be/src/vec/data_types/data_type_number_base.h b/be/src/vec/data_types/data_type_number_base.h index 7b6266e4c7..14fbfb3d07 100644 --- a/be/src/vec/data_types/data_type_number_base.h +++ b/be/src/vec/data_types/data_type_number_base.h @@ -67,6 +67,7 @@ public: void to_string(const IColumn& column, size_t row_num, BufferWritable& ostr) const override; std::string to_string(const IColumn& column, size_t row_num) const override; + Status from_string(ReadBuffer& rb, IColumn* column) const override; }; } // namespace doris::vectorized diff --git a/be/src/vec/data_types/data_type_string.cpp b/be/src/vec/data_types/data_type_string.cpp index fe0518ad38..220b418e4b 100644 --- a/be/src/vec/data_types/data_type_string.cpp +++ b/be/src/vec/data_types/data_type_string.cpp @@ -66,6 +66,12 @@ void DataTypeString::to_string(const class doris::vectorized::IColumn& column, s ostr.write(s.data, s.size); } +Status DataTypeString::from_string(ReadBuffer& rb, IColumn* column) const { + auto* column_data = static_cast<ColumnString*>(column); + column_data->insert_data(rb.position(), rb.count()); + return Status::OK(); +} + Field DataTypeString::get_default() const { return String(); } diff --git a/be/src/vec/data_types/data_type_string.h b/be/src/vec/data_types/data_type_string.h index 8cf10fd9d4..1ecf86f2fe 100644 --- a/be/src/vec/data_types/data_type_string.h +++ b/be/src/vec/data_types/data_type_string.h @@ -58,6 +58,7 @@ public: bool can_be_inside_low_cardinality() const override { return true; } std::string to_string(const IColumn& column, size_t row_num) const override; void to_string(const IColumn& column, size_t row_num, BufferWritable& ostr) const override; + Status from_string(ReadBuffer& rb, IColumn* column) const override; }; } // namespace doris::vectorized diff --git a/be/src/vec/exec/varrow_scanner.cpp b/be/src/vec/exec/varrow_scanner.cpp index 971f1b45bc..14ae373988 100644 --- a/be/src/vec/exec/varrow_scanner.cpp +++ b/be/src/vec/exec/varrow_scanner.cpp @@ -255,7 +255,7 @@ Status VArrowScanner::_append_batch_to_src_block(Block* block) { auto* array = _batch->column(column_pos++).get(); auto& column_with_type_and_name = block->get_by_name(slot_desc->col_name()); RETURN_IF_ERROR(arrow_column_to_doris_column(array, _arrow_batch_cur_idx, - column_with_type_and_name, num_elements, + column_with_type_and_name.column, num_elements, _state->timezone())); } diff --git a/be/src/vec/functions/function_cast.h b/be/src/vec/functions/function_cast.h index 84039b1cb3..5799e60d93 100644 --- a/be/src/vec/functions/function_cast.h +++ b/be/src/vec/functions/function_cast.h @@ -245,6 +245,40 @@ struct ConvertImplGenericToString { } }; +template <typename StringColumnType> +struct ConvertImplGenericFromString { + static Status execute(FunctionContext* context, Block& block, const ColumnNumbers& arguments, + const size_t result, size_t input_rows_count) { + static_assert(std::is_same_v<StringColumnType, ColumnString>, + "Can be used only to parse from ColumnString"); + const auto& col_with_type_and_name = block.get_by_position(arguments[0]); + const IColumn& col_from = *col_with_type_and_name.column; + // result column must set type + DCHECK(block.get_by_position(result).type != nullptr); + auto data_type_to = block.get_by_position(result).type; + if (const StringColumnType* col_from_string = + check_and_get_column<StringColumnType>(&col_from)) { + auto col_to = data_type_to->create_column(); + + //IColumn & col_to = *res; + size_t size = col_from.size(); + col_to->reserve(size); + + for (size_t i = 0; i < size; ++i) { + const auto& val = col_from_string->get_data_at(i); + ReadBuffer read_buffer((char*)(val.data), val.size); + RETURN_IF_ERROR(data_type_to->from_string(read_buffer, col_to)); + } + block.replace_by_position(result, std::move(col_to)); + } else { + return Status::RuntimeError(fmt::format( + "Illegal column {} of first argument of conversion function from string", + col_from.get_name())); + } + return Status::OK(); + } +}; + template <typename ToDataType, typename Name> struct ConvertImpl<DataTypeString, ToDataType, Name> { template <typename Additions = void*> @@ -693,7 +727,6 @@ protected: if (arguments.size() > 2) new_arguments.insert(std::end(new_arguments), std::next(std::begin(arguments), 2), std::end(arguments)); - return wrapper_function(context, block, new_arguments, result, input_rows_count); } @@ -1040,6 +1073,70 @@ private: }; } + WrapperType create_array_wrapper(const DataTypePtr& from_type_untyped, + const DataTypeArray& to_type) const { + /// Conversion from String through parsing. + if (check_and_get_data_type<DataTypeString>(from_type_untyped.get())) { + return &ConvertImplGenericFromString<ColumnString>::execute; + } + + const auto* from_type = check_and_get_data_type<DataTypeArray>(from_type_untyped.get()); + + if (!from_type) { + LOG(FATAL) << "CAST AS Array can only be performed between same-dimensional Array, " + "String types"; + } + + DataTypePtr from_nested_type = from_type->get_nested_type(); + + /// In query SELECT CAST([] AS Array(Array(String))) from type is Array(Nothing) + bool from_empty_array = is_nothing(from_nested_type); + + if (from_type->get_number_of_dimensions() != to_type.get_number_of_dimensions() && + !from_empty_array) { + LOG(FATAL) + << "CAST AS Array can only be performed between same-dimensional array types"; + } + + const DataTypePtr& to_nested_type = to_type.get_nested_type(); + + /// Prepare nested type conversion + const auto nested_function = prepare_unpack_dictionaries(from_nested_type, to_nested_type); + + return [nested_function, from_nested_type, to_nested_type]( + FunctionContext* context, Block& block, const ColumnNumbers& arguments, + const size_t result, size_t /*input_rows_count*/) -> Status { + auto& from_column = block.get_by_position(arguments.front()).column; + + const ColumnArray* from_col_array = + check_and_get_column<ColumnArray>(from_column.get()); + + if (from_col_array) { + /// create columns for converting nested column containing original and result columns + ColumnWithTypeAndName from_nested_column {from_col_array->get_data_ptr(), + from_nested_type, ""}; + + /// convert nested column + ColumnNumbers new_arguments {block.columns()}; + block.insert(from_nested_column); + + size_t nested_result = block.columns(); + block.insert({to_nested_type, ""}); + RETURN_IF_ERROR(nested_function(context, block, new_arguments, nested_result, + from_col_array->get_data_ptr()->size())); + auto nested_result_column = block.get_by_position(nested_result).column; + + /// set converted nested column to result + block.get_by_position(result).column = ColumnArray::create( + nested_result_column, from_col_array->get_offsets_ptr()); + } else { + return Status::RuntimeError(fmt::format( + "Illegal column {} for function CAST AS Array", from_column->get_name())); + } + return Status::OK(); + }; + } + WrapperType prepare_unpack_dictionaries(const DataTypePtr& from_type, const DataTypePtr& to_type) const { const auto& from_nested = from_type; @@ -1069,7 +1166,6 @@ private: WrapperType prepare_remove_nullable(const DataTypePtr& from_type, const DataTypePtr& to_type, bool skip_not_null_check) const { /// Determine whether pre-processing and/or post-processing must take place during conversion. - bool source_is_nullable = from_type->is_nullable(); bool result_is_nullable = to_type->is_nullable(); @@ -1096,7 +1192,8 @@ private: tmp_block.insert({nullptr, nested_type, ""}); /// Perform the requested conversion. - wrapper(context, tmp_block, arguments, tmp_res_index, input_rows_count); + RETURN_IF_ERROR( + wrapper(context, tmp_block, arguments, tmp_res_index, input_rows_count)); const auto& tmp_res = tmp_block.get_by_position(tmp_res_index); @@ -1135,7 +1232,7 @@ private: } } - wrapper(context, tmp_block, arguments, result, input_rows_count); + RETURN_IF_ERROR(wrapper(context, tmp_block, arguments, result, input_rows_count)); block.get_by_position(result).column = tmp_block.get_by_position(result).column; return Status::OK(); }; @@ -1193,7 +1290,8 @@ private: switch (to_type->get_type_id()) { case TypeIndex::String: return create_string_wrapper(from_type); - + case TypeIndex::Array: + return create_array_wrapper(from_type, static_cast<const DataTypeArray&>(*to_type)); default: break; } @@ -1238,8 +1336,8 @@ protected: LOG(FATAL) << fmt::format( "Second argument to {} must be a constant string describing type", get_name()); } - auto type = DataTypeFactory::instance().get(type_col->get_value<String>()); + DCHECK(type != nullptr); bool need_to_be_nullable = false; // 1. from_type is nullable diff --git a/be/src/vec/utils/arrow_column_to_doris_column.cpp b/be/src/vec/utils/arrow_column_to_doris_column.cpp index a02fc92b48..206c279e4c 100644 --- a/be/src/vec/utils/arrow_column_to_doris_column.cpp +++ b/be/src/vec/utils/arrow_column_to_doris_column.cpp @@ -28,6 +28,7 @@ #include "arrow/type_fwd.h" #include "arrow/type_traits.h" #include "gutil/casts.h" +#include "vec/columns/column_array.h" #include "vec/columns/column_nullable.h" #include "vec/data_types/data_type_decimal.h" #include "vec/runtime/vdatetime_value.h" @@ -236,19 +237,52 @@ static Status convert_column_with_decimal_data(const arrow::Array* array, size_t return Status::OK(); } +static Status convert_offset_from_list_column(const arrow::Array* array, size_t array_idx, + MutableColumnPtr& data_column, size_t num_elements, + size_t* start_idx_for_data, size_t* num_for_data) { + auto& offsets_data = static_cast<ColumnArray&>(*data_column).get_offsets(); + auto concrete_array = down_cast<const arrow::ListArray*>(array); + auto arrow_offsets_array = concrete_array->offsets(); + auto arrow_offsets = down_cast<arrow::Int32Array*>(arrow_offsets_array.get()); + for (int64_t i = array_idx + 1; i < array_idx + num_elements + 1; ++i) { + // convert to doris offset, start from 0 + offsets_data.emplace_back(arrow_offsets->Value(i) - arrow_offsets->Value(array_idx)); + } + *start_idx_for_data = arrow_offsets->Value(array_idx); + *num_for_data = offsets_data.back(); + + return Status::OK(); +} + +static Status convert_column_with_list_data(const arrow::Array* array, size_t array_idx, + MutableColumnPtr& data_column, size_t num_elements, + const std::string& timezone) { + size_t start_idx_of_data = 0; + size_t num_of_data = 0; + // get start idx and num of values from arrow offsets + RETURN_IF_ERROR(convert_offset_from_list_column(array, array_idx, data_column, num_elements, + &start_idx_of_data, &num_of_data)); + auto& data_column_ptr = static_cast<ColumnArray&>(*data_column).get_data_ptr(); + auto concrete_array = down_cast<const arrow::ListArray*>(array); + std::shared_ptr<arrow::Array> arrow_data = concrete_array->values(); + + return arrow_column_to_doris_column(arrow_data.get(), start_idx_of_data, data_column_ptr, + num_of_data, timezone); +} + Status arrow_column_to_doris_column(const arrow::Array* arrow_column, size_t arrow_batch_cur_idx, - ColumnWithTypeAndName& doirs_column, size_t num_elements, + ColumnPtr& doirs_column, size_t num_elements, const std::string& timezone) { // src column always be nullable for simpify converting - assert(doirs_column.column->is_nullable()); + assert(doirs_column->is_nullable()); MutableColumnPtr data_column = nullptr; - if (doirs_column.column->is_nullable()) { + if (doirs_column->is_nullable()) { auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>( - (*std::move(doirs_column.column)).mutate().get()); + (*std::move(doirs_column)).mutate().get()); fill_nullable_column(arrow_column, arrow_batch_cur_idx, nullable_column, num_elements); data_column = nullable_column->get_nested_column_ptr(); } else { - data_column = (*std::move(doirs_column.column)).mutate(); + data_column = (*std::move(doirs_column)).mutate(); } // process data switch (arrow_column->type()->id()) { @@ -280,6 +314,9 @@ Status arrow_column_to_doris_column(const arrow::Array* arrow_column, size_t arr case arrow::Type::DECIMAL: return convert_column_with_decimal_data(arrow_column, arrow_batch_cur_idx, data_column, num_elements); + case arrow::Type::LIST: + return convert_column_with_list_data(arrow_column, arrow_batch_cur_idx, data_column, + num_elements, timezone); default: break; } diff --git a/be/src/vec/utils/arrow_column_to_doris_column.h b/be/src/vec/utils/arrow_column_to_doris_column.h index 2e70fee11a..0c75f27cda 100644 --- a/be/src/vec/utils/arrow_column_to_doris_column.h +++ b/be/src/vec/utils/arrow_column_to_doris_column.h @@ -34,7 +34,7 @@ namespace doris::vectorized { const PrimitiveType arrow_type_to_primitive_type(::arrow::Type::type type); Status arrow_column_to_doris_column(const arrow::Array* arrow_column, size_t arrow_batch_cur_idx, - ColumnWithTypeAndName& doirs_column, size_t num_elements, + ColumnPtr& doirs_column, size_t num_elements, const std::string& timezone); } // namespace doris::vectorized diff --git a/be/test/vec/utils/arrow_column_to_doris_column_test.cpp b/be/test/vec/utils/arrow_column_to_doris_column_test.cpp index 0252cf4883..73181a57de 100644 --- a/be/test/vec/utils/arrow_column_to_doris_column_test.cpp +++ b/be/test/vec/utils/arrow_column_to_doris_column_test.cpp @@ -123,7 +123,7 @@ void test_arrow_to_datetime_column(std::shared_ptr<ArrowType> type, ColumnWithTy if constexpr (std::is_same_v<ArrowType, arrow::TimestampType>) { time_zone = type->timezone(); } - auto ret = arrow_column_to_doris_column(array.get(), 0, column, num_elements, time_zone); + auto ret = arrow_column_to_doris_column(array.get(), 0, column.column, num_elements, time_zone); ASSERT_EQ(ret.ok(), true); ASSERT_EQ(column.column->size(), counter); MutableColumnPtr data_column = nullptr; @@ -216,7 +216,7 @@ void test_arrow_to_numeric_column(std::shared_ptr<ArrowType> type, ColumnWithTyp ASSERT_EQ(column.column->size(), counter); auto array = create_constant_numeric_array<ArrowType, is_nullable>(num_elements, arrow_numeric, type, counter); - auto ret = arrow_column_to_doris_column(array.get(), 0, column, num_elements, "UTC"); + auto ret = arrow_column_to_doris_column(array.get(), 0, column.column, num_elements, "UTC"); ASSERT_EQ(ret.ok(), true); ASSERT_EQ(column.column->size(), counter); MutableColumnPtr data_column = nullptr; @@ -351,7 +351,7 @@ void test_arrow_to_decimal_column(std::shared_ptr<arrow::Decimal128Type> type, int128_t arrow_value, int128_t expect_value, size_t& counter) { ASSERT_EQ(column.column->size(), counter); auto array = create_decimal_array<is_nullable>(num_elements, arrow_value, type, counter); - auto ret = arrow_column_to_doris_column(array.get(), 0, column, num_elements, "UTC"); + auto ret = arrow_column_to_doris_column(array.get(), 0, column.column, num_elements, "UTC"); ASSERT_EQ(ret.ok(), true); ASSERT_EQ(column.column->size(), counter); MutableColumnPtr data_column = nullptr; @@ -452,7 +452,7 @@ void test_arrow_to_fixed_binary_column(ColumnWithTypeAndName& column, size_t num ASSERT_EQ(column.column->size(), counter); auto array = create_fixed_size_binary_array<bytes_width, is_nullable>(num_elements, value, counter); - auto ret = arrow_column_to_doris_column(array.get(), 0, column, num_elements, "UTC"); + auto ret = arrow_column_to_doris_column(array.get(), 0, column.column, num_elements, "UTC"); ASSERT_EQ(ret.ok(), true); ASSERT_EQ(column.column->size(), counter); MutableColumnPtr data_column = nullptr; @@ -554,7 +554,7 @@ void test_arrow_to_binary_column(ColumnWithTypeAndName& column, size_t num_eleme ArrowCppType value, size_t& counter) { ASSERT_EQ(column.column->size(), counter); auto array = create_binary_array<ArrowType, is_nullable>(num_elements, value, counter); - auto ret = arrow_column_to_doris_column(array.get(), 0, column, num_elements, "UTC"); + auto ret = arrow_column_to_doris_column(array.get(), 0, column.column, num_elements, "UTC"); ASSERT_EQ(ret.ok(), true); ASSERT_EQ(column.column->size(), counter); MutableColumnPtr data_column = nullptr; @@ -606,4 +606,119 @@ TEST(ArrowColumnToDorisColumnTest, test_binary) { test_binary<arrow::BinaryType, false>(test_cases, 64); test_binary<arrow::BinaryType, true>(test_cases, 64); } + +template <typename ArrowValueType, bool is_nullable = false> +static inline std::shared_ptr<arrow::Array> create_array_array( + std::vector<IColumn::Offset>& vec_offsets, std::vector<bool>& null_map, + std::shared_ptr<arrow::DataType> value_type, std::shared_ptr<arrow::Array> values, + size_t& counter) { + using offset_type = typename arrow::ListType::offset_type; + size_t num_rows = vec_offsets.size() - 1; + DCHECK(null_map.size() == num_rows); + size_t offsets_bytes = (vec_offsets.size()) * sizeof(offset_type); + auto offsets_buf_tmp = arrow::AllocateBuffer(offsets_bytes); + std::shared_ptr<arrow::Buffer> offsets_buf = std::move(offsets_buf_tmp.ValueOrDie()); + auto* offsets = (offset_type*)offsets_buf->mutable_data(); + offsets[0] = 0; + + auto null_bitmap_bytes = ((num_rows) + 7) / 8; + auto null_bitmap_tmp = arrow::AllocateBuffer(null_bitmap_bytes); + std::shared_ptr<arrow::Buffer> null_bitmap = std::move(null_bitmap_tmp.ValueOrDie()); + auto nulls = null_bitmap->mutable_data(); + for (auto i = 0; i < num_rows; ++i) { + if (is_nullable && null_map[i]) { + arrow::bit_util::ClearBit(nulls, i); + } else { + arrow::bit_util::SetBit(nulls, i); + } + offsets[i + 1] = vec_offsets[i + 1]; + } + + auto array = std::make_shared<arrow::ListArray>(value_type, num_rows, offsets_buf, values, + null_bitmap); + counter += num_rows; + return std::static_pointer_cast<arrow::Array>(array); +} + +template <typename ArrowType, bool is_nullable> +void test_arrow_to_array_column(ColumnWithTypeAndName& column, + std::vector<IColumn::Offset>& vec_offsets, + std::vector<bool>& null_map, + std::shared_ptr<arrow::DataType> value_type, + std::shared_ptr<arrow::Array> values, const std::string& value, + size_t& counter) { + ASSERT_EQ(column.column->size(), counter); + auto array = create_array_array<ArrowType, is_nullable>(vec_offsets, null_map, value_type, + values, counter); + auto ret = arrow_column_to_doris_column(array.get(), 0, column.column, vec_offsets.size() - 1, + "UTC"); + ASSERT_EQ(ret.ok(), true); + ASSERT_EQ(column.column->size(), counter); + MutableColumnPtr data_column = nullptr; + vectorized::ColumnNullable* nullable_column = nullptr; + if (column.column->is_nullable()) { + nullable_column = reinterpret_cast<vectorized::ColumnNullable*>( + (*std::move(column.column)).mutate().get()); + data_column = nullable_column->get_nested_column_ptr(); + } else { + data_column = (*std::move(column.column)).mutate(); + } + auto& array_column = static_cast<ColumnArray&>(*data_column); + EXPECT_EQ(array_column.size(), vec_offsets.size() - 1); + for (size_t i = 0; i < array_column.size(); ++i) { + auto v = get<Array>(array_column[i]); + EXPECT_EQ(v.size(), vec_offsets[i + 1] - vec_offsets[i]); + if (is_nullable) { + ASSERT_NE(nullable_column, nullptr); + NullMap& map_data = nullable_column->get_null_map_data(); + ASSERT_EQ(map_data[i], null_map[i]); + if (!null_map[i]) { + // check value + for (size_t j = 0; j < v.size(); ++j) { + // in nested column, values like [null, xx, null, xx, ...] + if ((vec_offsets[i] + j) % 2 != 0) { + EXPECT_EQ(value, get<std::string>(v[j])); + } + } + } + } else { + // check value + for (size_t j = 0; j < v.size(); ++j) { + EXPECT_EQ(value, get<std::string>(v[j])); + } + } + } +} + +template <typename ArrowType, bool is_nullable> +void test_array(const std::vector<std::string>& test_cases, size_t num_elements, + std::vector<IColumn::Offset>& vec_offsets, std::vector<bool>& null_map, + std::shared_ptr<arrow::DataType> value_type) { + TypeDescriptor type(TYPE_ARRAY); + type.children.push_back(TYPE_VARCHAR); + DataTypePtr data_type = DataTypeFactory::instance().create_data_type(type, true); + for (auto& value : test_cases) { + MutableColumnPtr data_column = data_type->create_column(); + ColumnWithTypeAndName column(std::move(data_column), data_type, "test_array_column"); + // create nested column + size_t nested_counter = 0; + auto array = + create_binary_array<ArrowType, is_nullable>(num_elements, value, nested_counter); + ASSERT_EQ(nested_counter, num_elements); + size_t counter = 0; + test_arrow_to_array_column<ArrowType, is_nullable>(column, vec_offsets, null_map, + value_type, array, value, counter); + } +} + +TEST(ArrowColumnToDorisColumnTest, test_array) { + std::vector<std::string> test_cases = {"1.2345678", "-12.34567890", "99999999999.99999999", + "-99999999999.99999999"}; + std::vector<IColumn::Offset> vec_offsets = {0, 3, 3, 4, 6, 6, 64}; + std::vector<bool> null_map = {false, true, false, false, false, false}; + test_array<arrow::BinaryType, false>(test_cases, 64, vec_offsets, null_map, + arrow::list(arrow::binary())); + test_array<arrow::BinaryType, true>(test_cases, 64, vec_offsets, null_map, + arrow::list(arrow::binary())); +} } // namespace doris::vectorized --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org