This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new fcd2ee74757 [fix](branch-3.0) Fix the data type mapping for complex types in Doris to the ORC and Parquet file formats (#45122) fcd2ee74757 is described below commit fcd2ee74757c1ae3520e6e70c870c05562fa5e33 Author: Tiewei Fang <fangtie...@selectdb.com> AuthorDate: Sun Dec 8 16:28:03 2024 +0800 [fix](branch-3.0) Fix the data type mapping for complex types in Doris to the ORC and Parquet file formats (#45122) picked from #44041 --- be/src/util/arrow/row_batch.cpp | 9 +- .../data_types/serde/data_type_bitmap_serde.cpp | 59 +++++++++++- .../vec/data_types/serde/data_type_bitmap_serde.h | 13 +-- .../data_types/serde/data_type_date64_serde.cpp | 20 +--- .../vec/data_types/serde/data_type_hll_serde.cpp | 48 +++++----- .../vec/data_types/serde/data_type_ipv6_serde.cpp | 37 +++---- .../vec/data_types/serde/data_type_jsonb_serde.cpp | 28 +++++- .../data_types/serde/data_type_number_serde.cpp | 36 ++----- .../data_types/serde/data_type_object_serde.cpp | 35 +++++++ .../vec/data_types/serde/data_type_object_serde.h | 4 +- .../serde/data_type_quantilestate_serde.h | 56 ++++++++++- be/src/vec/data_types/serde/data_type_serde.h | 12 +++ .../org/apache/doris/analysis/OutFileClause.java | 21 ++-- .../outfile/test_outfile_complex_type.out | 25 +++++ .../outfile/test_outfile_jsonb_and_variant.out | 25 +++++ .../outfile/test_outfile_complex_type.groovy | 106 +++++++++++++++++++++ .../outfile/test_outfile_jsonb_and_variant.groovy | 104 ++++++++++++++++++++ 17 files changed, 506 insertions(+), 132 deletions(-) diff --git a/be/src/util/arrow/row_batch.cpp b/be/src/util/arrow/row_batch.cpp index dd11d5ae46f..a0cd77aee41 100644 --- a/be/src/util/arrow/row_batch.cpp +++ b/be/src/util/arrow/row_batch.cpp @@ -24,6 +24,7 @@ #include <arrow/result.h> #include <arrow/status.h> #include <arrow/type.h> +#include <arrow/type_fwd.h> #include <glog/logging.h> #include <stdint.h> @@ -84,12 +85,10 @@ Status convert_to_arrow_type(const TypeDescriptor& type, std::shared_ptr<arrow:: case TYPE_LARGEINT: case TYPE_VARCHAR: case TYPE_CHAR: - case TYPE_HLL: case TYPE_DATE: case TYPE_DATETIME: case TYPE_STRING: case TYPE_JSONB: - case TYPE_OBJECT: *result = arrow::utf8(); break; case TYPE_DATEV2: @@ -150,6 +149,12 @@ Status convert_to_arrow_type(const TypeDescriptor& type, std::shared_ptr<arrow:: *result = arrow::utf8(); break; } + case TYPE_QUANTILE_STATE: + case TYPE_OBJECT: + case TYPE_HLL: { + *result = arrow::binary(); + break; + } default: return Status::InvalidArgument("Unknown primitive type({}) convert to Arrow type", type.type); diff --git a/be/src/vec/data_types/serde/data_type_bitmap_serde.cpp b/be/src/vec/data_types/serde/data_type_bitmap_serde.cpp index f52a3d1e9b4..f60d054df31 100644 --- a/be/src/vec/data_types/serde/data_type_bitmap_serde.cpp +++ b/be/src/vec/data_types/serde/data_type_bitmap_serde.cpp @@ -17,6 +17,7 @@ #include "data_type_bitmap_serde.h" +#include <arrow/array/builder_binary.h> #include <gen_cpp/types.pb.h> #include <string> @@ -27,12 +28,36 @@ #include "vec/columns/column_const.h" #include "vec/common/arena.h" #include "vec/common/assert_cast.h" +#include "vec/data_types/serde/data_type_nullable_serde.h" namespace doris { namespace vectorized { class IColumn; +Status DataTypeBitMapSerDe::serialize_column_to_json(const IColumn& column, int start_idx, + int end_idx, BufferWritable& bw, + FormatOptions& options) const { + SERIALIZE_COLUMN_TO_JSON(); +} + +Status DataTypeBitMapSerDe::serialize_one_cell_to_json(const IColumn& column, int row_num, + BufferWritable& bw, + FormatOptions& options) const { + /** + * For null values in ordinary types, we use \N to represent them; + * for null values in nested types, we use null to represent them, just like the json format. + */ + if (_nesting_level >= 2) { + bw.write(DataTypeNullableSerDe::NULL_IN_COMPLEX_TYPE.c_str(), + strlen(NULL_IN_COMPLEX_TYPE.c_str())); + } else { + bw.write(DataTypeNullableSerDe::NULL_IN_CSV_FOR_ORDINARY_TYPE.c_str(), + strlen(NULL_IN_CSV_FOR_ORDINARY_TYPE.c_str())); + } + return Status::OK(); +} + Status DataTypeBitMapSerDe::deserialize_column_from_json_vector( IColumn& column, std::vector<Slice>& slices, int* num_deserialized, const FormatOptions& options) const { @@ -95,6 +120,26 @@ void DataTypeBitMapSerDe::write_one_cell_to_jsonb(const IColumn& column, JsonbWr result.writeEndBinary(); } +void DataTypeBitMapSerDe::write_column_to_arrow(const IColumn& column, const NullMap* null_map, + arrow::ArrayBuilder* array_builder, int start, + int end, const cctz::time_zone& ctz) const { + const auto& col = assert_cast<const ColumnBitmap&>(column); + auto& builder = assert_cast<arrow::BinaryBuilder&>(*array_builder); + for (size_t string_i = start; string_i < end; ++string_i) { + if (null_map && (*null_map)[string_i]) { + checkArrowStatus(builder.AppendNull(), column.get_name(), + array_builder->type()->name()); + } else { + auto& bitmap_value = const_cast<BitmapValue&>(col.get_element(string_i)); + std::string memory_buffer(bitmap_value.getSizeInBytes(), '0'); + bitmap_value.write_to(memory_buffer.data()); + checkArrowStatus( + builder.Append(memory_buffer.data(), static_cast<int>(memory_buffer.size())), + column.get_name(), array_builder->type()->name()); + } + } +} + void DataTypeBitMapSerDe::read_one_cell_from_jsonb(IColumn& column, const JsonbValue* arg) const { auto& col = reinterpret_cast<ColumnBitmap&>(column); auto blob = static_cast<const JsonbBlobVal*>(arg); @@ -147,11 +192,19 @@ Status DataTypeBitMapSerDe::write_column_to_orc(const std::string& timezone, con auto& col_data = assert_cast<const ColumnBitmap&>(column); orc::StringVectorBatch* cur_batch = dynamic_cast<orc::StringVectorBatch*>(orc_col_batch); + INIT_MEMORY_FOR_ORC_WRITER() + for (size_t row_id = start; row_id < end; row_id++) { if (cur_batch->notNull[row_id] == 1) { - const auto& ele = col_data.get_data_at(row_id); - cur_batch->data[row_id] = const_cast<char*>(ele.data); - cur_batch->length[row_id] = ele.size; + auto bitmap_value = const_cast<BitmapValue&>(col_data.get_element(row_id)); + size_t len = bitmap_value.getSizeInBytes(); + + REALLOC_MEMORY_FOR_ORC_WRITER() + + bitmap_value.write_to(const_cast<char*>(bufferRef.data) + offset); + cur_batch->data[row_id] = const_cast<char*>(bufferRef.data) + offset; + cur_batch->length[row_id] = len; + offset += len; } } diff --git a/be/src/vec/data_types/serde/data_type_bitmap_serde.h b/be/src/vec/data_types/serde/data_type_bitmap_serde.h index d4a643b3b16..ba7842e354c 100644 --- a/be/src/vec/data_types/serde/data_type_bitmap_serde.h +++ b/be/src/vec/data_types/serde/data_type_bitmap_serde.h @@ -36,14 +36,10 @@ public: DataTypeBitMapSerDe(int nesting_level = 1) : DataTypeSerDe(nesting_level) {}; Status serialize_one_cell_to_json(const IColumn& column, int row_num, BufferWritable& bw, - FormatOptions& options) const override { - return Status::NotSupported("serialize_one_cell_to_json with type [{}]", column.get_name()); - } + FormatOptions& options) const override; Status serialize_column_to_json(const IColumn& column, int start_idx, int end_idx, - BufferWritable& bw, FormatOptions& options) const override { - return Status::NotSupported("serialize_column_to_json with type [{}]", column.get_name()); - } + BufferWritable& bw, FormatOptions& options) const override; Status deserialize_one_cell_from_json(IColumn& column, Slice& slice, const FormatOptions& options) const override; @@ -63,10 +59,7 @@ public: void write_column_to_arrow(const IColumn& column, const NullMap* null_map, arrow::ArrayBuilder* array_builder, int start, int end, - const cctz::time_zone& ctz) const override { - throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR, - "write_column_to_arrow with type " + column.get_name()); - } + const cctz::time_zone& ctz) const override; void read_column_from_arrow(IColumn& column, const arrow::Array* arrow_array, int start, int end, const cctz::time_zone& ctz) const override { diff --git a/be/src/vec/data_types/serde/data_type_date64_serde.cpp b/be/src/vec/data_types/serde/data_type_date64_serde.cpp index e749b2fa2e7..4cdd6b90326 100644 --- a/be/src/vec/data_types/serde/data_type_date64_serde.cpp +++ b/be/src/vec/data_types/serde/data_type_date64_serde.cpp @@ -288,16 +288,7 @@ Status DataTypeDate64SerDe::write_column_to_orc(const std::string& timezone, con auto& col_data = static_cast<const ColumnVector<Int64>&>(column).get_data(); orc::StringVectorBatch* cur_batch = dynamic_cast<orc::StringVectorBatch*>(orc_col_batch); - char* ptr = (char*)malloc(BUFFER_UNIT_SIZE); - if (!ptr) { - return Status::InternalError( - "malloc memory error when write largeint column data to orc file."); - } - StringRef bufferRef; - bufferRef.data = ptr; - bufferRef.size = BUFFER_UNIT_SIZE; - size_t offset = 0; - const size_t begin_off = offset; + INIT_MEMORY_FOR_ORC_WRITER() for (size_t row_id = start; row_id < end; row_id++) { if (cur_batch->notNull[row_id] == 0) { @@ -309,18 +300,11 @@ Status DataTypeDate64SerDe::write_column_to_orc(const std::string& timezone, con REALLOC_MEMORY_FOR_ORC_WRITER() + cur_batch->data[row_id] = const_cast<char*>(bufferRef.data) + offset; cur_batch->length[row_id] = len; offset += len; } - size_t data_off = 0; - for (size_t row_id = start; row_id < end; row_id++) { - if (cur_batch->notNull[row_id] == 1) { - cur_batch->data[row_id] = const_cast<char*>(bufferRef.data) + begin_off + data_off; - data_off += cur_batch->length[row_id]; - } - } - buffer_list.emplace_back(bufferRef); cur_batch->numElements = end - start; return Status::OK(); } diff --git a/be/src/vec/data_types/serde/data_type_hll_serde.cpp b/be/src/vec/data_types/serde/data_type_hll_serde.cpp index 799e1c15d63..aba3a9d0619 100644 --- a/be/src/vec/data_types/serde/data_type_hll_serde.cpp +++ b/be/src/vec/data_types/serde/data_type_hll_serde.cpp @@ -21,6 +21,7 @@ #include <stddef.h> #include <stdint.h> +#include <memory> #include <string> #include "arrow/array/builder_binary.h" @@ -47,28 +48,17 @@ Status DataTypeHLLSerDe::serialize_column_to_json(const IColumn& column, int sta Status DataTypeHLLSerDe::serialize_one_cell_to_json(const IColumn& column, int row_num, BufferWritable& bw, FormatOptions& options) const { - if (!options._output_object_data) { - /** - * For null values in ordinary types, we use \N to represent them; - * for null values in nested types, we use null to represent them, just like the json format. - */ - if (_nesting_level >= 2) { - bw.write(DataTypeNullableSerDe::NULL_IN_COMPLEX_TYPE.c_str(), - strlen(NULL_IN_COMPLEX_TYPE.c_str())); - } else { - bw.write(DataTypeNullableSerDe::NULL_IN_CSV_FOR_ORDINARY_TYPE.c_str(), - strlen(NULL_IN_CSV_FOR_ORDINARY_TYPE.c_str())); - } - return Status::OK(); + /** + * For null values in ordinary types, we use \N to represent them; + * for null values in nested types, we use null to represent them, just like the json format. + */ + if (_nesting_level >= 2) { + bw.write(DataTypeNullableSerDe::NULL_IN_COMPLEX_TYPE.c_str(), + strlen(NULL_IN_COMPLEX_TYPE.c_str())); + } else { + bw.write(DataTypeNullableSerDe::NULL_IN_CSV_FOR_ORDINARY_TYPE.c_str(), + strlen(NULL_IN_CSV_FOR_ORDINARY_TYPE.c_str())); } - auto col_row = check_column_const_set_readability(column, row_num); - ColumnPtr ptr = col_row.first; - row_num = col_row.second; - auto& data = const_cast<HyperLogLog&>(assert_cast<const ColumnHLL&>(*ptr).get_element(row_num)); - std::unique_ptr<char[]> buf = - std::make_unique_for_overwrite<char[]>(data.max_serialized_size()); - size_t size = data.serialize((uint8*)buf.get()); - bw.write(buf.get(), size); return Status::OK(); } @@ -137,7 +127,7 @@ void DataTypeHLLSerDe::write_column_to_arrow(const IColumn& column, const NullMa arrow::ArrayBuilder* array_builder, int start, int end, const cctz::time_zone& ctz) const { const auto& col = assert_cast<const ColumnHLL&>(column); - auto& builder = assert_cast<arrow::StringBuilder&>(*array_builder); + auto& builder = assert_cast<arrow::BinaryBuilder&>(*array_builder); for (size_t string_i = start; string_i < end; ++string_i) { if (null_map && (*null_map)[string_i]) { checkArrowStatus(builder.AppendNull(), column.get_name(), @@ -195,11 +185,19 @@ Status DataTypeHLLSerDe::write_column_to_orc(const std::string& timezone, const auto& col_data = assert_cast<const ColumnHLL&>(column); orc::StringVectorBatch* cur_batch = dynamic_cast<orc::StringVectorBatch*>(orc_col_batch); + INIT_MEMORY_FOR_ORC_WRITER() + for (size_t row_id = start; row_id < end; row_id++) { if (cur_batch->notNull[row_id] == 1) { - const auto& ele = col_data.get_data_at(row_id); - cur_batch->data[row_id] = const_cast<char*>(ele.data); - cur_batch->length[row_id] = ele.size; + auto hll_value = const_cast<HyperLogLog&>(col_data.get_element(row_id)); + size_t len = hll_value.max_serialized_size(); + + REALLOC_MEMORY_FOR_ORC_WRITER() + + hll_value.serialize((uint8_t*)(bufferRef.data) + offset); + cur_batch->data[row_id] = const_cast<char*>(bufferRef.data) + offset; + cur_batch->length[row_id] = len; + offset += len; } } diff --git a/be/src/vec/data_types/serde/data_type_ipv6_serde.cpp b/be/src/vec/data_types/serde/data_type_ipv6_serde.cpp index 612c9ce4222..468565ad2d2 100644 --- a/be/src/vec/data_types/serde/data_type_ipv6_serde.cpp +++ b/be/src/vec/data_types/serde/data_type_ipv6_serde.cpp @@ -182,38 +182,23 @@ Status DataTypeIPv6SerDe::write_column_to_orc(const std::string& timezone, const int end, std::vector<StringRef>& buffer_list) const { const auto& col_data = assert_cast<const ColumnIPv6&>(column).get_data(); orc::StringVectorBatch* cur_batch = assert_cast<orc::StringVectorBatch*>(orc_col_batch); - char* ptr = (char*)malloc(BUFFER_UNIT_SIZE); - if (!ptr) { - return Status::InternalError( - "malloc memory error when write largeint column data to orc file."); - } - StringRef bufferRef; - bufferRef.data = ptr; - bufferRef.size = BUFFER_UNIT_SIZE; - size_t offset = 0; - const size_t begin_off = offset; - - for (size_t row_id = start; row_id < end; row_id++) { - if (cur_batch->notNull[row_id] == 0) { - continue; - } - std::string ipv6_str = IPv6Value::to_string(col_data[row_id]); - size_t len = ipv6_str.size(); - REALLOC_MEMORY_FOR_ORC_WRITER() + INIT_MEMORY_FOR_ORC_WRITER() - strcpy(const_cast<char*>(bufferRef.data) + offset, ipv6_str.c_str()); - offset += len; - cur_batch->length[row_id] = len; - } - size_t data_off = 0; for (size_t row_id = start; row_id < end; row_id++) { if (cur_batch->notNull[row_id] == 1) { - cur_batch->data[row_id] = const_cast<char*>(bufferRef.data) + begin_off + data_off; - data_off += cur_batch->length[row_id]; + std::string ipv6_str = IPv6Value::to_string(col_data[row_id]); + size_t len = ipv6_str.size(); + + REALLOC_MEMORY_FOR_ORC_WRITER() + + strcpy(const_cast<char*>(bufferRef.data) + offset, ipv6_str.c_str()); + cur_batch->data[row_id] = const_cast<char*>(bufferRef.data) + offset; + cur_batch->length[row_id] = len; + offset += len; } } - buffer_list.emplace_back(bufferRef); + cur_batch->numElements = end - start; return Status::OK(); } diff --git a/be/src/vec/data_types/serde/data_type_jsonb_serde.cpp b/be/src/vec/data_types/serde/data_type_jsonb_serde.cpp index 08514a6eea7..eb6c783cf28 100644 --- a/be/src/vec/data_types/serde/data_type_jsonb_serde.cpp +++ b/be/src/vec/data_types/serde/data_type_jsonb_serde.cpp @@ -21,6 +21,10 @@ #include <rapidjson/stringbuffer.h> #include <rapidjson/writer.h> +#include <cstddef> +#include <cstdint> +#include <memory> + #include "arrow/array/builder_binary.h" #include "common/exception.h" #include "common/status.h" @@ -136,7 +140,29 @@ Status DataTypeJsonbSerDe::write_column_to_orc(const std::string& timezone, cons const NullMap* null_map, orc::ColumnVectorBatch* orc_col_batch, int start, int end, std::vector<StringRef>& buffer_list) const { - return Status::NotSupported("write_column_to_orc with type [{}]", column.get_name()); + auto* cur_batch = dynamic_cast<orc::StringVectorBatch*>(orc_col_batch); + const auto& string_column = assert_cast<const ColumnString&>(column); + + INIT_MEMORY_FOR_ORC_WRITER() + + for (size_t row_id = start; row_id < end; row_id++) { + if (cur_batch->notNull[row_id] == 1) { + std::string_view string_ref = string_column.get_data_at(row_id).to_string_view(); + auto serialized_value = std::make_unique<std::string>( + JsonbToJson::jsonb_to_json_string(string_ref.data(), string_ref.size())); + auto len = serialized_value->size(); + + REALLOC_MEMORY_FOR_ORC_WRITER() + + memcpy(const_cast<char*>(bufferRef.data) + offset, serialized_value->data(), len); + cur_batch->data[row_id] = const_cast<char*>(bufferRef.data) + offset; + cur_batch->length[row_id] = len; + offset += len; + } + } + + cur_batch->numElements = end - start; + return Status::OK(); } void convert_jsonb_to_rapidjson(const JsonbValue& val, rapidjson::Value& target, diff --git a/be/src/vec/data_types/serde/data_type_number_serde.cpp b/be/src/vec/data_types/serde/data_type_number_serde.cpp index f4fb6bbbb1f..972668e65fd 100644 --- a/be/src/vec/data_types/serde/data_type_number_serde.cpp +++ b/be/src/vec/data_types/serde/data_type_number_serde.cpp @@ -339,38 +339,22 @@ Status DataTypeNumberSerDe<T>::write_column_to_orc(const std::string& timezone, if constexpr (std::is_same_v<T, Int128>) { // largeint orc::StringVectorBatch* cur_batch = dynamic_cast<orc::StringVectorBatch*>(orc_col_batch); - char* ptr = (char*)malloc(BUFFER_UNIT_SIZE); - if (!ptr) { - return Status::InternalError( - "malloc memory error when write largeint column data to orc file."); - } - StringRef bufferRef; - bufferRef.data = ptr; - bufferRef.size = BUFFER_UNIT_SIZE; - size_t offset = 0; - const size_t begin_off = offset; + INIT_MEMORY_FOR_ORC_WRITER() for (size_t row_id = start; row_id < end; row_id++) { - if (cur_batch->notNull[row_id] == 0) { - continue; - } - std::string value_str = fmt::format("{}", col_data[row_id]); - size_t len = value_str.size(); + if (cur_batch->notNull[row_id] == 1) { + std::string value_str = fmt::format("{}", col_data[row_id]); + size_t len = value_str.size(); - REALLOC_MEMORY_FOR_ORC_WRITER() + REALLOC_MEMORY_FOR_ORC_WRITER() - strcpy(const_cast<char*>(bufferRef.data) + offset, value_str.c_str()); - offset += len; - cur_batch->length[row_id] = len; - } - size_t data_off = 0; - for (size_t row_id = start; row_id < end; row_id++) { - if (cur_batch->notNull[row_id] == 1) { - cur_batch->data[row_id] = const_cast<char*>(bufferRef.data) + begin_off + data_off; - data_off += cur_batch->length[row_id]; + strcpy(const_cast<char*>(bufferRef.data) + offset, value_str.c_str()); + cur_batch->data[row_id] = const_cast<char*>(bufferRef.data) + offset; + cur_batch->length[row_id] = len; + offset += len; } } - buffer_list.emplace_back(bufferRef); + cur_batch->numElements = end - start; } else if constexpr (std::is_same_v<T, Int8> || std::is_same_v<T, UInt8>) { // tinyint/boolean WRITE_INTEGRAL_COLUMN_TO_ORC(orc::ByteVectorBatch) diff --git a/be/src/vec/data_types/serde/data_type_object_serde.cpp b/be/src/vec/data_types/serde/data_type_object_serde.cpp index 383add39354..1b4e1caf4ac 100644 --- a/be/src/vec/data_types/serde/data_type_object_serde.cpp +++ b/be/src/vec/data_types/serde/data_type_object_serde.cpp @@ -19,6 +19,9 @@ #include <rapidjson/stringbuffer.h> +#include <cstdint> +#include <string> + #include "common/exception.h" #include "common/status.h" #include "vec/columns/column.h" @@ -192,6 +195,38 @@ Status DataTypeObjectSerDe::write_one_cell_to_json(const IColumn& column, rapidj return Status::OK(); } +Status DataTypeObjectSerDe::write_column_to_orc(const std::string& timezone, const IColumn& column, + const NullMap* null_map, + orc::ColumnVectorBatch* orc_col_batch, int start, + int end, + std::vector<StringRef>& buffer_list) const { + const auto* var = check_and_get_column<ColumnObject>(column); + orc::StringVectorBatch* cur_batch = dynamic_cast<orc::StringVectorBatch*>(orc_col_batch); + + INIT_MEMORY_FOR_ORC_WRITER() + + for (size_t row_id = start; row_id < end; row_id++) { + if (cur_batch->notNull[row_id] == 1) { + auto serialized_value = std::make_unique<std::string>(); + if (!var->serialize_one_row_to_string(row_id, serialized_value.get())) { + throw doris::Exception(ErrorCode::INTERNAL_ERROR, "Failed to serialize variant {}", + var->dump_structure()); + } + auto len = serialized_value->length(); + + REALLOC_MEMORY_FOR_ORC_WRITER() + + memcpy(const_cast<char*>(bufferRef.data) + offset, serialized_value->data(), len); + cur_batch->data[row_id] = const_cast<char*>(bufferRef.data) + offset; + cur_batch->length[row_id] = len; + offset += len; + } + } + + cur_batch->numElements = end - start; + return Status::OK(); +} + } // namespace vectorized } // namespace doris diff --git a/be/src/vec/data_types/serde/data_type_object_serde.h b/be/src/vec/data_types/serde/data_type_object_serde.h index 314922f8694..41608dc3f85 100644 --- a/be/src/vec/data_types/serde/data_type_object_serde.h +++ b/be/src/vec/data_types/serde/data_type_object_serde.h @@ -89,9 +89,7 @@ public: Status write_column_to_orc(const std::string& timezone, const IColumn& column, const NullMap* null_map, orc::ColumnVectorBatch* orc_col_batch, int start, int end, - std::vector<StringRef>& buffer_list) const override { - return Status::NotSupported("write_column_to_orc with type " + column.get_name()); - } + std::vector<StringRef>& buffer_list) const override; Status write_one_cell_to_json(const IColumn& column, rapidjson::Value& result, rapidjson::Document::AllocatorType& allocator, Arena& mem_pool, diff --git a/be/src/vec/data_types/serde/data_type_quantilestate_serde.h b/be/src/vec/data_types/serde/data_type_quantilestate_serde.h index e91fe0166e1..e24a3a29543 100644 --- a/be/src/vec/data_types/serde/data_type_quantilestate_serde.h +++ b/be/src/vec/data_types/serde/data_type_quantilestate_serde.h @@ -17,6 +17,7 @@ #pragma once +#include <arrow/array/builder_binary.h> #include <gen_cpp/types.pb.h> #include <stddef.h> #include <stdint.h> @@ -32,6 +33,7 @@ #include "vec/columns/column_const.h" #include "vec/common/arena.h" #include "vec/common/string_ref.h" +#include "vec/data_types/serde/data_type_nullable_serde.h" namespace doris { @@ -43,12 +45,23 @@ public: Status serialize_one_cell_to_json(const IColumn& column, int row_num, BufferWritable& bw, FormatOptions& options) const override { - return Status::NotSupported("serialize_one_cell_to_json with type [{}]", column.get_name()); + /** + * For null values in ordinary types, we use \N to represent them; + * for null values in nested types, we use null to represent them, just like the json format. + */ + if (_nesting_level >= 2) { + bw.write(DataTypeNullableSerDe::NULL_IN_COMPLEX_TYPE.c_str(), + strlen(NULL_IN_COMPLEX_TYPE.c_str())); + } else { + bw.write(DataTypeNullableSerDe::NULL_IN_CSV_FOR_ORDINARY_TYPE.c_str(), + strlen(NULL_IN_CSV_FOR_ORDINARY_TYPE.c_str())); + } + return Status::OK(); } Status serialize_column_to_json(const IColumn& column, int start_idx, int end_idx, BufferWritable& bw, FormatOptions& options) const override { - return Status::NotSupported("serialize_column_to_json with type [{}]", column.get_name()); + SERIALIZE_COLUMN_TO_JSON(); } Status deserialize_one_cell_from_json(IColumn& column, Slice& slice, const FormatOptions& options) const override { @@ -102,8 +115,21 @@ public: void write_column_to_arrow(const IColumn& column, const NullMap* null_map, arrow::ArrayBuilder* array_builder, int start, int end, const cctz::time_zone& ctz) const override { - throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR, - "write_column_to_arrow with type " + column.get_name()); + const auto& col = assert_cast<const ColumnQuantileState&>(column); + auto& builder = assert_cast<arrow::BinaryBuilder&>(*array_builder); + for (size_t string_i = start; string_i < end; ++string_i) { + if (null_map && (*null_map)[string_i]) { + checkArrowStatus(builder.AppendNull(), column.get_name(), + array_builder->type()->name()); + } else { + auto& quantile_state_value = const_cast<QuantileState&>(col.get_element(string_i)); + std::string memory_buffer(quantile_state_value.get_serialized_size(), '0'); + quantile_state_value.serialize((uint8_t*)memory_buffer.data()); + checkArrowStatus(builder.Append(memory_buffer.data(), + static_cast<int>(memory_buffer.size())), + column.get_name(), array_builder->type()->name()); + } + } } void read_column_from_arrow(IColumn& column, const arrow::Array* arrow_array, int start, int end, const cctz::time_zone& ctz) const override { @@ -126,7 +152,27 @@ public: const NullMap* null_map, orc::ColumnVectorBatch* orc_col_batch, int start, int end, std::vector<StringRef>& buffer_list) const override { - return Status::NotSupported("write_column_to_orc with type [{}]", column.get_name()); + auto& col_data = assert_cast<const ColumnQuantileState&>(column); + orc::StringVectorBatch* cur_batch = dynamic_cast<orc::StringVectorBatch*>(orc_col_batch); + + INIT_MEMORY_FOR_ORC_WRITER() + + for (size_t row_id = start; row_id < end; row_id++) { + if (cur_batch->notNull[row_id] == 1) { + auto quantilestate_value = const_cast<QuantileState&>(col_data.get_element(row_id)); + size_t len = quantilestate_value.get_serialized_size(); + + REALLOC_MEMORY_FOR_ORC_WRITER() + + quantilestate_value.serialize((uint8_t*)(bufferRef.data) + offset); + cur_batch->data[row_id] = const_cast<char*>(bufferRef.data) + offset; + cur_batch->length[row_id] = len; + offset += len; + } + } + + cur_batch->numElements = end - start; + return Status::OK(); } private: diff --git a/be/src/vec/data_types/serde/data_type_serde.h b/be/src/vec/data_types/serde/data_type_serde.h index 6caa51d2663..105b1bbaedd 100644 --- a/be/src/vec/data_types/serde/data_type_serde.h +++ b/be/src/vec/data_types/serde/data_type_serde.h @@ -75,6 +75,18 @@ struct ColumnVectorBatch; ++*num_deserialized; \ } +#define INIT_MEMORY_FOR_ORC_WRITER() \ + char* ptr = (char*)malloc(BUFFER_UNIT_SIZE); \ + if (!ptr) { \ + return Status::InternalError( \ + "malloc memory error when write largeint column data to orc file."); \ + } \ + StringRef bufferRef; \ + bufferRef.data = ptr; \ + bufferRef.size = BUFFER_UNIT_SIZE; \ + size_t offset = 0; \ + buffer_list.emplace_back(bufferRef); + #define REALLOC_MEMORY_FOR_ORC_WRITER() \ while (bufferRef.size - BUFFER_RESERVED_SIZE < offset + len) { \ char* new_ptr = (char*)malloc(bufferRef.size + BUFFER_UNIT_SIZE); \ diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java index ef65b405853..026e4da29b5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java @@ -36,7 +36,6 @@ import org.apache.doris.common.util.PrintableMap; import org.apache.doris.common.util.Util; import org.apache.doris.datasource.property.PropertyConverter; import org.apache.doris.datasource.property.constants.S3Properties; -import org.apache.doris.qe.ConnectContext; import org.apache.doris.thrift.TFileCompressType; import org.apache.doris.thrift.TFileFormatType; import org.apache.doris.thrift.TParquetCompressionType; @@ -302,11 +301,8 @@ public class OutFileClause { break; case HLL: case BITMAP: - if (!(ConnectContext.get() != null && ConnectContext.get() - .getSessionVariable().isReturnObjectDataAsBinary())) { - break; - } - orcType = "string"; + case QUANTILE_STATE: + orcType = "binary"; break; case DATEV2: orcType = "date"; @@ -327,6 +323,8 @@ public class OutFileClause { case DATE: case DATETIME: case IPV6: + case VARIANT: + case JSONB: orcType = "string"; break; case DECIMALV2: @@ -445,6 +443,8 @@ public class OutFileClause { case DATE: case DATETIME: case IPV6: + case VARIANT: + case JSONB: checkOrcType(schema.second, "string", true, resultType.getPrimitiveType().toString()); break; case DECIMAL32: @@ -455,13 +455,8 @@ public class OutFileClause { break; case HLL: case BITMAP: - if (ConnectContext.get() != null && ConnectContext.get() - .getSessionVariable().isReturnObjectDataAsBinary()) { - checkOrcType(schema.second, "string", true, resultType.getPrimitiveType().toString()); - } else { - throw new AnalysisException("Orc format does not support column type: " - + resultType.getPrimitiveType()); - } + case QUANTILE_STATE: + checkOrcType(schema.second, "binary", true, resultType.getPrimitiveType().toString()); break; case STRUCT: checkOrcType(schema.second, "struct", false, resultType.getPrimitiveType().toString()); diff --git a/regression-test/data/export_p0/outfile/test_outfile_complex_type.out b/regression-test/data/export_p0/outfile/test_outfile_complex_type.out new file mode 100644 index 00000000000..cd6f000b6c5 --- /dev/null +++ b/regression-test/data/export_p0/outfile/test_outfile_complex_type.out @@ -0,0 +1,25 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !select_load_parquet -- +20220201 0 0000004501000000000000F03F 0101675D86AC33FA8CD6 +20220201 1 0000004501000000000000F0BF 01010B3C52B765A11A2F +20220201 2 00000045010000000000000000 0101DDEA60F9C89AA329 +20220201 3 0000004501000000000000F03F 0101EF81F59130F8B748 +20220201 4 00000045010000000000000040 010114CAA737BD54146E +20220201 5 00000045010000000000000840 0101DCBC5BA258F9602C + +-- !select_load_orc -- +20220201 0 0000004501000000000000F03F 0101675D86AC33FA8CD6 +20220201 1 0000004501000000000000F0BF 01010B3C52B765A11A2F +20220201 2 00000045010000000000000000 0101DDEA60F9C89AA329 +20220201 3 0000004501000000000000F03F 0101EF81F59130F8B748 +20220201 4 00000045010000000000000040 010114CAA737BD54146E +20220201 5 00000045010000000000000840 0101DCBC5BA258F9602C + +-- !select_load_csv -- +20220201 0 \N \N \N +20220201 1 \N \N \N +20220201 2 \N \N \N +20220201 3 \N \N \N +20220201 4 \N \N \N +20220201 5 \N \N \N + diff --git a/regression-test/data/export_p0/outfile/test_outfile_jsonb_and_variant.out b/regression-test/data/export_p0/outfile/test_outfile_jsonb_and_variant.out new file mode 100644 index 00000000000..d2583093964 --- /dev/null +++ b/regression-test/data/export_p0/outfile/test_outfile_jsonb_and_variant.out @@ -0,0 +1,25 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !select_load_parquet -- +20220201 0 {"k1":"100"} {"k1":"100"} +20220201 1 {"k1":"100","k2":"123"} {"k1":"100","k2":"123"} +20220201 2 {"k1":"100","abc":"567"} {"abc":"567","k1":"100"} +20220201 3 {"k1":"100","k3":123} {"k1":"100","k3":123} +20220201 4 {"k1":"100","doris":"nereids"} {"doris":"nereids","k1":"100"} +20220201 5 {"k1":"100","doris":"pipeline"} {"doris":"pipeline","k1":"100"} + +-- !select_load_orc -- +20220201 0 {"k1":"100"} {"k1":"100"} +20220201 1 {"k1":"100","k2":"123"} {"k1":"100","k2":"123"} +20220201 2 {"k1":"100","abc":"567"} {"abc":"567","k1":"100"} +20220201 3 {"k1":"100","k3":123} {"k1":"100","k3":123} +20220201 4 {"k1":"100","doris":"nereids"} {"doris":"nereids","k1":"100"} +20220201 5 {"k1":"100","doris":"pipeline"} {"doris":"pipeline","k1":"100"} + +-- !select_load_orc -- +20220201 0 {"k1":"100"} {"k1":"100"} +20220201 1 {"k1":"100","k2":"123"} {"k1":"100","k2":"123"} +20220201 2 {"k1":"100","abc":"567"} {"abc":"567","k1":"100"} +20220201 3 {"k1":"100","k3":123} {"k1":"100","k3":123} +20220201 4 {"k1":"100","doris":"nereids"} {"doris":"nereids","k1":"100"} +20220201 5 {"k1":"100","doris":"pipeline"} {"doris":"pipeline","k1":"100"} + diff --git a/regression-test/suites/export_p0/outfile/test_outfile_complex_type.groovy b/regression-test/suites/export_p0/outfile/test_outfile_complex_type.groovy new file mode 100644 index 00000000000..49f81732791 --- /dev/null +++ b/regression-test/suites/export_p0/outfile/test_outfile_complex_type.groovy @@ -0,0 +1,106 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_outfile_complex_type", "p0") { + String ak = getS3AK() + String sk = getS3SK() + String s3_endpoint = getS3Endpoint() + String region = getS3Region() + String bucket = context.config.otherConfigs.get("s3BucketName"); + + def export_table_name = "test_outfile_complex_type_table" + def outFilePath = "${bucket}/outfile/complex_type/exp_" + + def outfile_to_S3 = { format -> + // select ... into outfile ... + def res = sql """ + SELECT * FROM ${export_table_name} t + INTO OUTFILE "s3://${outFilePath}" + FORMAT AS ${format} + PROPERTIES ( + "s3.endpoint" = "${s3_endpoint}", + "s3.region" = "${region}", + "s3.secret_key"="${sk}", + "s3.access_key" = "${ak}" + ); + """ + + return res[0][3] + } + + sql """ DROP TABLE IF EXISTS ${export_table_name} """ + sql """ + CREATE TABLE `${export_table_name}` ( + `dt` int(11) NULL COMMENT "", + `id` int(11) NULL COMMENT "", + `price` quantile_state QUANTILE_UNION NOT NULL COMMENT "", + `hll_t` hll hll_union, + `device_id` bitmap BITMAP_UNION + ) ENGINE=OLAP + AGGREGATE KEY(`dt`, `id`) + DISTRIBUTED BY HASH(`dt`) + PROPERTIES ( + "replication_num" = "1" + ); + """ + + sql """ + INSERT INTO `${export_table_name}` values + (20220201,0, to_quantile_state(1, 2048), hll_hash(1), to_bitmap(243)), + (20220201,1, to_quantile_state(-1, 2048), hll_hash(2), bitmap_from_array([1,2,3,4,5,434543])), + (20220201,2, to_quantile_state(0, 2048), hll_hash(3), to_bitmap(1234566)), + (20220201,3, to_quantile_state(1, 2048), hll_hash(4), to_bitmap(8888888888888)), + (20220201,4, to_quantile_state(2, 2048), hll_hash(5), to_bitmap(98392819412234)), + (20220201,5, to_quantile_state(3, 2048), hll_hash(6), to_bitmap(253234234)); + """ + + // parquet file format + def format = "parquet" + def outfile_url = outfile_to_S3("${format}") + qt_select_load_parquet """ SELECT dt, id, hex(price), hex(hll_t) FROM S3 ( + "uri" = "http://${bucket}.${s3_endpoint}${outfile_url.substring(5 + bucket.length(), outfile_url.length() - 1)}0.${format}", + "ACCESS_KEY"= "${ak}", + "SECRET_KEY" = "${sk}", + "format" = "${format}", + "region" = "${region}" + ); + """ + + // orc file foramt + format = "orc" + outfile_url = outfile_to_S3("${format}") + qt_select_load_orc """ SELECT dt, id, hex(price), hex(hll_t) FROM S3 ( + "uri" = "http://${bucket}.${s3_endpoint}${outfile_url.substring(5 + bucket.length(), outfile_url.length() - 1)}0.${format}", + "ACCESS_KEY"= "${ak}", + "SECRET_KEY" = "${sk}", + "format" = "${format}", + "region" = "${region}" + ); + """ + + // csv file foramt + format = "csv" + outfile_url = outfile_to_S3("${format}") + qt_select_load_csv """ SELECT * FROM S3 ( + "uri" = "http://${bucket}.${s3_endpoint}${outfile_url.substring(5 + bucket.length(), outfile_url.length() - 1)}0.${format}", + "ACCESS_KEY"= "${ak}", + "SECRET_KEY" = "${sk}", + "format" = "${format}", + "region" = "${region}" + ); + """ +} \ No newline at end of file diff --git a/regression-test/suites/export_p0/outfile/test_outfile_jsonb_and_variant.groovy b/regression-test/suites/export_p0/outfile/test_outfile_jsonb_and_variant.groovy new file mode 100644 index 00000000000..ed3019436ae --- /dev/null +++ b/regression-test/suites/export_p0/outfile/test_outfile_jsonb_and_variant.groovy @@ -0,0 +1,104 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_outfile_jsonb_and_variant", "p0") { + String ak = getS3AK() + String sk = getS3SK() + String s3_endpoint = getS3Endpoint() + String region = getS3Region() + String bucket = context.config.otherConfigs.get("s3BucketName"); + + def export_table_name = "test_outfile_jsonb_and_variant_table" + def outFilePath = "${bucket}/outfile/jsonb_and_variant/exp_" + + def outfile_to_S3 = { format -> + // select ... into outfile ... + def res = sql """ + SELECT * FROM ${export_table_name} t + INTO OUTFILE "s3://${outFilePath}" + FORMAT AS ${format} + PROPERTIES ( + "s3.endpoint" = "${s3_endpoint}", + "s3.region" = "${region}", + "s3.secret_key"="${sk}", + "s3.access_key" = "${ak}" + ); + """ + + return res[0][3] + } + + sql """ DROP TABLE IF EXISTS ${export_table_name} """ + sql """ + CREATE TABLE `${export_table_name}` ( + `dt` int(11) NULL COMMENT "", + `id` int(11) NULL COMMENT "", + `json_col` JSON NULL COMMENT "", + `variant_col` variant NULL COMMENT "" + ) ENGINE=OLAP + DISTRIBUTED BY HASH(`dt`) + PROPERTIES ( + "replication_num" = "1" + ); + """ + + sql """ + INSERT INTO `${export_table_name}` values + (20220201,0, '{"k1": "100"}', '{"k1": "100"}'), + (20220201,1, '{"k1": "100", "k2": "123"}', '{"k1": "100", "k2": "123"}'), + (20220201,2, '{"k1": "100", "abc": "567"}', '{"k1": "100", "abc": "567"}'), + (20220201,3, '{"k1": "100", "k3": 123}', '{"k1": "100", "k3": 123}'), + (20220201,4, '{"k1": "100", "doris": "nereids"}', '{"k1": "100", "doris": "nereids"}'), + (20220201,5, '{"k1": "100", "doris": "pipeline"}', '{"k1": "100", "doris": "pipeline"}'); + """ + + // parquet file format + def format = "parquet" + def outfile_url = outfile_to_S3("${format}") + qt_select_load_parquet """ SELECT * FROM S3 ( + "uri" = "http://${bucket}.${s3_endpoint}${outfile_url.substring(5 + bucket.length(), outfile_url.length() - 1)}0.${format}", + "ACCESS_KEY"= "${ak}", + "SECRET_KEY" = "${sk}", + "format" = "${format}", + "region" = "${region}" + ); + """ + + // orc file foramt + format = "orc" + outfile_url = outfile_to_S3("${format}") + qt_select_load_orc """ SELECT * FROM S3 ( + "uri" = "http://${bucket}.${s3_endpoint}${outfile_url.substring(5 + bucket.length(), outfile_url.length() - 1)}0.${format}", + "ACCESS_KEY"= "${ak}", + "SECRET_KEY" = "${sk}", + "format" = "${format}", + "region" = "${region}" + ); + """ + + // orc file foramt + format = "csv" + outfile_url = outfile_to_S3("${format}") + qt_select_load_orc """ SELECT * FROM S3 ( + "uri" = "http://${bucket}.${s3_endpoint}${outfile_url.substring(5 + bucket.length(), outfile_url.length() - 1)}0.${format}", + "ACCESS_KEY"= "${ak}", + "SECRET_KEY" = "${sk}", + "format" = "${format}", + "region" = "${region}" + ); + """ +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org