This is an automated email from the ASF dual-hosted git repository. gabriellee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new c2fae109c3 [Improvement](outfile) Support output null in parquet writer (#12970) c2fae109c3 is described below commit c2fae109c3f54dd28c6b8cf858200dadcf344949 Author: Gabriel <gabrielleeb...@gmail.com> AuthorDate: Thu Sep 29 13:36:30 2022 +0800 [Improvement](outfile) Support output null in parquet writer (#12970) --- be/src/vec/olap/olap_data_convertor.cpp | 228 ++++++------------- be/src/vec/olap/olap_data_convertor.h | 150 ++----------- be/src/vec/runtime/vdatetime_value.h | 11 - be/src/vec/runtime/vparquet_writer.cpp | 243 +++++++++++---------- .../org/apache/doris/analysis/OutFileClause.java | 33 +-- .../java/org/apache/doris/analysis/SelectStmt.java | 2 +- .../apache/doris/analysis/SetOperationStmt.java | 2 +- .../org/apache/doris/analysis/SelectStmtTest.java | 4 +- .../data/export_p0/test_outfile_parquet.out | 25 +++ .../suites/export_p0/test_outfile_parquet.groovy | 158 ++++++++++++++ 10 files changed, 425 insertions(+), 431 deletions(-) diff --git a/be/src/vec/olap/olap_data_convertor.cpp b/be/src/vec/olap/olap_data_convertor.cpp index 7a1f2cfbec..58ab5d6579 100644 --- a/be/src/vec/olap/olap_data_convertor.cpp +++ b/be/src/vec/olap/olap_data_convertor.cpp @@ -490,99 +490,49 @@ void OlapBlockDataConvertor::OlapColumnDataConvertorDate::set_source_column( const ColumnWithTypeAndName& typed_column, size_t row_pos, size_t num_rows) { OlapBlockDataConvertor::OlapColumnDataConvertorPaddedPODArray<uint24_t>::set_source_column( typed_column, row_pos, num_rows); - if (is_date_v2(typed_column.type)) { - from_date_v2_ = true; - } else { - from_date_v2_ = false; - } } Status OlapBlockDataConvertor::OlapColumnDataConvertorDate::convert_to_olap() { assert(_typed_column.column); - if (from_date_v2_) { - const vectorized::ColumnVector<vectorized::UInt32>* column_datetime = nullptr; - if (_nullmap) { - auto nullable_column = - assert_cast<const vectorized::ColumnNullable*>(_typed_column.column.get()); - column_datetime = assert_cast<const vectorized::ColumnVector<vectorized::UInt32>*>( - nullable_column->get_nested_column_ptr().get()); - } else { - column_datetime = assert_cast<const vectorized::ColumnVector<vectorized::UInt32>*>( - _typed_column.column.get()); - } + const vectorized::ColumnVector<vectorized::Int64>* column_datetime = nullptr; + if (_nullmap) { + auto nullable_column = + assert_cast<const vectorized::ColumnNullable*>(_typed_column.column.get()); + column_datetime = assert_cast<const vectorized::ColumnVector<vectorized::Int64>*>( + nullable_column->get_nested_column_ptr().get()); + } else { + column_datetime = assert_cast<const vectorized::ColumnVector<vectorized::Int64>*>( + _typed_column.column.get()); + } - assert(column_datetime); - - const DateV2Value<DateV2ValueType>* datetime_cur = - (const DateV2Value<DateV2ValueType>*)(column_datetime->get_data().data()) + - _row_pos; - const DateV2Value<DateV2ValueType>* datetime_end = datetime_cur + _num_rows; - uint24_t* value = _values.data(); - if (_nullmap) { - const UInt8* nullmap_cur = _nullmap + _row_pos; - while (datetime_cur != datetime_end) { - if (!*nullmap_cur) { - *value = datetime_cur->to_olap_date(); - } else { - // do nothing - } - ++value; - ++datetime_cur; - ++nullmap_cur; - } - assert(nullmap_cur == _nullmap + _row_pos + _num_rows && - value == _values.get_end_ptr()); - } else { - while (datetime_cur != datetime_end) { + assert(column_datetime); + + const VecDateTimeValue* datetime_cur = + (const VecDateTimeValue*)(column_datetime->get_data().data()) + _row_pos; + const VecDateTimeValue* datetime_end = datetime_cur + _num_rows; + uint24_t* value = _values.data(); + if (_nullmap) { + const UInt8* nullmap_cur = _nullmap + _row_pos; + while (datetime_cur != datetime_end) { + if (!*nullmap_cur) { *value = datetime_cur->to_olap_date(); - ++value; - ++datetime_cur; + } else { + // do nothing } - assert(value == _values.get_end_ptr()); + ++value; + ++datetime_cur; + ++nullmap_cur; } - return Status::OK(); + assert(nullmap_cur == _nullmap + _row_pos + _num_rows && value == _values.get_end_ptr()); } else { - const vectorized::ColumnVector<vectorized::Int64>* column_datetime = nullptr; - if (_nullmap) { - auto nullable_column = - assert_cast<const vectorized::ColumnNullable*>(_typed_column.column.get()); - column_datetime = assert_cast<const vectorized::ColumnVector<vectorized::Int64>*>( - nullable_column->get_nested_column_ptr().get()); - } else { - column_datetime = assert_cast<const vectorized::ColumnVector<vectorized::Int64>*>( - _typed_column.column.get()); - } - - assert(column_datetime); - - const VecDateTimeValue* datetime_cur = - (const VecDateTimeValue*)(column_datetime->get_data().data()) + _row_pos; - const VecDateTimeValue* datetime_end = datetime_cur + _num_rows; - uint24_t* value = _values.data(); - if (_nullmap) { - const UInt8* nullmap_cur = _nullmap + _row_pos; - while (datetime_cur != datetime_end) { - if (!*nullmap_cur) { - *value = datetime_cur->to_olap_date(); - } else { - // do nothing - } - ++value; - ++datetime_cur; - ++nullmap_cur; - } - assert(nullmap_cur == _nullmap + _row_pos + _num_rows && - value == _values.get_end_ptr()); - } else { - while (datetime_cur != datetime_end) { - *value = datetime_cur->to_olap_date(); - ++value; - ++datetime_cur; - } - assert(value == _values.get_end_ptr()); + while (datetime_cur != datetime_end) { + *value = datetime_cur->to_olap_date(); + ++value; + ++datetime_cur; } - return Status::OK(); + assert(value == _values.get_end_ptr()); } + return Status::OK(); } // class OlapBlockDataConvertor::OlapColumnDataConvertorJsonb @@ -660,99 +610,49 @@ void OlapBlockDataConvertor::OlapColumnDataConvertorDateTime::set_source_column( const ColumnWithTypeAndName& typed_column, size_t row_pos, size_t num_rows) { OlapBlockDataConvertor::OlapColumnDataConvertorPaddedPODArray<uint64_t>::set_source_column( typed_column, row_pos, num_rows); - if (is_date_v2_or_datetime_v2(typed_column.type)) { - from_datetime_v2_ = true; - } else { - from_datetime_v2_ = false; - } } Status OlapBlockDataConvertor::OlapColumnDataConvertorDateTime::convert_to_olap() { assert(_typed_column.column); - if (from_datetime_v2_) { - const vectorized::ColumnVector<vectorized::UInt64>* column_datetimev2 = nullptr; - if (_nullmap) { - auto nullable_column = - assert_cast<const vectorized::ColumnNullable*>(_typed_column.column.get()); - column_datetimev2 = assert_cast<const vectorized::ColumnVector<vectorized::UInt64>*>( - nullable_column->get_nested_column_ptr().get()); - } else { - column_datetimev2 = assert_cast<const vectorized::ColumnVector<vectorized::UInt64>*>( - _typed_column.column.get()); - } + const vectorized::ColumnVector<vectorized::Int64>* column_datetime = nullptr; + if (_nullmap) { + auto nullable_column = + assert_cast<const vectorized::ColumnNullable*>(_typed_column.column.get()); + column_datetime = assert_cast<const vectorized::ColumnVector<vectorized::Int64>*>( + nullable_column->get_nested_column_ptr().get()); + } else { + column_datetime = assert_cast<const vectorized::ColumnVector<vectorized::Int64>*>( + _typed_column.column.get()); + } - assert(column_datetimev2); - - const DateV2Value<DateTimeV2ValueType>* datetime_cur = - (const DateV2Value<DateTimeV2ValueType>*)(column_datetimev2->get_data().data()) + - _row_pos; - const DateV2Value<DateTimeV2ValueType>* datetime_end = datetime_cur + _num_rows; - uint64_t* value = _values.data(); - if (_nullmap) { - const UInt8* nullmap_cur = _nullmap + _row_pos; - while (datetime_cur != datetime_end) { - if (!*nullmap_cur) { - *value = datetime_cur->to_olap_datetime(); - } else { - // do nothing - } - ++value; - ++datetime_cur; - ++nullmap_cur; - } - assert(nullmap_cur == _nullmap + _row_pos + _num_rows && - value == _values.get_end_ptr()); - } else { - while (datetime_cur != datetime_end) { + assert(column_datetime); + + const VecDateTimeValue* datetime_cur = + (const VecDateTimeValue*)(column_datetime->get_data().data()) + _row_pos; + const VecDateTimeValue* datetime_end = datetime_cur + _num_rows; + uint64_t* value = _values.data(); + if (_nullmap) { + const UInt8* nullmap_cur = _nullmap + _row_pos; + while (datetime_cur != datetime_end) { + if (!*nullmap_cur) { *value = datetime_cur->to_olap_datetime(); - ++value; - ++datetime_cur; + } else { + // do nothing } - assert(value == _values.get_end_ptr()); + ++value; + ++datetime_cur; + ++nullmap_cur; } - return Status::OK(); + assert(nullmap_cur == _nullmap + _row_pos + _num_rows && value == _values.get_end_ptr()); } else { - const vectorized::ColumnVector<vectorized::Int64>* column_datetime = nullptr; - if (_nullmap) { - auto nullable_column = - assert_cast<const vectorized::ColumnNullable*>(_typed_column.column.get()); - column_datetime = assert_cast<const vectorized::ColumnVector<vectorized::Int64>*>( - nullable_column->get_nested_column_ptr().get()); - } else { - column_datetime = assert_cast<const vectorized::ColumnVector<vectorized::Int64>*>( - _typed_column.column.get()); - } - - assert(column_datetime); - - const VecDateTimeValue* datetime_cur = - (const VecDateTimeValue*)(column_datetime->get_data().data()) + _row_pos; - const VecDateTimeValue* datetime_end = datetime_cur + _num_rows; - uint64_t* value = _values.data(); - if (_nullmap) { - const UInt8* nullmap_cur = _nullmap + _row_pos; - while (datetime_cur != datetime_end) { - if (!*nullmap_cur) { - *value = datetime_cur->to_olap_datetime(); - } else { - // do nothing - } - ++value; - ++datetime_cur; - ++nullmap_cur; - } - assert(nullmap_cur == _nullmap + _row_pos + _num_rows && - value == _values.get_end_ptr()); - } else { - while (datetime_cur != datetime_end) { - *value = datetime_cur->to_olap_datetime(); - ++value; - ++datetime_cur; - } - assert(value == _values.get_end_ptr()); + while (datetime_cur != datetime_end) { + *value = datetime_cur->to_olap_datetime(); + ++value; + ++datetime_cur; } - return Status::OK(); + assert(value == _values.get_end_ptr()); } + return Status::OK(); } Status OlapBlockDataConvertor::OlapColumnDataConvertorDecimal::convert_to_olap() { diff --git a/be/src/vec/olap/olap_data_convertor.h b/be/src/vec/olap/olap_data_convertor.h index 961f60f4c5..9eb63c9154 100644 --- a/be/src/vec/olap/olap_data_convertor.h +++ b/be/src/vec/olap/olap_data_convertor.h @@ -209,9 +209,6 @@ private: void set_source_column(const ColumnWithTypeAndName& typed_column, size_t row_pos, size_t num_rows) override; Status convert_to_olap() override; - - private: - bool from_date_v2_; }; class OlapColumnDataConvertorDateTime : public OlapColumnDataConvertorPaddedPODArray<uint64_t> { @@ -219,9 +216,6 @@ private: void set_source_column(const ColumnWithTypeAndName& typed_column, size_t row_pos, size_t num_rows) override; Status convert_to_olap() override; - - private: - bool from_datetime_v2_; }; class OlapColumnDataConvertorDecimal @@ -277,11 +271,6 @@ private: void set_source_column(const ColumnWithTypeAndName& typed_column, size_t row_pos, size_t num_rows) override { OlapColumnDataConvertorBase::set_source_column(typed_column, row_pos, num_rows); - if (is_date(typed_column.type)) { - from_date_to_date_v2_ = true; - } else { - from_date_to_date_v2_ = false; - } } const void* get_data() const override { return values_; } @@ -296,67 +285,24 @@ private: } Status convert_to_olap() override { - if (UNLIKELY(from_date_to_date_v2_)) { - const vectorized::ColumnVector<vectorized::Int64>* column_datetime = nullptr; - if (_nullmap) { - auto nullable_column = assert_cast<const vectorized::ColumnNullable*>( - _typed_column.column.get()); - column_datetime = - assert_cast<const vectorized::ColumnVector<vectorized::Int64>*>( - nullable_column->get_nested_column_ptr().get()); - } else { - column_datetime = - assert_cast<const vectorized::ColumnVector<vectorized::Int64>*>( - _typed_column.column.get()); - } - - assert(column_datetime); - - const VecDateTimeValue* datetime_cur = - (const VecDateTimeValue*)(column_datetime->get_data().data()) + _row_pos; - const VecDateTimeValue* datetime_end = datetime_cur + _num_rows; - uint32_t* value = const_cast<uint32_t*>(values_); - if (_nullmap) { - const UInt8* nullmap_cur = _nullmap + _row_pos; - while (datetime_cur != datetime_end) { - if (!*nullmap_cur) { - *value = datetime_cur->to_date_v2(); - } else { - // do nothing - } - ++value; - ++datetime_cur; - ++nullmap_cur; - } - } else { - while (datetime_cur != datetime_end) { - *value = datetime_cur->to_date_v2(); - ++value; - ++datetime_cur; - } - } - return Status::OK(); + const vectorized::ColumnVector<uint32>* column_data = nullptr; + if (_nullmap) { + auto nullable_column = + assert_cast<const vectorized::ColumnNullable*>(_typed_column.column.get()); + column_data = assert_cast<const vectorized::ColumnVector<uint32>*>( + nullable_column->get_nested_column_ptr().get()); } else { - const vectorized::ColumnVector<uint32>* column_data = nullptr; - if (_nullmap) { - auto nullable_column = assert_cast<const vectorized::ColumnNullable*>( - _typed_column.column.get()); - column_data = assert_cast<const vectorized::ColumnVector<uint32>*>( - nullable_column->get_nested_column_ptr().get()); - } else { - column_data = assert_cast<const vectorized::ColumnVector<uint32>*>( - _typed_column.column.get()); - } - - assert(column_data); - values_ = (const uint32*)(column_data->get_data().data()) + _row_pos; - return Status::OK(); + column_data = assert_cast<const vectorized::ColumnVector<uint32>*>( + _typed_column.column.get()); } + + assert(column_data); + values_ = (const uint32*)(column_data->get_data().data()) + _row_pos; + return Status::OK(); } private: const uint32_t* values_ = nullptr; - bool from_date_to_date_v2_; }; class OlapColumnDataConvertorDateTimeV2 : public OlapColumnDataConvertorBase { @@ -367,11 +313,6 @@ private: void set_source_column(const ColumnWithTypeAndName& typed_column, size_t row_pos, size_t num_rows) override { OlapColumnDataConvertorBase::set_source_column(typed_column, row_pos, num_rows); - if (is_date_or_datetime(typed_column.type)) { - from_datetime_to_datetime_v2_ = true; - } else { - from_datetime_to_datetime_v2_ = false; - } } const void* get_data() const override { return values_; } @@ -386,67 +327,24 @@ private: } Status convert_to_olap() override { - if (UNLIKELY(from_datetime_to_datetime_v2_)) { - const vectorized::ColumnVector<vectorized::Int64>* column_datetime = nullptr; - if (_nullmap) { - auto nullable_column = assert_cast<const vectorized::ColumnNullable*>( - _typed_column.column.get()); - column_datetime = - assert_cast<const vectorized::ColumnVector<vectorized::Int64>*>( - nullable_column->get_nested_column_ptr().get()); - } else { - column_datetime = - assert_cast<const vectorized::ColumnVector<vectorized::Int64>*>( - _typed_column.column.get()); - } - - assert(column_datetime); - - const VecDateTimeValue* datetime_cur = - (const VecDateTimeValue*)(column_datetime->get_data().data()) + _row_pos; - const VecDateTimeValue* datetime_end = datetime_cur + _num_rows; - uint64_t* value = const_cast<uint64_t*>(values_); - if (_nullmap) { - const UInt8* nullmap_cur = _nullmap + _row_pos; - while (datetime_cur != datetime_end) { - if (!*nullmap_cur) { - *value = datetime_cur->to_datetime_v2(); - } else { - // do nothing - } - ++value; - ++datetime_cur; - ++nullmap_cur; - } - } else { - while (datetime_cur != datetime_end) { - *value = datetime_cur->to_datetime_v2(); - ++value; - ++datetime_cur; - } - } - return Status::OK(); + const vectorized::ColumnVector<uint64_t>* column_data = nullptr; + if (_nullmap) { + auto nullable_column = + assert_cast<const vectorized::ColumnNullable*>(_typed_column.column.get()); + column_data = assert_cast<const vectorized::ColumnVector<uint64_t>*>( + nullable_column->get_nested_column_ptr().get()); } else { - const vectorized::ColumnVector<uint64_t>* column_data = nullptr; - if (_nullmap) { - auto nullable_column = assert_cast<const vectorized::ColumnNullable*>( - _typed_column.column.get()); - column_data = assert_cast<const vectorized::ColumnVector<uint64_t>*>( - nullable_column->get_nested_column_ptr().get()); - } else { - column_data = assert_cast<const vectorized::ColumnVector<uint64_t>*>( - _typed_column.column.get()); - } - - assert(column_data); - values_ = (const uint64_t*)(column_data->get_data().data()) + _row_pos; - return Status::OK(); + column_data = assert_cast<const vectorized::ColumnVector<uint64_t>*>( + _typed_column.column.get()); } + + assert(column_data); + values_ = (const uint64_t*)(column_data->get_data().data()) + _row_pos; + return Status::OK(); } private: const uint64_t* values_ = nullptr; - bool from_datetime_to_datetime_v2_; }; // decimalv3 don't need to do any convert diff --git a/be/src/vec/runtime/vdatetime_value.h b/be/src/vec/runtime/vdatetime_value.h index 42ce3a173c..7af0c273b1 100644 --- a/be/src/vec/runtime/vdatetime_value.h +++ b/be/src/vec/runtime/vdatetime_value.h @@ -781,17 +781,6 @@ public: return val; } - uint64_t to_olap_datetime() const { - uint64_t date_val = - date_v2_value_.year_ * 10000 + date_v2_value_.month_ * 100 + date_v2_value_.day_; - uint64_t time_val = 0; - if constexpr (is_datetime) { - time_val = date_v2_value_.hour_ * 10000 + date_v2_value_.minute_ * 100 + - date_v2_value_.second_; - } - return date_val * 1000000 + time_val; - } - bool to_format_string(const char* format, int len, char* to) const; bool from_date_format_str(const char* format, int format_len, const char* value, diff --git a/be/src/vec/runtime/vparquet_writer.cpp b/be/src/vec/runtime/vparquet_writer.cpp index 14f90fc63e..9737fba854 100644 --- a/be/src/vec/runtime/vparquet_writer.cpp +++ b/be/src/vec/runtime/vparquet_writer.cpp @@ -89,20 +89,17 @@ void VParquetWriterWrapper::parse_schema(const std::vector<TParquetSchema>& parq #define DISPATCH_PARQUET_NUMERIC_WRITER(WRITER, COLUMN_TYPE, NATIVE_TYPE) \ parquet::RowGroupWriter* rgWriter = get_rg_writer(); \ parquet::WRITER* col_writer = static_cast<parquet::WRITER*>(rgWriter->column(i)); \ - __int128 default_value = 0; \ if (null_map != nullptr) { \ + auto& null_data = assert_cast<const ColumnUInt8&>(*null_map).get_data(); \ for (size_t row_id = 0; row_id < sz; row_id++) { \ - col_writer->WriteBatch(1, nullptr, nullptr, \ - (*null_map)[row_id] != 0 \ - ? reinterpret_cast<const NATIVE_TYPE*>(&default_value) \ - : reinterpret_cast<const NATIVE_TYPE*>( \ - assert_cast<const COLUMN_TYPE&>(*col) \ - .get_data_at(row_id) \ - .data)); \ + def_level[row_id] = null_data[row_id] == 0; \ } \ + col_writer->WriteBatch(sz, def_level.data(), nullptr, \ + reinterpret_cast<const NATIVE_TYPE*>( \ + assert_cast<const COLUMN_TYPE&>(*col).get_data().data())); \ } else if (const auto* not_nullable_column = check_and_get_column<const COLUMN_TYPE>(col)) { \ col_writer->WriteBatch( \ - sz, nullptr, nullptr, \ + sz, nullable ? def_level.data() : nullptr, nullptr, \ reinterpret_cast<const NATIVE_TYPE*>(not_nullable_column->get_data().data())); \ } else { \ RETURN_WRONG_TYPE \ @@ -117,14 +114,17 @@ void VParquetWriterWrapper::parse_schema(const std::vector<TParquetSchema>& parq check_and_get_data_type<DataTypeDecimal<DECIMAL_TYPE>>(remove_nullable(type).get()); \ DCHECK(decimal_type); \ if (null_map != nullptr) { \ + auto& null_data = assert_cast<const ColumnUInt8&>(*null_map).get_data(); \ for (size_t row_id = 0; row_id < sz; row_id++) { \ - if ((*null_map)[row_id] != 0) { \ - col_writer->WriteBatch(1, nullptr, nullptr, &value); \ + if (null_data[row_id] != 0) { \ + single_def_level = 0; \ + col_writer->WriteBatch(1, &single_def_level, nullptr, &value); \ + single_def_level = 1; \ } else { \ auto s = decimal_type->to_string(*col, row_id); \ value.ptr = reinterpret_cast<const uint8_t*>(s.data()); \ value.len = s.size(); \ - col_writer->WriteBatch(1, nullptr, nullptr, &value); \ + col_writer->WriteBatch(1, &single_def_level, nullptr, &value); \ } \ } \ } else { \ @@ -132,7 +132,7 @@ void VParquetWriterWrapper::parse_schema(const std::vector<TParquetSchema>& parq auto s = decimal_type->to_string(*col, row_id); \ value.ptr = reinterpret_cast<const uint8_t*>(s.data()); \ value.len = s.size(); \ - col_writer->WriteBatch(1, nullptr, nullptr, &value); \ + col_writer->WriteBatch(1, nullable ? def_level.data() : nullptr, nullptr, &value); \ } \ } @@ -141,16 +141,19 @@ void VParquetWriterWrapper::parse_schema(const std::vector<TParquetSchema>& parq parquet::ByteArrayWriter* col_writer = \ static_cast<parquet::ByteArrayWriter*>(rgWriter->column(i)); \ if (null_map != nullptr) { \ + auto& null_data = assert_cast<const ColumnUInt8&>(*null_map).get_data(); \ for (size_t row_id = 0; row_id < sz; row_id++) { \ - if ((*null_map)[row_id] != 0) { \ + if (null_data[row_id] != 0) { \ + single_def_level = 0; \ parquet::ByteArray value; \ - col_writer->WriteBatch(1, nullptr, nullptr, &value); \ + col_writer->WriteBatch(1, &single_def_level, nullptr, &value); \ + single_def_level = 1; \ } else { \ const auto& tmp = col->get_data_at(row_id); \ parquet::ByteArray value; \ value.ptr = reinterpret_cast<const uint8_t*>(tmp.data); \ value.len = tmp.size; \ - col_writer->WriteBatch(1, nullptr, nullptr, &value); \ + col_writer->WriteBatch(1, &single_def_level, nullptr, &value); \ } \ } \ } else if (const auto* not_nullable_column = check_and_get_column<const COLUMN_TYPE>(col)) { \ @@ -159,7 +162,7 @@ void VParquetWriterWrapper::parse_schema(const std::vector<TParquetSchema>& parq parquet::ByteArray value; \ value.ptr = reinterpret_cast<const uint8_t*>(tmp.data); \ value.len = tmp.size; \ - col_writer->WriteBatch(1, nullptr, nullptr, &value); \ + col_writer->WriteBatch(1, nullable ? &single_def_level : nullptr, nullptr, &value); \ } \ } else { \ RETURN_WRONG_TYPE \ @@ -173,22 +176,25 @@ Status VParquetWriterWrapper::write(const Block& block) { try { for (size_t i = 0; i < block.columns(); i++) { auto& raw_column = block.get_by_position(i).column; - const auto col = raw_column->is_nullable() - ? reinterpret_cast<const ColumnNullable*>( - block.get_by_position(i).column.get()) - ->get_nested_column_ptr() - .get() - : block.get_by_position(i).column.get(); - auto null_map = - raw_column->is_nullable() && reinterpret_cast<const ColumnNullable*>( - block.get_by_position(i).column.get()) - ->get_null_map_column_ptr() - ->has_null() - ? reinterpret_cast<const ColumnNullable*>( - block.get_by_position(i).column.get()) - ->get_null_map_column_ptr() - : nullptr; + auto nullable = raw_column->is_nullable(); + const auto col = nullable ? reinterpret_cast<const ColumnNullable*>( + block.get_by_position(i).column.get()) + ->get_nested_column_ptr() + .get() + : block.get_by_position(i).column.get(); + auto null_map = nullable && reinterpret_cast<const ColumnNullable*>( + block.get_by_position(i).column.get()) + ->has_null() + ? reinterpret_cast<const ColumnNullable*>( + block.get_by_position(i).column.get()) + ->get_null_map_column_ptr() + : nullptr; auto& type = block.get_by_position(i).type; + + std::vector<int16_t> def_level(sz); + // For scalar type, definition level == 1 means this value is not NULL. + std::fill(def_level.begin(), def_level.end(), 1); + int16_t single_def_level = 1; switch (_output_vexpr_ctxs[i]->root()->type().type) { case TYPE_BOOLEAN: { DISPATCH_PARQUET_NUMERIC_WRITER(BoolWriter, ColumnVector<UInt8>, bool) @@ -210,63 +216,49 @@ Status VParquetWriterWrapper::write(const Block& block) { break; } case TYPE_TINYINT: - case TYPE_SMALLINT: - case TYPE_INT: { + case TYPE_SMALLINT: { parquet::RowGroupWriter* rgWriter = get_rg_writer(); parquet::Int32Writer* col_writer = static_cast<parquet::Int32Writer*>(rgWriter->column(i)); - int32_t default_int32 = 0; if (null_map != nullptr) { - if (const auto* nested_column = - check_and_get_column<const ColumnVector<Int32>>(col)) { - for (size_t row_id = 0; row_id < sz; row_id++) { - col_writer->WriteBatch( - 1, nullptr, nullptr, - (*null_map)[row_id] != 0 - ? &default_int32 - : reinterpret_cast<const int32_t*>( - nested_column->get_data_at(row_id).data)); - } - } else if (const auto* int16_column = - check_and_get_column<const ColumnVector<Int16>>(col)) { + auto& null_data = assert_cast<const ColumnUInt8&>(*null_map).get_data(); + if (const auto* int16_column = + check_and_get_column<const ColumnVector<Int16>>(col)) { for (size_t row_id = 0; row_id < sz; row_id++) { + if (null_data[row_id] != 0) { + single_def_level = 0; + } const int32_t tmp = int16_column->get_data()[row_id]; - col_writer->WriteBatch( - 1, nullptr, nullptr, - (*null_map)[row_id] != 0 - ? &default_int32 - : reinterpret_cast<const int32_t*>(&tmp)); + col_writer->WriteBatch(1, &single_def_level, nullptr, + reinterpret_cast<const int32_t*>(&tmp)); + single_def_level = 1; } } else if (const auto* int8_column = check_and_get_column<const ColumnVector<Int8>>(col)) { for (size_t row_id = 0; row_id < sz; row_id++) { + if (null_data[row_id] != 0) { + single_def_level = 0; + } const int32_t tmp = int8_column->get_data()[row_id]; - col_writer->WriteBatch( - 1, nullptr, nullptr, - (*null_map)[row_id] != 0 - ? &default_int32 - : reinterpret_cast<const int32_t*>(&tmp)); + col_writer->WriteBatch(1, &single_def_level, nullptr, + reinterpret_cast<const int32_t*>(&tmp)); + single_def_level = 1; } } else { RETURN_WRONG_TYPE } - } else if (const auto* not_nullable_column = - check_and_get_column<const ColumnVector<Int32>>(col)) { - col_writer->WriteBatch(sz, nullptr, nullptr, - reinterpret_cast<const int32_t*>( - not_nullable_column->get_data().data())); } else if (const auto& int16_column = check_and_get_column<const ColumnVector<Int16>>(col)) { for (size_t row_id = 0; row_id < sz; row_id++) { const int32_t tmp = int16_column->get_data()[row_id]; - col_writer->WriteBatch(1, nullptr, nullptr, + col_writer->WriteBatch(1, nullable ? def_level.data() : nullptr, nullptr, reinterpret_cast<const int32_t*>(&tmp)); } } else if (const auto& int8_column = check_and_get_column<const ColumnVector<Int8>>(col)) { for (size_t row_id = 0; row_id < sz; row_id++) { const int32_t tmp = int8_column->get_data()[row_id]; - col_writer->WriteBatch(1, nullptr, nullptr, + col_writer->WriteBatch(1, nullable ? def_level.data() : nullptr, nullptr, reinterpret_cast<const int32_t*>(&tmp)); } } else { @@ -274,25 +266,34 @@ Status VParquetWriterWrapper::write(const Block& block) { } break; } + case TYPE_INT: { + DISPATCH_PARQUET_NUMERIC_WRITER(Int32Writer, ColumnVector<Int32>, Int32) + break; + } case TYPE_DATETIME: case TYPE_DATE: { parquet::RowGroupWriter* rgWriter = get_rg_writer(); parquet::Int64Writer* col_writer = static_cast<parquet::Int64Writer*>(rgWriter->column(i)); - int64_t default_int64 = 0; + uint64_t default_int64 = 0; if (null_map != nullptr) { + auto& null_data = assert_cast<const ColumnUInt8&>(*null_map).get_data(); for (size_t row_id = 0; row_id < sz; row_id++) { - if ((*null_map)[row_id] != 0) { - col_writer->WriteBatch(1, nullptr, nullptr, &default_int64); + def_level[row_id] = null_data[row_id] == 0; + } + uint64_t tmp_data[sz]; + for (size_t row_id = 0; row_id < sz; row_id++) { + if (null_data[row_id] != 0) { + tmp_data[row_id] = default_int64; } else { - const auto tmp = binary_cast<Int64, VecDateTimeValue>( - assert_cast<const ColumnVector<Int64>&>(*col) - .get_data()[row_id]) - .to_olap_datetime(); - col_writer->WriteBatch(1, nullptr, nullptr, - reinterpret_cast<const int64_t*>(&tmp)); + tmp_data[row_id] = binary_cast<Int64, VecDateTimeValue>( + assert_cast<const ColumnVector<Int64>&>(*col) + .get_data()[row_id]) + .to_olap_datetime(); } } + col_writer->WriteBatch(sz, def_level.data(), nullptr, + reinterpret_cast<const int64_t*>(tmp_data)); } else if (const auto* not_nullable_column = check_and_get_column<const ColumnVector<Int64>>(col)) { std::vector<uint64_t> res(sz); @@ -301,7 +302,7 @@ Status VParquetWriterWrapper::write(const Block& block) { not_nullable_column->get_data()[row_id]) .to_olap_datetime(); } - col_writer->WriteBatch(sz, nullptr, nullptr, + col_writer->WriteBatch(sz, nullable ? def_level.data() : nullptr, nullptr, reinterpret_cast<const int64_t*>(res.data())); } else { RETURN_WRONG_TYPE @@ -310,32 +311,39 @@ Status VParquetWriterWrapper::write(const Block& block) { } case TYPE_DATEV2: { parquet::RowGroupWriter* rgWriter = get_rg_writer(); - parquet::Int64Writer* col_writer = - static_cast<parquet::Int64Writer*>(rgWriter->column(i)); - int64_t default_int64 = 0; + parquet::ByteArrayWriter* col_writer = + static_cast<parquet::ByteArrayWriter*>(rgWriter->column(i)); + parquet::ByteArray value; if (null_map != nullptr) { + auto& null_data = assert_cast<const ColumnUInt8&>(*null_map).get_data(); for (size_t row_id = 0; row_id < sz; row_id++) { - if ((*null_map)[row_id] != 0) { - col_writer->WriteBatch(1, nullptr, nullptr, &default_int64); + if (null_data[row_id] != 0) { + single_def_level = 0; + col_writer->WriteBatch(1, &single_def_level, nullptr, &value); + single_def_level = 1; } else { - uint64_t tmp = binary_cast<UInt32, DateV2Value<DateV2ValueType>>( - assert_cast<const ColumnVector<UInt32>&>(*col) - .get_data()[row_id]) - .to_olap_datetime(); - col_writer->WriteBatch(1, nullptr, nullptr, - reinterpret_cast<const int64_t*>(&tmp)); + char buffer[30]; + int output_scale = _output_vexpr_ctxs[i]->root()->type().scale; + value.ptr = reinterpret_cast<const uint8_t*>(buffer); + value.len = binary_cast<UInt32, DateV2Value<DateV2ValueType>>( + assert_cast<const ColumnVector<UInt32>&>(*col) + .get_data()[row_id]) + .to_buffer(buffer, output_scale); + col_writer->WriteBatch(1, &single_def_level, nullptr, &value); } } } else if (const auto* not_nullable_column = check_and_get_column<const ColumnVector<UInt32>>(col)) { - std::vector<uint64_t> res(sz); for (size_t row_id = 0; row_id < sz; row_id++) { - res[row_id] = binary_cast<UInt32, DateV2Value<DateV2ValueType>>( - not_nullable_column->get_data()[row_id]) - .to_olap_datetime(); + char buffer[30]; + int output_scale = _output_vexpr_ctxs[i]->root()->type().scale; + value.ptr = reinterpret_cast<const uint8_t*>(buffer); + value.len = binary_cast<UInt32, DateV2Value<DateV2ValueType>>( + not_nullable_column->get_data()[row_id]) + .to_buffer(buffer, output_scale); + col_writer->WriteBatch(1, nullable ? &single_def_level : nullptr, nullptr, + &value); } - col_writer->WriteBatch(sz, nullptr, nullptr, - reinterpret_cast<const int64_t*>(res.data())); } else { RETURN_WRONG_TYPE } @@ -343,32 +351,39 @@ Status VParquetWriterWrapper::write(const Block& block) { } case TYPE_DATETIMEV2: { parquet::RowGroupWriter* rgWriter = get_rg_writer(); - parquet::Int64Writer* col_writer = - static_cast<parquet::Int64Writer*>(rgWriter->column(i)); - int64_t default_int64 = 0; + parquet::ByteArrayWriter* col_writer = + static_cast<parquet::ByteArrayWriter*>(rgWriter->column(i)); + parquet::ByteArray value; if (null_map != nullptr) { + auto& null_data = assert_cast<const ColumnUInt8&>(*null_map).get_data(); for (size_t row_id = 0; row_id < sz; row_id++) { - if ((*null_map)[row_id] != 0) { - col_writer->WriteBatch(1, nullptr, nullptr, &default_int64); + if (null_data[row_id] != 0) { + single_def_level = 0; + col_writer->WriteBatch(1, &single_def_level, nullptr, &value); + single_def_level = 1; } else { - uint64_t tmp = binary_cast<UInt64, DateV2Value<DateTimeV2ValueType>>( - assert_cast<const ColumnVector<UInt64>&>(*col) - .get_data()[row_id]) - .to_olap_datetime(); - col_writer->WriteBatch(1, nullptr, nullptr, - reinterpret_cast<const int64_t*>(&tmp)); + char buffer[30]; + int output_scale = _output_vexpr_ctxs[i]->root()->type().scale; + value.ptr = reinterpret_cast<const uint8_t*>(buffer); + value.len = binary_cast<UInt64, DateV2Value<DateTimeV2ValueType>>( + assert_cast<const ColumnVector<UInt64>&>(*col) + .get_data()[row_id]) + .to_buffer(buffer, output_scale); + col_writer->WriteBatch(1, &single_def_level, nullptr, &value); } } } else if (const auto* not_nullable_column = check_and_get_column<const ColumnVector<UInt64>>(col)) { - std::vector<uint64_t> res(sz); for (size_t row_id = 0; row_id < sz; row_id++) { - res[row_id] = binary_cast<UInt64, DateV2Value<DateTimeV2ValueType>>( - not_nullable_column->get_data()[row_id]) - .to_olap_datetime(); + char buffer[30]; + int output_scale = _output_vexpr_ctxs[i]->root()->type().scale; + value.ptr = reinterpret_cast<const uint8_t*>(buffer); + value.len = binary_cast<UInt64, DateV2Value<DateTimeV2ValueType>>( + not_nullable_column->get_data()[row_id]) + .to_buffer(buffer, output_scale); + col_writer->WriteBatch(1, nullable ? &single_def_level : nullptr, nullptr, + &value); } - col_writer->WriteBatch(sz, nullptr, nullptr, - reinterpret_cast<const int64_t*>(res.data())); } else { RETURN_WRONG_TYPE } @@ -402,9 +417,12 @@ Status VParquetWriterWrapper::write(const Block& block) { static_cast<parquet::ByteArrayWriter*>(rgWriter->column(i)); parquet::ByteArray value; if (null_map != nullptr) { + auto& null_data = assert_cast<const ColumnUInt8&>(*null_map).get_data(); for (size_t row_id = 0; row_id < sz; row_id++) { - if ((*null_map)[row_id] != 0) { - col_writer->WriteBatch(1, nullptr, nullptr, &value); + if (null_data[row_id] != 0) { + single_def_level = 0; + col_writer->WriteBatch(1, &single_def_level, nullptr, &value); + single_def_level = 1; } else { const DecimalV2Value decimal_val(reinterpret_cast<const PackedInt128*>( col->get_data_at(row_id).data) @@ -413,7 +431,7 @@ Status VParquetWriterWrapper::write(const Block& block) { int output_scale = _output_vexpr_ctxs[i]->root()->type().scale; value.ptr = reinterpret_cast<const uint8_t*>(decimal_buffer); value.len = decimal_val.to_buffer(decimal_buffer, output_scale); - col_writer->WriteBatch(1, nullptr, nullptr, &value); + col_writer->WriteBatch(1, &single_def_level, nullptr, &value); } } } else if (const auto* not_nullable_column = @@ -427,7 +445,8 @@ Status VParquetWriterWrapper::write(const Block& block) { int output_scale = _output_vexpr_ctxs[i]->root()->type().scale; value.ptr = reinterpret_cast<const uint8_t*>(decimal_buffer); value.len = decimal_val.to_buffer(decimal_buffer, output_scale); - col_writer->WriteBatch(1, nullptr, nullptr, &value); + col_writer->WriteBatch(1, nullable ? &single_def_level : nullptr, nullptr, + &value); } } else { RETURN_WRONG_TYPE diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java index 64f7ec6737..fdb88b1ed4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java @@ -190,7 +190,7 @@ public class OutFileClause { return parquetSchemas; } - public void analyze(Analyzer analyzer, List<Expr> resultExprs) throws UserException { + public void analyze(Analyzer analyzer, List<Expr> resultExprs, List<String> colLabels) throws UserException { if (isAnalyzed) { // If the query stmt is rewritten, the whole stmt will be analyzed again. // But some of fields in this OutfileClause has been changed, @@ -229,13 +229,13 @@ public class OutFileClause { isAnalyzed = true; if (isParquetFormat()) { - analyzeForParquetFormat(resultExprs); + analyzeForParquetFormat(resultExprs, colLabels); } } - private void analyzeForParquetFormat(List<Expr> resultExprs) throws AnalysisException { + private void analyzeForParquetFormat(List<Expr> resultExprs, List<String> colLabels) throws AnalysisException { if (this.parquetSchemas.isEmpty()) { - genParquetSchema(resultExprs); + genParquetSchema(resultExprs, colLabels); } // check schema number @@ -265,10 +265,8 @@ public class OutFileClause { case BIGINT: case DATE: case DATETIME: - case DATETIMEV2: - case DATEV2: if (!PARQUET_DATA_TYPE_MAP.get("int64").equals(type)) { - throw new AnalysisException("project field type is BIGINT/DATE/DATETIME/DATEV2/DATETIMEV2," + throw new AnalysisException("project field type is BIGINT/DATE/DATETIME," + "should use int64, but the definition type of column " + i + " is " + type); } break; @@ -291,9 +289,12 @@ public class OutFileClause { case DECIMAL64: case DECIMAL128: case DECIMALV2: + case DATETIMEV2: + case DATEV2: if (!PARQUET_DATA_TYPE_MAP.get("byte_array").equals(type)) { - throw new AnalysisException("project field type is CHAR/VARCHAR/STRING/DECIMAL," - + " should use byte_array, but the definition type of column " + i + " is " + type); + throw new AnalysisException("project field type is CHAR/VARCHAR/STRING/DECIMAL/DATEV2" + + "/DATETIMEV2, should use byte_array, but the definition type of column " + + i + " is " + type); } break; case HLL: @@ -316,12 +317,16 @@ public class OutFileClause { } } - private void genParquetSchema(List<Expr> resultExprs) throws AnalysisException { + private void genParquetSchema(List<Expr> resultExprs, List<String> colLabels) throws AnalysisException { Preconditions.checkState(this.parquetSchemas.isEmpty()); for (int i = 0; i < resultExprs.size(); ++i) { Expr expr = resultExprs.get(i); TParquetSchema parquetSchema = new TParquetSchema(); - parquetSchema.schema_repetition_type = PARQUET_REPETITION_TYPE_MAP.get("required"); + if (resultExprs.get(i).isNullable()) { + parquetSchema.schema_repetition_type = PARQUET_REPETITION_TYPE_MAP.get("optional"); + } else { + parquetSchema.schema_repetition_type = PARQUET_REPETITION_TYPE_MAP.get("required"); + } switch (expr.getType().getPrimitiveType()) { case BOOLEAN: parquetSchema.schema_data_type = PARQUET_DATA_TYPE_MAP.get("boolean"); @@ -334,8 +339,6 @@ public class OutFileClause { case BIGINT: case DATE: case DATETIME: - case DATETIMEV2: - case DATEV2: parquetSchema.schema_data_type = PARQUET_DATA_TYPE_MAP.get("int64"); break; case FLOAT: @@ -351,6 +354,8 @@ public class OutFileClause { case DECIMAL32: case DECIMAL64: case DECIMAL128: + case DATETIMEV2: + case DATEV2: parquetSchema.schema_data_type = PARQUET_DATA_TYPE_MAP.get("byte_array"); break; case HLL: @@ -364,7 +369,7 @@ public class OutFileClause { throw new AnalysisException("currently parquet do not support column type: " + expr.getType().getPrimitiveType()); } - parquetSchema.schema_column_name = "col" + i; + parquetSchema.schema_column_name = colLabels.get(i); parquetSchemas.add(parquetSchema); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java index e29a68d375..5645219af7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java @@ -559,7 +559,7 @@ public class SelectStmt extends QueryStmt { } } if (hasOutFileClause()) { - outFileClause.analyze(analyzer, resultExprs); + outFileClause.analyze(analyzer, resultExprs, colLabels); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/SetOperationStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/SetOperationStmt.java index b3ef8269e6..c8cb2a8ccb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/SetOperationStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/SetOperationStmt.java @@ -303,7 +303,7 @@ public class SetOperationStmt extends QueryStmt { baseTblResultExprs = resultExprs; if (hasOutFileClause()) { - outFileClause.analyze(analyzer, resultExprs); + outFileClause.analyze(analyzer, resultExprs, getColLabels()); } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/analysis/SelectStmtTest.java b/fe/fe-core/src/test/java/org/apache/doris/analysis/SelectStmtTest.java index 6932bbcaab..4945a59690 100755 --- a/fe/fe-core/src/test/java/org/apache/doris/analysis/SelectStmtTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/SelectStmtTest.java @@ -640,11 +640,11 @@ public class SelectStmtTest { try { SelectStmt stmt = (SelectStmt) UtFrameUtils.parseAndAnalyzeStmt(sql, ctx); Assert.assertEquals(1, stmt.getOutFileClause().getParquetSchemas().size()); - Assert.assertEquals(stmt.getOutFileClause().PARQUET_REPETITION_TYPE_MAP.get("required"), + Assert.assertEquals(stmt.getOutFileClause().PARQUET_REPETITION_TYPE_MAP.get("optional"), stmt.getOutFileClause().getParquetSchemas().get(0).schema_repetition_type); Assert.assertEquals(stmt.getOutFileClause().PARQUET_DATA_TYPE_MAP.get("byte_array"), stmt.getOutFileClause().getParquetSchemas().get(0).schema_data_type); - Assert.assertEquals("col0", stmt.getOutFileClause().getParquetSchemas().get(0).schema_column_name); + Assert.assertEquals("k1", stmt.getOutFileClause().getParquetSchemas().get(0).schema_column_name); } catch (Exception e) { Assert.fail(e.getMessage()); } diff --git a/regression-test/data/export_p0/test_outfile_parquet.out b/regression-test/data/export_p0/test_outfile_parquet.out new file mode 100644 index 0000000000..cb6eab3268 --- /dev/null +++ b/regression-test/data/export_p0/test_outfile_parquet.out @@ -0,0 +1,25 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !select_default -- +1 2017-10-01 2017-10-01T00:00 2017-10-01 2017-10-01T00:00 2017-10-01T00:00:00.111 2017-10-01T00:00:00.111111 Beijing 1 1 true 1 1 1 1.1 1.1 char1 1 +2 2017-10-01 2017-10-01T00:00 2017-10-01 2017-10-01T00:00 2017-10-01T00:00:00.111 2017-10-01T00:00:00.111111 Beijing 2 2 true 2 2 2 2.2 2.2 char2 2 +3 2017-10-01 2017-10-01T00:00 2017-10-01 2017-10-01T00:00 2017-10-01T00:00:00.111 2017-10-01T00:00:00.111111 Beijing 3 3 true 3 3 3 3.3 3.3 char3 3 +4 2017-10-01 2017-10-01T00:00 2017-10-01 2017-10-01T00:00 2017-10-01T00:00:00.111 2017-10-01T00:00:00.111111 Beijing 4 4 true 4 4 4 4.4 4.4 char4 4 +5 2017-10-01 2017-10-01T00:00 2017-10-01 2017-10-01T00:00 2017-10-01T00:00:00.111 2017-10-01T00:00:00.111111 Beijing 5 5 true 5 5 5 5.5 5.5 char5 5 +6 2017-10-01 2017-10-01T00:00 2017-10-01 2017-10-01T00:00 2017-10-01T00:00:00.111 2017-10-01T00:00:00.111111 Beijing 6 6 true 6 6 6 6.6 6.6 char6 6 +7 2017-10-01 2017-10-01T00:00 2017-10-01 2017-10-01T00:00 2017-10-01T00:00:00.111 2017-10-01T00:00:00.111111 Beijing 7 7 true 7 7 7 7.7 7.7 char7 7 +8 2017-10-01 2017-10-01T00:00 2017-10-01 2017-10-01T00:00 2017-10-01T00:00:00.111 2017-10-01T00:00:00.111111 Beijing 8 8 true 8 8 8 8.8 8.8 char8 8 +9 2017-10-01 2017-10-01T00:00 2017-10-01 2017-10-01T00:00 2017-10-01T00:00:00.111 2017-10-01T00:00:00.111111 Beijing 9 9 true 9 9 9 9.9 9.9 char9 9 +10 2017-10-01 2017-10-01T00:00 2017-10-01 2017-10-01T00:00 2017-10-01T00:00:00.111 2017-10-01T00:00:00.111111 \N \N \N \N \N \N \N \N \N \N \N + +-- !select_default -- +1 2017-10-01 2017-10-01T00:00 2017-10-01 2017-10-01T00:00 2017-10-01T00:00:00.111 2017-10-01T00:00:00.111111 Beijing 1 1 true 1 1 1 1.1 1.1 char1 1 +2 2017-10-01 2017-10-01T00:00 2017-10-01 2017-10-01T00:00 2017-10-01T00:00:00.111 2017-10-01T00:00:00.111111 Beijing 2 2 true 2 2 2 2.2 2.2 char2 2 +3 2017-10-01 2017-10-01T00:00 2017-10-01 2017-10-01T00:00 2017-10-01T00:00:00.111 2017-10-01T00:00:00.111111 Beijing 3 3 true 3 3 3 3.3 3.3 char3 3 +4 2017-10-01 2017-10-01T00:00 2017-10-01 2017-10-01T00:00 2017-10-01T00:00:00.111 2017-10-01T00:00:00.111111 Beijing 4 4 true 4 4 4 4.4 4.4 char4 4 +5 2017-10-01 2017-10-01T00:00 2017-10-01 2017-10-01T00:00 2017-10-01T00:00:00.111 2017-10-01T00:00:00.111111 Beijing 5 5 true 5 5 5 5.5 5.5 char5 5 +6 2017-10-01 2017-10-01T00:00 2017-10-01 2017-10-01T00:00 2017-10-01T00:00:00.111 2017-10-01T00:00:00.111111 Beijing 6 6 true 6 6 6 6.6 6.6 char6 6 +7 2017-10-01 2017-10-01T00:00 2017-10-01 2017-10-01T00:00 2017-10-01T00:00:00.111 2017-10-01T00:00:00.111111 Beijing 7 7 true 7 7 7 7.7 7.7 char7 7 +8 2017-10-01 2017-10-01T00:00 2017-10-01 2017-10-01T00:00 2017-10-01T00:00:00.111 2017-10-01T00:00:00.111111 Beijing 8 8 true 8 8 8 8.8 8.8 char8 8 +9 2017-10-01 2017-10-01T00:00 2017-10-01 2017-10-01T00:00 2017-10-01T00:00:00.111 2017-10-01T00:00:00.111111 Beijing 9 9 true 9 9 9 9.9 9.9 char9 9 +10 2017-10-01 2017-10-01T00:00 2017-10-01 2017-10-01T00:00 2017-10-01T00:00:00.111 2017-10-01T00:00:00.111111 \N \N \N \N \N \N \N \N \N \N \N + diff --git a/regression-test/suites/export_p0/test_outfile_parquet.groovy b/regression-test/suites/export_p0/test_outfile_parquet.groovy new file mode 100644 index 0000000000..2804b1e0e5 --- /dev/null +++ b/regression-test/suites/export_p0/test_outfile_parquet.groovy @@ -0,0 +1,158 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.codehaus.groovy.runtime.IOGroovyMethods + +import java.nio.charset.StandardCharsets +import java.nio.file.Files +import java.nio.file.Paths + +suite("test_outfile_parquet") { + def dbName = "test_query_db" + sql "CREATE DATABASE IF NOT EXISTS ${dbName}" + sql "USE $dbName" + StringBuilder strBuilder = new StringBuilder() + strBuilder.append("curl --location-trusted -u " + context.config.jdbcUser + ":" + context.config.jdbcPassword) + strBuilder.append(" http://" + context.config.feHttpAddress + "/rest/v1/config/fe") + + String command = strBuilder.toString() + def process = command.toString().execute() + def code = process.waitFor() + def err = IOGroovyMethods.getText(new BufferedReader(new InputStreamReader(process.getErrorStream()))); + def out = process.getText() + logger.info("Request FE Config: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def response = parseJson(out.trim()) + assertEquals(response.code, 0) + assertEquals(response.msg, "success") + def configJson = response.data.rows + boolean enableOutfileToLocal = false + for (Object conf: configJson) { + assert conf instanceof Map + if (((Map<String, String>) conf).get("Name").toLowerCase() == "enable_outfile_to_local") { + enableOutfileToLocal = ((Map<String, String>) conf).get("Value").toLowerCase() == "true" + } + } + if (!enableOutfileToLocal) { + logger.warn("Please set enable_outfile_to_local to true to run test_outfile") + return + } + def tableName = "outfile_parquet_test" + def tableName2 = "outfile_parquet_test2" + def outFilePath = """${context.file.parent}/tmp""" + try { + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} ( + `user_id` INT NOT NULL COMMENT "用户id", + `date` DATE NOT NULL COMMENT "数据灌入日期时间", + `datetime` DATETIME NOT NULL COMMENT "数据灌入日期时间", + `date_1` DATEV2 NOT NULL COMMENT "", + `datetime_1` DATETIMEV2 NOT NULL COMMENT "", + `datetime_2` DATETIMEV2(3) NOT NULL COMMENT "", + `datetime_3` DATETIMEV2(6) NOT NULL COMMENT "", + `city` VARCHAR(20) COMMENT "用户所在城市", + `age` SMALLINT COMMENT "用户年龄", + `sex` TINYINT COMMENT "用户性别", + `bool_col` boolean COMMENT "", + `int_col` int COMMENT "", + `bigint_col` bigint COMMENT "", + `largeint_col` int COMMENT "", + `float_col` float COMMENT "", + `double_col` double COMMENT "", + `char_col` CHAR(10) COMMENT "", + `decimal_col` decimal COMMENT "" + ) + DISTRIBUTED BY HASH(user_id) PROPERTIES("replication_num" = "1"); + """ + StringBuilder sb = new StringBuilder() + int i = 1 + for (; i < 10; i ++) { + sb.append(""" + (${i}, '2017-10-01', '2017-10-01 00:00:00', '2017-10-01', '2017-10-01 00:00:00.111111', '2017-10-01 00:00:00.111111', '2017-10-01 00:00:00.111111', 'Beijing', ${i}, ${i % 128}, true, ${i}, ${i}, ${i}, ${i}.${i}, ${i}.${i}, 'char${i}', ${i}), + """) + } + sb.append(""" + (${i}, '2017-10-01', '2017-10-01 00:00:00', '2017-10-01', '2017-10-01 00:00:00.111111', '2017-10-01 00:00:00.111111', '2017-10-01 00:00:00.111111', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL) + """) + sql """ INSERT INTO ${tableName} VALUES + ${sb.toString()} + """ + qt_select_default """ SELECT * FROM ${tableName} t ORDER BY user_id; """ + + // check outfile + File path = new File(outFilePath) + if (!path.exists()) { + assert path.mkdirs() + } else { + throw new IllegalStateException("""${outFilePath} already exists! """) + } + sql """ + SELECT * FROM ${tableName} t ORDER BY user_id INTO OUTFILE "file://${outFilePath}/" FORMAT AS PARQUET; + """ + + File[] files = path.listFiles() + assert files.length == 1 + + sql """ DROP TABLE IF EXISTS ${tableName2} """ + sql """ + CREATE TABLE IF NOT EXISTS ${tableName2} ( + `user_id` INT NOT NULL COMMENT "用户id", + `date` DATE NOT NULL COMMENT "数据灌入日期时间", + `datetime` DATETIME NOT NULL COMMENT "数据灌入日期时间", + `date_1` DATEV2 NOT NULL COMMENT "", + `datetime_1` DATETIMEV2 NOT NULL COMMENT "", + `datetime_2` DATETIMEV2(3) NOT NULL COMMENT "", + `datetime_3` DATETIMEV2(6) NOT NULL COMMENT "", + `city` VARCHAR(20) COMMENT "用户所在城市", + `age` SMALLINT COMMENT "用户年龄", + `sex` TINYINT COMMENT "用户性别", + `bool_col` boolean COMMENT "", + `int_col` int COMMENT "", + `bigint_col` bigint COMMENT "", + `largeint_col` int COMMENT "", + `float_col` float COMMENT "", + `double_col` double COMMENT "", + `char_col` CHAR(10) COMMENT "", + `decimal_col` decimal COMMENT "" + ) + DISTRIBUTED BY HASH(user_id) PROPERTIES("replication_num" = "1"); + """ + + StringBuilder commandBuilder = new StringBuilder() + commandBuilder.append("""curl -v --location-trusted -u ${context.config.feHttpUser}:${context.config.feHttpPassword}""") + commandBuilder.append(""" -H format:parquet -T """ + files[0].getAbsolutePath() + """ http://${context.config.feHttpAddress}/api/""" + dbName + "/" + tableName2 + "/_stream_load") + command = commandBuilder.toString() + process = command.execute() + code = process.waitFor() + err = IOGroovyMethods.getText(new BufferedReader(new InputStreamReader(process.getErrorStream()))) + out = process.getText() + logger.info("Run command: command=" + command + ",code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + qt_select_default """ SELECT * FROM ${tableName2} t ORDER BY user_id; """ + } finally { + try_sql("DROP TABLE IF EXISTS ${tableName}") + try_sql("DROP TABLE IF EXISTS ${tableName2}") + File path = new File(outFilePath) + if (path.exists()) { + for (File f: path.listFiles()) { + f.delete(); + } + path.delete(); + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org