This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 455c800405 [feature](parquet-reader) add rle bool and delta decoder to read AWS Glue (#17112) 455c800405 is described below commit 455c8004058ce2b27e67dbd28f5fb444e357dc6f Author: slothever <18522955+w...@users.noreply.github.com> AuthorDate: Sun Mar 12 20:09:58 2023 +0800 [feature](parquet-reader) add rle bool and delta decoder to read AWS Glue (#17112) Support delta encoding and rle(bool) to read Glue data add delta bit pack decoder, add delta length byte array decoder, add delta byte array decoder. add rle bool decoder. We find some data type is read with delta encoding on AWS Glue, so it should be supported. The definition of delta encoding can refer to the delta encoding in parquet. --- be/src/util/bit_stream_utils.h | 16 +- be/src/util/bit_stream_utils.inline.h | 76 ++++- be/src/util/rle_encoding.h | 2 +- be/src/vec/CMakeLists.txt | 2 + .../vec/exec/format/parquet/bool_rle_decoder.cpp | 85 ++++++ be/src/vec/exec/format/parquet/bool_rle_decoder.h | 43 +++ be/src/vec/exec/format/parquet/decoder.cpp | 44 +++ .../exec/format/parquet/delta_bit_pack_decoder.cpp | 319 +++++++++++++++++++++ .../exec/format/parquet/delta_bit_pack_decoder.h | 280 ++++++++++++++++++ docs/en/docs/lakehouse/multi-catalog/hive.md | 19 +- docs/en/docs/lakehouse/multi-catalog/iceberg.md | 6 +- docs/zh-CN/docs/lakehouse/multi-catalog/hive.md | 17 ++ docs/zh-CN/docs/lakehouse/multi-catalog/iceberg.md | 6 +- .../hive/test_external_catalog_glue_table.out | 131 +++++++++ .../hive/test_external_catalog_glue_table.groovy | 54 ++++ 15 files changed, 1080 insertions(+), 20 deletions(-) diff --git a/be/src/util/bit_stream_utils.h b/be/src/util/bit_stream_utils.h index d8efe163bb..d5abbc4d94 100644 --- a/be/src/util/bit_stream_utils.h +++ b/be/src/util/bit_stream_utils.h @@ -111,7 +111,15 @@ public: // Reads a vlq encoded int from the stream. The encoded int must start at the // beginning of a byte. Return false if there were not enough bytes in the buffer. - bool GetVlqInt(int32_t* v); + bool GetVlqInt(uint32_t* v); + // Reads a zigzag encoded int `into` v. + bool GetZigZagVlqInt(int32_t* v); + + // Reads a vlq encoded int from the stream. The encoded int must start at the + // beginning of a byte. Return false if there were not enough bytes in the buffer. + bool GetVlqInt(uint64_t* v); + // Reads a zigzag encoded int `into` v. + bool GetZigZagVlqInt(int64_t* v); // Returns the number of bytes left in the stream, not including the current byte (i.e., // there may be an additional fraction of a byte). @@ -123,12 +131,18 @@ public: // Rewind the stream by 'num_bits' bits void Rewind(int num_bits); + // Advance the stream by 'num_bits' bits + bool Advance(int64_t num_bits); + // Seek to a specific bit in the buffer void SeekToBit(unsigned int stream_position); // Maximum byte length of a vlq encoded int static const int MAX_VLQ_BYTE_LEN = 5; + // Maximum byte length of a vlq encoded int64 + static const int MAX_VLQ_BYTE_LEN_FOR_INT64 = 10; + bool is_initialized() const { return buffer_ != nullptr; } private: diff --git a/be/src/util/bit_stream_utils.inline.h b/be/src/util/bit_stream_utils.inline.h index 88a059758c..fb62e9e3ae 100644 --- a/be/src/util/bit_stream_utils.inline.h +++ b/be/src/util/bit_stream_utils.inline.h @@ -26,6 +26,7 @@ #include "util/alignment.h" #include "util/bit_packing.inline.h" #include "util/bit_stream_utils.h" +#include "util/bit_util.h" using doris::BitUtil; @@ -150,6 +151,18 @@ inline void BitReader::Rewind(int num_bits) { memcpy(&buffered_values_, buffer_ + byte_offset_, 8); } +inline bool BitReader::Advance(int64_t num_bits) { + int64_t bits_required = bit_offset_ + num_bits; + int64_t bytes_required = (bits_required >> 3) + ((bits_required & 7) != 0); + if (bytes_required > max_bytes_ - byte_offset_) { + return false; + } + byte_offset_ += static_cast<int>(bits_required >> 3); + bit_offset_ = static_cast<int>(bits_required & 7); + BufferValues(); + return true; +} + inline void BitReader::SeekToBit(unsigned int stream_position) { DCHECK_LE(stream_position, max_bytes_ * 8); @@ -195,17 +208,52 @@ inline bool BitReader::GetAligned(int num_bytes, T* v) { return true; } -inline bool BitReader::GetVlqInt(int32_t* v) { - *v = 0; - int shift = 0; - int num_bytes = 0; - uint8_t byte = 0; - do { +inline bool BitReader::GetVlqInt(uint32_t* v) { + uint32_t tmp = 0; + for (int num_bytes = 0; num_bytes < MAX_VLQ_BYTE_LEN; num_bytes++) { + uint8_t byte = 0; if (!GetAligned<uint8_t>(1, &byte)) return false; - *v |= (byte & 0x7F) << shift; - shift += 7; - DCHECK_LE(++num_bytes, MAX_VLQ_BYTE_LEN); - } while ((byte & 0x80) != 0); + tmp |= static_cast<uint32_t>(byte & 0x7F) << (7 * num_bytes); + if ((byte & 0x80) == 0) { + *v = tmp; + return true; + } + } + return false; +} + +inline bool BitReader::GetZigZagVlqInt(int32_t* v) { + uint32_t u; + if (!GetVlqInt(&u)) { + return false; + } + u = (u >> 1) ^ (~(u & 1) + 1); + // copy uint32_t to int32_t + std::memcpy(v, &u, sizeof(uint32_t)); + return true; +} + +inline bool BitReader::GetVlqInt(uint64_t* v) { + uint64_t tmp = 0; + for (int num_bytes = 0; num_bytes < MAX_VLQ_BYTE_LEN_FOR_INT64; num_bytes++) { + uint8_t byte = 0; + if (!GetAligned<uint8_t>(1, &byte)) return false; + tmp |= static_cast<uint64_t>(byte & 0x7F) << (7 * num_bytes); + if ((byte & 0x80) == 0) { + *v = tmp; + return true; + } + } + return false; +} + +inline bool BitReader::GetZigZagVlqInt(int64_t* v) { + uint64_t u; + if (!GetVlqInt(&u)) { + return false; + } + u = (u >> 1) ^ (~(u & 1) + 1); + std::memcpy(v, &u, sizeof(uint64_t)); return true; } @@ -227,12 +275,14 @@ inline int BatchedBitReader::UnpackBatch(int bit_width, int num_values, T* v) { inline bool BatchedBitReader::SkipBatch(int bit_width, int num_values_to_skip) { DCHECK(buffer_pos_ != nullptr); - DCHECK_GT(bit_width, 0); + DCHECK_GE(bit_width, 0); DCHECK_LE(bit_width, MAX_BITWIDTH); - DCHECK_GT(num_values_to_skip, 0); + DCHECK_GE(num_values_to_skip, 0); int skip_bytes = BitUtil::RoundUpNumBytes(bit_width * num_values_to_skip); - if (skip_bytes > buffer_end_ - buffer_pos_) return false; + if (skip_bytes > buffer_end_ - buffer_pos_) { + return false; + } buffer_pos_ += skip_bytes; return true; } diff --git a/be/src/util/rle_encoding.h b/be/src/util/rle_encoding.h index 1409473a09..1e83599265 100644 --- a/be/src/util/rle_encoding.h +++ b/be/src/util/rle_encoding.h @@ -229,7 +229,7 @@ inline bool RleDecoder<T>::ReadHeader() { if (PREDICT_FALSE(literal_count_ == 0 && repeat_count_ == 0)) { // Read the next run's indicator int, it could be a literal or repeated run // The int is encoded as a vlq-encoded value. - int32_t indicator_value = 0; + uint32_t indicator_value = 0; bool result = bit_reader_.GetVlqInt(&indicator_value); if (PREDICT_FALSE(!result)) { return false; diff --git a/be/src/vec/CMakeLists.txt b/be/src/vec/CMakeLists.txt index ab364f56ea..29bea35d47 100644 --- a/be/src/vec/CMakeLists.txt +++ b/be/src/vec/CMakeLists.txt @@ -313,6 +313,8 @@ set(VEC_FILES exec/format/table/iceberg_reader.cpp exec/format/file_reader/new_plain_text_line_reader.cpp exec/format/file_reader/new_plain_binary_line_reader.cpp + exec/format/parquet/delta_bit_pack_decoder.cpp + exec/format/parquet/bool_rle_decoder.cpp ) if (WITH_MYSQL) diff --git a/be/src/vec/exec/format/parquet/bool_rle_decoder.cpp b/be/src/vec/exec/format/parquet/bool_rle_decoder.cpp new file mode 100644 index 0000000000..563b6c68df --- /dev/null +++ b/be/src/vec/exec/format/parquet/bool_rle_decoder.cpp @@ -0,0 +1,85 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/exec/format/parquet/bool_rle_decoder.h" + +#include "util/bit_util.h" + +namespace doris::vectorized { +void BoolRLEDecoder::set_data(Slice* slice) { + _data = slice; + _num_bytes = slice->size; + _offset = 0; + if (_num_bytes < 4) { + LOG(FATAL) << "Received invalid length : " + std::to_string(_num_bytes) + + " (corrupt data page?)"; + } + // Load the first 4 bytes in little-endian, which indicates the length + const uint8_t* data = reinterpret_cast<const uint8_t*>(_data->data); + uint32_t num_bytes = decode_fixed32_le(data); + if (num_bytes > static_cast<uint32_t>(_num_bytes - 4)) { + LOG(FATAL) << ("Received invalid number of bytes : " + std::to_string(num_bytes) + + " (corrupt data page?)"); + } + _num_bytes = num_bytes; + auto decoder_data = data + 4; + _decoder = RleDecoder<uint8_t>(decoder_data, num_bytes, 1); +} + +Status BoolRLEDecoder::skip_values(size_t num_values) { + _current_value_idx += num_values; + return Status::OK(); +} + +Status BoolRLEDecoder::decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type, + ColumnSelectVector& select_vector) { + auto& column_data = static_cast<ColumnVector<UInt8>&>(*doris_column).get_data(); + size_t data_index = column_data.size(); + column_data.resize(data_index + select_vector.num_values() - select_vector.num_filtered()); + size_t max_values = column_data.size(); + _values.resize(max_values); + if (!_decoder.get_values(_values.data(), max_values)) { + return Status::IOError("Can't read enough booleans in rle decoder"); + } + // _num_bytes -= max_values; + ColumnSelectVector::DataReadType read_type; + while (size_t run_length = select_vector.get_next_run(&read_type)) { + switch (read_type) { + case ColumnSelectVector::CONTENT: { + bool value; // Can't use uint8_t directly, we should correct it. + for (size_t i = 0; i < run_length; ++i) { + value = _values[_current_value_idx++]; + column_data[data_index++] = (UInt8)value; + } + break; + } + case ColumnSelectVector::NULL_DATA: { + data_index += run_length; + break; + } + case ColumnSelectVector::FILTERED_CONTENT: { + _current_value_idx += run_length; + break; + } + case ColumnSelectVector::FILTERED_NULL: { + break; + } + } + } + return Status::OK(); +} +} // namespace doris::vectorized diff --git a/be/src/vec/exec/format/parquet/bool_rle_decoder.h b/be/src/vec/exec/format/parquet/bool_rle_decoder.h new file mode 100644 index 0000000000..0b3ba6e05d --- /dev/null +++ b/be/src/vec/exec/format/parquet/bool_rle_decoder.h @@ -0,0 +1,43 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "util/rle_encoding.h" +#include "vec/exec/format/parquet/bool_plain_decoder.h" +#include "vec/exec/format/parquet/decoder.h" + +namespace doris::vectorized { +class BoolRLEDecoder final : public Decoder { +public: + BoolRLEDecoder() = default; + ~BoolRLEDecoder() override = default; + + void set_data(Slice* slice) override; + + Status decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type, + ColumnSelectVector& select_vector) override; + + Status skip_values(size_t num_values) override; + +private: + RleDecoder<uint8_t> _decoder; + std::vector<uint8_t> _values; + size_t _num_bytes; + size_t _current_value_idx = 0; +}; +} // namespace doris::vectorized \ No newline at end of file diff --git a/be/src/vec/exec/format/parquet/decoder.cpp b/be/src/vec/exec/format/parquet/decoder.cpp index e35a0ff58c..e625994a21 100644 --- a/be/src/vec/exec/format/parquet/decoder.cpp +++ b/be/src/vec/exec/format/parquet/decoder.cpp @@ -20,8 +20,10 @@ #include "vec/data_types/data_type_decimal.h" #include "vec/data_types/data_type_nullable.h" #include "vec/exec/format/parquet/bool_plain_decoder.h" +#include "vec/exec/format/parquet/bool_rle_decoder.h" #include "vec/exec/format/parquet/byte_array_dict_decoder.h" #include "vec/exec/format/parquet/byte_array_plain_decoder.h" +#include "vec/exec/format/parquet/delta_bit_pack_decoder.h" #include "vec/exec/format/parquet/fix_length_dict_decoder.hpp" #include "vec/exec/format/parquet/fix_length_plain_decoder.h" @@ -88,6 +90,48 @@ Status Decoder::get_decoder(tparquet::Type::type type, tparquet::Encoding::type tparquet::to_string(type), tparquet::to_string(encoding)); } break; + case tparquet::Encoding::RLE: + switch (type) { + case tparquet::Type::BOOLEAN: + decoder.reset(new BoolRLEDecoder()); + break; + default: + return Status::InternalError("Unsupported type {}(encoding={}) in parquet decoder", + tparquet::to_string(type), tparquet::to_string(encoding)); + } + break; + case tparquet::Encoding::DELTA_BINARY_PACKED: + // Supports only INT32 and INT64. + switch (type) { + case tparquet::Type::INT32: + decoder.reset(new DeltaBitPackDecoder<Int32>(type)); + break; + case tparquet::Type::INT64: + decoder.reset(new DeltaBitPackDecoder<Int64>(type)); + break; + default: + return Status::InternalError("DELTA_BINARY_PACKED only supports INT32 and INT64"); + } + break; + case tparquet::Encoding::DELTA_BYTE_ARRAY: + switch (type) { + case tparquet::Type::BYTE_ARRAY: + decoder.reset(new DeltaByteArrayDecoder(type)); + break; + default: + return Status::InternalError("DELTA_BYTE_ARRAY only supports BYTE_ARRAY."); + } + break; + case tparquet::Encoding::DELTA_LENGTH_BYTE_ARRAY: + switch (type) { + case tparquet::Type::FIXED_LEN_BYTE_ARRAY: + decoder.reset(new DeltaLengthByteArrayDecoder(type)); + break; + default: + return Status::InternalError( + "DELTA_LENGTH_BYTE_ARRAY only supports FIXED_LEN_BYTE_ARRAY."); + } + break; default: return Status::InternalError("Unsupported encoding {}(type={}) in parquet decoder", tparquet::to_string(encoding), tparquet::to_string(type)); diff --git a/be/src/vec/exec/format/parquet/delta_bit_pack_decoder.cpp b/be/src/vec/exec/format/parquet/delta_bit_pack_decoder.cpp new file mode 100644 index 0000000000..c550e7262d --- /dev/null +++ b/be/src/vec/exec/format/parquet/delta_bit_pack_decoder.cpp @@ -0,0 +1,319 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "delta_bit_pack_decoder.h" + +namespace doris::vectorized { + +Status DeltaDecoder::decode_byte_array(const std::vector<Slice>& decoded_vals, + MutableColumnPtr& doris_column, DataTypePtr& data_type, + ColumnSelectVector& select_vector) { + TypeIndex logical_type = remove_nullable(data_type)->get_type_id(); + switch (logical_type) { + case TypeIndex::String: + [[fallthrough]]; + case TypeIndex::FixedString: { + ColumnSelectVector::DataReadType read_type; + while (size_t run_length = select_vector.get_next_run(&read_type)) { + switch (read_type) { + case ColumnSelectVector::CONTENT: { + std::vector<StringRef> string_values; + string_values.reserve(run_length); + for (size_t i = 0; i < run_length; ++i) { + size_t length = decoded_vals[_current_value_idx].size; + string_values.emplace_back(decoded_vals[_current_value_idx].data, length); + _current_value_idx++; + } + doris_column->insert_many_strings(&string_values[0], run_length); + break; + } + case ColumnSelectVector::NULL_DATA: { + doris_column->insert_many_defaults(run_length); + break; + } + case ColumnSelectVector::FILTERED_CONTENT: { + _current_value_idx += run_length; + break; + } + case ColumnSelectVector::FILTERED_NULL: { + // do nothing + break; + } + } + } + _current_value_idx = 0; + return Status::OK(); + } + default: + break; + } + return Status::InvalidArgument( + "Can't decode parquet physical type BYTE_ARRAY to doris logical type {}", + getTypeName(logical_type)); +} + +template <typename T> +Status DeltaBitPackDecoder<T>::_init_header() { + if (!_bit_reader->GetVlqInt(&_values_per_block) || + !_bit_reader->GetVlqInt(&_mini_blocks_per_block) || + !_bit_reader->GetVlqInt(&_total_value_count) || + !_bit_reader->GetZigZagVlqInt(&_last_value)) { + return Status::IOError("Init header eof"); + } + if (_values_per_block == 0) { + return Status::InvalidArgument("Cannot have zero value per block"); + } + if (_values_per_block % 128 != 0) { + return Status::InvalidArgument( + "the number of values in a block must be multiple of 128, but it's " + + std::to_string(_values_per_block)); + } + if (_mini_blocks_per_block == 0) { + return Status::InvalidArgument("Cannot have zero miniblock per block"); + } + _values_per_mini_block = _values_per_block / _mini_blocks_per_block; + if (_values_per_mini_block == 0) { + return Status::InvalidArgument("Cannot have zero value per miniblock"); + } + if (_values_per_mini_block % 32 != 0) { + return Status::InvalidArgument( + "The number of values in a miniblock must be multiple of 32, but it's " + + std::to_string(_values_per_mini_block)); + } + _total_values_remaining = _total_value_count; + _delta_bit_widths.resize(_mini_blocks_per_block); + // init as empty property + _block_initialized = false; + _values_remaining_current_mini_block = 0; + return Status::OK(); +} + +template <typename T> +Status DeltaBitPackDecoder<T>::_init_block() { + DCHECK_GT(_total_values_remaining, 0) << "InitBlock called at EOF"; + if (!_bit_reader->GetZigZagVlqInt(&_min_delta)) { + return Status::IOError("Init block eof"); + } + + // read the bitwidth of each miniblock + uint8_t* bit_width_data = _delta_bit_widths.data(); + for (uint32_t i = 0; i < _mini_blocks_per_block; ++i) { + if (!_bit_reader->GetAligned<uint8_t>(1, bit_width_data + i)) { + return Status::IOError("Decode bit-width EOF"); + } + // Note that non-conformant bitwidth entries are allowed by the Parquet spec + // for extraneous miniblocks in the last block (GH-14923), so we check + // the bitwidths when actually using them (see InitMiniBlock()). + } + _mini_block_idx = 0; + _block_initialized = true; + RETURN_IF_ERROR(_init_mini_block(bit_width_data[0])); + return Status::OK(); +} + +template <typename T> +Status DeltaBitPackDecoder<T>::_init_mini_block(int bit_width) { + if (PREDICT_FALSE(bit_width > kMaxDeltaBitWidth)) { + return Status::InvalidArgument("delta bit width larger than integer bit width"); + } + _delta_bit_width = bit_width; + _values_remaining_current_mini_block = _values_per_mini_block; + return Status::OK(); +} + +template <typename T> +Status DeltaBitPackDecoder<T>::_get_internal(T* buffer, int num_values, int* out_num_values) { + num_values = static_cast<int>(std::min<int64_t>(num_values, _total_values_remaining)); + if (num_values == 0) { + *out_num_values = 0; + return Status::OK(); + } + int i = 0; + while (i < num_values) { + if (PREDICT_FALSE(_values_remaining_current_mini_block == 0)) { + if (PREDICT_FALSE(!_block_initialized)) { + buffer[i++] = _last_value; + DCHECK_EQ(i, 1); // we're at the beginning of the page + if (i == num_values) { + // When block is uninitialized and i reaches num_values we have two + // different possibilities: + // 1. _total_value_count == 1, which means that the page may have only + // one value (encoded in the header), and we should not initialize + // any block. + // 2. _total_value_count != 1, which means we should initialize the + // incoming block for subsequent reads. + if (_total_value_count != 1) { + RETURN_IF_ERROR(_init_block()); + } + break; + } + RETURN_IF_ERROR(_init_block()); + } else { + ++_mini_block_idx; + if (_mini_block_idx < _mini_blocks_per_block) { + RETURN_IF_ERROR(_init_mini_block(_delta_bit_widths.data()[_mini_block_idx])); + } else { + RETURN_IF_ERROR(_init_block()); + } + } + } + + int values_decode = std::min(_values_remaining_current_mini_block, + static_cast<uint32_t>(num_values - i)); + for (int j = 0; j < values_decode; ++j) { + if (!_bit_reader->GetValue(_delta_bit_width, buffer + i + j)) { + return Status::IOError("Get batch EOF"); + } + } + for (int j = 0; j < values_decode; ++j) { + // Addition between min_delta, packed int and last_value should be treated as + // unsigned addition. Overflow is as expected. + buffer[i + j] = static_cast<UT>(_min_delta) + static_cast<UT>(buffer[i + j]) + + static_cast<UT>(_last_value); + _last_value = buffer[i + j]; + } + _values_remaining_current_mini_block -= values_decode; + i += values_decode; + } + _total_values_remaining -= num_values; + + if (PREDICT_FALSE(_total_values_remaining == 0)) { + if (!_bit_reader->Advance(_delta_bit_width * _values_remaining_current_mini_block)) { + return Status::IOError("Skip padding EOF"); + } + _values_remaining_current_mini_block = 0; + } + *out_num_values = num_values; + return Status::OK(); +} + +void DeltaLengthByteArrayDecoder::_decode_lengths() { + _len_decoder.set_bit_reader(_bit_reader); + // get the number of encoded lengths + int num_length = _len_decoder.valid_values_count(); + _buffered_length.resize(num_length); + + // decode all the lengths. all the lengths are buffered in buffered_length_. + int ret; + Status st = _len_decoder.decode(_buffered_length.data(), num_length, &ret); + if (st != Status::OK()) { + LOG(FATAL) << "Fail to decode delta length, status: " << st; + } + DCHECK_EQ(ret, num_length); + _length_idx = 0; + _num_valid_values = num_length; +} + +Status DeltaLengthByteArrayDecoder::_get_internal(Slice* buffer, int max_values, + int* out_num_values) { + // Decode up to `max_values` strings into an internal buffer + // and reference them into `buffer`. + max_values = std::min(max_values, _num_valid_values); + if (max_values == 0) { + *out_num_values = 0; + return Status::OK(); + } + + int32_t data_size = 0; + const int32_t* length_ptr = _buffered_length.data() + _length_idx; + for (int i = 0; i < max_values; ++i) { + int32_t len = length_ptr[i]; + if (PREDICT_FALSE(len < 0)) { + return Status::InvalidArgument("Negative string delta length"); + } + buffer[i].size = len; + if (common::add_overflow(data_size, len, data_size)) { + return Status::InvalidArgument("Excess expansion in DELTA_(LENGTH_)BYTE_ARRAY"); + } + } + _length_idx += max_values; + + _buffered_data.resize(data_size); + char* data_ptr = _buffered_data.data(); + for (int j = 0; j < data_size; j++) { + if (!_bit_reader->GetValue(8, data_ptr + j)) { + return Status::IOError("Get length bytes EOF"); + } + } + + for (int i = 0; i < max_values; ++i) { + buffer[i].data = data_ptr; + data_ptr += buffer[i].size; + } + // this->num_values_ -= max_values; + _num_valid_values -= max_values; + *out_num_values = max_values; + return Status::OK(); +} + +Status DeltaByteArrayDecoder::_get_internal(Slice* buffer, int max_values, int* out_num_values) { + // Decode up to `max_values` strings into an internal buffer + // and reference them into `buffer`. + max_values = std::min(max_values, _num_valid_values); + if (max_values == 0) { + *out_num_values = max_values; + return Status::OK(); + } + + int suffix_read; + RETURN_IF_ERROR(_suffix_decoder.decode(buffer, max_values, &suffix_read)); + if (PREDICT_FALSE(suffix_read != max_values)) { + return Status::IOError("Read {}, expecting {} from suffix decoder", + std::to_string(suffix_read), std::to_string(max_values)); + } + + int64_t data_size = 0; + const int32_t* prefix_len_ptr = _buffered_prefix_length.data() + _prefix_len_offset; + for (int i = 0; i < max_values; ++i) { + if (PREDICT_FALSE(prefix_len_ptr[i] < 0)) { + return Status::InvalidArgument("negative prefix length in DELTA_BYTE_ARRAY"); + } + if (PREDICT_FALSE(common::add_overflow(data_size, static_cast<int64_t>(prefix_len_ptr[i]), + data_size) || + common::add_overflow(data_size, static_cast<int64_t>(buffer[i].size), + data_size))) { + return Status::InvalidArgument("excess expansion in DELTA_BYTE_ARRAY"); + } + } + _buffered_data.resize(data_size); + + std::string_view prefix {_last_value}; + + char* data_ptr = _buffered_data.data(); + for (int i = 0; i < max_values; ++i) { + if (PREDICT_FALSE(static_cast<size_t>(prefix_len_ptr[i]) > prefix.length())) { + return Status::InvalidArgument("prefix length too large in DELTA_BYTE_ARRAY"); + } + memcpy(data_ptr, prefix.data(), prefix_len_ptr[i]); + // buffer[i] currently points to the string suffix + memcpy(data_ptr + prefix_len_ptr[i], buffer[i].data, buffer[i].size); + buffer[i].data = data_ptr; + buffer[i].size += prefix_len_ptr[i]; + data_ptr += buffer[i].size; + prefix = std::string_view {buffer[i].data, buffer[i].size}; + } + _prefix_len_offset += max_values; + _num_valid_values -= max_values; + _last_value = std::string {prefix}; + + if (_num_valid_values == 0) { + _last_value_in_previous_page = _last_value; + } + *out_num_values = max_values; + return Status::OK(); +} +} // namespace doris::vectorized diff --git a/be/src/vec/exec/format/parquet/delta_bit_pack_decoder.h b/be/src/vec/exec/format/parquet/delta_bit_pack_decoder.h new file mode 100644 index 0000000000..bdebd4d82f --- /dev/null +++ b/be/src/vec/exec/format/parquet/delta_bit_pack_decoder.h @@ -0,0 +1,280 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "util/bit_stream_utils.h" +#include "vec/exec/format/parquet/byte_array_plain_decoder.h" +#include "vec/exec/format/parquet/fix_length_plain_decoder.h" + +namespace doris::vectorized { + +class DeltaDecoder : public Decoder { +public: + DeltaDecoder(Decoder* decoder) { _type_converted_decoder.reset(decoder); } + + ~DeltaDecoder() override = default; + + Status skip_values(size_t num_values) override { + return _type_converted_decoder->skip_values(num_values); + } + + Status decode_byte_array(const std::vector<Slice>& decoded_vals, MutableColumnPtr& doris_column, + DataTypePtr& data_type, ColumnSelectVector& select_vector); + +protected: + void init_values_converter() { + _type_converted_decoder->set_data(_data); + _type_converted_decoder->set_type_length(_type_length); + _type_converted_decoder->init(_field_schema, _decode_params->ctz); + } + // Convert decoded value to doris type value. + std::unique_ptr<Decoder> _type_converted_decoder; + size_t _current_value_idx = 0; +}; + +/** + * Format + * [header] [block 1] [block 2] ... [block N] + * Header + * [block size] [_mini_blocks_per_block] [_total_value_count] [first value] + * Block + * [min delta] [list of bitwidths of the mini blocks] [miniblocks] + */ +template <typename T> +class DeltaBitPackDecoder final : public DeltaDecoder { +public: + using UT = std::make_unsigned_t<T>; + + DeltaBitPackDecoder(const tparquet::Type::type& physical_type) + : DeltaDecoder(new FixLengthPlainDecoder(physical_type)) {} + ~DeltaBitPackDecoder() override = default; + Status decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type, + ColumnSelectVector& select_vector) override { + size_t non_null_size = select_vector.num_values() - select_vector.num_nulls(); + // decode values + _values.resize(non_null_size); + int decoded_count = 0; + RETURN_IF_ERROR(_get_internal(_values.data(), non_null_size, &decoded_count)); + _data->data = reinterpret_cast<char*>(_values.data()); + _type_length = sizeof(T); + _data->size = _values.size() * _type_length; + // set decoded value with fix plain decoder + init_values_converter(); + return _type_converted_decoder->decode_values(doris_column, data_type, select_vector); + } + + Status decode(T* buffer, int num_values, int* out_num_values) { + return _get_internal(buffer, num_values, out_num_values); + } + + int valid_values_count() { + // _total_value_count in header ignores of null values + return static_cast<int>(_total_values_remaining); + } + + void set_data(Slice* slice) override { + _bit_reader.reset(new BitReader((const uint8_t*)slice->data, slice->size)); + Status st = _init_header(); + if (st != Status::OK()) { + LOG(FATAL) << "Fail to init delta encoding header for " << st.to_string(); + } + _data = slice; + _offset = 0; + } + + // Set BitReader which is already initialized by DeltaLengthByteArrayDecoder or + // DeltaByteArrayDecoder + void set_bit_reader(std::shared_ptr<BitReader> bit_reader) { + _bit_reader = std::move(bit_reader); + Status st = _init_header(); + if (st != Status::OK()) { + LOG(FATAL) << "Fail to init delta encoding header for " << st.to_string(); + } + } + +private: + static constexpr int kMaxDeltaBitWidth = static_cast<int>(sizeof(T) * 8); + Status _init_header(); + Status _init_block(); + Status _init_mini_block(int bit_width); + Status _get_internal(T* buffer, int max_values, int* out_num_values); + + std::vector<T> _values; + + std::shared_ptr<BitReader> _bit_reader; + uint32_t _values_per_block; + uint32_t _mini_blocks_per_block; + uint32_t _values_per_mini_block; + uint32_t _total_value_count; + + T _min_delta; + T _last_value; + + uint32_t _mini_block_idx; + std::vector<uint8_t> _delta_bit_widths; + int _delta_bit_width; + // If the page doesn't contain any block, `_block_initialized` will + // always be false. Otherwise, it will be true when first block initialized. + bool _block_initialized; + + uint32_t _total_values_remaining; + // Remaining values in current mini block. If the current block is the last mini block, + // _values_remaining_current_mini_block may greater than _total_values_remaining. + uint32_t _values_remaining_current_mini_block; +}; +template class DeltaBitPackDecoder<int32_t>; +template class DeltaBitPackDecoder<int64_t>; + +class DeltaLengthByteArrayDecoder final : public DeltaDecoder { +public: + explicit DeltaLengthByteArrayDecoder(const tparquet::Type::type& physical_type) + : DeltaDecoder(nullptr), + _len_decoder(physical_type), + _buffered_length(0), + _buffered_data(0) {} + + Status skip_values(size_t num_values) override { + _current_value_idx += num_values; + RETURN_IF_ERROR(_len_decoder.skip_values(num_values)); + return Status::OK(); + } + + Status decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type, + ColumnSelectVector& select_vector) override { + size_t num_values = select_vector.num_values(); + size_t null_count = select_vector.num_nulls(); + // init read buffer + _values.resize(num_values - null_count); + int num_valid_values; + RETURN_IF_ERROR(_get_internal(_values.data(), num_values - null_count, &num_valid_values)); + + if (PREDICT_FALSE(num_values - null_count != num_valid_values)) { + return Status::IOError("Expected to decode {} values, but decoded {} values.", + num_values - null_count, num_valid_values); + } + return decode_byte_array(_values, doris_column, data_type, select_vector); + } + + Status decode(Slice* buffer, int num_values, int* out_num_values) { + return _get_internal(buffer, num_values, out_num_values); + } + + void set_data(Slice* slice) override { + if (slice->size == 0) { + return; + } + _bit_reader = std::make_shared<BitReader>((const uint8_t*)slice->data, slice->size); + _data = slice; + _offset = 0; + _decode_lengths(); + } + + void set_bit_reader(std::shared_ptr<BitReader> bit_reader) { + _bit_reader = std::move(bit_reader); + _decode_lengths(); + } + +private: + // Decode all the encoded lengths. The decoder_ will be at the start of the encoded data + // after that. + void _decode_lengths(); + Status _get_internal(Slice* buffer, int max_values, int* out_num_values); + + std::vector<Slice> _values; + std::shared_ptr<BitReader> _bit_reader; + DeltaBitPackDecoder<int32_t> _len_decoder; + + int _num_valid_values; + uint32_t _length_idx; + std::vector<int32_t> _buffered_length; + std::vector<char> _buffered_data; +}; + +class DeltaByteArrayDecoder : public DeltaDecoder { +public: + explicit DeltaByteArrayDecoder(const tparquet::Type::type& physical_type) + : DeltaDecoder(nullptr), + _prefix_len_decoder(physical_type), + _suffix_decoder(physical_type), + _buffered_prefix_length(0), + _buffered_data(0) {} + + Status skip_values(size_t num_values) override { + _current_value_idx += num_values; + RETURN_IF_ERROR(_prefix_len_decoder.skip_values(num_values)); + RETURN_IF_ERROR(_suffix_decoder.skip_values(num_values)); + return Status::OK(); + } + + Status decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type, + ColumnSelectVector& select_vector) override { + size_t num_values = select_vector.num_values(); + size_t null_count = select_vector.num_nulls(); + _values.resize(num_values - null_count); + int num_valid_values; + RETURN_IF_ERROR(_get_internal(_values.data(), num_values - null_count, &num_valid_values)); + DCHECK_EQ(num_values - null_count, num_valid_values); + return decode_byte_array(_values, doris_column, data_type, select_vector); + } + + void set_data(Slice* slice) override { + _bit_reader = std::make_shared<BitReader>((const uint8_t*)slice->data, slice->size); + _prefix_len_decoder.set_bit_reader(_bit_reader); + + // get the number of encoded prefix lengths + int num_prefix = _prefix_len_decoder.valid_values_count(); + // call _prefix_len_decoder.Decode to decode all the prefix lengths. + // all the prefix lengths are buffered in _buffered_prefix_length. + _buffered_prefix_length.resize(num_prefix); + int ret; + Status st = _prefix_len_decoder.decode(_buffered_prefix_length.data(), num_prefix, &ret); + if (st != Status::OK()) { + LOG(FATAL) << "Fail to decode delta prefix, status: " << st; + } + DCHECK_EQ(ret, num_prefix); + _prefix_len_offset = 0; + _num_valid_values = num_prefix; + + // at this time, the decoder_ will be at the start of the encoded suffix data. + _suffix_decoder.set_bit_reader(_bit_reader); + + // TODO: read corrupted files written with bug(PARQUET-246). _last_value should be set + // to _last_value_in_previous_page when decoding a new page(except the first page) + _last_value = ""; + } + + Status decode(Slice* buffer, int num_values, int* out_num_values) { + return _get_internal(buffer, num_values, out_num_values); + } + +private: + Status _get_internal(Slice* buffer, int max_values, int* out_num_values); + + std::vector<Slice> _values; + std::shared_ptr<BitReader> _bit_reader; + DeltaBitPackDecoder<int32_t> _prefix_len_decoder; + DeltaLengthByteArrayDecoder _suffix_decoder; + std::string _last_value; + // string buffer for last value in previous page + std::string _last_value_in_previous_page; + int _num_valid_values; + uint32_t _prefix_len_offset; + std::vector<int32_t> _buffered_prefix_length; + std::vector<char> _buffered_data; +}; +} // namespace doris::vectorized diff --git a/docs/en/docs/lakehouse/multi-catalog/hive.md b/docs/en/docs/lakehouse/multi-catalog/hive.md index 8b83e2e0be..c6b59bb802 100644 --- a/docs/en/docs/lakehouse/multi-catalog/hive.md +++ b/docs/en/docs/lakehouse/multi-catalog/hive.md @@ -104,7 +104,7 @@ CREATE CATALOG hive PROPERTIES ( ); ``` -Or to connect to Hive data stored in JuiceFS: +Or to connect to Hive data stored on JuiceFS: ```sql CREATE CATALOG hive PROPERTIES ( @@ -117,6 +117,23 @@ CREATE CATALOG hive PROPERTIES ( ); ``` +Or to connect to Glue and data stored on S3: + +```sql +CREATE CATALOG hive PROPERTIES ( + "type"="hms", + "hive.metastore.type" = "glue", + "aws.region" = "us-east-1", + "aws.glue.access-key" = "ak", + "aws.glue.secret-key" = "sk", + "AWS_ENDPOINT" = "s3.us-east-1.amazonaws.com", + "AWS_REGION" = "us-east-1", + "AWS_ACCESS_KEY" = "ak", + "AWS_SECRET_KEY" = "sk", + "use_path_style" = "true" +); +``` + <version since="dev"> when connecting to Hive Metastore which is authorized by Ranger, need some properties and update FE runtime environment. diff --git a/docs/en/docs/lakehouse/multi-catalog/iceberg.md b/docs/en/docs/lakehouse/multi-catalog/iceberg.md index f73f1464b2..4b870dfea8 100644 --- a/docs/en/docs/lakehouse/multi-catalog/iceberg.md +++ b/docs/en/docs/lakehouse/multi-catalog/iceberg.md @@ -61,7 +61,7 @@ Access metadata with the iceberg API. The Hive, REST, Glue and other services ca </version> -- Using Iceberg Hive Catalog +#### Using Iceberg Hive Catalog ```sql CREATE CATALOG iceberg PROPERTIES ( @@ -77,7 +77,7 @@ CREATE CATALOG iceberg PROPERTIES ( ); ``` -- Using Iceberg Glue Catalog +#### Using Iceberg Glue Catalog ```sql CREATE CATALOG glue PROPERTIES ( @@ -97,6 +97,8 @@ CREATE CATALOG glue PROPERTIES ( `warehouse`: Glue Warehouse Location. To determine the root path of the data warehouse in storage. +The other properties can refer to [Iceberg Glue Catalog](https://iceberg.apache.org/docs/latest/aws/#glue-catalog) + - Using Iceberg REST Catalog RESTful service as the server side. Implementing RESTCatalog interface of iceberg to obtain metadata. diff --git a/docs/zh-CN/docs/lakehouse/multi-catalog/hive.md b/docs/zh-CN/docs/lakehouse/multi-catalog/hive.md index 8c7ddcbc58..89334a1272 100644 --- a/docs/zh-CN/docs/lakehouse/multi-catalog/hive.md +++ b/docs/zh-CN/docs/lakehouse/multi-catalog/hive.md @@ -113,6 +113,23 @@ CREATE CATALOG hive PROPERTIES ( ); ``` +hive元数据存储在Glue,数据存储在S3,示例如下: + +```sql +CREATE CATALOG hive PROPERTIES ( + "type"="hms", + "hive.metastore.type" = "glue", + "aws.region" = "us-east-1", + "aws.glue.access-key" = "ak", + "aws.glue.secret-key" = "sk", + "AWS_ENDPOINT" = "s3.us-east-1.amazonaws.com", + "AWS_REGION" = "us-east-1", + "AWS_ACCESS_KEY" = "ak", + "AWS_SECRET_KEY" = "sk", + "use_path_style" = "true" +); +``` + <version since="dev"> 连接开启 Ranger 权限校验的 Hive Metastore 需要增加配置 & 配置环境: diff --git a/docs/zh-CN/docs/lakehouse/multi-catalog/iceberg.md b/docs/zh-CN/docs/lakehouse/multi-catalog/iceberg.md index 9363767160..10e12af675 100644 --- a/docs/zh-CN/docs/lakehouse/multi-catalog/iceberg.md +++ b/docs/zh-CN/docs/lakehouse/multi-catalog/iceberg.md @@ -59,7 +59,7 @@ CREATE CATALOG iceberg PROPERTIES ( </version> -- Hive Metastore作为元数据服务 +#### Hive Metastore作为元数据服务 ```sql CREATE CATALOG iceberg PROPERTIES ( @@ -75,7 +75,7 @@ CREATE CATALOG iceberg PROPERTIES ( ); ``` -- Glue Catalog作为元数据服务 +#### Glue Catalog作为元数据服务 ```sql CREATE CATALOG glue PROPERTIES ( @@ -95,6 +95,8 @@ CREATE CATALOG glue PROPERTIES ( `warehouse`: Glue Warehouse Location. Glue Catalog的根路径,用于指定数据存放位置。 +属性详情参见 [Iceberg Glue Catalog](https://iceberg.apache.org/docs/latest/aws/#glue-catalog) + - REST Catalog作为元数据服务 该方式需要预先提供REST服务,用户需实现获取Iceberg元数据的REST接口。 diff --git a/regression-test/data/external_table_emr_p2/hive/test_external_catalog_glue_table.out b/regression-test/data/external_table_emr_p2/hive/test_external_catalog_glue_table.out new file mode 100644 index 0000000000..23cb814ab1 --- /dev/null +++ b/regression-test/data/external_table_emr_p2/hive/test_external_catalog_glue_table.out @@ -0,0 +1,131 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !q01 -- +2967 +3158 +15505 +20726 +21843 + +-- !q02 -- +1809714008 +1979816070 +2147483647 +2147483647 +2147483647 + +-- !q03 -- +\N +15821 +\N +\N +\N + +-- !q04 -- +1604.7639 +1583.2013 +1031.6219 +1295.7802 +182.5588 + +-- !q05 -- +1937.7762425702406 +992.21123681735253 +56.682069922520562 +940.70481552186243 +1876.4831949153224 + +-- !q06 -- +2023-03-07 20:34:59 +2023-03-07 20:34:59 +2023-03-07 20:34:59 +2023-03-07 20:34:59 +2023-03-07 20:34:59 +2023-03-07 20:34:59 +2023-03-07 20:34:59 +2023-03-07 20:35 +2023-03-07 20:35 +2023-03-07 20:35 +2023-03-07 20:35 +2023-03-07 20:35 +2023-03-07 20:35 +2023-03-07 20:35 +2023-03-07 20:35 +2023-03-07 20:35 +2023-03-07 20:35 +2023-03-07 20:35 +2023-03-07 20:35 +2023-03-07 20:35 + +-- !q07 -- +6f77a7baae184d +88 +fbbf69fc81374 +f14889 +33d471ce + +-- !q08 -- +c2b69a82f074e4f +81b1152fa774b8 +73df8eaccf +2ed59df3c824dc78b +5e3e98e07e + +-- !q09 -- +dfaac2a43 +28c5f21b8 +a20faeee91e34ce +b5e6bf2b5 +8bc56e + +-- !q10 -- +true +true +false +false +false + +-- !q11 -- +54078 8184 +122067 9731 +140902 170 +143594 5714 +170289 4567 +175294 1959 +202483 857 +222664 6449 +230156 2480 +266339 6845 + +-- !q12 -- +\N +\N +\N + +-- !q13 -- +27 +27 +34 +50 +59 +97 +99 +101 +107 +114 + +-- !q14 -- +dfaac2a43 +28c5f21b8 +a20faeee91e34ce +b5e6bf2b5 +8bc56e + +-- !q15 -- +5000 + +-- !q16 -- +2023-03-07 20:35:59 +2023-03-07 20:35:59 +2023-03-07 20:35:59 +2023-03-07 20:35:59 +2023-03-07 20:35:59 diff --git a/regression-test/suites/external_table_emr_p2/hive/test_external_catalog_glue_table.groovy b/regression-test/suites/external_table_emr_p2/hive/test_external_catalog_glue_table.groovy new file mode 100644 index 0000000000..d1f014d28c --- /dev/null +++ b/regression-test/suites/external_table_emr_p2/hive/test_external_catalog_glue_table.groovy @@ -0,0 +1,54 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_external_catalog_glue_table", "p2") { + String enabled = context.config.otherConfigs.get("enableExternalHiveTest") + if (enabled != null && enabled.equalsIgnoreCase("true")) { + String extHiveHmsHost = context.config.otherConfigs.get("extHiveHmsHost") + String extHiveHmsPort = context.config.otherConfigs.get("extHiveHmsPort") + + sql """drop catalog if exists test_external_catalog_glue;""" + sql """ + create catalog if not exists test_external_catalog_glue properties ( + 'type'='hms', + 'hive.metastore.uris' = 'thrift://${extHiveHmsHost}:${extHiveHmsPort}' + ); + """ + + sql """switch test_external_catalog_glue;""" + def q01 = { + qt_q01 """ select glue_int from iceberg_glue_types order by glue_int limit 5 """ + qt_q02 """ select glue_bigint from iceberg_glue_types order by glue_int limit 5 """ + qt_q03 """ select glue_smallint from iceberg_glue_types order by glue_int limit 5 """ + qt_q04 """ select glue_decimal from iceberg_glue_types order by glue_int limit 5 """ + qt_q05 """ select glue_double from iceberg_glue_types order by glue_int limit 5 """ + qt_q06 """ select glue_timstamp from iceberg_glue_types order by glue_timstamp limit 20 """ + qt_q07 """ select glue_char from iceberg_glue_types order by glue_int limit 5 """ + qt_q08 """ select glue_varchar from iceberg_glue_types order by glue_int limit 5 """ + qt_q09 """ select glue_string from iceberg_glue_types order by glue_int limit 5 """ + qt_q10 """ select glue_bool from iceberg_glue_types order by glue_int limit 5 """ + qt_q11 """ select glue_int,glue_smallint from iceberg_glue_types where glue_int > 2000 and glue_smallint < 10000 order by glue_int limit 10 """ + qt_q12 """ select glue_smallint from iceberg_glue_types where glue_smallint is null order by glue_smallint limit 3 """ + qt_q13 """ select glue_smallint from iceberg_glue_types where glue_smallint is not null order by glue_smallint limit 10 """ + qt_q14 """ select glue_string from iceberg_glue_types where glue_string>'040abff1da4748e4b' order by glue_int limit 5 """ + qt_q15 """ select count(1) from iceberg_glue_types """ + qt_q16 """ select glue_timstamp from iceberg_glue_types where glue_timstamp > '2023-03-07 20:35:59' order by glue_timstamp limit 5 """ + } + sql """ use `iceberg_catalog`; """ + q01() + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org