This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new c2fae109c3 [Improvement](outfile) Support output null in parquet 
writer (#12970)
c2fae109c3 is described below

commit c2fae109c3f54dd28c6b8cf858200dadcf344949
Author: Gabriel <gabrielleeb...@gmail.com>
AuthorDate: Thu Sep 29 13:36:30 2022 +0800

    [Improvement](outfile) Support output null in parquet writer (#12970)
---
 be/src/vec/olap/olap_data_convertor.cpp            | 228 ++++++-------------
 be/src/vec/olap/olap_data_convertor.h              | 150 ++-----------
 be/src/vec/runtime/vdatetime_value.h               |  11 -
 be/src/vec/runtime/vparquet_writer.cpp             | 243 +++++++++++----------
 .../org/apache/doris/analysis/OutFileClause.java   |  33 +--
 .../java/org/apache/doris/analysis/SelectStmt.java |   2 +-
 .../apache/doris/analysis/SetOperationStmt.java    |   2 +-
 .../org/apache/doris/analysis/SelectStmtTest.java  |   4 +-
 .../data/export_p0/test_outfile_parquet.out        |  25 +++
 .../suites/export_p0/test_outfile_parquet.groovy   | 158 ++++++++++++++
 10 files changed, 425 insertions(+), 431 deletions(-)

diff --git a/be/src/vec/olap/olap_data_convertor.cpp 
b/be/src/vec/olap/olap_data_convertor.cpp
index 7a1f2cfbec..58ab5d6579 100644
--- a/be/src/vec/olap/olap_data_convertor.cpp
+++ b/be/src/vec/olap/olap_data_convertor.cpp
@@ -490,99 +490,49 @@ void 
OlapBlockDataConvertor::OlapColumnDataConvertorDate::set_source_column(
         const ColumnWithTypeAndName& typed_column, size_t row_pos, size_t 
num_rows) {
     
OlapBlockDataConvertor::OlapColumnDataConvertorPaddedPODArray<uint24_t>::set_source_column(
             typed_column, row_pos, num_rows);
-    if (is_date_v2(typed_column.type)) {
-        from_date_v2_ = true;
-    } else {
-        from_date_v2_ = false;
-    }
 }
 
 Status OlapBlockDataConvertor::OlapColumnDataConvertorDate::convert_to_olap() {
     assert(_typed_column.column);
-    if (from_date_v2_) {
-        const vectorized::ColumnVector<vectorized::UInt32>* column_datetime = 
nullptr;
-        if (_nullmap) {
-            auto nullable_column =
-                    assert_cast<const 
vectorized::ColumnNullable*>(_typed_column.column.get());
-            column_datetime = assert_cast<const 
vectorized::ColumnVector<vectorized::UInt32>*>(
-                    nullable_column->get_nested_column_ptr().get());
-        } else {
-            column_datetime = assert_cast<const 
vectorized::ColumnVector<vectorized::UInt32>*>(
-                    _typed_column.column.get());
-        }
+    const vectorized::ColumnVector<vectorized::Int64>* column_datetime = 
nullptr;
+    if (_nullmap) {
+        auto nullable_column =
+                assert_cast<const 
vectorized::ColumnNullable*>(_typed_column.column.get());
+        column_datetime = assert_cast<const 
vectorized::ColumnVector<vectorized::Int64>*>(
+                nullable_column->get_nested_column_ptr().get());
+    } else {
+        column_datetime = assert_cast<const 
vectorized::ColumnVector<vectorized::Int64>*>(
+                _typed_column.column.get());
+    }
 
-        assert(column_datetime);
-
-        const DateV2Value<DateV2ValueType>* datetime_cur =
-                (const 
DateV2Value<DateV2ValueType>*)(column_datetime->get_data().data()) +
-                _row_pos;
-        const DateV2Value<DateV2ValueType>* datetime_end = datetime_cur + 
_num_rows;
-        uint24_t* value = _values.data();
-        if (_nullmap) {
-            const UInt8* nullmap_cur = _nullmap + _row_pos;
-            while (datetime_cur != datetime_end) {
-                if (!*nullmap_cur) {
-                    *value = datetime_cur->to_olap_date();
-                } else {
-                    // do nothing
-                }
-                ++value;
-                ++datetime_cur;
-                ++nullmap_cur;
-            }
-            assert(nullmap_cur == _nullmap + _row_pos + _num_rows &&
-                   value == _values.get_end_ptr());
-        } else {
-            while (datetime_cur != datetime_end) {
+    assert(column_datetime);
+
+    const VecDateTimeValue* datetime_cur =
+            (const VecDateTimeValue*)(column_datetime->get_data().data()) + 
_row_pos;
+    const VecDateTimeValue* datetime_end = datetime_cur + _num_rows;
+    uint24_t* value = _values.data();
+    if (_nullmap) {
+        const UInt8* nullmap_cur = _nullmap + _row_pos;
+        while (datetime_cur != datetime_end) {
+            if (!*nullmap_cur) {
                 *value = datetime_cur->to_olap_date();
-                ++value;
-                ++datetime_cur;
+            } else {
+                // do nothing
             }
-            assert(value == _values.get_end_ptr());
+            ++value;
+            ++datetime_cur;
+            ++nullmap_cur;
         }
-        return Status::OK();
+        assert(nullmap_cur == _nullmap + _row_pos + _num_rows && value == 
_values.get_end_ptr());
     } else {
-        const vectorized::ColumnVector<vectorized::Int64>* column_datetime = 
nullptr;
-        if (_nullmap) {
-            auto nullable_column =
-                    assert_cast<const 
vectorized::ColumnNullable*>(_typed_column.column.get());
-            column_datetime = assert_cast<const 
vectorized::ColumnVector<vectorized::Int64>*>(
-                    nullable_column->get_nested_column_ptr().get());
-        } else {
-            column_datetime = assert_cast<const 
vectorized::ColumnVector<vectorized::Int64>*>(
-                    _typed_column.column.get());
-        }
-
-        assert(column_datetime);
-
-        const VecDateTimeValue* datetime_cur =
-                (const VecDateTimeValue*)(column_datetime->get_data().data()) 
+ _row_pos;
-        const VecDateTimeValue* datetime_end = datetime_cur + _num_rows;
-        uint24_t* value = _values.data();
-        if (_nullmap) {
-            const UInt8* nullmap_cur = _nullmap + _row_pos;
-            while (datetime_cur != datetime_end) {
-                if (!*nullmap_cur) {
-                    *value = datetime_cur->to_olap_date();
-                } else {
-                    // do nothing
-                }
-                ++value;
-                ++datetime_cur;
-                ++nullmap_cur;
-            }
-            assert(nullmap_cur == _nullmap + _row_pos + _num_rows &&
-                   value == _values.get_end_ptr());
-        } else {
-            while (datetime_cur != datetime_end) {
-                *value = datetime_cur->to_olap_date();
-                ++value;
-                ++datetime_cur;
-            }
-            assert(value == _values.get_end_ptr());
+        while (datetime_cur != datetime_end) {
+            *value = datetime_cur->to_olap_date();
+            ++value;
+            ++datetime_cur;
         }
-        return Status::OK();
+        assert(value == _values.get_end_ptr());
     }
+    return Status::OK();
 }
 
 // class OlapBlockDataConvertor::OlapColumnDataConvertorJsonb
@@ -660,99 +610,49 @@ void 
OlapBlockDataConvertor::OlapColumnDataConvertorDateTime::set_source_column(
         const ColumnWithTypeAndName& typed_column, size_t row_pos, size_t 
num_rows) {
     
OlapBlockDataConvertor::OlapColumnDataConvertorPaddedPODArray<uint64_t>::set_source_column(
             typed_column, row_pos, num_rows);
-    if (is_date_v2_or_datetime_v2(typed_column.type)) {
-        from_datetime_v2_ = true;
-    } else {
-        from_datetime_v2_ = false;
-    }
 }
 
 Status 
OlapBlockDataConvertor::OlapColumnDataConvertorDateTime::convert_to_olap() {
     assert(_typed_column.column);
-    if (from_datetime_v2_) {
-        const vectorized::ColumnVector<vectorized::UInt64>* column_datetimev2 
= nullptr;
-        if (_nullmap) {
-            auto nullable_column =
-                    assert_cast<const 
vectorized::ColumnNullable*>(_typed_column.column.get());
-            column_datetimev2 = assert_cast<const 
vectorized::ColumnVector<vectorized::UInt64>*>(
-                    nullable_column->get_nested_column_ptr().get());
-        } else {
-            column_datetimev2 = assert_cast<const 
vectorized::ColumnVector<vectorized::UInt64>*>(
-                    _typed_column.column.get());
-        }
+    const vectorized::ColumnVector<vectorized::Int64>* column_datetime = 
nullptr;
+    if (_nullmap) {
+        auto nullable_column =
+                assert_cast<const 
vectorized::ColumnNullable*>(_typed_column.column.get());
+        column_datetime = assert_cast<const 
vectorized::ColumnVector<vectorized::Int64>*>(
+                nullable_column->get_nested_column_ptr().get());
+    } else {
+        column_datetime = assert_cast<const 
vectorized::ColumnVector<vectorized::Int64>*>(
+                _typed_column.column.get());
+    }
 
-        assert(column_datetimev2);
-
-        const DateV2Value<DateTimeV2ValueType>* datetime_cur =
-                (const 
DateV2Value<DateTimeV2ValueType>*)(column_datetimev2->get_data().data()) +
-                _row_pos;
-        const DateV2Value<DateTimeV2ValueType>* datetime_end = datetime_cur + 
_num_rows;
-        uint64_t* value = _values.data();
-        if (_nullmap) {
-            const UInt8* nullmap_cur = _nullmap + _row_pos;
-            while (datetime_cur != datetime_end) {
-                if (!*nullmap_cur) {
-                    *value = datetime_cur->to_olap_datetime();
-                } else {
-                    // do nothing
-                }
-                ++value;
-                ++datetime_cur;
-                ++nullmap_cur;
-            }
-            assert(nullmap_cur == _nullmap + _row_pos + _num_rows &&
-                   value == _values.get_end_ptr());
-        } else {
-            while (datetime_cur != datetime_end) {
+    assert(column_datetime);
+
+    const VecDateTimeValue* datetime_cur =
+            (const VecDateTimeValue*)(column_datetime->get_data().data()) + 
_row_pos;
+    const VecDateTimeValue* datetime_end = datetime_cur + _num_rows;
+    uint64_t* value = _values.data();
+    if (_nullmap) {
+        const UInt8* nullmap_cur = _nullmap + _row_pos;
+        while (datetime_cur != datetime_end) {
+            if (!*nullmap_cur) {
                 *value = datetime_cur->to_olap_datetime();
-                ++value;
-                ++datetime_cur;
+            } else {
+                // do nothing
             }
-            assert(value == _values.get_end_ptr());
+            ++value;
+            ++datetime_cur;
+            ++nullmap_cur;
         }
-        return Status::OK();
+        assert(nullmap_cur == _nullmap + _row_pos + _num_rows && value == 
_values.get_end_ptr());
     } else {
-        const vectorized::ColumnVector<vectorized::Int64>* column_datetime = 
nullptr;
-        if (_nullmap) {
-            auto nullable_column =
-                    assert_cast<const 
vectorized::ColumnNullable*>(_typed_column.column.get());
-            column_datetime = assert_cast<const 
vectorized::ColumnVector<vectorized::Int64>*>(
-                    nullable_column->get_nested_column_ptr().get());
-        } else {
-            column_datetime = assert_cast<const 
vectorized::ColumnVector<vectorized::Int64>*>(
-                    _typed_column.column.get());
-        }
-
-        assert(column_datetime);
-
-        const VecDateTimeValue* datetime_cur =
-                (const VecDateTimeValue*)(column_datetime->get_data().data()) 
+ _row_pos;
-        const VecDateTimeValue* datetime_end = datetime_cur + _num_rows;
-        uint64_t* value = _values.data();
-        if (_nullmap) {
-            const UInt8* nullmap_cur = _nullmap + _row_pos;
-            while (datetime_cur != datetime_end) {
-                if (!*nullmap_cur) {
-                    *value = datetime_cur->to_olap_datetime();
-                } else {
-                    // do nothing
-                }
-                ++value;
-                ++datetime_cur;
-                ++nullmap_cur;
-            }
-            assert(nullmap_cur == _nullmap + _row_pos + _num_rows &&
-                   value == _values.get_end_ptr());
-        } else {
-            while (datetime_cur != datetime_end) {
-                *value = datetime_cur->to_olap_datetime();
-                ++value;
-                ++datetime_cur;
-            }
-            assert(value == _values.get_end_ptr());
+        while (datetime_cur != datetime_end) {
+            *value = datetime_cur->to_olap_datetime();
+            ++value;
+            ++datetime_cur;
         }
-        return Status::OK();
+        assert(value == _values.get_end_ptr());
     }
+    return Status::OK();
 }
 
 Status 
OlapBlockDataConvertor::OlapColumnDataConvertorDecimal::convert_to_olap() {
diff --git a/be/src/vec/olap/olap_data_convertor.h 
b/be/src/vec/olap/olap_data_convertor.h
index 961f60f4c5..9eb63c9154 100644
--- a/be/src/vec/olap/olap_data_convertor.h
+++ b/be/src/vec/olap/olap_data_convertor.h
@@ -209,9 +209,6 @@ private:
         void set_source_column(const ColumnWithTypeAndName& typed_column, 
size_t row_pos,
                                size_t num_rows) override;
         Status convert_to_olap() override;
-
-    private:
-        bool from_date_v2_;
     };
 
     class OlapColumnDataConvertorDateTime : public 
OlapColumnDataConvertorPaddedPODArray<uint64_t> {
@@ -219,9 +216,6 @@ private:
         void set_source_column(const ColumnWithTypeAndName& typed_column, 
size_t row_pos,
                                size_t num_rows) override;
         Status convert_to_olap() override;
-
-    private:
-        bool from_datetime_v2_;
     };
 
     class OlapColumnDataConvertorDecimal
@@ -277,11 +271,6 @@ private:
         void set_source_column(const ColumnWithTypeAndName& typed_column, 
size_t row_pos,
                                size_t num_rows) override {
             OlapColumnDataConvertorBase::set_source_column(typed_column, 
row_pos, num_rows);
-            if (is_date(typed_column.type)) {
-                from_date_to_date_v2_ = true;
-            } else {
-                from_date_to_date_v2_ = false;
-            }
         }
 
         const void* get_data() const override { return values_; }
@@ -296,67 +285,24 @@ private:
         }
 
         Status convert_to_olap() override {
-            if (UNLIKELY(from_date_to_date_v2_)) {
-                const vectorized::ColumnVector<vectorized::Int64>* 
column_datetime = nullptr;
-                if (_nullmap) {
-                    auto nullable_column = assert_cast<const 
vectorized::ColumnNullable*>(
-                            _typed_column.column.get());
-                    column_datetime =
-                            assert_cast<const 
vectorized::ColumnVector<vectorized::Int64>*>(
-                                    
nullable_column->get_nested_column_ptr().get());
-                } else {
-                    column_datetime =
-                            assert_cast<const 
vectorized::ColumnVector<vectorized::Int64>*>(
-                                    _typed_column.column.get());
-                }
-
-                assert(column_datetime);
-
-                const VecDateTimeValue* datetime_cur =
-                        (const 
VecDateTimeValue*)(column_datetime->get_data().data()) + _row_pos;
-                const VecDateTimeValue* datetime_end = datetime_cur + 
_num_rows;
-                uint32_t* value = const_cast<uint32_t*>(values_);
-                if (_nullmap) {
-                    const UInt8* nullmap_cur = _nullmap + _row_pos;
-                    while (datetime_cur != datetime_end) {
-                        if (!*nullmap_cur) {
-                            *value = datetime_cur->to_date_v2();
-                        } else {
-                            // do nothing
-                        }
-                        ++value;
-                        ++datetime_cur;
-                        ++nullmap_cur;
-                    }
-                } else {
-                    while (datetime_cur != datetime_end) {
-                        *value = datetime_cur->to_date_v2();
-                        ++value;
-                        ++datetime_cur;
-                    }
-                }
-                return Status::OK();
+            const vectorized::ColumnVector<uint32>* column_data = nullptr;
+            if (_nullmap) {
+                auto nullable_column =
+                        assert_cast<const 
vectorized::ColumnNullable*>(_typed_column.column.get());
+                column_data = assert_cast<const 
vectorized::ColumnVector<uint32>*>(
+                        nullable_column->get_nested_column_ptr().get());
             } else {
-                const vectorized::ColumnVector<uint32>* column_data = nullptr;
-                if (_nullmap) {
-                    auto nullable_column = assert_cast<const 
vectorized::ColumnNullable*>(
-                            _typed_column.column.get());
-                    column_data = assert_cast<const 
vectorized::ColumnVector<uint32>*>(
-                            nullable_column->get_nested_column_ptr().get());
-                } else {
-                    column_data = assert_cast<const 
vectorized::ColumnVector<uint32>*>(
-                            _typed_column.column.get());
-                }
-
-                assert(column_data);
-                values_ = (const uint32*)(column_data->get_data().data()) + 
_row_pos;
-                return Status::OK();
+                column_data = assert_cast<const 
vectorized::ColumnVector<uint32>*>(
+                        _typed_column.column.get());
             }
+
+            assert(column_data);
+            values_ = (const uint32*)(column_data->get_data().data()) + 
_row_pos;
+            return Status::OK();
         }
 
     private:
         const uint32_t* values_ = nullptr;
-        bool from_date_to_date_v2_;
     };
 
     class OlapColumnDataConvertorDateTimeV2 : public 
OlapColumnDataConvertorBase {
@@ -367,11 +313,6 @@ private:
         void set_source_column(const ColumnWithTypeAndName& typed_column, 
size_t row_pos,
                                size_t num_rows) override {
             OlapColumnDataConvertorBase::set_source_column(typed_column, 
row_pos, num_rows);
-            if (is_date_or_datetime(typed_column.type)) {
-                from_datetime_to_datetime_v2_ = true;
-            } else {
-                from_datetime_to_datetime_v2_ = false;
-            }
         }
 
         const void* get_data() const override { return values_; }
@@ -386,67 +327,24 @@ private:
         }
 
         Status convert_to_olap() override {
-            if (UNLIKELY(from_datetime_to_datetime_v2_)) {
-                const vectorized::ColumnVector<vectorized::Int64>* 
column_datetime = nullptr;
-                if (_nullmap) {
-                    auto nullable_column = assert_cast<const 
vectorized::ColumnNullable*>(
-                            _typed_column.column.get());
-                    column_datetime =
-                            assert_cast<const 
vectorized::ColumnVector<vectorized::Int64>*>(
-                                    
nullable_column->get_nested_column_ptr().get());
-                } else {
-                    column_datetime =
-                            assert_cast<const 
vectorized::ColumnVector<vectorized::Int64>*>(
-                                    _typed_column.column.get());
-                }
-
-                assert(column_datetime);
-
-                const VecDateTimeValue* datetime_cur =
-                        (const 
VecDateTimeValue*)(column_datetime->get_data().data()) + _row_pos;
-                const VecDateTimeValue* datetime_end = datetime_cur + 
_num_rows;
-                uint64_t* value = const_cast<uint64_t*>(values_);
-                if (_nullmap) {
-                    const UInt8* nullmap_cur = _nullmap + _row_pos;
-                    while (datetime_cur != datetime_end) {
-                        if (!*nullmap_cur) {
-                            *value = datetime_cur->to_datetime_v2();
-                        } else {
-                            // do nothing
-                        }
-                        ++value;
-                        ++datetime_cur;
-                        ++nullmap_cur;
-                    }
-                } else {
-                    while (datetime_cur != datetime_end) {
-                        *value = datetime_cur->to_datetime_v2();
-                        ++value;
-                        ++datetime_cur;
-                    }
-                }
-                return Status::OK();
+            const vectorized::ColumnVector<uint64_t>* column_data = nullptr;
+            if (_nullmap) {
+                auto nullable_column =
+                        assert_cast<const 
vectorized::ColumnNullable*>(_typed_column.column.get());
+                column_data = assert_cast<const 
vectorized::ColumnVector<uint64_t>*>(
+                        nullable_column->get_nested_column_ptr().get());
             } else {
-                const vectorized::ColumnVector<uint64_t>* column_data = 
nullptr;
-                if (_nullmap) {
-                    auto nullable_column = assert_cast<const 
vectorized::ColumnNullable*>(
-                            _typed_column.column.get());
-                    column_data = assert_cast<const 
vectorized::ColumnVector<uint64_t>*>(
-                            nullable_column->get_nested_column_ptr().get());
-                } else {
-                    column_data = assert_cast<const 
vectorized::ColumnVector<uint64_t>*>(
-                            _typed_column.column.get());
-                }
-
-                assert(column_data);
-                values_ = (const uint64_t*)(column_data->get_data().data()) + 
_row_pos;
-                return Status::OK();
+                column_data = assert_cast<const 
vectorized::ColumnVector<uint64_t>*>(
+                        _typed_column.column.get());
             }
+
+            assert(column_data);
+            values_ = (const uint64_t*)(column_data->get_data().data()) + 
_row_pos;
+            return Status::OK();
         }
 
     private:
         const uint64_t* values_ = nullptr;
-        bool from_datetime_to_datetime_v2_;
     };
 
     // decimalv3 don't need to do any convert
diff --git a/be/src/vec/runtime/vdatetime_value.h 
b/be/src/vec/runtime/vdatetime_value.h
index 42ce3a173c..7af0c273b1 100644
--- a/be/src/vec/runtime/vdatetime_value.h
+++ b/be/src/vec/runtime/vdatetime_value.h
@@ -781,17 +781,6 @@ public:
         return val;
     }
 
-    uint64_t to_olap_datetime() const {
-        uint64_t date_val =
-                date_v2_value_.year_ * 10000 + date_v2_value_.month_ * 100 + 
date_v2_value_.day_;
-        uint64_t time_val = 0;
-        if constexpr (is_datetime) {
-            time_val = date_v2_value_.hour_ * 10000 + date_v2_value_.minute_ * 
100 +
-                       date_v2_value_.second_;
-        }
-        return date_val * 1000000 + time_val;
-    }
-
     bool to_format_string(const char* format, int len, char* to) const;
 
     bool from_date_format_str(const char* format, int format_len, const char* 
value,
diff --git a/be/src/vec/runtime/vparquet_writer.cpp 
b/be/src/vec/runtime/vparquet_writer.cpp
index 14f90fc63e..9737fba854 100644
--- a/be/src/vec/runtime/vparquet_writer.cpp
+++ b/be/src/vec/runtime/vparquet_writer.cpp
@@ -89,20 +89,17 @@ void VParquetWriterWrapper::parse_schema(const 
std::vector<TParquetSchema>& parq
 #define DISPATCH_PARQUET_NUMERIC_WRITER(WRITER, COLUMN_TYPE, NATIVE_TYPE)      
                   \
     parquet::RowGroupWriter* rgWriter = get_rg_writer();                       
                   \
     parquet::WRITER* col_writer = 
static_cast<parquet::WRITER*>(rgWriter->column(i));             \
-    __int128 default_value = 0;                                                
                   \
     if (null_map != nullptr) {                                                 
                   \
+        auto& null_data = assert_cast<const 
ColumnUInt8&>(*null_map).get_data();                  \
         for (size_t row_id = 0; row_id < sz; row_id++) {                       
                   \
-            col_writer->WriteBatch(1, nullptr, nullptr,                        
                   \
-                                   (*null_map)[row_id] != 0                    
                   \
-                                           ? reinterpret_cast<const 
NATIVE_TYPE*>(&default_value) \
-                                           : reinterpret_cast<const 
NATIVE_TYPE*>(                \
-                                                     assert_cast<const 
COLUMN_TYPE&>(*col)        \
-                                                             
.get_data_at(row_id)                 \
-                                                             .data));          
                   \
+            def_level[row_id] = null_data[row_id] == 0;                        
                   \
         }                                                                      
                   \
+        col_writer->WriteBatch(sz, def_level.data(), nullptr,                  
                   \
+                               reinterpret_cast<const NATIVE_TYPE*>(           
                   \
+                                       assert_cast<const 
COLUMN_TYPE&>(*col).get_data().data())); \
     } else if (const auto* not_nullable_column = check_and_get_column<const 
COLUMN_TYPE>(col)) {  \
         col_writer->WriteBatch(                                                
                   \
-                sz, nullptr, nullptr,                                          
                   \
+                sz, nullable ? def_level.data() : nullptr, nullptr,            
                   \
                 reinterpret_cast<const 
NATIVE_TYPE*>(not_nullable_column->get_data().data()));    \
     } else {                                                                   
                   \
         RETURN_WRONG_TYPE                                                      
                   \
@@ -117,14 +114,17 @@ void VParquetWriterWrapper::parse_schema(const 
std::vector<TParquetSchema>& parq
             
check_and_get_data_type<DataTypeDecimal<DECIMAL_TYPE>>(remove_nullable(type).get());
 \
     DCHECK(decimal_type);                                                      
                  \
     if (null_map != nullptr) {                                                 
                  \
+        auto& null_data = assert_cast<const 
ColumnUInt8&>(*null_map).get_data();                 \
         for (size_t row_id = 0; row_id < sz; row_id++) {                       
                  \
-            if ((*null_map)[row_id] != 0) {                                    
                  \
-                col_writer->WriteBatch(1, nullptr, nullptr, &value);           
                  \
+            if (null_data[row_id] != 0) {                                      
                  \
+                single_def_level = 0;                                          
                  \
+                col_writer->WriteBatch(1, &single_def_level, nullptr, &value); 
                  \
+                single_def_level = 1;                                          
                  \
             } else {                                                           
                  \
                 auto s = decimal_type->to_string(*col, row_id);                
                  \
                 value.ptr = reinterpret_cast<const uint8_t*>(s.data());        
                  \
                 value.len = s.size();                                          
                  \
-                col_writer->WriteBatch(1, nullptr, nullptr, &value);           
                  \
+                col_writer->WriteBatch(1, &single_def_level, nullptr, &value); 
                  \
             }                                                                  
                  \
         }                                                                      
                  \
     } else {                                                                   
                  \
@@ -132,7 +132,7 @@ void VParquetWriterWrapper::parse_schema(const 
std::vector<TParquetSchema>& parq
             auto s = decimal_type->to_string(*col, row_id);                    
                  \
             value.ptr = reinterpret_cast<const uint8_t*>(s.data());            
                  \
             value.len = s.size();                                              
                  \
-            col_writer->WriteBatch(1, nullptr, nullptr, &value);               
                  \
+            col_writer->WriteBatch(1, nullable ? def_level.data() : nullptr, 
nullptr, &value);   \
         }                                                                      
                  \
     }
 
@@ -141,16 +141,19 @@ void VParquetWriterWrapper::parse_schema(const 
std::vector<TParquetSchema>& parq
     parquet::ByteArrayWriter* col_writer =                                     
                  \
             static_cast<parquet::ByteArrayWriter*>(rgWriter->column(i));       
                  \
     if (null_map != nullptr) {                                                 
                  \
+        auto& null_data = assert_cast<const 
ColumnUInt8&>(*null_map).get_data();                 \
         for (size_t row_id = 0; row_id < sz; row_id++) {                       
                  \
-            if ((*null_map)[row_id] != 0) {                                    
                  \
+            if (null_data[row_id] != 0) {                                      
                  \
+                single_def_level = 0;                                          
                  \
                 parquet::ByteArray value;                                      
                  \
-                col_writer->WriteBatch(1, nullptr, nullptr, &value);           
                  \
+                col_writer->WriteBatch(1, &single_def_level, nullptr, &value); 
                  \
+                single_def_level = 1;                                          
                  \
             } else {                                                           
                  \
                 const auto& tmp = col->get_data_at(row_id);                    
                  \
                 parquet::ByteArray value;                                      
                  \
                 value.ptr = reinterpret_cast<const uint8_t*>(tmp.data);        
                  \
                 value.len = tmp.size;                                          
                  \
-                col_writer->WriteBatch(1, nullptr, nullptr, &value);           
                  \
+                col_writer->WriteBatch(1, &single_def_level, nullptr, &value); 
                  \
             }                                                                  
                  \
         }                                                                      
                  \
     } else if (const auto* not_nullable_column = check_and_get_column<const 
COLUMN_TYPE>(col)) { \
@@ -159,7 +162,7 @@ void VParquetWriterWrapper::parse_schema(const 
std::vector<TParquetSchema>& parq
             parquet::ByteArray value;                                          
                  \
             value.ptr = reinterpret_cast<const uint8_t*>(tmp.data);            
                  \
             value.len = tmp.size;                                              
                  \
-            col_writer->WriteBatch(1, nullptr, nullptr, &value);               
                  \
+            col_writer->WriteBatch(1, nullable ? &single_def_level : nullptr, 
nullptr, &value);  \
         }                                                                      
                  \
     } else {                                                                   
                  \
         RETURN_WRONG_TYPE                                                      
                  \
@@ -173,22 +176,25 @@ Status VParquetWriterWrapper::write(const Block& block) {
     try {
         for (size_t i = 0; i < block.columns(); i++) {
             auto& raw_column = block.get_by_position(i).column;
-            const auto col = raw_column->is_nullable()
-                                     ? reinterpret_cast<const ColumnNullable*>(
-                                               
block.get_by_position(i).column.get())
-                                               ->get_nested_column_ptr()
-                                               .get()
-                                     : block.get_by_position(i).column.get();
-            auto null_map =
-                    raw_column->is_nullable() && reinterpret_cast<const 
ColumnNullable*>(
-                                                         
block.get_by_position(i).column.get())
-                                                         
->get_null_map_column_ptr()
-                                                         ->has_null()
-                            ? reinterpret_cast<const ColumnNullable*>(
-                                      block.get_by_position(i).column.get())
-                                      ->get_null_map_column_ptr()
-                            : nullptr;
+            auto nullable = raw_column->is_nullable();
+            const auto col = nullable ? reinterpret_cast<const 
ColumnNullable*>(
+                                                
block.get_by_position(i).column.get())
+                                                ->get_nested_column_ptr()
+                                                .get()
+                                      : block.get_by_position(i).column.get();
+            auto null_map = nullable && reinterpret_cast<const 
ColumnNullable*>(
+                                                
block.get_by_position(i).column.get())
+                                                    ->has_null()
+                                    ? reinterpret_cast<const ColumnNullable*>(
+                                              
block.get_by_position(i).column.get())
+                                              ->get_null_map_column_ptr()
+                                    : nullptr;
             auto& type = block.get_by_position(i).type;
+
+            std::vector<int16_t> def_level(sz);
+            // For scalar type, definition level == 1 means this value is not 
NULL.
+            std::fill(def_level.begin(), def_level.end(), 1);
+            int16_t single_def_level = 1;
             switch (_output_vexpr_ctxs[i]->root()->type().type) {
             case TYPE_BOOLEAN: {
                 DISPATCH_PARQUET_NUMERIC_WRITER(BoolWriter, 
ColumnVector<UInt8>, bool)
@@ -210,63 +216,49 @@ Status VParquetWriterWrapper::write(const Block& block) {
                 break;
             }
             case TYPE_TINYINT:
-            case TYPE_SMALLINT:
-            case TYPE_INT: {
+            case TYPE_SMALLINT: {
                 parquet::RowGroupWriter* rgWriter = get_rg_writer();
                 parquet::Int32Writer* col_writer =
                         
static_cast<parquet::Int32Writer*>(rgWriter->column(i));
-                int32_t default_int32 = 0;
                 if (null_map != nullptr) {
-                    if (const auto* nested_column =
-                                check_and_get_column<const 
ColumnVector<Int32>>(col)) {
-                        for (size_t row_id = 0; row_id < sz; row_id++) {
-                            col_writer->WriteBatch(
-                                    1, nullptr, nullptr,
-                                    (*null_map)[row_id] != 0
-                                            ? &default_int32
-                                            : reinterpret_cast<const int32_t*>(
-                                                      
nested_column->get_data_at(row_id).data));
-                        }
-                    } else if (const auto* int16_column =
-                                       check_and_get_column<const 
ColumnVector<Int16>>(col)) {
+                    auto& null_data = assert_cast<const 
ColumnUInt8&>(*null_map).get_data();
+                    if (const auto* int16_column =
+                                check_and_get_column<const 
ColumnVector<Int16>>(col)) {
                         for (size_t row_id = 0; row_id < sz; row_id++) {
+                            if (null_data[row_id] != 0) {
+                                single_def_level = 0;
+                            }
                             const int32_t tmp = 
int16_column->get_data()[row_id];
-                            col_writer->WriteBatch(
-                                    1, nullptr, nullptr,
-                                    (*null_map)[row_id] != 0
-                                            ? &default_int32
-                                            : reinterpret_cast<const 
int32_t*>(&tmp));
+                            col_writer->WriteBatch(1, &single_def_level, 
nullptr,
+                                                   reinterpret_cast<const 
int32_t*>(&tmp));
+                            single_def_level = 1;
                         }
                     } else if (const auto* int8_column =
                                        check_and_get_column<const 
ColumnVector<Int8>>(col)) {
                         for (size_t row_id = 0; row_id < sz; row_id++) {
+                            if (null_data[row_id] != 0) {
+                                single_def_level = 0;
+                            }
                             const int32_t tmp = 
int8_column->get_data()[row_id];
-                            col_writer->WriteBatch(
-                                    1, nullptr, nullptr,
-                                    (*null_map)[row_id] != 0
-                                            ? &default_int32
-                                            : reinterpret_cast<const 
int32_t*>(&tmp));
+                            col_writer->WriteBatch(1, &single_def_level, 
nullptr,
+                                                   reinterpret_cast<const 
int32_t*>(&tmp));
+                            single_def_level = 1;
                         }
                     } else {
                         RETURN_WRONG_TYPE
                     }
-                } else if (const auto* not_nullable_column =
-                                   check_and_get_column<const 
ColumnVector<Int32>>(col)) {
-                    col_writer->WriteBatch(sz, nullptr, nullptr,
-                                           reinterpret_cast<const int32_t*>(
-                                                   
not_nullable_column->get_data().data()));
                 } else if (const auto& int16_column =
                                    check_and_get_column<const 
ColumnVector<Int16>>(col)) {
                     for (size_t row_id = 0; row_id < sz; row_id++) {
                         const int32_t tmp = int16_column->get_data()[row_id];
-                        col_writer->WriteBatch(1, nullptr, nullptr,
+                        col_writer->WriteBatch(1, nullable ? def_level.data() 
: nullptr, nullptr,
                                                reinterpret_cast<const 
int32_t*>(&tmp));
                     }
                 } else if (const auto& int8_column =
                                    check_and_get_column<const 
ColumnVector<Int8>>(col)) {
                     for (size_t row_id = 0; row_id < sz; row_id++) {
                         const int32_t tmp = int8_column->get_data()[row_id];
-                        col_writer->WriteBatch(1, nullptr, nullptr,
+                        col_writer->WriteBatch(1, nullable ? def_level.data() 
: nullptr, nullptr,
                                                reinterpret_cast<const 
int32_t*>(&tmp));
                     }
                 } else {
@@ -274,25 +266,34 @@ Status VParquetWriterWrapper::write(const Block& block) {
                 }
                 break;
             }
+            case TYPE_INT: {
+                DISPATCH_PARQUET_NUMERIC_WRITER(Int32Writer, 
ColumnVector<Int32>, Int32)
+                break;
+            }
             case TYPE_DATETIME:
             case TYPE_DATE: {
                 parquet::RowGroupWriter* rgWriter = get_rg_writer();
                 parquet::Int64Writer* col_writer =
                         
static_cast<parquet::Int64Writer*>(rgWriter->column(i));
-                int64_t default_int64 = 0;
+                uint64_t default_int64 = 0;
                 if (null_map != nullptr) {
+                    auto& null_data = assert_cast<const 
ColumnUInt8&>(*null_map).get_data();
                     for (size_t row_id = 0; row_id < sz; row_id++) {
-                        if ((*null_map)[row_id] != 0) {
-                            col_writer->WriteBatch(1, nullptr, nullptr, 
&default_int64);
+                        def_level[row_id] = null_data[row_id] == 0;
+                    }
+                    uint64_t tmp_data[sz];
+                    for (size_t row_id = 0; row_id < sz; row_id++) {
+                        if (null_data[row_id] != 0) {
+                            tmp_data[row_id] = default_int64;
                         } else {
-                            const auto tmp = binary_cast<Int64, 
VecDateTimeValue>(
-                                                     assert_cast<const 
ColumnVector<Int64>&>(*col)
-                                                             
.get_data()[row_id])
-                                                     .to_olap_datetime();
-                            col_writer->WriteBatch(1, nullptr, nullptr,
-                                                   reinterpret_cast<const 
int64_t*>(&tmp));
+                            tmp_data[row_id] = binary_cast<Int64, 
VecDateTimeValue>(
+                                                       assert_cast<const 
ColumnVector<Int64>&>(*col)
+                                                               
.get_data()[row_id])
+                                                       .to_olap_datetime();
                         }
                     }
+                    col_writer->WriteBatch(sz, def_level.data(), nullptr,
+                                           reinterpret_cast<const 
int64_t*>(tmp_data));
                 } else if (const auto* not_nullable_column =
                                    check_and_get_column<const 
ColumnVector<Int64>>(col)) {
                     std::vector<uint64_t> res(sz);
@@ -301,7 +302,7 @@ Status VParquetWriterWrapper::write(const Block& block) {
                                               
not_nullable_column->get_data()[row_id])
                                               .to_olap_datetime();
                     }
-                    col_writer->WriteBatch(sz, nullptr, nullptr,
+                    col_writer->WriteBatch(sz, nullable ? def_level.data() : 
nullptr, nullptr,
                                            reinterpret_cast<const 
int64_t*>(res.data()));
                 } else {
                     RETURN_WRONG_TYPE
@@ -310,32 +311,39 @@ Status VParquetWriterWrapper::write(const Block& block) {
             }
             case TYPE_DATEV2: {
                 parquet::RowGroupWriter* rgWriter = get_rg_writer();
-                parquet::Int64Writer* col_writer =
-                        
static_cast<parquet::Int64Writer*>(rgWriter->column(i));
-                int64_t default_int64 = 0;
+                parquet::ByteArrayWriter* col_writer =
+                        
static_cast<parquet::ByteArrayWriter*>(rgWriter->column(i));
+                parquet::ByteArray value;
                 if (null_map != nullptr) {
+                    auto& null_data = assert_cast<const 
ColumnUInt8&>(*null_map).get_data();
                     for (size_t row_id = 0; row_id < sz; row_id++) {
-                        if ((*null_map)[row_id] != 0) {
-                            col_writer->WriteBatch(1, nullptr, nullptr, 
&default_int64);
+                        if (null_data[row_id] != 0) {
+                            single_def_level = 0;
+                            col_writer->WriteBatch(1, &single_def_level, 
nullptr, &value);
+                            single_def_level = 1;
                         } else {
-                            uint64_t tmp = binary_cast<UInt32, 
DateV2Value<DateV2ValueType>>(
-                                                   assert_cast<const 
ColumnVector<UInt32>&>(*col)
-                                                           .get_data()[row_id])
-                                                   .to_olap_datetime();
-                            col_writer->WriteBatch(1, nullptr, nullptr,
-                                                   reinterpret_cast<const 
int64_t*>(&tmp));
+                            char buffer[30];
+                            int output_scale = 
_output_vexpr_ctxs[i]->root()->type().scale;
+                            value.ptr = reinterpret_cast<const 
uint8_t*>(buffer);
+                            value.len = binary_cast<UInt32, 
DateV2Value<DateV2ValueType>>(
+                                                assert_cast<const 
ColumnVector<UInt32>&>(*col)
+                                                        .get_data()[row_id])
+                                                .to_buffer(buffer, 
output_scale);
+                            col_writer->WriteBatch(1, &single_def_level, 
nullptr, &value);
                         }
                     }
                 } else if (const auto* not_nullable_column =
                                    check_and_get_column<const 
ColumnVector<UInt32>>(col)) {
-                    std::vector<uint64_t> res(sz);
                     for (size_t row_id = 0; row_id < sz; row_id++) {
-                        res[row_id] = binary_cast<UInt32, 
DateV2Value<DateV2ValueType>>(
-                                              
not_nullable_column->get_data()[row_id])
-                                              .to_olap_datetime();
+                        char buffer[30];
+                        int output_scale = 
_output_vexpr_ctxs[i]->root()->type().scale;
+                        value.ptr = reinterpret_cast<const uint8_t*>(buffer);
+                        value.len = binary_cast<UInt32, 
DateV2Value<DateV2ValueType>>(
+                                            
not_nullable_column->get_data()[row_id])
+                                            .to_buffer(buffer, output_scale);
+                        col_writer->WriteBatch(1, nullable ? &single_def_level 
: nullptr, nullptr,
+                                               &value);
                     }
-                    col_writer->WriteBatch(sz, nullptr, nullptr,
-                                           reinterpret_cast<const 
int64_t*>(res.data()));
                 } else {
                     RETURN_WRONG_TYPE
                 }
@@ -343,32 +351,39 @@ Status VParquetWriterWrapper::write(const Block& block) {
             }
             case TYPE_DATETIMEV2: {
                 parquet::RowGroupWriter* rgWriter = get_rg_writer();
-                parquet::Int64Writer* col_writer =
-                        
static_cast<parquet::Int64Writer*>(rgWriter->column(i));
-                int64_t default_int64 = 0;
+                parquet::ByteArrayWriter* col_writer =
+                        
static_cast<parquet::ByteArrayWriter*>(rgWriter->column(i));
+                parquet::ByteArray value;
                 if (null_map != nullptr) {
+                    auto& null_data = assert_cast<const 
ColumnUInt8&>(*null_map).get_data();
                     for (size_t row_id = 0; row_id < sz; row_id++) {
-                        if ((*null_map)[row_id] != 0) {
-                            col_writer->WriteBatch(1, nullptr, nullptr, 
&default_int64);
+                        if (null_data[row_id] != 0) {
+                            single_def_level = 0;
+                            col_writer->WriteBatch(1, &single_def_level, 
nullptr, &value);
+                            single_def_level = 1;
                         } else {
-                            uint64_t tmp = binary_cast<UInt64, 
DateV2Value<DateTimeV2ValueType>>(
-                                                   assert_cast<const 
ColumnVector<UInt64>&>(*col)
-                                                           .get_data()[row_id])
-                                                   .to_olap_datetime();
-                            col_writer->WriteBatch(1, nullptr, nullptr,
-                                                   reinterpret_cast<const 
int64_t*>(&tmp));
+                            char buffer[30];
+                            int output_scale = 
_output_vexpr_ctxs[i]->root()->type().scale;
+                            value.ptr = reinterpret_cast<const 
uint8_t*>(buffer);
+                            value.len = binary_cast<UInt64, 
DateV2Value<DateTimeV2ValueType>>(
+                                                assert_cast<const 
ColumnVector<UInt64>&>(*col)
+                                                        .get_data()[row_id])
+                                                .to_buffer(buffer, 
output_scale);
+                            col_writer->WriteBatch(1, &single_def_level, 
nullptr, &value);
                         }
                     }
                 } else if (const auto* not_nullable_column =
                                    check_and_get_column<const 
ColumnVector<UInt64>>(col)) {
-                    std::vector<uint64_t> res(sz);
                     for (size_t row_id = 0; row_id < sz; row_id++) {
-                        res[row_id] = binary_cast<UInt64, 
DateV2Value<DateTimeV2ValueType>>(
-                                              
not_nullable_column->get_data()[row_id])
-                                              .to_olap_datetime();
+                        char buffer[30];
+                        int output_scale = 
_output_vexpr_ctxs[i]->root()->type().scale;
+                        value.ptr = reinterpret_cast<const uint8_t*>(buffer);
+                        value.len = binary_cast<UInt64, 
DateV2Value<DateTimeV2ValueType>>(
+                                            
not_nullable_column->get_data()[row_id])
+                                            .to_buffer(buffer, output_scale);
+                        col_writer->WriteBatch(1, nullable ? &single_def_level 
: nullptr, nullptr,
+                                               &value);
                     }
-                    col_writer->WriteBatch(sz, nullptr, nullptr,
-                                           reinterpret_cast<const 
int64_t*>(res.data()));
                 } else {
                     RETURN_WRONG_TYPE
                 }
@@ -402,9 +417,12 @@ Status VParquetWriterWrapper::write(const Block& block) {
                         
static_cast<parquet::ByteArrayWriter*>(rgWriter->column(i));
                 parquet::ByteArray value;
                 if (null_map != nullptr) {
+                    auto& null_data = assert_cast<const 
ColumnUInt8&>(*null_map).get_data();
                     for (size_t row_id = 0; row_id < sz; row_id++) {
-                        if ((*null_map)[row_id] != 0) {
-                            col_writer->WriteBatch(1, nullptr, nullptr, 
&value);
+                        if (null_data[row_id] != 0) {
+                            single_def_level = 0;
+                            col_writer->WriteBatch(1, &single_def_level, 
nullptr, &value);
+                            single_def_level = 1;
                         } else {
                             const DecimalV2Value 
decimal_val(reinterpret_cast<const PackedInt128*>(
                                                                      
col->get_data_at(row_id).data)
@@ -413,7 +431,7 @@ Status VParquetWriterWrapper::write(const Block& block) {
                             int output_scale = 
_output_vexpr_ctxs[i]->root()->type().scale;
                             value.ptr = reinterpret_cast<const 
uint8_t*>(decimal_buffer);
                             value.len = decimal_val.to_buffer(decimal_buffer, 
output_scale);
-                            col_writer->WriteBatch(1, nullptr, nullptr, 
&value);
+                            col_writer->WriteBatch(1, &single_def_level, 
nullptr, &value);
                         }
                     }
                 } else if (const auto* not_nullable_column =
@@ -427,7 +445,8 @@ Status VParquetWriterWrapper::write(const Block& block) {
                         int output_scale = 
_output_vexpr_ctxs[i]->root()->type().scale;
                         value.ptr = reinterpret_cast<const 
uint8_t*>(decimal_buffer);
                         value.len = decimal_val.to_buffer(decimal_buffer, 
output_scale);
-                        col_writer->WriteBatch(1, nullptr, nullptr, &value);
+                        col_writer->WriteBatch(1, nullable ? &single_def_level 
: nullptr, nullptr,
+                                               &value);
                     }
                 } else {
                     RETURN_WRONG_TYPE
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java
index 64f7ec6737..fdb88b1ed4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java
@@ -190,7 +190,7 @@ public class OutFileClause {
         return parquetSchemas;
     }
 
-    public void analyze(Analyzer analyzer, List<Expr> resultExprs) throws 
UserException {
+    public void analyze(Analyzer analyzer, List<Expr> resultExprs, 
List<String> colLabels) throws UserException {
         if (isAnalyzed) {
             // If the query stmt is rewritten, the whole stmt will be analyzed 
again.
             // But some of fields in this OutfileClause has been changed,
@@ -229,13 +229,13 @@ public class OutFileClause {
         isAnalyzed = true;
 
         if (isParquetFormat()) {
-            analyzeForParquetFormat(resultExprs);
+            analyzeForParquetFormat(resultExprs, colLabels);
         }
     }
 
-    private void analyzeForParquetFormat(List<Expr> resultExprs) throws 
AnalysisException {
+    private void analyzeForParquetFormat(List<Expr> resultExprs, List<String> 
colLabels) throws AnalysisException {
         if (this.parquetSchemas.isEmpty()) {
-            genParquetSchema(resultExprs);
+            genParquetSchema(resultExprs, colLabels);
         }
 
         // check schema number
@@ -265,10 +265,8 @@ public class OutFileClause {
                 case BIGINT:
                 case DATE:
                 case DATETIME:
-                case DATETIMEV2:
-                case DATEV2:
                     if (!PARQUET_DATA_TYPE_MAP.get("int64").equals(type)) {
-                        throw new AnalysisException("project field type is 
BIGINT/DATE/DATETIME/DATEV2/DATETIMEV2,"
+                        throw new AnalysisException("project field type is 
BIGINT/DATE/DATETIME,"
                                 + "should use int64, but the definition type 
of column " + i + " is " + type);
                     }
                     break;
@@ -291,9 +289,12 @@ public class OutFileClause {
                 case DECIMAL64:
                 case DECIMAL128:
                 case DECIMALV2:
+                case DATETIMEV2:
+                case DATEV2:
                     if (!PARQUET_DATA_TYPE_MAP.get("byte_array").equals(type)) 
{
-                        throw new AnalysisException("project field type is 
CHAR/VARCHAR/STRING/DECIMAL,"
-                                + " should use byte_array, but the definition 
type of column " + i + " is " + type);
+                        throw new AnalysisException("project field type is 
CHAR/VARCHAR/STRING/DECIMAL/DATEV2"
+                                + "/DATETIMEV2, should use byte_array, but the 
definition type of column "
+                                + i + " is " + type);
                     }
                     break;
                 case HLL:
@@ -316,12 +317,16 @@ public class OutFileClause {
         }
     }
 
-    private void genParquetSchema(List<Expr> resultExprs) throws 
AnalysisException {
+    private void genParquetSchema(List<Expr> resultExprs, List<String> 
colLabels) throws AnalysisException {
         Preconditions.checkState(this.parquetSchemas.isEmpty());
         for (int i = 0; i < resultExprs.size(); ++i) {
             Expr expr = resultExprs.get(i);
             TParquetSchema parquetSchema = new TParquetSchema();
-            parquetSchema.schema_repetition_type = 
PARQUET_REPETITION_TYPE_MAP.get("required");
+            if (resultExprs.get(i).isNullable()) {
+                parquetSchema.schema_repetition_type = 
PARQUET_REPETITION_TYPE_MAP.get("optional");
+            } else {
+                parquetSchema.schema_repetition_type = 
PARQUET_REPETITION_TYPE_MAP.get("required");
+            }
             switch (expr.getType().getPrimitiveType()) {
                 case BOOLEAN:
                     parquetSchema.schema_data_type = 
PARQUET_DATA_TYPE_MAP.get("boolean");
@@ -334,8 +339,6 @@ public class OutFileClause {
                 case BIGINT:
                 case DATE:
                 case DATETIME:
-                case DATETIMEV2:
-                case DATEV2:
                     parquetSchema.schema_data_type = 
PARQUET_DATA_TYPE_MAP.get("int64");
                     break;
                 case FLOAT:
@@ -351,6 +354,8 @@ public class OutFileClause {
                 case DECIMAL32:
                 case DECIMAL64:
                 case DECIMAL128:
+                case DATETIMEV2:
+                case DATEV2:
                     parquetSchema.schema_data_type = 
PARQUET_DATA_TYPE_MAP.get("byte_array");
                     break;
                 case HLL:
@@ -364,7 +369,7 @@ public class OutFileClause {
                     throw new AnalysisException("currently parquet do not 
support column type: "
                             + expr.getType().getPrimitiveType());
             }
-            parquetSchema.schema_column_name = "col" + i;
+            parquetSchema.schema_column_name = colLabels.get(i);
             parquetSchemas.add(parquetSchema);
         }
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java
index e29a68d375..5645219af7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java
@@ -559,7 +559,7 @@ public class SelectStmt extends QueryStmt {
             }
         }
         if (hasOutFileClause()) {
-            outFileClause.analyze(analyzer, resultExprs);
+            outFileClause.analyze(analyzer, resultExprs, colLabels);
         }
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/SetOperationStmt.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/SetOperationStmt.java
index b3ef8269e6..c8cb2a8ccb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/SetOperationStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/SetOperationStmt.java
@@ -303,7 +303,7 @@ public class SetOperationStmt extends QueryStmt {
         baseTblResultExprs = resultExprs;
 
         if (hasOutFileClause()) {
-            outFileClause.analyze(analyzer, resultExprs);
+            outFileClause.analyze(analyzer, resultExprs, getColLabels());
         }
     }
 
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/analysis/SelectStmtTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/analysis/SelectStmtTest.java
index 6932bbcaab..4945a59690 100755
--- a/fe/fe-core/src/test/java/org/apache/doris/analysis/SelectStmtTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/SelectStmtTest.java
@@ -640,11 +640,11 @@ public class SelectStmtTest {
         try {
             SelectStmt stmt = (SelectStmt) 
UtFrameUtils.parseAndAnalyzeStmt(sql, ctx);
             Assert.assertEquals(1, 
stmt.getOutFileClause().getParquetSchemas().size());
-            
Assert.assertEquals(stmt.getOutFileClause().PARQUET_REPETITION_TYPE_MAP.get("required"),
+            
Assert.assertEquals(stmt.getOutFileClause().PARQUET_REPETITION_TYPE_MAP.get("optional"),
                     
stmt.getOutFileClause().getParquetSchemas().get(0).schema_repetition_type);
             
Assert.assertEquals(stmt.getOutFileClause().PARQUET_DATA_TYPE_MAP.get("byte_array"),
                     
stmt.getOutFileClause().getParquetSchemas().get(0).schema_data_type);
-            Assert.assertEquals("col0", 
stmt.getOutFileClause().getParquetSchemas().get(0).schema_column_name);
+            Assert.assertEquals("k1", 
stmt.getOutFileClause().getParquetSchemas().get(0).schema_column_name);
         } catch (Exception e) {
             Assert.fail(e.getMessage());
         }
diff --git a/regression-test/data/export_p0/test_outfile_parquet.out 
b/regression-test/data/export_p0/test_outfile_parquet.out
new file mode 100644
index 0000000000..cb6eab3268
--- /dev/null
+++ b/regression-test/data/export_p0/test_outfile_parquet.out
@@ -0,0 +1,25 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !select_default --
+1      2017-10-01      2017-10-01T00:00        2017-10-01      
2017-10-01T00:00        2017-10-01T00:00:00.111 2017-10-01T00:00:00.111111      
Beijing 1       1       true    1       1       1       1.1     1.1     char1   
1
+2      2017-10-01      2017-10-01T00:00        2017-10-01      
2017-10-01T00:00        2017-10-01T00:00:00.111 2017-10-01T00:00:00.111111      
Beijing 2       2       true    2       2       2       2.2     2.2     char2   
2
+3      2017-10-01      2017-10-01T00:00        2017-10-01      
2017-10-01T00:00        2017-10-01T00:00:00.111 2017-10-01T00:00:00.111111      
Beijing 3       3       true    3       3       3       3.3     3.3     char3   
3
+4      2017-10-01      2017-10-01T00:00        2017-10-01      
2017-10-01T00:00        2017-10-01T00:00:00.111 2017-10-01T00:00:00.111111      
Beijing 4       4       true    4       4       4       4.4     4.4     char4   
4
+5      2017-10-01      2017-10-01T00:00        2017-10-01      
2017-10-01T00:00        2017-10-01T00:00:00.111 2017-10-01T00:00:00.111111      
Beijing 5       5       true    5       5       5       5.5     5.5     char5   
5
+6      2017-10-01      2017-10-01T00:00        2017-10-01      
2017-10-01T00:00        2017-10-01T00:00:00.111 2017-10-01T00:00:00.111111      
Beijing 6       6       true    6       6       6       6.6     6.6     char6   
6
+7      2017-10-01      2017-10-01T00:00        2017-10-01      
2017-10-01T00:00        2017-10-01T00:00:00.111 2017-10-01T00:00:00.111111      
Beijing 7       7       true    7       7       7       7.7     7.7     char7   
7
+8      2017-10-01      2017-10-01T00:00        2017-10-01      
2017-10-01T00:00        2017-10-01T00:00:00.111 2017-10-01T00:00:00.111111      
Beijing 8       8       true    8       8       8       8.8     8.8     char8   
8
+9      2017-10-01      2017-10-01T00:00        2017-10-01      
2017-10-01T00:00        2017-10-01T00:00:00.111 2017-10-01T00:00:00.111111      
Beijing 9       9       true    9       9       9       9.9     9.9     char9   
9
+10     2017-10-01      2017-10-01T00:00        2017-10-01      
2017-10-01T00:00        2017-10-01T00:00:00.111 2017-10-01T00:00:00.111111      
\N      \N      \N      \N      \N      \N      \N      \N      \N      \N      
\N
+
+-- !select_default --
+1      2017-10-01      2017-10-01T00:00        2017-10-01      
2017-10-01T00:00        2017-10-01T00:00:00.111 2017-10-01T00:00:00.111111      
Beijing 1       1       true    1       1       1       1.1     1.1     char1   
1
+2      2017-10-01      2017-10-01T00:00        2017-10-01      
2017-10-01T00:00        2017-10-01T00:00:00.111 2017-10-01T00:00:00.111111      
Beijing 2       2       true    2       2       2       2.2     2.2     char2   
2
+3      2017-10-01      2017-10-01T00:00        2017-10-01      
2017-10-01T00:00        2017-10-01T00:00:00.111 2017-10-01T00:00:00.111111      
Beijing 3       3       true    3       3       3       3.3     3.3     char3   
3
+4      2017-10-01      2017-10-01T00:00        2017-10-01      
2017-10-01T00:00        2017-10-01T00:00:00.111 2017-10-01T00:00:00.111111      
Beijing 4       4       true    4       4       4       4.4     4.4     char4   
4
+5      2017-10-01      2017-10-01T00:00        2017-10-01      
2017-10-01T00:00        2017-10-01T00:00:00.111 2017-10-01T00:00:00.111111      
Beijing 5       5       true    5       5       5       5.5     5.5     char5   
5
+6      2017-10-01      2017-10-01T00:00        2017-10-01      
2017-10-01T00:00        2017-10-01T00:00:00.111 2017-10-01T00:00:00.111111      
Beijing 6       6       true    6       6       6       6.6     6.6     char6   
6
+7      2017-10-01      2017-10-01T00:00        2017-10-01      
2017-10-01T00:00        2017-10-01T00:00:00.111 2017-10-01T00:00:00.111111      
Beijing 7       7       true    7       7       7       7.7     7.7     char7   
7
+8      2017-10-01      2017-10-01T00:00        2017-10-01      
2017-10-01T00:00        2017-10-01T00:00:00.111 2017-10-01T00:00:00.111111      
Beijing 8       8       true    8       8       8       8.8     8.8     char8   
8
+9      2017-10-01      2017-10-01T00:00        2017-10-01      
2017-10-01T00:00        2017-10-01T00:00:00.111 2017-10-01T00:00:00.111111      
Beijing 9       9       true    9       9       9       9.9     9.9     char9   
9
+10     2017-10-01      2017-10-01T00:00        2017-10-01      
2017-10-01T00:00        2017-10-01T00:00:00.111 2017-10-01T00:00:00.111111      
\N      \N      \N      \N      \N      \N      \N      \N      \N      \N      
\N
+
diff --git a/regression-test/suites/export_p0/test_outfile_parquet.groovy 
b/regression-test/suites/export_p0/test_outfile_parquet.groovy
new file mode 100644
index 0000000000..2804b1e0e5
--- /dev/null
+++ b/regression-test/suites/export_p0/test_outfile_parquet.groovy
@@ -0,0 +1,158 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.codehaus.groovy.runtime.IOGroovyMethods
+
+import java.nio.charset.StandardCharsets
+import java.nio.file.Files
+import java.nio.file.Paths
+
+suite("test_outfile_parquet") {
+    def dbName = "test_query_db"
+    sql "CREATE DATABASE IF NOT EXISTS ${dbName}"
+    sql "USE $dbName"
+    StringBuilder strBuilder = new StringBuilder()
+    strBuilder.append("curl --location-trusted -u " + context.config.jdbcUser 
+ ":" + context.config.jdbcPassword)
+    strBuilder.append(" http://"; + context.config.feHttpAddress + 
"/rest/v1/config/fe")
+
+    String command = strBuilder.toString()
+    def process = command.toString().execute()
+    def code = process.waitFor()
+    def err = IOGroovyMethods.getText(new BufferedReader(new 
InputStreamReader(process.getErrorStream())));
+    def out = process.getText()
+    logger.info("Request FE Config: code=" + code + ", out=" + out + ", err=" 
+ err)
+    assertEquals(code, 0)
+    def response = parseJson(out.trim())
+    assertEquals(response.code, 0)
+    assertEquals(response.msg, "success")
+    def configJson = response.data.rows
+    boolean enableOutfileToLocal = false
+    for (Object conf: configJson) {
+        assert conf instanceof Map
+        if (((Map<String, String>) conf).get("Name").toLowerCase() == 
"enable_outfile_to_local") {
+            enableOutfileToLocal = ((Map<String, String>) 
conf).get("Value").toLowerCase() == "true"
+        }
+    }
+    if (!enableOutfileToLocal) {
+        logger.warn("Please set enable_outfile_to_local to true to run 
test_outfile")
+        return
+    }
+    def tableName = "outfile_parquet_test"
+    def tableName2 = "outfile_parquet_test2"
+    def outFilePath = """${context.file.parent}/tmp"""
+    try {
+        sql """ DROP TABLE IF EXISTS ${tableName} """
+        sql """
+        CREATE TABLE IF NOT EXISTS ${tableName} (
+            `user_id` INT NOT NULL COMMENT "用户id",
+            `date` DATE NOT NULL COMMENT "数据灌入日期时间",
+            `datetime` DATETIME NOT NULL COMMENT "数据灌入日期时间",
+            `date_1` DATEV2 NOT NULL COMMENT "",
+            `datetime_1` DATETIMEV2 NOT NULL COMMENT "",
+            `datetime_2` DATETIMEV2(3) NOT NULL COMMENT "",
+            `datetime_3` DATETIMEV2(6) NOT NULL COMMENT "",
+            `city` VARCHAR(20) COMMENT "用户所在城市",
+            `age` SMALLINT COMMENT "用户年龄",
+            `sex` TINYINT COMMENT "用户性别",
+            `bool_col` boolean COMMENT "",
+            `int_col` int COMMENT "",
+            `bigint_col` bigint COMMENT "",
+            `largeint_col` int COMMENT "",
+            `float_col` float COMMENT "",
+            `double_col` double COMMENT "",
+            `char_col` CHAR(10) COMMENT "",
+            `decimal_col` decimal COMMENT ""
+            )
+            DISTRIBUTED BY HASH(user_id) PROPERTIES("replication_num" = "1");
+        """
+        StringBuilder sb = new StringBuilder()
+        int i = 1
+        for (; i < 10; i ++) {
+            sb.append("""
+                (${i}, '2017-10-01', '2017-10-01 00:00:00', '2017-10-01', 
'2017-10-01 00:00:00.111111', '2017-10-01 00:00:00.111111', '2017-10-01 
00:00:00.111111', 'Beijing', ${i}, ${i % 128}, true, ${i}, ${i}, ${i}, 
${i}.${i}, ${i}.${i}, 'char${i}', ${i}),
+            """)
+        }
+        sb.append("""
+                (${i}, '2017-10-01', '2017-10-01 00:00:00', '2017-10-01', 
'2017-10-01 00:00:00.111111', '2017-10-01 00:00:00.111111', '2017-10-01 
00:00:00.111111', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, 
NULL)
+            """)
+        sql """ INSERT INTO ${tableName} VALUES
+             ${sb.toString()}
+            """
+        qt_select_default """ SELECT * FROM ${tableName} t ORDER BY user_id; 
"""
+
+        // check outfile
+        File path = new File(outFilePath)
+        if (!path.exists()) {
+            assert path.mkdirs()
+        } else {
+            throw new IllegalStateException("""${outFilePath} already exists! 
""")
+        }
+        sql """
+            SELECT * FROM ${tableName} t ORDER BY user_id INTO OUTFILE 
"file://${outFilePath}/" FORMAT AS PARQUET;
+        """
+
+        File[] files = path.listFiles()
+        assert files.length == 1
+
+        sql """ DROP TABLE IF EXISTS ${tableName2} """
+        sql """
+        CREATE TABLE IF NOT EXISTS ${tableName2} (
+            `user_id` INT NOT NULL COMMENT "用户id",
+            `date` DATE NOT NULL COMMENT "数据灌入日期时间",
+            `datetime` DATETIME NOT NULL COMMENT "数据灌入日期时间",
+            `date_1` DATEV2 NOT NULL COMMENT "",
+            `datetime_1` DATETIMEV2 NOT NULL COMMENT "",
+            `datetime_2` DATETIMEV2(3) NOT NULL COMMENT "",
+            `datetime_3` DATETIMEV2(6) NOT NULL COMMENT "",
+            `city` VARCHAR(20) COMMENT "用户所在城市",
+            `age` SMALLINT COMMENT "用户年龄",
+            `sex` TINYINT COMMENT "用户性别",
+            `bool_col` boolean COMMENT "",
+            `int_col` int COMMENT "",
+            `bigint_col` bigint COMMENT "",
+            `largeint_col` int COMMENT "",
+            `float_col` float COMMENT "",
+            `double_col` double COMMENT "",
+            `char_col` CHAR(10) COMMENT "",
+            `decimal_col` decimal COMMENT ""
+            )
+            DISTRIBUTED BY HASH(user_id) PROPERTIES("replication_num" = "1");
+        """
+
+        StringBuilder commandBuilder = new StringBuilder()
+        commandBuilder.append("""curl -v --location-trusted -u 
${context.config.feHttpUser}:${context.config.feHttpPassword}""")
+        commandBuilder.append(""" -H format:parquet -T """ + 
files[0].getAbsolutePath() + """ http://${context.config.feHttpAddress}/api/"""; 
+ dbName + "/" + tableName2 + "/_stream_load")
+        command = commandBuilder.toString()
+        process = command.execute()
+        code = process.waitFor()
+        err = IOGroovyMethods.getText(new BufferedReader(new 
InputStreamReader(process.getErrorStream())))
+        out = process.getText()
+        logger.info("Run command: command=" + command + ",code=" + code + ", 
out=" + out + ", err=" + err)
+        assertEquals(code, 0)
+        qt_select_default """ SELECT * FROM ${tableName2} t ORDER BY user_id; 
"""
+    } finally {
+        try_sql("DROP TABLE IF EXISTS ${tableName}")
+        try_sql("DROP TABLE IF EXISTS ${tableName2}")
+        File path = new File(outFilePath)
+        if (path.exists()) {
+            for (File f: path.listFiles()) {
+                f.delete();
+            }
+            path.delete();
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to