This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 6d925054de [feature-wip](parquet-reader) decode parquet time & datetime & decimal (#11845) 6d925054de is described below commit 6d925054de3be67d27893a47939e88a30e60e735 Author: Ashin Gau <ashin...@users.noreply.github.com> AuthorDate: Mon Aug 22 10:15:35 2022 +0800 [feature-wip](parquet-reader) decode parquet time & datetime & decimal (#11845) 1. Spark can set the timestamp precision by the following configuration: spark.sql.parquet.outputTimestampType = INT96(NANOS), TIMESTAMP_MICROS, TIMESTAMP_MILLIS DATETIME V1 only keeps the second precision, DATETIME V2 keeps the microsecond precision. 2. If using DECIMAL V2, the BE saves the value as decimal128, and keeps the precision of decimal as (precision=27, scale=9). DECIMAL V3 can maintain the right precision of decimal --- be/src/exec/schema_scanner.cpp | 7 +- be/src/exec/schema_scanner.h | 3 + be/src/vec/exec/file_hdfs_scanner.cpp | 6 +- be/src/vec/exec/format/parquet/parquet_common.cpp | 166 ++++++++++++-- be/src/vec/exec/format/parquet/parquet_common.h | 248 +++++++++++++++++++++ .../parquet/vparquet_column_chunk_reader.cpp | 36 ++- .../format/parquet/vparquet_column_chunk_reader.h | 11 +- .../exec/format/parquet/vparquet_column_reader.cpp | 6 +- .../exec/format/parquet/vparquet_column_reader.h | 9 +- .../exec/format/parquet/vparquet_group_reader.cpp | 8 +- .../exec/format/parquet/vparquet_group_reader.h | 3 +- .../exec/format/parquet/vparquet_page_reader.cpp | 2 +- be/src/vec/exec/format/parquet/vparquet_reader.cpp | 8 +- be/src/vec/exec/format/parquet/vparquet_reader.h | 3 +- be/src/vec/runtime/vdatetime_value.cpp | 10 +- be/src/vec/runtime/vdatetime_value.h | 2 + be/src/vec/utils/arrow_column_to_doris_column.cpp | 3 +- .../test_data/parquet_scanner/type-decoder.parquet | Bin 3405 -> 4391 bytes be/test/vec/exec/parquet/parquet_thrift_test.cpp | 146 ++++++++++-- 19 files changed, 609 insertions(+), 68 deletions(-) diff --git a/be/src/exec/schema_scanner.cpp b/be/src/exec/schema_scanner.cpp index da5edec413..9071293ff1 100644 --- a/be/src/exec/schema_scanner.cpp +++ b/be/src/exec/schema_scanner.cpp @@ -138,7 +138,12 @@ Status SchemaScanner::create_tuple_desc(ObjectPool* pool) { if (_columns[i].type == TYPE_DECIMALV2) { t_slot_desc.__set_slotType(TypeDescriptor::create_decimalv2_type(27, 9).to_thrift()); } else { - t_slot_desc.__set_slotType(TypeDescriptor(_columns[i].type).to_thrift()); + TypeDescriptor descriptor(_columns[i].type); + if (_columns[i].precision >= 0 && _columns[i].scale >= 0) { + descriptor.precision = _columns[i].precision; + descriptor.scale = _columns[i].scale; + } + t_slot_desc.__set_slotType(descriptor.to_thrift()); } t_slot_desc.__set_colName(_columns[i].name); t_slot_desc.__set_columnPos(i); diff --git a/be/src/exec/schema_scanner.h b/be/src/exec/schema_scanner.h index ea2bc28748..2450da24d4 100644 --- a/be/src/exec/schema_scanner.h +++ b/be/src/exec/schema_scanner.h @@ -63,6 +63,9 @@ public: PrimitiveType type; int size; bool is_null; + /// Only set if type == TYPE_DECIMAL or DATETIMEV2 + int precision = -1; + int scale = -1; }; SchemaScanner(ColumnDesc* columns, int column_num); virtual ~SchemaScanner(); diff --git a/be/src/vec/exec/file_hdfs_scanner.cpp b/be/src/vec/exec/file_hdfs_scanner.cpp index ab6401de8b..6459da6fff 100644 --- a/be/src/vec/exec/file_hdfs_scanner.cpp +++ b/be/src/vec/exec/file_hdfs_scanner.cpp @@ -59,9 +59,9 @@ Status ParquetFileHdfsScanner::_get_next_reader(int _next_range) { std::unique_ptr<FileReader> file_reader; RETURN_IF_ERROR(FileFactory::create_file_reader(_state->exec_env(), _profile, _params, range, file_reader)); - _reader.reset(new ParquetReader(file_reader.release(), _file_slot_descs.size(), - _state->query_options().batch_size, range.start_offset, - range.size)); + _reader.reset(new ParquetReader( + file_reader.release(), _file_slot_descs.size(), _state->query_options().batch_size, + range.start_offset, range.size, const_cast<cctz::time_zone*>(&_state->timezone_obj()))); auto tuple_desc = _state->desc_tbl().get_tuple_descriptor(_tupleId); Status status = _reader->init_reader(tuple_desc, _file_slot_descs, _conjunct_ctxs, _state->timezone()); diff --git a/be/src/vec/exec/format/parquet/parquet_common.cpp b/be/src/vec/exec/format/parquet/parquet_common.cpp index 082d0fc57d..be4ec35223 100644 --- a/be/src/vec/exec/format/parquet/parquet_common.cpp +++ b/be/src/vec/exec/format/parquet/parquet_common.cpp @@ -22,6 +22,16 @@ namespace doris::vectorized { +const cctz::time_zone DecodeParams::utc0 = cctz::utc_time_zone(); + +const uint32_t ParquetInt96::JULIAN_EPOCH_OFFSET_DAYS = 2440588; +const uint64_t ParquetInt96::MICROS_IN_DAY = 86400000000; +const uint64_t ParquetInt96::NANOS_PER_MICROSECOND = 1000; + +inline uint64_t ParquetInt96::to_timestamp_micros() const { + return (hi - JULIAN_EPOCH_OFFSET_DAYS) * MICROS_IN_DAY + lo / NANOS_PER_MICROSECOND; +} + #define FOR_LOGICAL_NUMERIC_TYPES(M) \ M(TypeIndex::Int32, Int32) \ M(TypeIndex::UInt32, UInt32) \ @@ -62,6 +72,44 @@ Status Decoder::get_decoder(tparquet::Type::type type, tparquet::Encoding::type return Status::OK(); } +void Decoder::init(FieldSchema* field_schema, cctz::time_zone* ctz) { + _field_schema = field_schema; + if (_decode_params == nullptr) { + _decode_params.reset(new DecodeParams()); + } + if (ctz != nullptr) { + _decode_params->ctz = ctz; + } + const auto& schema = field_schema->parquet_schema; + if (schema.__isset.logicalType && schema.logicalType.__isset.TIMESTAMP) { + const auto& timestamp_info = schema.logicalType.TIMESTAMP; + if (!timestamp_info.isAdjustedToUTC) { + // should set timezone to utc+0 + _decode_params->ctz = const_cast<cctz::time_zone*>(&_decode_params->utc0); + } + const auto& time_unit = timestamp_info.unit; + if (time_unit.__isset.MILLIS) { + _decode_params->second_mask = 1000; + _decode_params->scale_to_nano_factor = 1000000; + } else if (time_unit.__isset.MICROS) { + _decode_params->second_mask = 1000000; + _decode_params->scale_to_nano_factor = 1000; + } else if (time_unit.__isset.NANOS) { + _decode_params->second_mask = 1000000000; + _decode_params->scale_to_nano_factor = 1; + } + } else if (schema.__isset.converted_type) { + const auto& converted_type = schema.converted_type; + if (converted_type == tparquet::ConvertedType::TIMESTAMP_MILLIS) { + _decode_params->second_mask = 1000; + _decode_params->scale_to_nano_factor = 1000000; + } else if (converted_type == tparquet::ConvertedType::TIMESTAMP_MICROS) { + _decode_params->second_mask = 1000000; + _decode_params->scale_to_nano_factor = 1000; + } + } +} + Status Decoder::decode_values(ColumnPtr& doris_column, DataTypePtr& data_type, size_t num_values) { CHECK(doris_column->is_nullable()); auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>( @@ -72,9 +120,6 @@ Status Decoder::decode_values(ColumnPtr& doris_column, DataTypePtr& data_type, s Status PlainDecoder::decode_values(Slice& slice, size_t num_values) { size_t to_read_bytes = _type_length * num_values; - if (UNLIKELY(_offset + to_read_bytes > _data->size)) { - return Status::IOError("Out-of-bounds access in parquet data decoder"); - } if (UNLIKELY(to_read_bytes > slice.size)) { return Status::IOError("Slice does not have enough space to write out the decoding data"); } @@ -93,8 +138,8 @@ Status PlainDecoder::skip_values(size_t num_values) { Status PlainDecoder::_decode_short_int(MutableColumnPtr& doris_column, size_t num_values, size_t real_length) { - if (UNLIKELY(_offset + _type_length * num_values > _data->size)) { - return Status::IOError("Out-of-bounds access in parquet data decoder"); + if (UNLIKELY(_physical_type != tparquet::Type::INT32)) { + return Status::InternalError("Short int can only be decoded from INT32"); } for (int i = 0; i < num_values; ++i) { doris_column->insert_data(_data->data + _offset, real_length); @@ -105,6 +150,9 @@ Status PlainDecoder::_decode_short_int(MutableColumnPtr& doris_column, size_t nu Status PlainDecoder::decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type, size_t num_values) { + if (UNLIKELY(_offset + _type_length * num_values > _data->size)) { + return Status::IOError("Out-of-bounds access in parquet data decoder"); + } TypeIndex logical_type = remove_nullable(data_type)->get_type_id(); switch (logical_type) { case TypeIndex::Int8: @@ -118,6 +166,74 @@ Status PlainDecoder::decode_values(MutableColumnPtr& doris_column, DataTypePtr& return _decode_numeric<CPP_NUMERIC_TYPE>(doris_column, num_values); FOR_LOGICAL_NUMERIC_TYPES(DISPATCH) #undef DISPATCH + case TypeIndex::Date: + if (_physical_type == tparquet::Type::INT32) { + return _decode_date<VecDateTimeValue, Int64>(doris_column, logical_type, num_values); + } + break; + case TypeIndex::DateV2: + if (_physical_type == tparquet::Type::INT32) { + return _decode_date<DateV2Value<DateV2ValueType>, UInt32>(doris_column, logical_type, + num_values); + } + break; + case TypeIndex::DateTime: + if (_physical_type == tparquet::Type::INT96) { + return _decode_datetime96<VecDateTimeValue, Int64>(doris_column, logical_type, + num_values); + } else if (_physical_type == tparquet::Type::INT64) { + return _decode_datetime64<VecDateTimeValue, Int64>(doris_column, logical_type, + num_values); + } + break; + case TypeIndex::DateTimeV2: + // Spark can set the timestamp precision by the following configuration: + // spark.sql.parquet.outputTimestampType = INT96(NANOS), TIMESTAMP_MICROS, TIMESTAMP_MILLIS + if (_physical_type == tparquet::Type::INT96) { + return _decode_datetime96<DateV2Value<DateTimeV2ValueType>, UInt64>( + doris_column, logical_type, num_values); + } else if (_physical_type == tparquet::Type::INT64) { + return _decode_datetime64<DateV2Value<DateTimeV2ValueType>, UInt64>( + doris_column, logical_type, num_values); + } + break; + case TypeIndex::Decimal32: + if (_physical_type == tparquet::Type::FIXED_LEN_BYTE_ARRAY) { + return _decode_binary_decimal<Int32>(doris_column, data_type, num_values); + } else if (_physical_type == tparquet::Type::INT32) { + return _decode_primitive_decimal<Int32, Int32>(doris_column, data_type, num_values); + } else if (_physical_type == tparquet::Type::INT64) { + return _decode_primitive_decimal<Int32, Int64>(doris_column, data_type, num_values); + } + break; + case TypeIndex::Decimal64: + if (_physical_type == tparquet::Type::FIXED_LEN_BYTE_ARRAY) { + return _decode_binary_decimal<Int64>(doris_column, data_type, num_values); + } else if (_physical_type == tparquet::Type::INT32) { + return _decode_primitive_decimal<Int64, Int32>(doris_column, data_type, num_values); + } else if (_physical_type == tparquet::Type::INT64) { + return _decode_primitive_decimal<Int64, Int64>(doris_column, data_type, num_values); + } + break; + case TypeIndex::Decimal128: + if (_physical_type == tparquet::Type::FIXED_LEN_BYTE_ARRAY) { + return _decode_binary_decimal<Int128>(doris_column, data_type, num_values); + } else if (_physical_type == tparquet::Type::INT32) { + return _decode_primitive_decimal<Int128, Int32>(doris_column, data_type, num_values); + } else if (_physical_type == tparquet::Type::INT64) { + return _decode_primitive_decimal<Int128, Int64>(doris_column, data_type, num_values); + } + break; + case TypeIndex::String: + case TypeIndex::FixedString: + if (_physical_type == tparquet::Type::FIXED_LEN_BYTE_ARRAY) { + for (int i = 0; i < num_values; ++i) { + doris_column->insert_data(_data->data + _offset, _type_length); + _offset += _type_length; + } + return Status::OK(); + } + break; default: break; } @@ -165,20 +281,36 @@ Status ByteArrayPlainDecoder::skip_values(size_t num_values) { Status ByteArrayPlainDecoder::decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type, size_t num_values) { - for (int i = 0; i < num_values; ++i) { - if (UNLIKELY(_offset + 4 > _data->size)) { - return Status::IOError("Can't read byte array length from plain decoder"); - } - uint32_t length = - decode_fixed32_le(reinterpret_cast<const uint8_t*>(_data->data) + _offset); - _offset += 4; - if (UNLIKELY(_offset + length) > _data->size) { - return Status::IOError("Can't read enough bytes in plain decoder"); + TypeIndex logical_type = remove_nullable(data_type)->get_type_id(); + switch (logical_type) { + case TypeIndex::String: + case TypeIndex::FixedString: + for (int i = 0; i < num_values; ++i) { + if (UNLIKELY(_offset + 4 > _data->size)) { + return Status::IOError("Can't read byte array length from plain decoder"); + } + uint32_t length = + decode_fixed32_le(reinterpret_cast<const uint8_t*>(_data->data) + _offset); + _offset += 4; + if (UNLIKELY(_offset + length) > _data->size) { + return Status::IOError("Can't read enough bytes in plain decoder"); + } + doris_column->insert_data(_data->data + _offset, length); + _offset += length; } - doris_column->insert_data(_data->data + _offset, length); - _offset += length; + return Status::OK(); + case TypeIndex::Decimal32: + return _decode_binary_decimal<Int32>(doris_column, data_type, num_values); + case TypeIndex::Decimal64: + return _decode_binary_decimal<Int64>(doris_column, data_type, num_values); + case TypeIndex::Decimal128: + return _decode_binary_decimal<Int128>(doris_column, data_type, num_values); + default: + break; } - return Status::OK(); + return Status::InvalidArgument( + "Can't decode parquet physical type BYTE_ARRAY to doris logical type {}", + getTypeName(data_type->get_type_id())); } Status BoolPlainDecoder::decode_values(Slice& slice, size_t num_values) { diff --git a/be/src/vec/exec/format/parquet/parquet_common.h b/be/src/vec/exec/format/parquet/parquet_common.h index f0aed43288..3620721ddd 100644 --- a/be/src/vec/exec/format/parquet/parquet_common.h +++ b/be/src/vec/exec/format/parquet/parquet_common.h @@ -21,16 +21,67 @@ #include "common/status.h" #include "gen_cpp/parquet_types.h" +#include "gutil/endian.h" +#include "schema_desc.h" #include "util/bit_stream_utils.inline.h" +#include "util/coding.h" #include "vec/columns/column_array.h" #include "vec/columns/column_nullable.h" #include "vec/columns/column_string.h" +#include "vec/common/int_exp.h" #include "vec/data_types/data_type.h" +#include "vec/data_types/data_type_decimal.h" +#include "vec/data_types/data_type_nullable.h" namespace doris::vectorized { using level_t = int16_t; +struct ParquetInt96 { + uint64_t lo; // time of nanoseconds in a day + uint32_t hi; // days from julian epoch + + inline uint64_t to_timestamp_micros() const; + + static const uint32_t JULIAN_EPOCH_OFFSET_DAYS; + static const uint64_t MICROS_IN_DAY; + static const uint64_t NANOS_PER_MICROSECOND; +}; + +struct DecimalScaleParams { + enum ScaleType { + NOT_INIT, + NO_SCALE, + SCALE_UP, + SCALE_DOWN, + }; + ScaleType scale_type = ScaleType::NOT_INIT; + int32_t scale_factor = 1; + + template <typename DecimalPrimitiveType> + static inline constexpr DecimalPrimitiveType get_scale_factor(int32_t n) { + if constexpr (std::is_same_v<DecimalPrimitiveType, Int32>) { + return common::exp10_i32(n); + } else if constexpr (std::is_same_v<DecimalPrimitiveType, Int64>) { + return common::exp10_i64(n); + } else if constexpr (std::is_same_v<DecimalPrimitiveType, Int128>) { + return common::exp10_i128(n); + } else { + return DecimalPrimitiveType(1); + } + } +}; + +struct DecodeParams { + // schema.logicalType.TIMESTAMP.isAdjustedToUTC == false + static const cctz::time_zone utc0; + // schema.logicalType.TIMESTAMP.isAdjustedToUTC == true, we should set the time zone + cctz::time_zone* ctz = nullptr; + int64_t second_mask = 1; + int64_t scale_to_nano_factor = 1; + DecimalScaleParams decimal_scale; +}; + class Decoder { public: Decoder() = default; @@ -48,6 +99,11 @@ public: _offset = 0; } + void init(FieldSchema* field_schema, cctz::time_zone* ctz); + + template <typename DecimalPrimitiveType> + void init_decimal_converter(DataTypePtr& data_type); + // Write the decoded values batch to doris's column Status decode_values(ColumnPtr& doris_column, DataTypePtr& data_type, size_t num_values); @@ -62,8 +118,34 @@ protected: int32_t _type_length; Slice* _data = nullptr; uint32_t _offset = 0; + FieldSchema* _field_schema = nullptr; + std::unique_ptr<DecodeParams> _decode_params = nullptr; }; +template <typename DecimalPrimitiveType> +void Decoder::init_decimal_converter(DataTypePtr& data_type) { + if (_decode_params == nullptr || _field_schema == nullptr || + _decode_params->decimal_scale.scale_type != DecimalScaleParams::NOT_INIT) { + return; + } + auto scale = _field_schema->parquet_schema.scale; + auto* decimal_type = reinterpret_cast<DataTypeDecimal<Decimal<DecimalPrimitiveType>>*>( + const_cast<IDataType*>(remove_nullable(data_type).get())); + auto dest_scale = decimal_type->get_scale(); + if (dest_scale > scale) { + _decode_params->decimal_scale.scale_type = DecimalScaleParams::SCALE_UP; + _decode_params->decimal_scale.scale_factor = + DecimalScaleParams::get_scale_factor<DecimalPrimitiveType>(dest_scale - scale); + } else if (dest_scale < scale) { + _decode_params->decimal_scale.scale_type = DecimalScaleParams::SCALE_DOWN; + _decode_params->decimal_scale.scale_factor = + DecimalScaleParams::get_scale_factor<DecimalPrimitiveType>(scale - dest_scale); + } else { + _decode_params->decimal_scale.scale_type = DecimalScaleParams::NO_SCALE; + _decode_params->decimal_scale.scale_factor = 1; + } +} + class PlainDecoder final : public Decoder { public: PlainDecoder(tparquet::Type::type physical_type) : _physical_type(physical_type) {}; @@ -92,9 +174,138 @@ protected: return Status::OK(); } + template <typename CppType, typename ColumnType> + Status _decode_date(MutableColumnPtr& doris_column, TypeIndex& logical_type, size_t num_values); + + template <typename CppType, typename ColumnType> + Status _decode_datetime64(MutableColumnPtr& doris_column, TypeIndex& logical_type, + size_t num_values); + + template <typename CppType, typename ColumnType> + Status _decode_datetime96(MutableColumnPtr& doris_column, TypeIndex& logical_type, + size_t num_values); + + template <typename DecimalPrimitiveType> + Status _decode_binary_decimal(MutableColumnPtr& doris_column, DataTypePtr& data_type, + size_t num_values); + + template <typename DecimalPrimitiveType, typename DecimalPhysicalType> + Status _decode_primitive_decimal(MutableColumnPtr& doris_column, DataTypePtr& data_type, + size_t num_values); + tparquet::Type::type _physical_type; }; +template <typename CppType, typename ColumnType> +Status PlainDecoder::_decode_date(MutableColumnPtr& doris_column, TypeIndex& logical_type, + size_t num_values) { + auto& column_data = static_cast<ColumnVector<ColumnType>&>(*doris_column).get_data(); + for (int i = 0; i < num_values; ++i) { + int64_t date_value = + static_cast<int64_t>(*reinterpret_cast<int32_t*>(_data->data + _offset)); + CppType v; + v.from_unixtime(date_value * 24 * 60 * 60, *_decode_params->ctz); // day to seconds + if constexpr (std::is_same_v<CppType, VecDateTimeValue>) { + // we should cast to date if using date v1. + v.cast_to_date(); + } + ColumnType& cast_value = *reinterpret_cast<ColumnType*>(&v); + column_data.emplace_back(cast_value); + _offset += _type_length; + } + return Status::OK(); +} + +template <typename CppType, typename ColumnType> +Status PlainDecoder::_decode_datetime64(MutableColumnPtr& doris_column, TypeIndex& logical_type, + size_t num_values) { + auto& column_data = static_cast<ColumnVector<ColumnType>&>(*doris_column).get_data(); + for (int i = 0; i < num_values; i++) { + int64_t& date_value = *reinterpret_cast<int64_t*>(_data->data + _offset); + CppType v; + v.from_unixtime(date_value / _decode_params->second_mask, *_decode_params->ctz); + if constexpr (std::is_same_v<CppType, DateV2Value<DateTimeV2ValueType>>) { + // nanoseconds will be ignored. + v.set_microsecond((date_value % _decode_params->second_mask) * + _decode_params->scale_to_nano_factor / 1000); + } + ColumnType& cast_value = *reinterpret_cast<ColumnType*>(&v); + column_data.emplace_back(cast_value); + _offset += _type_length; + } + return Status::OK(); +} + +template <typename CppType, typename ColumnType> +Status PlainDecoder::_decode_datetime96(MutableColumnPtr& doris_column, TypeIndex& logical_type, + size_t num_values) { + auto& column_data = static_cast<ColumnVector<ColumnType>&>(*doris_column).get_data(); + for (int i = 0; i < num_values; ++i) { + ParquetInt96& datetime96 = *reinterpret_cast<ParquetInt96*>(_data->data + _offset); + CppType v; + int64_t micros = datetime96.to_timestamp_micros(); + v.from_unixtime(micros / 1000000, *_decode_params->ctz); + if constexpr (std::is_same_v<CppType, DateV2Value<DateTimeV2ValueType>>) { + // spark.sql.parquet.outputTimestampType = INT96(NANOS) will lost precision. + // only keep microseconds. + v.set_microsecond(micros % 1000000); + } + ColumnType& cast_value = *reinterpret_cast<ColumnType*>(&v); + column_data.emplace_back(cast_value); + _offset += _type_length; + } + return Status::OK(); +} + +template <typename DecimalPrimitiveType> +Status PlainDecoder::_decode_binary_decimal(MutableColumnPtr& doris_column, DataTypePtr& data_type, + size_t num_values) { + init_decimal_converter<DecimalPrimitiveType>(data_type); + auto& column_data = + static_cast<ColumnDecimal<Decimal<DecimalPrimitiveType>>&>(*doris_column).get_data(); + DecimalScaleParams& scale_params = _decode_params->decimal_scale; + for (int i = 0; i < num_values; ++i) { + char* buf_start = _data->data + _offset; + // When Decimal in parquet is stored in byte arrays, binary and fixed, + // the unscaled number must be encoded as two's complement using big-endian byte order. + Int128 value = buf_start[0] & 0x80 ? -1 : 0; + memcpy(reinterpret_cast<char*>(&value) + sizeof(Int128) - _type_length, buf_start, + _type_length); + value = BigEndian::ToHost128(value); + if (scale_params.scale_type == DecimalScaleParams::SCALE_UP) { + value *= scale_params.scale_factor; + } else if (scale_params.scale_type == DecimalScaleParams::SCALE_UP) { + value /= scale_params.scale_factor; + } + DecimalPrimitiveType cast_value(value); + column_data.emplace_back(*reinterpret_cast<Decimal<DecimalPrimitiveType>*>(&cast_value)); + _offset += _type_length; + } + return Status::OK(); +} + +template <typename DecimalPrimitiveType, typename DecimalPhysicalType> +Status PlainDecoder::_decode_primitive_decimal(MutableColumnPtr& doris_column, + DataTypePtr& data_type, size_t num_values) { + init_decimal_converter<DecimalPrimitiveType>(data_type); + auto& column_data = + static_cast<ColumnDecimal<Decimal<DecimalPrimitiveType>>&>(*doris_column).get_data(); + DecimalScaleParams& scale_params = _decode_params->decimal_scale; + for (int i = 0; i < num_values; ++i) { + // we should use decimal128 to scale up/down + Int128 value = *reinterpret_cast<DecimalPhysicalType*>(_data->data + _offset); + if (scale_params.scale_type == DecimalScaleParams::SCALE_UP) { + value *= scale_params.scale_factor; + } else if (scale_params.scale_type == DecimalScaleParams::SCALE_UP) { + value /= scale_params.scale_factor; + } + DecimalPrimitiveType cast_value(value); + column_data.emplace_back(*reinterpret_cast<Decimal<DecimalPrimitiveType>*>(&cast_value)); + _offset += _type_length; + } + return Status::OK(); +} + class ByteArrayPlainDecoder final : public Decoder { public: ByteArrayPlainDecoder() = default; @@ -106,8 +317,45 @@ public: Status decode_values(Slice& slice, size_t num_values) override; Status skip_values(size_t num_values) override; + +protected: + template <typename DecimalPrimitiveType> + Status _decode_binary_decimal(MutableColumnPtr& doris_column, DataTypePtr& data_type, + size_t num_values); }; +template <typename DecimalPrimitiveType> +Status ByteArrayPlainDecoder::_decode_binary_decimal(MutableColumnPtr& doris_column, + DataTypePtr& data_type, size_t num_values) { + init_decimal_converter<DecimalPrimitiveType>(data_type); + auto& column_data = + static_cast<ColumnDecimal<Decimal<DecimalPrimitiveType>>&>(*doris_column).get_data(); + DecimalScaleParams& scale_params = _decode_params->decimal_scale; + for (int i = 0; i < num_values; ++i) { + if (UNLIKELY(_offset + 4 > _data->size)) { + return Status::IOError("Can't read byte array length from plain decoder"); + } + uint32_t length = + decode_fixed32_le(reinterpret_cast<const uint8_t*>(_data->data) + _offset); + _offset += 4; + char* buf_start = _data->data + _offset; + // When Decimal in parquet is stored in byte arrays, binary and fixed, + // the unscaled number must be encoded as two's complement using big-endian byte order. + Int128 value = buf_start[0] & 0x80 ? -1 : 0; + memcpy(reinterpret_cast<char*>(&value) + sizeof(Int128) - length, buf_start, length); + value = BigEndian::ToHost128(value); + if (scale_params.scale_type == DecimalScaleParams::SCALE_UP) { + value *= scale_params.scale_factor; + } else if (scale_params.scale_type == DecimalScaleParams::SCALE_UP) { + value /= scale_params.scale_factor; + } + DecimalPrimitiveType cast_value(value); + column_data.emplace_back(*reinterpret_cast<Decimal<DecimalPrimitiveType>*>(&cast_value)); + _offset += length; + } + return Status::OK(); +} + /// Decoder bit-packed boolean-encoded values. /// Implementation from https://github.com/apache/impala/blob/master/be/src/exec/parquet/parquet-bool-decoder.h class BoolPlainDecoder final : public Decoder { diff --git a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp index 0cb4e8229c..778e7b1a66 100644 --- a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp @@ -20,12 +20,14 @@ namespace doris::vectorized { ColumnChunkReader::ColumnChunkReader(BufferedStreamReader* reader, - tparquet::ColumnChunk* column_chunk, FieldSchema* field_schema) + tparquet::ColumnChunk* column_chunk, FieldSchema* field_schema, + cctz::time_zone* ctz) : _field_schema(field_schema), _max_rep_level(field_schema->repetition_level), _max_def_level(field_schema->definition_level), _stream_reader(reader), - _metadata(column_chunk->meta_data) {} + _metadata(column_chunk->meta_data), + _ctz(ctz) {} Status ColumnChunkReader::init() { size_t start_offset = _metadata.__isset.dictionary_page_offset @@ -34,10 +36,14 @@ Status ColumnChunkReader::init() { size_t chunk_size = _metadata.total_compressed_size; _page_reader = std::make_unique<PageReader>(_stream_reader, start_offset, chunk_size); if (_metadata.__isset.dictionary_page_offset) { + // seek to the directory page + _page_reader->seek_to_page(_metadata.dictionary_page_offset); + RETURN_IF_ERROR(_page_reader->next_page_header()); RETURN_IF_ERROR(_decode_dict_page()); + } else { + // seek to the first data page + _page_reader->seek_to_page(_metadata.data_page_offset); } - // seek to the first data page - _page_reader->seek_to_page(_metadata.data_page_offset); // get the block compression codec RETURN_IF_ERROR(get_block_compression_codec(_metadata.codec, _block_compress_codec)); return Status::OK(); @@ -45,8 +51,16 @@ Status ColumnChunkReader::init() { Status ColumnChunkReader::next_page() { RETURN_IF_ERROR(_page_reader->next_page_header()); - _remaining_num_values = _page_reader->get_page_header()->data_page_header.num_values; - return Status::OK(); + if (_page_reader->get_page_header()->type == tparquet::PageType::DICTIONARY_PAGE) { + // the first page maybe directory page even if _metadata.__isset.dictionary_page_offset == false, + // so we should parse the directory page in next_page() + RETURN_IF_ERROR(_decode_dict_page()); + // parse the real first data page + return next_page(); + } else { + _remaining_num_values = _page_reader->get_page_header()->data_page_header.num_values; + return Status::OK(); + } } Status ColumnChunkReader::load_page_data() { @@ -89,20 +103,20 @@ Status ColumnChunkReader::load_page_data() { } else { std::unique_ptr<Decoder> page_decoder; Decoder::get_decoder(_metadata.type, encoding, page_decoder); + // Set type length + page_decoder->set_type_length(_get_type_length()); + // Initialize the time convert context + page_decoder->init(_field_schema, _ctz); _decoders[static_cast<int>(encoding)] = std::move(page_decoder); _page_decoder = _decoders[static_cast<int>(encoding)].get(); } + // Reset page data for each page _page_decoder->set_data(&_page_data); - // Set type length - _page_decoder->set_type_length(_get_type_length()); return Status::OK(); } Status ColumnChunkReader::_decode_dict_page() { - int64_t dict_offset = _metadata.dictionary_page_offset; - _page_reader->seek_to_page(dict_offset); - _page_reader->next_page_header(); const tparquet::PageHeader& header = *_page_reader->get_page_header(); DCHECK_EQ(tparquet::PageType::DICTIONARY_PAGE, header.type); // TODO(gaoxin): decode dictionary page diff --git a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h index b248ba0a51..bc3fcedbe1 100644 --- a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h @@ -42,8 +42,7 @@ namespace doris::vectorized { * ColumnChunkReader chunk_reader(BufferedStreamReader* reader, * tparquet::ColumnChunk* column_chunk, * FieldSchema* fieldSchema); - * // Initialize chunk reader, we can set the type length if the length of column type is fixed. - * // If not set, default value = -1, then the decoder will infer the type length. + * // Initialize chunk reader * chunk_reader.init(); * while (chunk_reader.has_next_page()) { * // Seek to next page header. Only read and parse the page header, not page data. @@ -59,7 +58,7 @@ namespace doris::vectorized { class ColumnChunkReader { public: ColumnChunkReader(BufferedStreamReader* reader, tparquet::ColumnChunk* column_chunk, - FieldSchema* field_schema); + FieldSchema* field_schema, cctz::time_zone* ctz); ~ColumnChunkReader() = default; // Initialize chunk reader, will generate the decoder and codec. @@ -86,7 +85,7 @@ public: Status load_page_data(); // The remaining number of values in current page(including null values). Decreased when reading or skipping. uint32_t remaining_num_values() const { return _remaining_num_values; }; - // null values are not analyzing from definition levels + // null values are generated from definition levels // the caller should maintain the consistency after analyzing null values from definition levels. void dec_num_values(uint32_t dec_num) { _remaining_num_values -= dec_num; }; // Get the raw data of current page. @@ -108,6 +107,9 @@ public: // Get the definition level decoder of current page. LevelDecoder& def_level_decoder() { return _def_level_decoder; } + // Get page decoder + Decoder* get_page_decoder() { return _page_decoder; } + private: Status _decode_dict_page(); void _reserve_decompress_buf(size_t size); @@ -122,6 +124,7 @@ private: // tparquet::ColumnChunk* _column_chunk; tparquet::ColumnMetaData& _metadata; // FieldSchema* _field_schema; + cctz::time_zone* _ctz; std::unique_ptr<PageReader> _page_reader = nullptr; std::unique_ptr<BlockCompressionCodec> _block_compress_codec = nullptr; diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp index 3daf80e7c8..f15a7d5623 100644 --- a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp @@ -29,7 +29,7 @@ namespace doris::vectorized { Status ParquetColumnReader::create(FileReader* file, FieldSchema* field, const ParquetReadColumn& column, const tparquet::RowGroup& row_group, - std::vector<RowRange>& row_ranges, + std::vector<RowRange>& row_ranges, cctz::time_zone* ctz, std::unique_ptr<ParquetColumnReader>& reader) { if (field->type.type == TYPE_MAP || field->type.type == TYPE_STRUCT) { return Status::Corruption("not supported type"); @@ -38,7 +38,7 @@ Status ParquetColumnReader::create(FileReader* file, FieldSchema* field, return Status::Corruption("not supported array type yet"); } else { tparquet::ColumnChunk chunk = row_group.columns[field->physical_column_index]; - ScalarColumnReader* scalar_reader = new ScalarColumnReader(column); + ScalarColumnReader* scalar_reader = new ScalarColumnReader(column, ctz); scalar_reader->init_column_metadata(chunk); RETURN_IF_ERROR(scalar_reader->init(file, field, &chunk, row_ranges)); reader.reset(scalar_reader); @@ -62,7 +62,7 @@ Status ScalarColumnReader::init(FileReader* file, FieldSchema* field, tparquet:: _stream_reader = new BufferedFileStreamReader(file, _metadata->start_offset(), _metadata->size()); _row_ranges = &row_ranges; - _chunk_reader.reset(new ColumnChunkReader(_stream_reader, chunk, field)); + _chunk_reader.reset(new ColumnChunkReader(_stream_reader, chunk, field, _ctz)); RETURN_IF_ERROR(_chunk_reader->init()); RETURN_IF_ERROR(_chunk_reader->next_page()); if (_row_ranges->size() != 0) { diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.h b/be/src/vec/exec/format/parquet/vparquet_column_reader.h index 6c6a0e4013..7014b92430 100644 --- a/be/src/vec/exec/format/parquet/vparquet_column_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.h @@ -49,7 +49,8 @@ private: class ParquetColumnReader { public: - ParquetColumnReader(const ParquetReadColumn& column) : _column(column) {}; + ParquetColumnReader(const ParquetReadColumn& column, cctz::time_zone* ctz) + : _column(column), _ctz(ctz) {}; virtual ~ParquetColumnReader() { if (_stream_reader != nullptr) { delete _stream_reader; @@ -60,7 +61,7 @@ public: size_t* read_rows, bool* eof) = 0; static Status create(FileReader* file, FieldSchema* field, const ParquetReadColumn& column, const tparquet::RowGroup& row_group, std::vector<RowRange>& row_ranges, - std::unique_ptr<ParquetColumnReader>& reader); + cctz::time_zone* ctz, std::unique_ptr<ParquetColumnReader>& reader); void init_column_metadata(const tparquet::ColumnChunk& chunk); virtual void close() = 0; @@ -72,11 +73,13 @@ protected: BufferedFileStreamReader* _stream_reader; std::unique_ptr<ParquetColumnMetadata> _metadata; std::vector<RowRange>* _row_ranges; + cctz::time_zone* _ctz; }; class ScalarColumnReader : public ParquetColumnReader { public: - ScalarColumnReader(const ParquetReadColumn& column) : ParquetColumnReader(column) {}; + ScalarColumnReader(const ParquetReadColumn& column, cctz::time_zone* ctz) + : ParquetColumnReader(column, ctz) {}; ~ScalarColumnReader() override { close(); }; Status init(FileReader* file, FieldSchema* field, tparquet::ColumnChunk* chunk, std::vector<RowRange>& row_ranges); diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp index 0ac58ce6ed..7443434cfb 100644 --- a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp @@ -25,12 +25,14 @@ namespace doris::vectorized { RowGroupReader::RowGroupReader(doris::FileReader* file_reader, const std::vector<ParquetReadColumn>& read_columns, - const int32_t row_group_id, tparquet::RowGroup& row_group) + const int32_t row_group_id, tparquet::RowGroup& row_group, + cctz::time_zone* ctz) : _file_reader(file_reader), _read_columns(read_columns), _row_group_id(row_group_id), _row_group_meta(row_group), - _total_rows(row_group.num_rows) {} + _total_rows(row_group.num_rows), + _ctz(ctz) {} RowGroupReader::~RowGroupReader() { _column_readers.clear(); @@ -50,7 +52,7 @@ Status RowGroupReader::_init_column_readers(const FieldDescriptor& schema, auto field = const_cast<FieldSchema*>(schema.get_column(slot_desc->col_name())); std::unique_ptr<ParquetColumnReader> reader; RETURN_IF_ERROR(ParquetColumnReader::create(_file_reader, field, read_col, _row_group_meta, - row_ranges, reader)); + row_ranges, _ctz, reader)); if (reader == nullptr) { VLOG_DEBUG << "Init row group reader failed"; return Status::Corruption("Init row group reader failed"); diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.h b/be/src/vec/exec/format/parquet/vparquet_group_reader.h index ea9eeed342..5ed99cd4e3 100644 --- a/be/src/vec/exec/format/parquet/vparquet_group_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.h @@ -33,7 +33,7 @@ class RowGroupReader { public: RowGroupReader(doris::FileReader* file_reader, const std::vector<ParquetReadColumn>& read_columns, const int32_t _row_group_id, - tparquet::RowGroup& row_group); + tparquet::RowGroup& row_group, cctz::time_zone* ctz); ~RowGroupReader(); Status init(const FieldDescriptor& schema, std::vector<RowRange>& row_ranges); Status next_batch(Block* block, size_t batch_size, bool* _batch_eof); @@ -49,5 +49,6 @@ private: tparquet::RowGroup& _row_group_meta; int64_t _read_rows = 0; int64_t _total_rows; + cctz::time_zone* _ctz; }; } // namespace doris::vectorized diff --git a/be/src/vec/exec/format/parquet/vparquet_page_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_page_reader.cpp index 94a291f40e..fa2631e287 100644 --- a/be/src/vec/exec/format/parquet/vparquet_page_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_page_reader.cpp @@ -23,7 +23,7 @@ namespace doris::vectorized { -static constexpr size_t initPageHeaderSize = 1024; +static constexpr size_t initPageHeaderSize = 128; PageReader::PageReader(BufferedStreamReader* reader, uint64_t offset, uint64_t length) : _reader(reader), _start_offset(offset), _end_offset(offset + length) {} diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_reader.cpp index 13fb9e8cad..0127aae1d2 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp @@ -21,11 +21,13 @@ namespace doris::vectorized { ParquetReader::ParquetReader(FileReader* file_reader, int32_t num_of_columns_from_file, - size_t batch_size, int64_t range_start_offset, int64_t range_size) + size_t batch_size, int64_t range_start_offset, int64_t range_size, + cctz::time_zone* ctz) : _num_of_columns_from_file(num_of_columns_from_file), _batch_size(batch_size), _range_start_offset(range_start_offset), - _range_size(range_size) { + _range_size(range_size), + _ctz(ctz) { _file_reader = file_reader; _total_groups = 0; _current_row_group_id = 0; @@ -129,7 +131,7 @@ Status ParquetReader::_init_row_group_readers(const TupleDescriptor* tuple_desc, } std::shared_ptr<RowGroupReader> row_group_reader; row_group_reader.reset( - new RowGroupReader(_file_reader, _read_columns, row_group_id, row_group)); + new RowGroupReader(_file_reader, _read_columns, row_group_id, row_group, _ctz)); // todo: can filter row with candidate ranges rather than skipped ranges RETURN_IF_ERROR(row_group_reader->init(_file_metadata->schema(), skipped_row_ranges)); _row_group_readers.emplace_back(row_group_reader); diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.h b/be/src/vec/exec/format/parquet/vparquet_reader.h index a979daf692..213ed4dc82 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_reader.h @@ -65,7 +65,7 @@ private: class ParquetReader { public: ParquetReader(FileReader* file_reader, int32_t num_of_columns_from_file, size_t batch_size, - int64_t range_start_offset, int64_t range_size); + int64_t range_start_offset, int64_t range_size, cctz::time_zone* ctz); ~ParquetReader(); @@ -130,6 +130,7 @@ private: size_t _batch_size; int64_t _range_start_offset; int64_t _range_size; + cctz::time_zone* _ctz; const TupleDescriptor* _tuple_desc; // get all slot info }; diff --git a/be/src/vec/runtime/vdatetime_value.cpp b/be/src/vec/runtime/vdatetime_value.cpp index 31dd559777..2a7cbe737c 100644 --- a/be/src/vec/runtime/vdatetime_value.cpp +++ b/be/src/vec/runtime/vdatetime_value.cpp @@ -2596,7 +2596,6 @@ bool DateV2Value<T>::from_unixtime(int64_t timestamp, const std::string& timezon template <typename T> bool DateV2Value<T>::from_unixtime(int64_t timestamp, const cctz::time_zone& ctz) { - DCHECK(is_datetime); static const cctz::time_point<cctz::sys_seconds> epoch = std::chrono::time_point_cast<cctz::sys_seconds>( std::chrono::system_clock::from_time_t(0)); @@ -2651,6 +2650,15 @@ void DateV2Value<T>::set_time(uint8_t hour, uint8_t minute, uint8_t second, uint } } +template <typename T> +void DateV2Value<T>::set_microsecond(uint32_t microsecond) { + if constexpr (is_datetime) { + date_v2_value_.microsecond_ = microsecond; + } else { + LOG(FATAL) << "Invalid operation 'set_microsecond' for date!"; + } +} + template <typename T> bool DateV2Value<T>::to_format_string(const char* format, int len, char* to) const { char buf[64]; diff --git a/be/src/vec/runtime/vdatetime_value.h b/be/src/vec/runtime/vdatetime_value.h index 0ac9ccf074..e472d14e81 100644 --- a/be/src/vec/runtime/vdatetime_value.h +++ b/be/src/vec/runtime/vdatetime_value.h @@ -740,6 +740,8 @@ public: void set_time(uint8_t hour, uint8_t minute, uint8_t second, uint32_t microsecond); + void set_microsecond(uint32_t microsecond); + bool from_olap_date(uint64_t date) { auto [year, month, day] = std::tuple {0, 0, 0}; diff --git a/be/src/vec/utils/arrow_column_to_doris_column.cpp b/be/src/vec/utils/arrow_column_to_doris_column.cpp index eec727ebbc..4d9716b55c 100644 --- a/be/src/vec/utils/arrow_column_to_doris_column.cpp +++ b/be/src/vec/utils/arrow_column_to_doris_column.cpp @@ -32,6 +32,7 @@ #include "vec/columns/column_nullable.h" #include "vec/data_types/data_type_array.h" #include "vec/data_types/data_type_decimal.h" +#include "vec/data_types/data_type_nullable.h" #include "vec/runtime/vdatetime_value.h" #define FOR_ARROW_TYPES(M) \ @@ -354,7 +355,7 @@ Status arrow_column_to_doris_column(const arrow::Array* arrow_column, size_t arr (*std::move(doris_column)).mutate().get()); fill_nullable_column(arrow_column, arrow_batch_cur_idx, nullable_column, num_elements); data_column = nullable_column->get_nested_column_ptr(); - WhichDataType which_type(type); + WhichDataType which_type(remove_nullable(type)); // process data switch (arrow_column->type()->id()) { case arrow::Type::STRING: diff --git a/be/test/exec/test_data/parquet_scanner/type-decoder.parquet b/be/test/exec/test_data/parquet_scanner/type-decoder.parquet index 5f679c0005..504d004841 100644 Binary files a/be/test/exec/test_data/parquet_scanner/type-decoder.parquet and b/be/test/exec/test_data/parquet_scanner/type-decoder.parquet differ diff --git a/be/test/vec/exec/parquet/parquet_thrift_test.cpp b/be/test/vec/exec/parquet/parquet_thrift_test.cpp index 7aa0f8cbd3..c0f62067d3 100644 --- a/be/test/vec/exec/parquet/parquet_thrift_test.cpp +++ b/be/test/vec/exec/parquet/parquet_thrift_test.cpp @@ -28,6 +28,7 @@ #include "io/local_file_reader.h" #include "runtime/string_value.h" #include "util/runtime_profile.h" +#include "util/timezone_utils.h" #include "vec/core/block.h" #include "vec/core/column_with_type_and_name.h" #include "vec/data_types/data_type_factory.hpp" @@ -140,7 +141,9 @@ static Status get_column_values(FileReader* file_reader, tparquet::ColumnChunk* size_t chunk_size = chunk_meta.total_compressed_size; BufferedFileStreamReader stream_reader(file_reader, start_offset, chunk_size); - ColumnChunkReader chunk_reader(&stream_reader, column_chunk, field_schema); + cctz::time_zone ctz; + TimezoneUtils::find_cctz_time_zone(TimezoneUtils::default_time_zone, ctz); + ColumnChunkReader chunk_reader(&stream_reader, column_chunk, field_schema, &ctz); // initialize chunk reader chunk_reader.init(); // seek to next page header @@ -161,7 +164,17 @@ static void create_block(std::unique_ptr<vectorized::Block>& block) { {"boolean_col", TYPE_BOOLEAN, sizeof(bool), true}, {"float_col", TYPE_FLOAT, sizeof(float_t), true}, {"double_col", TYPE_DOUBLE, sizeof(double_t), true}, - {"string_col", TYPE_STRING, sizeof(StringValue), true}}; + {"string_col", TYPE_STRING, sizeof(StringValue), true}, + // binary is not supported, use string instead + {"binary_col", TYPE_STRING, sizeof(StringValue), true}, + // 64-bit-length, see doris::get_slot_size in primitive_type.cpp + {"timestamp_col", TYPE_DATETIME, sizeof(DateTimeValue), true}, + {"decimal_col", TYPE_DECIMALV2, sizeof(DecimalV2Value), true}, + {"char_col", TYPE_CHAR, sizeof(StringValue), true}, + {"varchar_col", TYPE_VARCHAR, sizeof(StringValue), true}, + {"date_col", TYPE_DATE, sizeof(DateTimeValue), true}, + {"date_v2_col", TYPE_DATEV2, sizeof(uint32_t), true}, + {"timestamp_v2_col", TYPE_DATETIMEV2, sizeof(DateTimeValue), true, 18, 0}}; SchemaScanner schema_scanner(column_descs, sizeof(column_descs) / sizeof(SchemaScanner::ColumnDesc)); ObjectPool object_pool; @@ -203,16 +216,16 @@ TEST_F(ParquetThriftReaderTest, type_decoder) { LocalFileReader reader("./be/test/exec/test_data/parquet_scanner/type-decoder.parquet", 0); /* * Data in type-decoder.parquet: - * -1 -1 -1 -1 false -1.14 -1.14 s-row0 b-row0 2022-08-01 00:00:00 -1.14 c-row0 vc-row0 2022-08-01 ["as-0","as-1"] - * 2 2 2 2 true 2.14 2.14 NULL b-row1 2022-08-02 00:00:00 2.14 c-row1 vc-row1 2022-08-02 [null,"as-3"] - * -3 -3 -3 -3 false -3.14 -3.14 s-row2 b-row2 2022-08-03 00:00:00 -3.14 c-row2 vc-row2 2022-08-03 [] - * 4 4 4 4 true 4.14 4.14 NULL b-row3 2022-08-04 00:00:00 4.14 c-row3 vc-row3 2022-08-04 ["as-4"] - * -5 -5 -5 -5 false -5.14 -5.14 s-row4 b-row4 2022-08-05 00:00:00 -5.14 c-row4 vc-row4 2022-08-05 ["as-5",null] - * 6 6 6 6 false 6.14 6.14 s-row5 b-row5 2022-08-06 00:00:00 6.14 c-row5 vc-row5 2022-08-06 [null,null] - * -7 -7 -7 -7 true -7.14 -7.14 s-row6 b-row6 2022-08-07 00:00:00 -7.14 c-row6 vc-row6 2022-08-07 ["as-6","as-7"] - * 8 8 8 8 false 8.14 8.14 NULL b-row7 2022-08-08 00:00:00 8.14 c-row7 vc-row7 2022-08-08 ["as-0","as-8"] - * -9 -9 -9 -9 false -9.14 -9.14 s-row8 b-row8 2022-08-09 00:00:00 -9.14 c-row8 vc-row8 2022-08-09 ["as-9","as-10"] - * 10 10 10 10 false 10.14 10.14 s-row9 b-row9 2022-08-10 00:00:00 10.14 c-row9 vc-row9 2022-08-10 ["as-11","as-12"] + * -1 -1 -1 -1 false -1.14 -1.14 s-row0 b-row0 2022-08-01 07:23:17 -1.14 c-row0 vc-row0 2022-08-01 ["as-0","as-1"] + * 2 2 2 2 true 2.14 2.14 NULL b-row1 2022-08-02 07:23:18 2.14 c-row1 vc-row1 2022-08-02 [null,"as-3"] + * -3 -3 -3 -3 false -3.14 -3.14 s-row2 b-row2 2022-08-03 07:23:19 -3.14 c-row2 vc-row2 2022-08-03 [] + * 4 4 4 4 true 4.14 4.14 NULL b-row3 2022-08-04 07:24:17 4.14 c-row3 vc-row3 2022-08-04 ["as-4"] + * -5 -5 -5 -5 false -5.14 -5.14 s-row4 b-row4 2022-08-05 07:25:17 -5.14 c-row4 vc-row4 2022-08-05 ["as-5",null] + * 6 6 6 6 false 6.14 6.14 s-row5 b-row5 2022-08-06 07:26:17 6.14 c-row5 vc-row5 2022-08-06 [null,null] + * -7 -7 -7 -7 true -7.14 -7.14 s-row6 b-row6 2022-08-07 07:27:17 -7.14 c-row6 vc-row6 2022-08-07 ["as-6","as-7"] + * 8 8 8 8 false 8.14 8.14 NULL b-row7 2022-08-08 07:28:17 8.14 c-row7 vc-row7 2022-08-08 ["as-0","as-8"] + * -9 -9 -9 -9 false -9.14 -9.14 s-row8 b-row8 2022-08-09 07:29:17 -9.14 c-row8 vc-row8 2022-08-09 ["as-9","as-10"] + * 10 10 10 10 false 10.14 10.14 s-row9 b-row9 2022-08-10 07:21:17 10.14 c-row9 vc-row9 2022-08-10 ["as-11","as-12"] */ auto st = reader.open(); EXPECT_TRUE(st.ok()); @@ -327,8 +340,11 @@ TEST_F(ParquetThriftReaderTest, type_decoder) { : chunk_meta.data_page_offset; size_t chunk_size = chunk_meta.total_compressed_size; BufferedFileStreamReader stream_reader(&reader, start_offset, chunk_size); + cctz::time_zone ctz; + TimezoneUtils::find_cctz_time_zone(TimezoneUtils::default_time_zone, ctz); ColumnChunkReader chunk_reader(&stream_reader, &column_chunk, - const_cast<FieldSchema*>(schema_descriptor.get_column(7))); + const_cast<FieldSchema*>(schema_descriptor.get_column(7)), + &ctz); // initialize chunk reader chunk_reader.init(); // seek to next page header @@ -352,6 +368,102 @@ TEST_F(ParquetThriftReaderTest, type_decoder) { ASSERT_STREQ("s-row0", row0); ASSERT_STREQ("s-row2", row2); } + // `timestamp_col` timestamp, // 9, DATETIME + { + auto& column_name_with_type = block->get_by_position(9); + auto& data_column = column_name_with_type.column; + auto& data_type = column_name_with_type.type; + get_column_values(&reader, &t_metadata.row_groups[0].columns[9], + const_cast<FieldSchema*>(schema_descriptor.get_column(9)), data_column, + data_type); + auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>( + (*std::move(data_column)).mutate().get()); + MutableColumnPtr nested_column = nullable_column->get_nested_column_ptr(); + int64_t date_value = (int64_t)nested_column->get64(0); + VecDateTimeInt64Union conv = {.i64 = date_value}; + auto dt = conv.dt; + ASSERT_EQ(dt.hour(), 7); + ASSERT_EQ(dt.minute(), 23); + ASSERT_EQ(dt.second(), 17); + } + // `decimal_col` decimal, // 10 + { + auto& column_name_with_type = block->get_by_position(10); + auto& data_column = column_name_with_type.column; + auto& data_type = column_name_with_type.type; + get_column_values(&reader, &t_metadata.row_groups[0].columns[10], + const_cast<FieldSchema*>(schema_descriptor.get_column(10)), data_column, + data_type); + auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>( + (*std::move(data_column)).mutate().get()); + MutableColumnPtr nested_column = nullable_column->get_nested_column_ptr(); + int neg = 1; + for (int i = 0; i < rows; ++i) { + neg *= -1; + auto decimal_field = nested_column->operator[](i) + .get<vectorized::DecimalField<vectorized::Decimal128>>(); + EXPECT_EQ(DecimalV2Value(decimal_field.get_value()), + DecimalV2Value(std::to_string(neg * (1.14 + i)))); + } + } + // `date_col` date, // 13, DATE + { + auto& column_name_with_type = block->get_by_position(13); + auto& data_column = column_name_with_type.column; + auto& data_type = column_name_with_type.type; + get_column_values(&reader, &t_metadata.row_groups[0].columns[13], + const_cast<FieldSchema*>(schema_descriptor.get_column(13)), data_column, + data_type); + auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>( + (*std::move(data_column)).mutate().get()); + MutableColumnPtr nested_column = nullable_column->get_nested_column_ptr(); + for (int i = 0; i < rows; ++i) { + int64_t date_value = (int64_t)nested_column->get64(i); + VecDateTimeInt64Union conv = {.i64 = date_value}; + auto dt = conv.dt; + ASSERT_EQ(dt.year(), 2022); + ASSERT_EQ(dt.month(), 8); + ASSERT_EQ(dt.day(), i + 1); + } + } + // `date_v2_col` date, // 14 - 13, DATEV2 + { + auto& column_name_with_type = block->get_by_position(14); + auto& data_column = column_name_with_type.column; + auto& data_type = column_name_with_type.type; + get_column_values(&reader, &t_metadata.row_groups[0].columns[13], + const_cast<FieldSchema*>(schema_descriptor.get_column(13)), data_column, + data_type); + auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>( + (*std::move(data_column)).mutate().get()); + MutableColumnPtr nested_column = nullable_column->get_nested_column_ptr(); + for (int i = 0; i < rows; ++i) { + uint32_t date_value = (uint32_t)nested_column->get64(i); + DateV2UInt32Union conv = {.ui32 = date_value}; + auto dt = conv.dt; + ASSERT_EQ(dt.year(), 2022); + ASSERT_EQ(dt.month(), 8); + ASSERT_EQ(dt.day(), i + 1); + } + } + // `timestamp_v2_col` timestamp, // 15 - 9, DATETIMEV2 + { + auto& column_name_with_type = block->get_by_position(15); + auto& data_column = column_name_with_type.column; + auto& data_type = column_name_with_type.type; + get_column_values(&reader, &t_metadata.row_groups[0].columns[9], + const_cast<FieldSchema*>(schema_descriptor.get_column(9)), data_column, + data_type); + auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>( + (*std::move(data_column)).mutate().get()); + MutableColumnPtr nested_column = nullable_column->get_nested_column_ptr(); + uint64_t date_value = nested_column->get64(0); + DateTimeV2UInt64Union conv = {.ui64 = date_value}; + auto dt = conv.dt; + ASSERT_EQ(dt.hour(), 7); + ASSERT_EQ(dt.minute(), 23); + ASSERT_EQ(dt.second(), 17); + } } TEST_F(ParquetThriftReaderTest, column_reader) { @@ -374,6 +486,8 @@ TEST_F(ParquetThriftReaderTest, column_reader) { TDescriptorTable t_desc_table; // table descriptors TTableDescriptor t_table_desc; + cctz::time_zone ctz; + TimezoneUtils::find_cctz_time_zone(TimezoneUtils::default_time_zone, ctz); t_table_desc.id = 0; t_table_desc.tableType = TTableType::OLAP_TABLE; @@ -422,7 +536,7 @@ TEST_F(ParquetThriftReaderTest, column_reader) { ParquetReadColumn column(slot_desc); std::vector<RowRange> row_ranges = std::vector<RowRange>(); ParquetColumnReader::create(&file_reader, field, column, t_metadata.row_groups[0], row_ranges, - reader); + &ctz, reader); std::unique_ptr<vectorized::Block> block; create_block(block); auto& column_with_type_and_name = block->get_by_name(slot_desc->col_name()); @@ -516,9 +630,11 @@ TEST_F(ParquetThriftReaderTest, group_reader) { parse_thrift_footer(&file_reader, meta_data); tparquet::FileMetaData t_metadata = meta_data->to_thrift_metadata(); + cctz::time_zone ctz; + TimezoneUtils::find_cctz_time_zone(TimezoneUtils::default_time_zone, ctz); auto row_group = t_metadata.row_groups[0]; std::shared_ptr<RowGroupReader> row_group_reader; - row_group_reader.reset(new RowGroupReader(&file_reader, read_columns, 0, row_group)); + row_group_reader.reset(new RowGroupReader(&file_reader, read_columns, 0, row_group, &ctz)); std::vector<RowRange> row_ranges = std::vector<RowRange>(); auto stg = row_group_reader->init(meta_data->schema(), row_ranges); EXPECT_TRUE(stg.ok()); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org