This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit f18a4b749fcfdd8b5e9f4ac3e130ffa86380227b
Author: lihangyu <[email protected]>
AuthorDate: Sat Mar 14 01:18:50 2026 +0800

    branch-4.1 cherry-pick [feat](format) support native format (#61286)
    
    cherry-pick #58711
---
 be/src/service/internal_service.cpp                |    6 +
 be/src/vec/exec/format/native/native_format.h      |   57 +
 be/src/vec/exec/format/native/native_reader.cpp    |  369 ++++++
 be/src/vec/exec/format/native/native_reader.h      |  107 ++
 be/src/vec/exec/scan/file_scanner.cpp              |   17 +-
 be/src/vec/functions/cast/cast_to_jsonb.h          |    2 +-
 be/src/vec/functions/cast/cast_to_variant.h        |  180 +--
 be/src/vec/runtime/vnative_transformer.cpp         |  133 ++
 be/src/vec/runtime/vnative_transformer.h           |   65 +
 be/src/vec/sink/writer/vfile_result_writer.cpp     |    9 +
 .../data/vec/native/all_types_single_row.native    |  Bin 0 -> 1129 bytes
 .../format/native/native_reader_writer_test.cpp    | 1350 ++++++++++++++++++++
 .../doris/common/util/FileFormatConstants.java     |    1 +
 .../java/org/apache/doris/common/util/Util.java    |    3 +
 .../property/fileformat/FileFormatProperties.java  |    3 +
 .../fileformat/NativeFileFormatProperties.java     |   65 +
 .../nereids/load/NereidsLoadScanProvider.java      |    5 +-
 .../ExternalFileTableValuedFunction.java           |   14 +-
 gensrc/thrift/PlanNodes.thrift                     |    3 +-
 .../outfile/native/test_outfile_native.out         |   25 +
 .../outfile/native/test_outfile_native.groovy      |  100 ++
 .../test_export_variant_10k_columns.groovy         |  215 ++++
 22 files changed, 2646 insertions(+), 83 deletions(-)

diff --git a/be/src/service/internal_service.cpp 
b/be/src/service/internal_service.cpp
index bbd86d9c56c..62ed4aec19c 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -123,6 +123,7 @@
 #include "vec/exec/format/csv/csv_reader.h"
 #include "vec/exec/format/generic_reader.h"
 #include "vec/exec/format/json/new_json_reader.h"
+#include "vec/exec/format/native/native_reader.h"
 #include "vec/exec/format/orc/vorc_reader.h"
 #include "vec/exec/format/parquet/vparquet_reader.h"
 #include "vec/exec/format/text/text_reader.h"
@@ -856,6 +857,11 @@ void 
PInternalService::fetch_table_schema(google::protobuf::RpcController* contr
             reader = vectorized::OrcReader::create_unique(params, range, "", 
&io_ctx);
             break;
         }
+        case TFileFormatType::FORMAT_NATIVE: {
+            reader = vectorized::NativeReader::create_unique(profile.get(), 
params, range, &io_ctx,
+                                                             nullptr);
+            break;
+        }
         case TFileFormatType::FORMAT_JSON: {
             reader = vectorized::NewJsonReader::create_unique(profile.get(), 
params, range,
                                                               file_slots, 
&io_ctx);
diff --git a/be/src/vec/exec/format/native/native_format.h 
b/be/src/vec/exec/format/native/native_format.h
new file mode 100644
index 00000000000..e004a3c0f31
--- /dev/null
+++ b/be/src/vec/exec/format/native/native_format.h
@@ -0,0 +1,57 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdint>
+
+namespace doris::vectorized {
+
+// Doris Native format file-level constants.
+//
+// File layout (byte stream):
+//
+//   
+-------------------------------+---------------------------+---------------------------+
 ...
+//   | File header                   | Data block #0             | Data block 
#1             | ...
+//   
+-------------------------------+---------------------------+---------------------------+
 ...
+//
+//   File header (12 bytes total):
+//     - [0..7]   : magic bytes "DORISN1\0"  (DORIS_NATIVE_MAGIC)
+//     - [8..11]  : uint32_t format_version (DORIS_NATIVE_FORMAT_VERSION, 
little-endian)
+//
+//   Each data block i:
+//     - uint64_t block_size   : length in bytes of serialized PBlock 
(little-endian)
+//     - uint8_t[block_size]   : PBlock protobuf payload produced by 
Block::serialize()
+//
+// NativeReader:
+//   - Detects the optional file header by checking the first 8 bytes against 
DORIS_NATIVE_MAGIC.
+//   - If the header is present, it skips 12 bytes and then starts reading 
blocks as
+//     [uint64_t block_size][PBlock bytes]...
+//   - If the header is absent (legacy files), it starts reading blocks from 
offset 0.
+//
+// VNativeTransformer:
+//   - Writes the header once in open(), then appends each block in write() as
+//     [uint64_t block_size][PBlock bytes]...
+//
+// These constants are shared between writer, reader and tests to keep the 
on-disk
+// format definition in a single place.
+// Header layout:
+// [magic bytes "DORISN1\0"][uint32_t format_version]
+static constexpr char DORIS_NATIVE_MAGIC[8] = {'D', 'O', 'R', 'I', 'S', 'N', 
'1', '\0'};
+static constexpr uint32_t DORIS_NATIVE_FORMAT_VERSION = 1;
+
+} // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/native/native_reader.cpp 
b/be/src/vec/exec/format/native/native_reader.cpp
new file mode 100644
index 00000000000..66cfe3a9f91
--- /dev/null
+++ b/be/src/vec/exec/format/native/native_reader.cpp
@@ -0,0 +1,369 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/exec/format/native/native_reader.h"
+
+#include <gen_cpp/data.pb.h>
+
+#include "io/file_factory.h"
+#include "io/fs/buffered_reader.h"
+#include "io/fs/file_reader.h"
+#include "io/fs/tracing_file_reader.h"
+#include "runtime/runtime_state.h"
+#include "util/runtime_profile.h"
+#include "vec/core/block.h"
+#include "vec/exec/format/native/native_format.h"
+
+namespace doris::vectorized {
+
+#include "common/compile_check_begin.h"
+
+NativeReader::NativeReader(RuntimeProfile* profile, const 
TFileScanRangeParams& params,
+                           const TFileRangeDesc& range, io::IOContext* io_ctx, 
RuntimeState* state)
+        : _profile(profile),
+          _scan_params(params),
+          _scan_range(range),
+          _io_ctx(io_ctx),
+          _state(state) {}
+
+NativeReader::~NativeReader() {
+    (void)close();
+}
+
+namespace {
+
+Status validate_and_consume_header(io::FileReaderSPtr file_reader, const 
TFileRangeDesc& range,
+                                   int64_t* file_size, int64_t* 
current_offset, bool* eof) {
+    *file_size = file_reader->size();
+    *current_offset = 0;
+    *eof = (*file_size == 0);
+
+    // Validate and consume Doris Native file header.
+    // Expected layout:
+    // [magic bytes "DORISN1\0"][uint32_t format_version][uint64_t 
block_size]...
+    static constexpr size_t HEADER_SIZE = sizeof(DORIS_NATIVE_MAGIC) + 
sizeof(uint32_t);
+    if (*eof || *file_size < static_cast<int64_t>(HEADER_SIZE)) {
+        return Status::InternalError(
+                "invalid Doris Native file {}, file size {} is smaller than 
header size {}",
+                range.path, *file_size, HEADER_SIZE);
+    }
+
+    char header[HEADER_SIZE];
+    Slice header_slice(header, sizeof(header));
+    size_t bytes_read = 0;
+    RETURN_IF_ERROR(file_reader->read_at(0, header_slice, &bytes_read));
+    if (bytes_read != sizeof(header)) {
+        return Status::InternalError(
+                "failed to read Doris Native header from file {}, expect {} 
bytes, got {} bytes",
+                range.path, sizeof(header), bytes_read);
+    }
+
+    if (memcmp(header, DORIS_NATIVE_MAGIC, sizeof(DORIS_NATIVE_MAGIC)) != 0) {
+        return Status::InternalError("invalid Doris Native magic header in 
file {}", range.path);
+    }
+
+    uint32_t version = 0;
+    memcpy(&version, header + sizeof(DORIS_NATIVE_MAGIC), sizeof(uint32_t));
+    if (version != DORIS_NATIVE_FORMAT_VERSION) {
+        return Status::InternalError(
+                "unsupported Doris Native format version {} in file {}, expect 
{}", version,
+                range.path, DORIS_NATIVE_FORMAT_VERSION);
+    }
+
+    *current_offset = sizeof(header);
+    *eof = (*file_size == *current_offset);
+    return Status::OK();
+}
+
+} // namespace
+
+Status NativeReader::init_reader() {
+    if (_file_reader != nullptr) {
+        return Status::OK();
+    }
+
+    // Create underlying file reader. For now we always use random access mode.
+    io::FileSystemProperties system_properties;
+    io::FileDescription file_description;
+    file_description.file_size = -1;
+    if (_scan_range.__isset.file_size) {
+        file_description.file_size = _scan_range.file_size;
+    }
+    file_description.path = _scan_range.path;
+    if (_scan_range.__isset.fs_name) {
+        file_description.fs_name = _scan_range.fs_name;
+    }
+    if (_scan_range.__isset.modification_time) {
+        file_description.mtime = _scan_range.modification_time;
+    } else {
+        file_description.mtime = 0;
+    }
+
+    if (_scan_range.__isset.file_type) {
+        // For compatibility with older FE.
+        system_properties.system_type = _scan_range.file_type;
+    } else {
+        system_properties.system_type = _scan_params.file_type;
+    }
+    system_properties.properties = _scan_params.properties;
+    system_properties.hdfs_params = _scan_params.hdfs_params;
+    if (_scan_params.__isset.broker_addresses) {
+        
system_properties.broker_addresses.assign(_scan_params.broker_addresses.begin(),
+                                                  
_scan_params.broker_addresses.end());
+    }
+
+    io::FileReaderOptions reader_options =
+            FileFactory::get_reader_options(_state, file_description);
+    auto reader_res = io::DelegateReader::create_file_reader(
+            _profile, system_properties, file_description, reader_options,
+            io::DelegateReader::AccessMode::RANDOM, _io_ctx);
+    if (!reader_res.has_value()) {
+        return reader_res.error();
+    }
+    _file_reader = reader_res.value();
+
+    if (_io_ctx) {
+        _file_reader =
+                std::make_shared<io::TracingFileReader>(_file_reader, 
_io_ctx->file_reader_stats);
+    }
+
+    RETURN_IF_ERROR(validate_and_consume_header(_file_reader, _scan_range, 
&_file_size,
+                                                &_current_offset, &_eof));
+    return Status::OK();
+}
+
+Status NativeReader::get_next_block(Block* block, size_t* read_rows, bool* 
eof) {
+    if (_eof) {
+        *read_rows = 0;
+        *eof = true;
+        return Status::OK();
+    }
+
+    RETURN_IF_ERROR(init_reader());
+
+    std::string buff;
+    bool local_eof = false;
+
+    // If we have already loaded the first block for schema probing, use it 
first.
+    if (_first_block_loaded && !_first_block_consumed) {
+        buff = _first_block_buf;
+        local_eof = false;
+    } else {
+        RETURN_IF_ERROR(_read_next_pblock(&buff, &local_eof));
+    }
+
+    // If we reach EOF and also read no data for this call, the whole file is 
considered finished.
+    if (local_eof && buff.empty()) {
+        *read_rows = 0;
+        *eof = true;
+        _eof = true;
+        return Status::OK();
+    }
+    // If buffer is empty but we have not reached EOF yet, treat this as an 
error.
+    if (buff.empty()) {
+        return Status::InternalError("read empty native block from file {}", 
_scan_range.path);
+    }
+
+    PBlock pblock;
+    if (!pblock.ParseFromArray(buff.data(), static_cast<int>(buff.size()))) {
+        return Status::InternalError("Failed to parse native PBlock from file 
{}",
+                                     _scan_range.path);
+    }
+
+    // Initialize schema from first block if not done yet.
+    if (!_schema_inited) {
+        RETURN_IF_ERROR(_init_schema_from_pblock(pblock));
+    }
+
+    size_t uncompressed_bytes = 0;
+    int64_t decompress_time = 0;
+    RETURN_IF_ERROR(block->deserialize(pblock, &uncompressed_bytes, 
&decompress_time));
+
+    // For external file scan / TVF scenarios, unify all columns as nullable 
to match
+    // GenericReader/SlotDescriptor convention. This ensures schema 
consistency when
+    // some writers emit non-nullable columns.
+    for (size_t i = 0; i < block->columns(); ++i) {
+        auto& col_with_type = block->get_by_position(i);
+        if (!col_with_type.type->is_nullable()) {
+            col_with_type.column = make_nullable(col_with_type.column);
+            col_with_type.type = make_nullable(col_with_type.type);
+        }
+    }
+
+    *read_rows = block->rows();
+    *eof = false;
+
+    if (_first_block_loaded && !_first_block_consumed) {
+        _first_block_consumed = true;
+    }
+
+    // If we reached the physical end of file, mark eof for subsequent calls.
+    if (_current_offset >= _file_size) {
+        _eof = true;
+    }
+
+    return Status::OK();
+}
+
+Status NativeReader::get_columns(std::unordered_map<std::string, DataTypePtr>* 
name_to_type,
+                                 std::unordered_set<std::string>* 
missing_cols) {
+    missing_cols->clear();
+    RETURN_IF_ERROR(init_reader());
+
+    if (!_schema_inited) {
+        // Load first block lazily to initialize schema.
+        if (!_first_block_loaded) {
+            bool local_eof = false;
+            RETURN_IF_ERROR(_read_next_pblock(&_first_block_buf, &local_eof));
+            // Treat file as empty only if we reach EOF and there is no block 
data at all.
+            if (local_eof && _first_block_buf.empty()) {
+                return Status::EndOfFile("empty native file {}", 
_scan_range.path);
+            }
+            // Non-EOF but empty buffer means corrupted native file.
+            if (_first_block_buf.empty()) {
+                return Status::InternalError("first native block is empty {}", 
_scan_range.path);
+            }
+            _first_block_loaded = true;
+        }
+
+        PBlock pblock;
+        if (!pblock.ParseFromArray(_first_block_buf.data(),
+                                   static_cast<int>(_first_block_buf.size()))) 
{
+            return Status::InternalError("Failed to parse native PBlock for 
schema from file {}",
+                                         _scan_range.path);
+        }
+        RETURN_IF_ERROR(_init_schema_from_pblock(pblock));
+    }
+
+    for (size_t i = 0; i < _schema_col_names.size(); ++i) {
+        name_to_type->emplace(_schema_col_names[i], _schema_col_types[i]);
+    }
+    return Status::OK();
+}
+
+Status NativeReader::init_schema_reader() {
+    RETURN_IF_ERROR(init_reader());
+    return Status::OK();
+}
+
+Status NativeReader::get_parsed_schema(std::vector<std::string>* col_names,
+                                       std::vector<DataTypePtr>* col_types) {
+    RETURN_IF_ERROR(init_reader());
+
+    if (!_schema_inited) {
+        if (!_first_block_loaded) {
+            bool local_eof = false;
+            RETURN_IF_ERROR(_read_next_pblock(&_first_block_buf, &local_eof));
+            // Treat file as empty only if we reach EOF and there is no block 
data at all.
+            if (local_eof && _first_block_buf.empty()) {
+                return Status::EndOfFile("empty native file {}", 
_scan_range.path);
+            }
+            // Non-EOF but empty buffer means corrupted native file.
+            if (_first_block_buf.empty()) {
+                return Status::InternalError("first native block is empty {}", 
_scan_range.path);
+            }
+            _first_block_loaded = true;
+        }
+
+        PBlock pblock;
+        if (!pblock.ParseFromArray(_first_block_buf.data(),
+                                   static_cast<int>(_first_block_buf.size()))) 
{
+            return Status::InternalError("Failed to parse native PBlock for 
schema from file {}",
+                                         _scan_range.path);
+        }
+        RETURN_IF_ERROR(_init_schema_from_pblock(pblock));
+    }
+
+    *col_names = _schema_col_names;
+    *col_types = _schema_col_types;
+    return Status::OK();
+}
+
+Status NativeReader::close() {
+    _file_reader.reset();
+    return Status::OK();
+}
+
+Status NativeReader::_read_next_pblock(std::string* buff, bool* eof) {
+    *eof = false;
+    buff->clear();
+
+    if (_file_reader == nullptr) {
+        RETURN_IF_ERROR(init_reader());
+    }
+
+    if (_current_offset >= _file_size) {
+        *eof = true;
+        return Status::OK();
+    }
+
+    uint64_t len = 0;
+    Slice len_slice(reinterpret_cast<char*>(&len), sizeof(len));
+    size_t bytes_read = 0;
+    RETURN_IF_ERROR(_file_reader->read_at(_current_offset, len_slice, 
&bytes_read));
+    if (bytes_read == 0) {
+        *eof = true;
+        return Status::OK();
+    }
+    if (bytes_read != sizeof(len)) {
+        return Status::InternalError(
+                "Failed to read native block length from file {}, expect {}, "
+                "actual {}",
+                _scan_range.path, sizeof(len), bytes_read);
+    }
+
+    _current_offset += sizeof(len);
+    if (len == 0) {
+        // Empty block, nothing to read.
+        *eof = (_current_offset >= _file_size);
+        return Status::OK();
+    }
+
+    buff->assign(len, '\0');
+    Slice data_slice(buff->data(), len);
+    bytes_read = 0;
+    RETURN_IF_ERROR(_file_reader->read_at(_current_offset, data_slice, 
&bytes_read));
+    if (bytes_read != len) {
+        return Status::InternalError(
+                "Failed to read native block body from file {}, expect {}, "
+                "actual {}",
+                _scan_range.path, len, bytes_read);
+    }
+
+    _current_offset += len;
+    *eof = (_current_offset >= _file_size);
+    return Status::OK();
+}
+
+Status NativeReader::_init_schema_from_pblock(const PBlock& pblock) {
+    _schema_col_names.clear();
+    _schema_col_types.clear();
+
+    for (const auto& pcol_meta : pblock.column_metas()) {
+        DataTypePtr type = 
make_nullable(DataTypeFactory::instance().create_data_type(pcol_meta));
+        VLOG_DEBUG << "init_schema_from_pblock, name=" << pcol_meta.name()
+                   << ", type=" << type->get_name();
+        _schema_col_names.emplace_back(pcol_meta.name());
+        _schema_col_types.emplace_back(type);
+    }
+    _schema_inited = true;
+    return Status::OK();
+}
+
+#include "common/compile_check_end.h"
+
+} // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/native/native_reader.h 
b/be/src/vec/exec/format/native/native_reader.h
new file mode 100644
index 00000000000..bc1635e0ccb
--- /dev/null
+++ b/be/src/vec/exec/format/native/native_reader.h
@@ -0,0 +1,107 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstddef>
+#include <string>
+#include <unordered_map>
+#include <unordered_set>
+
+#include "common/status.h"
+#include "gen_cpp/PlanNodes_types.h"
+#include "io/fs/file_reader_writer_fwd.h"
+#include "vec/exec/format/generic_reader.h"
+
+namespace doris {
+class RuntimeProfile;
+class RuntimeState;
+
+namespace io {
+struct IOContext;
+} // namespace io
+} // namespace doris
+
+namespace doris::vectorized {
+class Block;
+
+#include "common/compile_check_begin.h"
+
+// Doris Native format reader.
+// it will read a sequence of Blocks encoded in Doris Native binary format.
+//
+// NOTE: current implementation is just a skeleton and will be filled step by 
step.
+class NativeReader : public GenericReader {
+public:
+    ENABLE_FACTORY_CREATOR(NativeReader);
+
+    NativeReader(RuntimeProfile* profile, const TFileScanRangeParams& params,
+                 const TFileRangeDesc& range, io::IOContext* io_ctx, 
RuntimeState* state);
+
+    ~NativeReader() override;
+
+    // Initialize underlying file reader and any format specific state.
+    Status init_reader();
+
+    Status get_next_block(Block* block, size_t* read_rows, bool* eof) override;
+
+    Status get_columns(std::unordered_map<std::string, DataTypePtr>* 
name_to_type,
+                       std::unordered_set<std::string>* missing_cols) override;
+
+    Status init_schema_reader() override;
+
+    Status get_parsed_schema(std::vector<std::string>* col_names,
+                             std::vector<DataTypePtr>* col_types) override;
+
+    Status close() override;
+
+    bool count_read_rows() override { return true; }
+
+protected:
+    void _collect_profile_before_close() override {}
+
+private:
+    RuntimeProfile* _profile = nullptr;
+    const TFileScanRangeParams& _scan_params;
+    const TFileRangeDesc& _scan_range;
+
+    io::FileReaderSPtr _file_reader;
+    io::IOContext* _io_ctx = nullptr;
+    RuntimeState* _state = nullptr;
+
+    bool _eof = false;
+
+    // Current read offset in the underlying file.
+    int64_t _current_offset = 0;
+    int64_t _file_size = 0;
+
+    // Cached schema information from the first PBlock.
+    bool _schema_inited = false;
+    std::vector<std::string> _schema_col_names;
+    std::vector<DataTypePtr> _schema_col_types;
+
+    // Cached first block (serialized) to allow schema probing before data 
scan.
+    std::string _first_block_buf;
+    bool _first_block_loaded = false;
+    bool _first_block_consumed = false;
+
+    Status _read_next_pblock(std::string* buff, bool* eof);
+    Status _init_schema_from_pblock(const PBlock& pblock);
+};
+
+#include "common/compile_check_end.h"
+} // namespace doris::vectorized
diff --git a/be/src/vec/exec/scan/file_scanner.cpp 
b/be/src/vec/exec/scan/file_scanner.cpp
index 5124f8ea631..37f580b25ff 100644
--- a/be/src/vec/exec/scan/file_scanner.cpp
+++ b/be/src/vec/exec/scan/file_scanner.cpp
@@ -60,6 +60,7 @@
 #include "vec/exec/format/avro/avro_jni_reader.h"
 #include "vec/exec/format/csv/csv_reader.h"
 #include "vec/exec/format/json/new_json_reader.h"
+#include "vec/exec/format/native/native_reader.h"
 #include "vec/exec/format/orc/vorc_reader.h"
 #include "vec/exec/format/parquet/vparquet_reader.h"
 #include "vec/exec/format/table/hive_reader.h"
@@ -599,10 +600,6 @@ Status FileScanner::_cast_to_input_block(Block* block) {
             // skip columns which does not exist in file
             continue;
         }
-        if (slot_desc->type()->get_primitive_type() == 
PrimitiveType::TYPE_VARIANT) {
-            // skip variant type
-            continue;
-        }
         auto& arg = 
_src_block_ptr->get_by_position(_src_block_name_to_idx[slot_desc->col_name()]);
         auto return_type = slot_desc->get_data_type_ptr();
         // remove nullable here, let the get_function decide whether nullable
@@ -617,8 +614,10 @@ Status FileScanner::_cast_to_input_block(Block* block) {
                                          return_type->get_name());
         }
         idx = _src_block_name_to_idx[slot_desc->col_name()];
+        DCHECK(_state != nullptr);
+        auto ctx = FunctionContext::create_context(_state, {}, {});
         RETURN_IF_ERROR(
-                func_cast->execute(nullptr, *_src_block_ptr, {idx}, idx, 
arg.column->size()));
+                func_cast->execute(ctx.get(), *_src_block_ptr, {idx}, idx, 
arg.column->size()));
         _src_block_ptr->get_by_position(idx).type = std::move(return_type);
     }
     return Status::OK();
@@ -1118,6 +1117,14 @@ Status FileScanner::_get_next_reader() {
             init_status = 
((WalReader*)(_cur_reader.get()))->init_reader(_output_tuple_desc);
             break;
         }
+        case TFileFormatType::FORMAT_NATIVE: {
+            auto reader =
+                    NativeReader::create_unique(_profile, *_params, range, 
_io_ctx.get(), _state);
+            init_status = reader->init_reader();
+            _cur_reader = std::move(reader);
+            need_to_get_parsed_schema = false;
+            break;
+        }
         case TFileFormatType::FORMAT_ARROW: {
             if (range.__isset.table_format_params &&
                 range.table_format_params.table_format_type == "remote_doris") 
{
diff --git a/be/src/vec/functions/cast/cast_to_jsonb.h 
b/be/src/vec/functions/cast/cast_to_jsonb.h
index 92e3d4954a9..bd3cd8a329b 100644
--- a/be/src/vec/functions/cast/cast_to_jsonb.h
+++ b/be/src/vec/functions/cast/cast_to_jsonb.h
@@ -222,7 +222,7 @@ struct ParseJsonbFromString {
     }
 };
 
-// create cresponding jsonb value with type to_type
+// create corresponding jsonb value with type to_type
 // use jsonb writer to create jsonb value
 WrapperType create_cast_to_jsonb_wrapper(const DataTypePtr& from_type, const 
DataTypeJsonb& to_type,
                                          bool string_as_jsonb_string) {
diff --git a/be/src/vec/functions/cast/cast_to_variant.h 
b/be/src/vec/functions/cast/cast_to_variant.h
index 1593f9023de..1c45bb46597 100644
--- a/be/src/vec/functions/cast/cast_to_variant.h
+++ b/be/src/vec/functions/cast/cast_to_variant.h
@@ -20,86 +20,102 @@
 #include "cast_base.h"
 #include "cast_to_string.h"
 #include "vec/data_types/data_type_variant.h"
+
 namespace doris::vectorized::CastWrapper {
 
-struct CastFromVariant {
-    static Status execute(FunctionContext* context, Block& block, const 
ColumnNumbers& arguments,
-                          uint32_t result, size_t input_rows_count,
-                          const NullMap::value_type* null_map = nullptr) {
-        auto& data_type_to = block.get_by_position(result).type;
-        const auto& col_with_type_and_name = 
block.get_by_position(arguments[0]);
-        const auto& col_from = col_with_type_and_name.column;
-        const auto& variant = assert_cast<const ColumnVariant&>(*col_from);
-        ColumnPtr col_to = data_type_to->create_column();
-        if (!variant.is_finalized()) {
-            // ColumnVariant should be finalized before parsing, finalize 
maybe modify original column structure
-            variant.assume_mutable()->finalize();
-        }
-        // It's important to convert as many elements as possible in this 
context. For instance,
-        // if the root of this variant column is a number column, converting 
it to a number column
-        // is acceptable. However, if the destination type is a string and 
root is none scalar root, then
-        // we should convert the entire tree to a string.
-        bool is_root_valuable = variant.is_scalar_variant() ||
-                                (!variant.is_null_root() &&
-                                 variant.get_root_type()->get_primitive_type() 
!= INVALID_TYPE &&
-                                 
!is_string_type(data_type_to->get_primitive_type()) &&
-                                 data_type_to->get_primitive_type() != 
TYPE_JSONB);
-        if (is_root_valuable) {
-            ColumnPtr nested = variant.get_root();
-            auto nested_from_type = variant.get_root_type();
-            // DCHECK(nested_from_type->is_nullable());
-            DCHECK(!data_type_to->is_nullable());
-            auto new_context = context->clone();
+// shared implementation for casting from variant to arbitrary non-nullable 
target type
+inline Status cast_from_variant_impl(FunctionContext* context, Block& block,
+                                     const ColumnNumbers& arguments, uint32_t 
result,
+                                     size_t input_rows_count,
+                                     const NullMap::value_type* /*null_map*/,
+                                     const DataTypePtr& data_type_to) {
+    const auto& col_with_type_and_name = block.get_by_position(arguments[0]);
+    const auto& col_from = col_with_type_and_name.column;
+    const auto& variant = assert_cast<const ColumnVariant&>(*col_from);
+    ColumnPtr col_to = data_type_to->create_column();
+
+    if (!variant.is_finalized()) {
+        // ColumnVariant should be finalized before parsing, finalize maybe 
modify original column structure
+        variant.assume_mutable()->finalize();
+    }
+
+    // It's important to convert as many elements as possible in this context. 
For instance,
+    // if the root of this variant column is a number column, converting it to 
a number column
+    // is acceptable. However, if the destination type is a string and root is 
none scalar root, then
+    // we should convert the entire tree to a string.
+    bool is_root_valuable = variant.is_scalar_variant() ||
+                            (!variant.is_null_root() &&
+                             variant.get_root_type()->get_primitive_type() != 
INVALID_TYPE &&
+                             
!is_string_type(data_type_to->get_primitive_type()) &&
+                             data_type_to->get_primitive_type() != TYPE_JSONB);
+
+    if (is_root_valuable) {
+        ColumnPtr nested = variant.get_root();
+        auto nested_from_type = variant.get_root_type();
+        // DCHECK(nested_from_type->is_nullable());
+        DCHECK(!data_type_to->is_nullable());
+        auto new_context = context == nullptr ? nullptr : context->clone();
+        if (new_context != nullptr) {
             new_context->set_jsonb_string_as_string(true);
             // Strict mode is for user INSERT validation, not variant internal 
conversion.
             new_context->set_enable_strict_mode(false);
-            // dst type nullable has been removed, so we should remove the 
inner nullable of root column
-            auto wrapper = prepare_impl(new_context.get(), 
remove_nullable(nested_from_type),
-                                        data_type_to);
-            Block tmp_block {{remove_nullable(nested), 
remove_nullable(nested_from_type), ""}};
-            tmp_block.insert({nullptr, data_type_to, ""});
-            /// Perform the requested conversion.
-            Status st = wrapper(new_context.get(), tmp_block, {0}, 1, 
input_rows_count, nullptr);
-            if (!st.ok()) {
-                // Fill with default values, which is null
-                
col_to->assume_mutable()->insert_many_defaults(input_rows_count);
-                col_to = make_nullable(col_to, true);
-            } else {
-                col_to = tmp_block.get_by_position(1).column;
-                // Note: here we should return the nullable result column
-                col_to = wrap_in_nullable(
-                        col_to, Block({{nested, nested_from_type, ""}, 
{col_to, data_type_to, ""}}),
-                        {0}, input_rows_count);
-            }
+        }
+        // dst type nullable has been removed, so we should remove the inner 
nullable of root column
+        auto wrapper =
+                prepare_impl(new_context.get(), 
remove_nullable(nested_from_type), data_type_to);
+        Block tmp_block {{remove_nullable(nested), 
remove_nullable(nested_from_type), ""}};
+        tmp_block.insert({nullptr, data_type_to, ""});
+        /// Perform the requested conversion.
+        Status st = wrapper(new_context.get(), tmp_block, {0}, 1, 
input_rows_count, nullptr);
+        if (!st.ok()) {
+            // Fill with default values, which is null
+            col_to->assume_mutable()->insert_many_defaults(input_rows_count);
+            col_to = make_nullable(col_to, true);
         } else {
-            if (variant.only_have_default_values()) {
-                
col_to->assume_mutable()->insert_many_defaults(input_rows_count);
-                col_to = make_nullable(col_to, true);
-            } else if (is_string_type(data_type_to->get_primitive_type())) {
-                // serialize to string
-                return CastToStringFunction::execute_impl(context, block, 
arguments, result,
-                                                          input_rows_count);
-            } else if (data_type_to->get_primitive_type() == TYPE_JSONB) {
-                // serialize to json by parsing
-                return cast_from_generic_to_jsonb(context, block, arguments, 
result,
-                                                  input_rows_count);
-            } else if (!data_type_to->is_nullable() &&
-                       !is_string_type(data_type_to->get_primitive_type())) {
-                // other types
-                
col_to->assume_mutable()->insert_many_defaults(input_rows_count);
-                col_to = make_nullable(col_to, true);
-            } else {
-                assert_cast<ColumnNullable&>(*col_to->assume_mutable())
-                        .insert_many_defaults(input_rows_count);
-            }
+            col_to = tmp_block.get_by_position(1).column;
+            // Note: here we should return the nullable result column
+            col_to = wrap_in_nullable(
+                    col_to, Block({{nested, nested_from_type, ""}, {col_to, 
data_type_to, ""}}),
+                    {0}, input_rows_count);
         }
-        if (col_to->size() != input_rows_count) {
-            return Status::InternalError("Unmatched row count {}, expected 
{}", col_to->size(),
-                                         input_rows_count);
+    } else {
+        if (variant.only_have_default_values()) {
+            col_to->assume_mutable()->insert_many_defaults(input_rows_count);
+            col_to = make_nullable(col_to, true);
+        } else if (is_string_type(data_type_to->get_primitive_type())) {
+            // serialize to string
+            return CastToStringFunction::execute_impl(context, block, 
arguments, result,
+                                                      input_rows_count);
+        } else if (data_type_to->get_primitive_type() == TYPE_JSONB) {
+            // serialize to json by parsing
+            return cast_from_generic_to_jsonb(context, block, arguments, 
result, input_rows_count);
+        } else if (!data_type_to->is_nullable() &&
+                   !is_string_type(data_type_to->get_primitive_type())) {
+            // other types
+            col_to->assume_mutable()->insert_many_defaults(input_rows_count);
+            col_to = make_nullable(col_to, true);
+        } else {
+            assert_cast<ColumnNullable&>(*col_to->assume_mutable())
+                    .insert_many_defaults(input_rows_count);
         }
+    }
 
-        block.replace_by_position(result, std::move(col_to));
-        return Status::OK();
+    if (col_to->size() != input_rows_count) {
+        return Status::InternalError("Unmatched row count {}, expected {}", 
col_to->size(),
+                                     input_rows_count);
+    }
+
+    block.replace_by_position(result, std::move(col_to));
+    return Status::OK();
+}
+
+struct CastFromVariant {
+    static Status execute(FunctionContext* context, Block& block, const 
ColumnNumbers& arguments,
+                          uint32_t result, size_t input_rows_count,
+                          const NullMap::value_type* null_map = nullptr) {
+        auto& data_type_to = block.get_by_position(result).type;
+        return cast_from_variant_impl(context, block, arguments, result, 
input_rows_count, null_map,
+                                      data_type_to);
     }
 };
 
@@ -119,16 +135,32 @@ struct CastToVariant {
     }
 };
 
-// create cresponding variant value to wrap from_type
+// create corresponding variant value to wrap from_type
 WrapperType create_cast_to_variant_wrapper(const DataTypePtr& from_type,
                                            const DataTypeVariant& to_type) {
+    if (from_type->get_primitive_type() == TYPE_VARIANT) {
+        // variant_max_subcolumns_count is not equal
+        return create_unsupport_wrapper(from_type->get_name(), 
to_type.get_name());
+    }
     return &CastToVariant::execute;
 }
 
-// create cresponding type convert from variant
+// create corresponding type convert from variant
 WrapperType create_cast_from_variant_wrapper(const DataTypeVariant& from_type,
                                              const DataTypePtr& to_type) {
-    return &CastFromVariant::execute;
+    if (to_type->get_primitive_type() == TYPE_VARIANT) {
+        // variant_max_subcolumns_count is not equal
+        return create_unsupport_wrapper(from_type.get_name(), 
to_type->get_name());
+    }
+    // Capture explicit target type to make the cast independent from 
Block[result].type.
+    DataTypePtr captured_to_type = to_type;
+    return [captured_to_type](FunctionContext* context, Block& block,
+                              const ColumnNumbers& arguments, uint32_t result,
+                              size_t input_rows_count,
+                              const NullMap::value_type* null_map) -> Status {
+        return cast_from_variant_impl(context, block, arguments, result, 
input_rows_count, null_map,
+                                      captured_to_type);
+    };
 }
 
 } // namespace doris::vectorized::CastWrapper
diff --git a/be/src/vec/runtime/vnative_transformer.cpp 
b/be/src/vec/runtime/vnative_transformer.cpp
new file mode 100644
index 00000000000..578364eff22
--- /dev/null
+++ b/be/src/vec/runtime/vnative_transformer.cpp
@@ -0,0 +1,133 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/runtime/vnative_transformer.h"
+
+#include <gen_cpp/DataSinks_types.h>
+#include <gen_cpp/data.pb.h>
+#include <glog/logging.h>
+
+#include "agent/be_exec_version_manager.h"
+#include "io/fs/file_writer.h"
+#include "runtime/runtime_state.h"
+#include "util/slice.h"
+#include "vec/core/block.h"
+#include "vec/exec/format/native/native_format.h"
+
+namespace doris::vectorized {
+
+#include "common/compile_check_begin.h"
+
+namespace {
+
+// Map high-level TFileCompressType to low-level segment_v2::CompressionTypePB.
+segment_v2::CompressionTypePB 
to_local_compression_type(TFileCompressType::type type) {
+    using CT = segment_v2::CompressionTypePB;
+    switch (type) {
+    case TFileCompressType::GZ:
+    case TFileCompressType::ZLIB:
+    case TFileCompressType::DEFLATE:
+        return CT::ZLIB;
+    case TFileCompressType::LZ4FRAME:
+    case TFileCompressType::LZ4BLOCK:
+        return CT::LZ4;
+    case TFileCompressType::SNAPPYBLOCK:
+        return CT::SNAPPY;
+    case TFileCompressType::ZSTD:
+        return CT::ZSTD;
+    default:
+        return CT::ZSTD;
+    }
+}
+
+} // namespace
+
+VNativeTransformer::VNativeTransformer(RuntimeState* state, 
doris::io::FileWriter* file_writer,
+                                       const VExprContextSPtrs& 
output_vexpr_ctxs,
+                                       bool output_object_data,
+                                       TFileCompressType::type compress_type)
+        : VFileFormatTransformer(state, output_vexpr_ctxs, output_object_data),
+          _file_writer(file_writer),
+          _compression_type(to_local_compression_type(compress_type)) {}
+
+Status VNativeTransformer::open() {
+    // Write Doris Native file header:
+    // [magic bytes "DORISN1\0"][uint32_t format_version]
+    DCHECK(_file_writer != nullptr);
+    uint32_t version = DORIS_NATIVE_FORMAT_VERSION;
+
+    Slice magic_slice(DORIS_NATIVE_MAGIC, sizeof(DORIS_NATIVE_MAGIC));
+    Slice version_slice(reinterpret_cast<char*>(&version), sizeof(uint32_t));
+
+    RETURN_IF_ERROR(_file_writer->append(magic_slice));
+    RETURN_IF_ERROR(_file_writer->append(version_slice));
+
+    _written_len += sizeof(DORIS_NATIVE_MAGIC) + sizeof(uint32_t);
+    return Status::OK();
+}
+
+Status VNativeTransformer::write(const Block& block) {
+    if (block.rows() == 0) {
+        return Status::OK();
+    }
+
+    // Serialize Block into PBlock using existing vec serialization logic.
+    PBlock pblock;
+    size_t uncompressed_bytes = 0;
+    size_t compressed_bytes = 0;
+    int64_t compressed_time = 0;
+
+    
RETURN_IF_ERROR(block.serialize(BeExecVersionManager::get_newest_version(), 
&pblock,
+                                    &uncompressed_bytes, &compressed_bytes, 
&compressed_time,
+                                    _compression_type));
+
+    std::string buff;
+    if (!pblock.SerializeToString(&buff)) {
+        auto err = Status::Error<ErrorCode::SERIALIZE_PROTOBUF_ERROR>(
+                "serialize native block error. block rows: {}", block.rows());
+        return err;
+    }
+
+    // Layout of Doris Native file:
+    // [uint64_t block_size][PBlock bytes]...
+    uint64_t len = buff.size();
+    Slice len_slice(reinterpret_cast<char*>(&len), sizeof(len));
+    RETURN_IF_ERROR(_file_writer->append(len_slice));
+    RETURN_IF_ERROR(_file_writer->append(buff));
+
+    _written_len += sizeof(len) + buff.size();
+    _cur_written_rows += block.rows();
+
+    return Status::OK();
+}
+
+Status VNativeTransformer::close() {
+    // Close underlying FileWriter to ensure data is flushed to disk.
+    if (_file_writer != nullptr && _file_writer->state() != 
doris::io::FileWriter::State::CLOSED) {
+        RETURN_IF_ERROR(_file_writer->close());
+    }
+
+    return Status::OK();
+}
+
+int64_t VNativeTransformer::written_len() {
+    return _written_len;
+}
+
+#include "common/compile_check_end.h"
+
+} // namespace doris::vectorized
diff --git a/be/src/vec/runtime/vnative_transformer.h 
b/be/src/vec/runtime/vnative_transformer.h
new file mode 100644
index 00000000000..fdf7ff46ac9
--- /dev/null
+++ b/be/src/vec/runtime/vnative_transformer.h
@@ -0,0 +1,65 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <gen_cpp/PlanNodes_types.h>
+
+#include <cstdint>
+
+#include "common/status.h"
+#include "vec/exprs/vexpr_fwd.h"
+#include "vfile_format_transformer.h"
+
+namespace doris::io {
+class FileWriter;
+} // namespace doris::io
+
+namespace doris::vectorized {
+class Block;
+
+#include "common/compile_check_begin.h"
+
+// Doris Native format writer.
+// It serializes vectorized Blocks into Doris Native binary format.
+class VNativeTransformer final : public VFileFormatTransformer {
+public:
+    // |compress_type| controls how the PBlock is compressed on disk (ZSTD, 
LZ4, etc).
+    // Defaults to ZSTD to preserve the previous behavior.
+    VNativeTransformer(RuntimeState* state, doris::io::FileWriter* file_writer,
+                       const VExprContextSPtrs& output_vexpr_ctxs, bool 
output_object_data,
+                       TFileCompressType::type compress_type = 
TFileCompressType::ZSTD);
+
+    ~VNativeTransformer() override = default;
+
+    Status open() override;
+
+    Status write(const Block& block) override;
+
+    Status close() override;
+
+    int64_t written_len() override;
+
+private:
+    doris::io::FileWriter* _file_writer; // not owned
+    int64_t _written_len = 0;
+    // Compression type used for Block::serialize (PBlock compression).
+    segment_v2::CompressionTypePB _compression_type 
{segment_v2::CompressionTypePB::ZSTD};
+};
+
+#include "common/compile_check_end.h"
+} // namespace doris::vectorized
diff --git a/be/src/vec/sink/writer/vfile_result_writer.cpp 
b/be/src/vec/sink/writer/vfile_result_writer.cpp
index 0a2aee6c811..edec869ab36 100644
--- a/be/src/vec/sink/writer/vfile_result_writer.cpp
+++ b/be/src/vec/sink/writer/vfile_result_writer.cpp
@@ -58,6 +58,7 @@
 #include "vec/exprs/vexpr_context.h"
 #include "vec/functions/cast/cast_to_string.h"
 #include "vec/runtime/vcsv_transformer.h"
+#include "vec/runtime/vnative_transformer.h"
 #include "vec/runtime/vorc_transformer.h"
 #include "vec/runtime/vparquet_transformer.h"
 #include "vec/sink/vmysql_result_writer.h"
@@ -146,6 +147,12 @@ Status VFileResultWriter::_create_file_writer(const 
std::string& file_name) {
                 _state, _file_writer_impl.get(), _vec_output_expr_ctxs, 
_file_opts->orc_schema, {},
                 _output_object_data, _file_opts->orc_compression_type));
         break;
+    case TFileFormatType::FORMAT_NATIVE:
+        // Doris Native binary format writer with configurable compression.
+        _vfile_writer.reset(new VNativeTransformer(_state, 
_file_writer_impl.get(),
+                                                   _vec_output_expr_ctxs, 
_output_object_data,
+                                                   
_file_opts->compression_type));
+        break;
     default:
         return Status::InternalError("unsupported file format: {}", 
_file_opts->file_format);
     }
@@ -203,6 +210,8 @@ std::string VFileResultWriter::_file_format_to_name() {
         return "parquet";
     case TFileFormatType::FORMAT_ORC:
         return "orc";
+    case TFileFormatType::FORMAT_NATIVE:
+        return "native";
     default:
         return "unknown";
     }
diff --git a/be/test/data/vec/native/all_types_single_row.native 
b/be/test/data/vec/native/all_types_single_row.native
new file mode 100644
index 00000000000..90f825153ad
Binary files /dev/null and 
b/be/test/data/vec/native/all_types_single_row.native differ
diff --git a/be/test/vec/exec/format/native/native_reader_writer_test.cpp 
b/be/test/vec/exec/format/native/native_reader_writer_test.cpp
new file mode 100644
index 00000000000..323f94944b3
--- /dev/null
+++ b/be/test/vec/exec/format/native/native_reader_writer_test.cpp
@@ -0,0 +1,1350 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+
+#include <memory>
+#include <string>
+
+#include "common/config.h"
+#include "io/fs/local_file_system.h"
+#include "io/fs/path.h"
+#include "runtime/define_primitive_type.h"
+#include "runtime/runtime_state.h"
+#include "util/jsonb_writer.h"
+#include "util/uid_util.h"
+#include "vec/columns/column_array.h"
+#include "vec/columns/column_map.h"
+#include "vec/columns/column_struct.h"
+#include "vec/core/block.h"
+#include "vec/core/column_with_type_and_name.h"
+#include "vec/core/field.h"
+#include "vec/data_types/data_type_array.h"
+#include "vec/data_types/data_type_factory.hpp"
+#include "vec/data_types/data_type_map.h"
+#include "vec/data_types/data_type_struct.h"
+#include "vec/data_types/data_type_variant.h"
+#include "vec/data_types/serde/data_type_serde.h"
+#include "vec/exec/format/native/native_format.h"
+#include "vec/exec/format/native/native_reader.h"
+#include "vec/runtime/vnative_transformer.h"
+
+namespace doris::vectorized {
+
+class NativeReaderWriterTest : public ::testing::Test {};
+
+static void fill_primitive_columns(Block& block, size_t rows) {
+    DataTypePtr int_type =
+            
make_nullable(DataTypeFactory::instance().create_data_type(TYPE_INT, false));
+    DataTypePtr str_type =
+            
make_nullable(DataTypeFactory::instance().create_data_type(TYPE_VARCHAR, 
false));
+
+    {
+        MutableColumnPtr col = int_type->create_column();
+        for (size_t i = 0; i < rows; ++i) {
+            if (i % 3 == 0) {
+                // null
+                col->insert_default();
+            } else {
+                // insert int value via Field to match column interface
+                auto field =
+                        
Field::create_field<PrimitiveType::TYPE_INT>(static_cast<int32_t>(i * 10));
+                col->insert(field);
+            }
+        }
+        block.insert(ColumnWithTypeAndName(std::move(col), int_type, 
"int_col"));
+    }
+
+    {
+        MutableColumnPtr col = str_type->create_column();
+        for (size_t i = 0; i < rows; ++i) {
+            if (i % 4 == 0) {
+                col->insert_default();
+            } else {
+                std::string v = "s" + std::to_string(i);
+                // insert varchar value via Field
+                auto field = 
Field::create_field<PrimitiveType::TYPE_VARCHAR>(v);
+                col->insert(field);
+            }
+        }
+        block.insert(ColumnWithTypeAndName(std::move(col), str_type, 
"str_col"));
+    }
+}
+
+static void fill_array_column(Block& block, size_t rows) {
+    // array<int>
+    DataTypePtr arr_nested_type = 
DataTypeFactory::instance().create_data_type(TYPE_INT, false);
+    DataTypePtr arr_type = 
make_nullable(std::make_shared<DataTypeArray>(arr_nested_type));
+
+    {
+        MutableColumnPtr col = arr_type->create_column();
+        auto* nullable_col = assert_cast<ColumnNullable*>(col.get());
+        auto& nested = nullable_col->get_nested_column();
+        auto& null_map = nullable_col->get_null_map_data();
+        auto& array_col = assert_cast<ColumnArray&>(nested);
+        auto& offsets = array_col.get_offsets();
+        auto& data = array_col.get_data();
+        auto mutable_data = data.assume_mutable();
+
+        for (size_t i = 0; i < rows; ++i) {
+            if (i % 5 == 0) {
+                // null array
+                nullable_col->insert_default();
+            } else {
+                // non-null array with 3 elements: [i, i+1, i+2]
+                null_map.push_back(0);
+                mutable_data->insert(
+                        
Field::create_field<PrimitiveType::TYPE_INT>(static_cast<int32_t>(i)));
+                mutable_data->insert(
+                        
Field::create_field<PrimitiveType::TYPE_INT>(static_cast<int32_t>(i + 1)));
+                mutable_data->insert(
+                        
Field::create_field<PrimitiveType::TYPE_INT>(static_cast<int32_t>(i + 2)));
+                offsets.push_back(offsets.empty() ? 3 : offsets.back() + 3);
+            }
+        }
+        block.insert(ColumnWithTypeAndName(std::move(col), arr_type, 
"arr_col"));
+    }
+}
+
+static void fill_map_column(Block& block, size_t rows) {
+    // map<string, int>
+    DataTypePtr map_key_type = 
DataTypeFactory::instance().create_data_type(TYPE_VARCHAR, false);
+    DataTypePtr map_value_type = 
DataTypeFactory::instance().create_data_type(TYPE_INT, false);
+    DataTypePtr map_type =
+            make_nullable(std::make_shared<DataTypeMap>(map_key_type, 
map_value_type));
+
+    // map<string, int> column
+    {
+        MutableColumnPtr col = map_type->create_column();
+        auto* nullable_col = assert_cast<ColumnNullable*>(col.get());
+        auto& nested = nullable_col->get_nested_column();
+        auto& null_map = nullable_col->get_null_map_data();
+
+        for (size_t i = 0; i < rows; ++i) {
+            if (i % 7 == 0) {
+                // null map
+                nullable_col->insert_default();
+            } else {
+                null_map.push_back(0);
+                auto& offsets = assert_cast<ColumnMap&>(nested).get_offsets();
+                auto& keys = assert_cast<ColumnMap&>(nested).get_keys();
+                auto& values = assert_cast<ColumnMap&>(nested).get_values();
+
+                auto mutable_keys = keys.assume_mutable();
+                auto mutable_values = values.assume_mutable();
+
+                std::string k1 = "k" + std::to_string(i);
+                std::string k2 = "k" + std::to_string(i + 1);
+                
mutable_keys->insert(Field::create_field<PrimitiveType::TYPE_VARCHAR>(k1));
+                mutable_values->insert(
+                        
Field::create_field<PrimitiveType::TYPE_INT>(static_cast<int32_t>(i)));
+                
mutable_keys->insert(Field::create_field<PrimitiveType::TYPE_VARCHAR>(k2));
+                mutable_values->insert(
+                        
Field::create_field<PrimitiveType::TYPE_INT>(static_cast<int32_t>(i + 1)));
+
+                offsets.push_back(offsets.empty() ? 2 : offsets.back() + 2);
+            }
+        }
+        block.insert(ColumnWithTypeAndName(std::move(col), map_type, 
"map_col"));
+    }
+}
+
+static void fill_struct_column(Block& block, size_t rows) {
+    // struct<si:int, ss:string>
+    DataTypes struct_fields;
+    struct_fields.emplace_back(
+            
make_nullable(DataTypeFactory::instance().create_data_type(TYPE_INT, false)));
+    struct_fields.emplace_back(
+            
make_nullable(DataTypeFactory::instance().create_data_type(TYPE_VARCHAR, 
false)));
+    DataTypePtr struct_type = 
make_nullable(std::make_shared<DataTypeStruct>(struct_fields));
+
+    // struct<si:int, ss:string> column
+    {
+        MutableColumnPtr col = struct_type->create_column();
+        auto* nullable_col = assert_cast<ColumnNullable*>(col.get());
+        auto& nested = nullable_col->get_nested_column();
+        auto& null_map = nullable_col->get_null_map_data();
+
+        auto& struct_col = assert_cast<ColumnStruct&>(nested);
+        const auto& fields = struct_col.get_columns();
+
+        for (size_t i = 0; i < rows; ++i) {
+            if (i % 6 == 0) {
+                nullable_col->insert_default();
+            } else {
+                null_map.push_back(0);
+                auto mutable_field0 = fields[0]->assume_mutable();
+                auto mutable_field1 = fields[1]->assume_mutable();
+                // int field
+                
mutable_field0->insert(Field::create_field<PrimitiveType::TYPE_INT>(
+                        static_cast<int32_t>(i * 100)));
+                // string field
+                std::string vs = "ss" + std::to_string(i);
+                
mutable_field1->insert(Field::create_field<PrimitiveType::TYPE_VARCHAR>(vs));
+            }
+        }
+        block.insert(ColumnWithTypeAndName(std::move(col), struct_type, 
"struct_col"));
+    }
+}
+
+static void fill_variant_column(Block& block, size_t rows) {
+    // variant
+    DataTypePtr variant_type = 
make_nullable(std::make_shared<DataTypeVariant>());
+
+    // variant column: use JSON strings + deserialize_column_from_json_vector 
to populate ColumnVariant
+    {
+        MutableColumnPtr col = variant_type->create_column();
+        auto* nullable_col = assert_cast<ColumnNullable*>(col.get());
+        auto& nested = nullable_col->get_nested_column(); // ColumnVariant
+
+        // Prepare JSON strings with variable number of keys per row
+        std::vector<std::string> json_rows;
+        json_rows.reserve(rows);
+        std::vector<Slice> slices;
+        slices.reserve(rows);
+
+        size_t key_cnt = 100;
+        for (size_t i = 0; i < rows; ++i) {
+            // key count cycles between 1, 2, 3, ... for better coverage
+            std::string json = "{";
+            for (size_t k = 0; k < key_cnt; ++k) {
+                if (k > 0) {
+                    json += ",";
+                }
+                // keys: "k0", "k1", ...
+                json += "\"k" + std::to_string(k) + "\":";
+                // mix int and string values
+                if ((i + k) % 2 == 0) {
+                    json += std::to_string(static_cast<int32_t>(i * 10 + k));
+                } else {
+                    json += "\"v" + std::to_string(i * 10 + k) + "\"";
+                }
+            }
+            json += "}";
+            json_rows.emplace_back(std::move(json));
+            slices.emplace_back(json_rows.back().data(), 
json_rows.back().size());
+        }
+
+        // Use Variant SerDe to parse JSON into ColumnVariant
+        auto variant_type_inner = std::make_shared<DataTypeVariant>();
+        auto serde = variant_type_inner->get_serde();
+        auto* variant_serde = assert_cast<DataTypeVariantSerDe*>(serde.get());
+
+        uint64_t num_deserialized = 0;
+        DataTypeSerDe::FormatOptions options;
+        Status st = variant_serde->deserialize_column_from_json_vector(nested, 
slices,
+                                                                       
&num_deserialized, options);
+        ASSERT_TRUE(st.ok()) << st;
+        ASSERT_EQ(rows, num_deserialized);
+
+        // All rows are treated as non-null here
+        auto& null_map = nullable_col->get_null_map_data();
+        null_map.clear();
+        null_map.resize_fill(rows, 0);
+
+        block.insert(ColumnWithTypeAndName(std::move(col), variant_type, 
"variant_col"));
+    }
+}
+
+static void fill_complex_columns(Block& block, size_t rows) {
+    fill_array_column(block, rows);
+    fill_map_column(block, rows);
+    fill_struct_column(block, rows);
+    fill_variant_column(block, rows);
+}
+
+static Block create_test_block(size_t rows) {
+    Block block;
+    // simple schema with primitive + complex types:
+    // int_col, str_col, arr_col, map_col, struct_col, variant_col
+    fill_primitive_columns(block, rows);
+    fill_complex_columns(block, rows);
+
+    return block;
+}
+
+TEST_F(NativeReaderWriterTest, round_trip_native_file) {
+    // Prepare a temporary local file path
+    UniqueId uid = UniqueId::gen_uid();
+    std::string uuid = uid.to_string();
+    std::string file_path = "./native_format_" + uuid + ".native";
+
+    // 1. Write block to Native file via VNativeTransformer
+    auto fs = io::global_local_filesystem();
+    io::FileWriterPtr file_writer;
+    Status st = fs->create_file(file_path, &file_writer);
+    ASSERT_TRUE(st.ok()) << st;
+
+    RuntimeState state;
+    VExprContextSPtrs exprs; // empty, VNativeTransformer won't use it directly
+    VNativeTransformer transformer(&state, file_writer.get(), exprs, false);
+    st = transformer.open();
+    ASSERT_TRUE(st.ok()) << st;
+
+    Block src_block = create_test_block(16);
+    st = transformer.write(src_block);
+    ASSERT_TRUE(st.ok()) << st;
+    // VNativeTransformer::close() will also close the underlying FileWriter,
+    // so we don't need (and must not) close file_writer again here.
+    st = transformer.close();
+    ASSERT_TRUE(st.ok()) << st;
+
+    // 2. Read back via NativeReader using normal file scan path
+    TFileScanRangeParams scan_params;
+    scan_params.__set_file_type(TFileType::FILE_LOCAL);
+    TFileRangeDesc scan_range;
+    scan_range.__set_path(file_path);
+    scan_range.__set_file_type(TFileType::FILE_LOCAL);
+    NativeReader reader_impl(nullptr, scan_params, scan_range, nullptr, 
&state);
+
+    Block dst_block;
+    size_t read_rows = 0;
+    bool eof = false;
+    st = reader_impl.get_next_block(&dst_block, &read_rows, &eof);
+    ASSERT_TRUE(st.ok()) << st;
+    ASSERT_EQ(src_block.rows(), read_rows);
+    ASSERT_EQ(src_block.columns(), dst_block.columns());
+
+    // Compare column-wise values
+    for (size_t col = 0; col < src_block.columns(); ++col) {
+        const auto& src = src_block.get_by_position(col);
+        const auto& dst = dst_block.get_by_position(col);
+        ASSERT_EQ(src.type->get_family_name(), dst.type->get_family_name());
+        ASSERT_EQ(src.column->size(), dst.column->size());
+        for (size_t row = 0; row < src_block.rows(); ++row) {
+            auto src_field = (*src.column)[row];
+            auto dst_field = (*dst.column)[row];
+            ASSERT_EQ(src_field, dst_field) << "mismatch at col=" << col << " 
row=" << row;
+        }
+    }
+
+    // Next call should hit EOF
+    Block dst_block2;
+    read_rows = 0;
+    eof = false;
+    st = reader_impl.get_next_block(&dst_block2, &read_rows, &eof);
+    ASSERT_TRUE(st.ok()) << st;
+    ASSERT_TRUE(eof);
+    ASSERT_EQ(read_rows, 0);
+
+    // Clean up temp file
+    bool exists = false;
+    st = fs->exists(file_path, &exists);
+    if (st.ok() && exists) {
+        static_cast<void>(fs->delete_file(file_path));
+    }
+}
+
+// Edge cases: empty file and single-row file
+TEST_F(NativeReaderWriterTest, round_trip_empty_and_single_row) {
+    auto fs = io::global_local_filesystem();
+
+    // 1. Empty native file (no data blocks)
+    {
+        UniqueId uid = UniqueId::gen_uid();
+        std::string uuid = uid.to_string();
+        std::string file_path = "./native_format_empty_" + uuid + ".native";
+
+        io::FileWriterPtr file_writer;
+        Status st = fs->create_file(file_path, &file_writer);
+        ASSERT_TRUE(st.ok()) << st;
+
+        RuntimeState state;
+        VExprContextSPtrs exprs;
+        VNativeTransformer transformer(&state, file_writer.get(), exprs, 
false);
+        st = transformer.open();
+        ASSERT_TRUE(st.ok()) << st;
+
+        // Write an empty block, should not produce any data block in file.
+        Block empty_block;
+        st = transformer.write(empty_block);
+        ASSERT_TRUE(st.ok()) << st;
+        st = transformer.close();
+        ASSERT_TRUE(st.ok()) << st;
+
+        // Read back: should directly hit EOF with 0 rows.
+        TFileScanRangeParams scan_params;
+        scan_params.__set_file_type(TFileType::FILE_LOCAL);
+        TFileRangeDesc scan_range;
+        scan_range.__set_path(file_path);
+        scan_range.__set_file_type(TFileType::FILE_LOCAL);
+        NativeReader reader_impl(nullptr, scan_params, scan_range, nullptr, 
&state);
+
+        Block dst_block;
+        size_t read_rows = 0;
+        bool eof = false;
+        st = reader_impl.get_next_block(&dst_block, &read_rows, &eof);
+        ASSERT_TRUE(st.ok()) << st;
+        ASSERT_EQ(0U, read_rows);
+        ASSERT_TRUE(eof);
+
+        bool exists = false;
+        st = fs->exists(file_path, &exists);
+        if (st.ok() && exists) {
+            static_cast<void>(fs->delete_file(file_path));
+        }
+    }
+
+    // 2. Single-row native file
+    {
+        UniqueId uid = UniqueId::gen_uid();
+        std::string uuid = uid.to_string();
+        std::string file_path = "./native_format_single_row_" + uuid + 
".native";
+
+        io::FileWriterPtr file_writer;
+        Status st = fs->create_file(file_path, &file_writer);
+        ASSERT_TRUE(st.ok()) << st;
+
+        RuntimeState state;
+        VExprContextSPtrs exprs;
+        VNativeTransformer transformer(&state, file_writer.get(), exprs, 
false);
+        st = transformer.open();
+        ASSERT_TRUE(st.ok()) << st;
+
+        Block src_block = create_test_block(1);
+        st = transformer.write(src_block);
+        ASSERT_TRUE(st.ok()) << st;
+        st = transformer.close();
+        ASSERT_TRUE(st.ok()) << st;
+
+        TFileScanRangeParams scan_params;
+        scan_params.__set_file_type(TFileType::FILE_LOCAL);
+        TFileRangeDesc scan_range;
+        scan_range.__set_path(file_path);
+        scan_range.__set_file_type(TFileType::FILE_LOCAL);
+        NativeReader reader_impl(nullptr, scan_params, scan_range, nullptr, 
&state);
+
+        Block dst_block;
+        size_t read_rows = 0;
+        bool eof = false;
+        st = reader_impl.get_next_block(&dst_block, &read_rows, &eof);
+        ASSERT_TRUE(st.ok()) << st;
+        ASSERT_EQ(src_block.rows(), read_rows);
+        ASSERT_EQ(src_block.columns(), dst_block.columns());
+        ASSERT_FALSE(eof);
+
+        // Verify data equality for the single row.
+        for (size_t col = 0; col < src_block.columns(); ++col) {
+            const auto& src = src_block.get_by_position(col);
+            const auto& dst = dst_block.get_by_position(col);
+            ASSERT_EQ(src.type->get_family_name(), 
dst.type->get_family_name());
+            ASSERT_EQ(src.column->size(), dst.column->size());
+            auto src_field = (*src.column)[0];
+            auto dst_field = (*dst.column)[0];
+            ASSERT_EQ(src_field, dst_field) << "mismatch at col=" << col << " 
row=0";
+        }
+
+        // Next call should hit EOF
+        Block dst_block2;
+        read_rows = 0;
+        eof = false;
+        st = reader_impl.get_next_block(&dst_block2, &read_rows, &eof);
+        ASSERT_TRUE(st.ok()) << st;
+        ASSERT_TRUE(eof);
+        ASSERT_EQ(read_rows, 0U);
+
+        bool exists = false;
+        st = fs->exists(file_path, &exists);
+        if (st.ok() && exists) {
+            static_cast<void>(fs->delete_file(file_path));
+        }
+    }
+}
+
+// Large volume test: verify round-trip correctness with many rows.
+TEST_F(NativeReaderWriterTest, round_trip_native_file_large_rows) {
+    UniqueId uid = UniqueId::gen_uid();
+    std::string uuid = uid.to_string();
+    std::string file_path = "./native_format_large_" + uuid + ".native";
+
+    auto fs = io::global_local_filesystem();
+    io::FileWriterPtr file_writer;
+    Status st = fs->create_file(file_path, &file_writer);
+    ASSERT_TRUE(st.ok()) << st;
+
+    RuntimeState state;
+    VExprContextSPtrs exprs;
+    VNativeTransformer transformer(&state, file_writer.get(), exprs, false);
+    st = transformer.open();
+    ASSERT_TRUE(st.ok()) << st;
+
+    // Use a relatively large number of rows to test stability and performance 
characteristics.
+    const size_t kRows = 9999;
+    Block src_block = create_test_block(kRows);
+
+    // Split into multiple blocks and write them one by one to exercise 
multi-block path.
+    const size_t kBatchRows = 128;
+    for (size_t offset = 0; offset < kRows; offset += kBatchRows) {
+        size_t len = std::min(kBatchRows, kRows - offset);
+        // Create a sub block with the same schema and a row range [offset, 
offset+len)
+        Block sub_block = src_block.clone_empty();
+        for (size_t col = 0; col < src_block.columns(); ++col) {
+            const auto& src_col = *src_block.get_by_position(col).column;
+            const auto& dst_col_holder = 
*sub_block.get_by_position(col).column;
+            auto dst_mutable = dst_col_holder.assume_mutable();
+            dst_mutable->insert_range_from(src_col, offset, len);
+        }
+        st = transformer.write(sub_block);
+        ASSERT_TRUE(st.ok()) << st;
+    }
+    st = transformer.close();
+    ASSERT_TRUE(st.ok()) << st;
+
+    TFileScanRangeParams scan_params;
+    scan_params.__set_file_type(TFileType::FILE_LOCAL);
+    TFileRangeDesc scan_range;
+    scan_range.__set_path(file_path);
+    scan_range.__set_file_type(TFileType::FILE_LOCAL);
+    NativeReader reader_impl(nullptr, scan_params, scan_range, nullptr, 
&state);
+
+    // Read back in multiple blocks and merge into a single result block.
+    Block merged_block;
+    bool first_block = true;
+    size_t total_read_rows = 0;
+    while (true) {
+        Block dst_block;
+        size_t read_rows = 0;
+        bool eof = false;
+        st = reader_impl.get_next_block(&dst_block, &read_rows, &eof);
+        ASSERT_TRUE(st.ok()) << st;
+        if (read_rows > 0) {
+            if (first_block) {
+                merged_block = dst_block;
+                total_read_rows = read_rows;
+                first_block = false;
+            } else {
+                MutableBlock merged_mutable(&merged_block);
+                Status add_st = merged_mutable.add_rows(&dst_block, 0, 
read_rows);
+                ASSERT_TRUE(add_st.ok()) << add_st;
+                total_read_rows += read_rows;
+            }
+        }
+        if (eof) {
+            break;
+        }
+        if (read_rows == 0) {
+            break;
+        }
+    }
+
+    ASSERT_EQ(src_block.rows(), total_read_rows);
+    ASSERT_EQ(src_block.columns(), merged_block.columns());
+
+    // Compare column-wise values
+    for (size_t col = 0; col < src_block.columns(); ++col) {
+        const auto& src = src_block.get_by_position(col);
+        const auto& dst = merged_block.get_by_position(col);
+        ASSERT_EQ(src.type->get_family_name(), dst.type->get_family_name());
+        ASSERT_EQ(src.column->size(), dst.column->size());
+        for (size_t row = 0; row < src_block.rows(); row += 10) {
+            auto src_field = (*src.column)[row];
+            auto dst_field = (*dst.column)[row];
+            ASSERT_EQ(src_field, dst_field) << "mismatch at col=" << col << " 
row=" << row;
+        }
+    }
+    bool exists = false;
+    st = fs->exists(file_path, &exists);
+    if (st.ok() && exists) {
+        static_cast<void>(fs->delete_file(file_path));
+    }
+}
+
+// Verify that NativeReader forces all columns to nullable, even if the writer
+// serialized non-nullable columns.
+TEST_F(NativeReaderWriterTest, non_nullable_columns_forced_nullable) {
+    auto fs = io::global_local_filesystem();
+
+    UniqueId uid = UniqueId::gen_uid();
+    std::string uuid = uid.to_string();
+    std::string file_path = "./native_format_non_nullable_" + uuid + ".native";
+
+    io::FileWriterPtr file_writer;
+    Status st = fs->create_file(file_path, &file_writer);
+    ASSERT_TRUE(st.ok()) << st;
+
+    RuntimeState state;
+    VExprContextSPtrs exprs;
+    // Use default compression type.
+    VNativeTransformer transformer(&state, file_writer.get(), exprs, false);
+    st = transformer.open();
+    ASSERT_TRUE(st.ok()) << st;
+
+    // Build a block with non-nullable columns.
+    Block src_block;
+    DataTypePtr int_type =
+            
DataTypeFactory::instance().create_data_type(PrimitiveType::TYPE_INT, false);
+    {
+        MutableColumnPtr col = int_type->create_column();
+        for (int i = 0; i < 8; ++i) {
+            auto field = Field::create_field<PrimitiveType::TYPE_INT>(i * 3);
+            col->insert(field);
+        }
+        src_block.insert(ColumnWithTypeAndName(std::move(col), int_type, 
"int_nn"));
+    }
+
+    DataTypePtr str_type =
+            
DataTypeFactory::instance().create_data_type(PrimitiveType::TYPE_VARCHAR, 
false);
+    {
+        MutableColumnPtr col = str_type->create_column();
+        for (int i = 0; i < 8; ++i) {
+            std::string v = "v" + std::to_string(i);
+            auto field = Field::create_field<PrimitiveType::TYPE_VARCHAR>(v);
+            col->insert(field);
+        }
+        src_block.insert(ColumnWithTypeAndName(std::move(col), str_type, 
"str_nn"));
+    }
+
+    st = transformer.write(src_block);
+    ASSERT_TRUE(st.ok()) << st;
+    st = transformer.close();
+    ASSERT_TRUE(st.ok()) << st;
+
+    // Read back via NativeReader.
+    TFileScanRangeParams scan_params;
+    scan_params.__set_file_type(TFileType::FILE_LOCAL);
+    TFileRangeDesc scan_range;
+    scan_range.__set_path(file_path);
+    scan_range.__set_file_type(TFileType::FILE_LOCAL);
+    NativeReader reader_impl(nullptr, scan_params, scan_range, nullptr, 
&state);
+
+    Block dst_block;
+    size_t read_rows = 0;
+    bool eof = false;
+    st = reader_impl.get_next_block(&dst_block, &read_rows, &eof);
+    ASSERT_TRUE(st.ok()) << st;
+    ASSERT_EQ(src_block.rows(), read_rows);
+    ASSERT_FALSE(eof);
+
+    // All columns returned by NativeReader should be nullable.
+    for (size_t col = 0; col < dst_block.columns(); ++col) {
+        const auto& dst = dst_block.get_by_position(col);
+        ASSERT_TRUE(dst.type->is_nullable()) << "column " << col << " should 
be nullable";
+    }
+
+    // Values should be preserved.
+    for (size_t col = 0; col < src_block.columns(); ++col) {
+        const auto& src = src_block.get_by_position(col);
+        const auto& dst = dst_block.get_by_position(col);
+        ASSERT_EQ(src.column->size(), dst.column->size());
+        for (size_t row = 0; row < src_block.rows(); ++row) {
+            auto src_field = (*src.column)[row];
+            auto dst_field = (*dst.column)[row];
+            ASSERT_EQ(src_field, dst_field) << "mismatch at col=" << col << " 
row=" << row;
+        }
+    }
+
+    bool exists = false;
+    st = fs->exists(file_path, &exists);
+    if (st.ok() && exists) {
+        static_cast<void>(fs->delete_file(file_path));
+    }
+}
+
+// Verify that VNativeTransformer writes the native file header and that
+// NativeReader can transparently read files with this header.
+TEST_F(NativeReaderWriterTest, 
transformer_writes_header_and_reader_handles_it) {
+    auto fs = io::global_local_filesystem();
+
+    UniqueId uid = UniqueId::gen_uid();
+    std::string uuid = uid.to_string();
+    std::string file_path = "./native_format_with_header_" + uuid + ".native";
+
+    // Write a small block via VNativeTransformer.
+    io::FileWriterPtr file_writer;
+    Status st = fs->create_file(file_path, &file_writer);
+    ASSERT_TRUE(st.ok()) << st;
+
+    RuntimeState state;
+    VExprContextSPtrs exprs;
+    VNativeTransformer transformer(&state, file_writer.get(), exprs, false);
+    st = transformer.open();
+    ASSERT_TRUE(st.ok()) << st;
+
+    Block src_block = create_test_block(4);
+    st = transformer.write(src_block);
+    ASSERT_TRUE(st.ok()) << st;
+    st = transformer.close();
+    ASSERT_TRUE(st.ok()) << st;
+
+    // Read back raw bytes and verify the header.
+    {
+        io::FileReaderSPtr file_reader;
+        st = fs->open_file(file_path, &file_reader);
+        ASSERT_TRUE(st.ok()) << st;
+        size_t file_size = file_reader->size();
+        ASSERT_GE(file_size, sizeof(uint64_t) + 12);
+
+        char header[12];
+        Slice header_slice(header, sizeof(header));
+        size_t bytes_read = 0;
+        st = file_reader->read_at(0, header_slice, &bytes_read);
+        ASSERT_TRUE(st.ok()) << st;
+        ASSERT_EQ(sizeof(header), bytes_read);
+
+        ASSERT_EQ(0, memcmp(header, DORIS_NATIVE_MAGIC, 
sizeof(DORIS_NATIVE_MAGIC)));
+        uint32_t version = 0;
+        memcpy(&version, header + sizeof(DORIS_NATIVE_MAGIC), 
sizeof(uint32_t));
+        ASSERT_EQ(DORIS_NATIVE_FORMAT_VERSION, version);
+    }
+
+    // Now read via NativeReader; it should detect the header and skip it.
+    TFileScanRangeParams scan_params;
+    scan_params.__set_file_type(TFileType::FILE_LOCAL);
+    TFileRangeDesc scan_range;
+    scan_range.__set_path(file_path);
+    scan_range.__set_file_type(TFileType::FILE_LOCAL);
+    NativeReader reader_impl(nullptr, scan_params, scan_range, nullptr, 
&state);
+
+    Block dst_block;
+    size_t read_rows = 0;
+    bool eof = false;
+    st = reader_impl.get_next_block(&dst_block, &read_rows, &eof);
+    ASSERT_TRUE(st.ok()) << st;
+    ASSERT_EQ(src_block.rows(), read_rows);
+    ASSERT_FALSE(eof);
+
+    bool exists = false;
+    st = fs->exists(file_path, &exists);
+    if (st.ok() && exists) {
+        static_cast<void>(fs->delete_file(file_path));
+    }
+}
+
+// Verify get_columns and get_parsed_schema can probe schema without scanning
+// the whole file.
+TEST_F(NativeReaderWriterTest, get_columns_and_parsed_schema) {
+    auto fs = io::global_local_filesystem();
+
+    UniqueId uid = UniqueId::gen_uid();
+    std::string uuid = uid.to_string();
+    std::string file_path = "./native_format_schema_" + uuid + ".native";
+
+    io::FileWriterPtr file_writer;
+    Status st = fs->create_file(file_path, &file_writer);
+    ASSERT_TRUE(st.ok()) << st;
+
+    RuntimeState state;
+    VExprContextSPtrs exprs;
+    VNativeTransformer transformer(&state, file_writer.get(), exprs, false);
+    st = transformer.open();
+    ASSERT_TRUE(st.ok()) << st;
+
+    Block src_block = create_test_block(5);
+    st = transformer.write(src_block);
+    ASSERT_TRUE(st.ok()) << st;
+    st = transformer.close();
+    ASSERT_TRUE(st.ok()) << st;
+
+    TFileScanRangeParams scan_params;
+    scan_params.__set_file_type(TFileType::FILE_LOCAL);
+    TFileRangeDesc scan_range;
+    scan_range.__set_path(file_path);
+    scan_range.__set_file_type(TFileType::FILE_LOCAL);
+    NativeReader reader_impl(nullptr, scan_params, scan_range, nullptr, 
&state);
+
+    std::unordered_map<std::string, DataTypePtr> name_to_type;
+    std::unordered_set<std::string> missing_cols;
+    st = reader_impl.get_columns(&name_to_type, &missing_cols);
+    ASSERT_TRUE(st.ok()) << st;
+    ASSERT_TRUE(missing_cols.empty());
+
+    // All columns from src_block should appear in name_to_type.
+    for (size_t i = 0; i < src_block.columns(); ++i) {
+        const auto& col = src_block.get_by_position(i);
+        auto it = name_to_type.find(col.name);
+        ASSERT_TRUE(it != name_to_type.end()) << "missing column " << col.name;
+        ASSERT_TRUE(it->second->is_nullable()) << "schema type should be 
nullable for " << col.name;
+    }
+
+    std::vector<std::string> col_names;
+    std::vector<DataTypePtr> col_types;
+    st = reader_impl.get_parsed_schema(&col_names, &col_types);
+    ASSERT_TRUE(st.ok()) << st;
+    ASSERT_EQ(col_names.size(), col_types.size());
+    ASSERT_EQ(col_names.size(), src_block.columns());
+
+    for (size_t i = 0; i < col_names.size(); ++i) {
+        ASSERT_TRUE(col_types[i]->is_nullable());
+    }
+
+    bool exists = false;
+    st = fs->exists(file_path, &exists);
+    if (st.ok() && exists) {
+        static_cast<void>(fs->delete_file(file_path));
+    }
+}
+
+// Create a test block containing all known primitive and complex types with a 
single row.
+// This function covers:
+// - Basic integers: BOOLEAN, TINYINT, SMALLINT, INT, BIGINT, LARGEINT
+// - Floating point: FLOAT, DOUBLE
+// - Date/Time: DATE, DATETIME, DATEV2, DATETIMEV2, TIMEV2
+// - String types: CHAR, VARCHAR, STRING
+// - Decimal types: DECIMALV2, DECIMAL32, DECIMAL64, DECIMAL128I, DECIMAL256
+// - IP types: IPV4, IPV6
+// - JSON: JSONB
+// - Complex types: ARRAY, MAP, STRUCT, VARIANT
+static Block create_all_types_test_block() {
+    Block block;
+
+    // 1. BOOLEAN
+    {
+        DataTypePtr type =
+                
make_nullable(DataTypeFactory::instance().create_data_type(TYPE_BOOLEAN, 
false));
+        MutableColumnPtr col = type->create_column();
+        col->insert(Field::create_field<PrimitiveType::TYPE_BOOLEAN>(UInt8 
{1})); // true
+        block.insert(ColumnWithTypeAndName(std::move(col), type, 
"col_boolean"));
+    }
+
+    // 2. TINYINT
+    {
+        DataTypePtr type =
+                
make_nullable(DataTypeFactory::instance().create_data_type(TYPE_TINYINT, 
false));
+        MutableColumnPtr col = type->create_column();
+        
col->insert(Field::create_field<PrimitiveType::TYPE_TINYINT>(static_cast<int8_t>(42)));
+        block.insert(ColumnWithTypeAndName(std::move(col), type, 
"col_tinyint"));
+    }
+
+    // 3. SMALLINT
+    {
+        DataTypePtr type =
+                
make_nullable(DataTypeFactory::instance().create_data_type(TYPE_SMALLINT, 
false));
+        MutableColumnPtr col = type->create_column();
+        col->insert(
+                
Field::create_field<PrimitiveType::TYPE_SMALLINT>(static_cast<int16_t>(1234)));
+        block.insert(ColumnWithTypeAndName(std::move(col), type, 
"col_smallint"));
+    }
+
+    // 4. INT
+    {
+        DataTypePtr type =
+                
make_nullable(DataTypeFactory::instance().create_data_type(TYPE_INT, false));
+        MutableColumnPtr col = type->create_column();
+        
col->insert(Field::create_field<PrimitiveType::TYPE_INT>(static_cast<int32_t>(123456)));
+        block.insert(ColumnWithTypeAndName(std::move(col), type, "col_int"));
+    }
+
+    // 5. BIGINT
+    {
+        DataTypePtr type =
+                
make_nullable(DataTypeFactory::instance().create_data_type(TYPE_BIGINT, false));
+        MutableColumnPtr col = type->create_column();
+        col->insert(Field::create_field<PrimitiveType::TYPE_BIGINT>(
+                static_cast<int64_t>(9876543210LL)));
+        block.insert(ColumnWithTypeAndName(std::move(col), type, 
"col_bigint"));
+    }
+
+    // 6. LARGEINT
+    {
+        DataTypePtr type =
+                
make_nullable(DataTypeFactory::instance().create_data_type(TYPE_LARGEINT, 
false));
+        MutableColumnPtr col = type->create_column();
+        Int128 large_val = Int128(123456789012345LL) * 1000000;
+        
col->insert(Field::create_field<PrimitiveType::TYPE_LARGEINT>(large_val));
+        block.insert(ColumnWithTypeAndName(std::move(col), type, 
"col_largeint"));
+    }
+
+    // 7. FLOAT
+    {
+        DataTypePtr type =
+                
make_nullable(DataTypeFactory::instance().create_data_type(TYPE_FLOAT, false));
+        MutableColumnPtr col = type->create_column();
+        col->insert(Field::create_field<PrimitiveType::TYPE_FLOAT>(3.14F));
+        block.insert(ColumnWithTypeAndName(std::move(col), type, "col_float"));
+    }
+
+    // 8. DOUBLE
+    {
+        DataTypePtr type =
+                
make_nullable(DataTypeFactory::instance().create_data_type(TYPE_DOUBLE, false));
+        MutableColumnPtr col = type->create_column();
+        
col->insert(Field::create_field<PrimitiveType::TYPE_DOUBLE>(1.234567890123456789));
+        block.insert(ColumnWithTypeAndName(std::move(col), type, 
"col_double"));
+    }
+
+    // 9. DATE (DateV1)
+    {
+        DataTypePtr type =
+                
make_nullable(DataTypeFactory::instance().create_data_type(TYPE_DATE, false));
+        MutableColumnPtr col = type->create_column();
+        VecDateTimeValue dt;
+        dt.from_date_int64(20231215); // 2023-12-15
+        col->insert(Field::create_field<PrimitiveType::TYPE_DATE>(dt));
+        block.insert(ColumnWithTypeAndName(std::move(col), type, "col_date"));
+    }
+
+    // 10. DATETIME (DateTimeV1)
+    {
+        DataTypePtr type =
+                
make_nullable(DataTypeFactory::instance().create_data_type(TYPE_DATETIME, 
false));
+        MutableColumnPtr col = type->create_column();
+        VecDateTimeValue dt;
+        dt.from_date_int64(20231215103045LL); // 2023-12-15 10:30:45
+        col->insert(Field::create_field<PrimitiveType::TYPE_DATETIME>(dt));
+        block.insert(ColumnWithTypeAndName(std::move(col), type, 
"col_datetime"));
+    }
+
+    // 11. DATEV2
+    {
+        DataTypePtr type =
+                
make_nullable(DataTypeFactory::instance().create_data_type(TYPE_DATEV2, false));
+        MutableColumnPtr col = type->create_column();
+        DateV2Value<DateV2ValueType> dv2;
+        dv2.from_date_int64(20231215);
+        col->insert(Field::create_field<PrimitiveType::TYPE_DATEV2>(dv2));
+        block.insert(ColumnWithTypeAndName(std::move(col), type, 
"col_datev2"));
+    }
+
+    // 12. DATETIMEV2 (scale=0)
+    {
+        DataTypePtr type = make_nullable(
+                DataTypeFactory::instance().create_data_type(TYPE_DATETIMEV2, 
false, 0, 0));
+        MutableColumnPtr col = type->create_column();
+        DateV2Value<DateTimeV2ValueType> dtv2;
+        dtv2.from_date_int64(20231215103045LL);
+        col->insert(Field::create_field<PrimitiveType::TYPE_DATETIMEV2>(dtv2));
+        block.insert(ColumnWithTypeAndName(std::move(col), type, 
"col_datetimev2"));
+    }
+
+    // 13. TIMEV2 (scale=0)
+    {
+        DataTypePtr type = make_nullable(
+                DataTypeFactory::instance().create_data_type(TYPE_TIMEV2, 
false, 0, 0));
+        MutableColumnPtr col = type->create_column();
+        // TIMEV2 is stored as Float64 representing seconds
+        col->insert(Field::create_field<PrimitiveType::TYPE_TIMEV2>(
+                37845.0)); // 10:30:45 in seconds
+        block.insert(ColumnWithTypeAndName(std::move(col), type, 
"col_timev2"));
+    }
+
+    // 14. CHAR
+    {
+        DataTypePtr type = make_nullable(
+                DataTypeFactory::instance().create_data_type(TYPE_CHAR, false, 
0, 0, 20));
+        MutableColumnPtr col = type->create_column();
+        
col->insert(Field::create_field<PrimitiveType::TYPE_CHAR>(std::string("fixed_char_val")));
+        block.insert(ColumnWithTypeAndName(std::move(col), type, "col_char"));
+    }
+
+    // 15. VARCHAR
+    {
+        DataTypePtr type =
+                
make_nullable(DataTypeFactory::instance().create_data_type(TYPE_VARCHAR, 
false));
+        MutableColumnPtr col = type->create_column();
+        col->insert(
+                
Field::create_field<PrimitiveType::TYPE_VARCHAR>(std::string("variable_string")));
+        block.insert(ColumnWithTypeAndName(std::move(col), type, 
"col_varchar"));
+    }
+
+    // 16. STRING
+    {
+        DataTypePtr type =
+                
make_nullable(DataTypeFactory::instance().create_data_type(TYPE_STRING, false));
+        MutableColumnPtr col = type->create_column();
+        col->insert(Field::create_field<PrimitiveType::TYPE_STRING>(
+                std::string("long_text_content_here")));
+        block.insert(ColumnWithTypeAndName(std::move(col), type, 
"col_string"));
+    }
+
+    // 17. DECIMALV2 (precision=27, scale=9)
+    {
+        DataTypePtr type = make_nullable(
+                DataTypeFactory::instance().create_data_type(TYPE_DECIMALV2, 
false, 27, 9));
+        MutableColumnPtr col = type->create_column();
+        DecimalV2Value dec_val(123456789, 123456789);
+        
col->insert(Field::create_field<PrimitiveType::TYPE_DECIMALV2>(dec_val));
+        block.insert(ColumnWithTypeAndName(std::move(col), type, 
"col_decimalv2"));
+    }
+
+    // 18. DECIMAL32 (precision=9, scale=2)
+    {
+        DataTypePtr type = make_nullable(
+                DataTypeFactory::instance().create_data_type(TYPE_DECIMAL32, 
false, 9, 2));
+        MutableColumnPtr col = type->create_column();
+        Decimal32 dec_val(static_cast<Int32>(12345678));
+        
col->insert(Field::create_field<PrimitiveType::TYPE_DECIMAL32>(dec_val));
+        block.insert(ColumnWithTypeAndName(std::move(col), type, 
"col_decimal32"));
+    }
+
+    // 19. DECIMAL64 (precision=18, scale=4)
+    {
+        DataTypePtr type = make_nullable(
+                DataTypeFactory::instance().create_data_type(TYPE_DECIMAL64, 
false, 18, 4));
+        MutableColumnPtr col = type->create_column();
+        Decimal64 dec_val(static_cast<Int64>(123456789012345678LL));
+        
col->insert(Field::create_field<PrimitiveType::TYPE_DECIMAL64>(dec_val));
+        block.insert(ColumnWithTypeAndName(std::move(col), type, 
"col_decimal64"));
+    }
+
+    // 20. DECIMAL128I (precision=38, scale=6)
+    {
+        DataTypePtr type = make_nullable(
+                DataTypeFactory::instance().create_data_type(TYPE_DECIMAL128I, 
false, 38, 6));
+        MutableColumnPtr col = type->create_column();
+        Decimal128V3 dec_val(static_cast<Int128>(123456789012345678LL) * 100);
+        
col->insert(Field::create_field<PrimitiveType::TYPE_DECIMAL128I>(dec_val));
+        block.insert(ColumnWithTypeAndName(std::move(col), type, 
"col_decimal128"));
+    }
+
+    // 21. DECIMAL256 (precision=76, scale=10)
+    {
+        DataTypePtr type = make_nullable(
+                DataTypeFactory::instance().create_data_type(TYPE_DECIMAL256, 
false, 76, 10));
+        MutableColumnPtr col = type->create_column();
+        wide::Int256 wide_val = wide::Int256(123456789012345678LL) * 
10000000000LL;
+        Decimal256 dec_val(wide_val);
+        
col->insert(Field::create_field<PrimitiveType::TYPE_DECIMAL256>(dec_val));
+        block.insert(ColumnWithTypeAndName(std::move(col), type, 
"col_decimal256"));
+    }
+
+    // 22. IPV4
+    {
+        DataTypePtr type =
+                
make_nullable(DataTypeFactory::instance().create_data_type(TYPE_IPV4, false));
+        MutableColumnPtr col = type->create_column();
+        IPv4 ip_val = (192U << 24) | (168U << 16) | (1U << 8) | 100U; // 
192.168.1.100
+        col->insert(Field::create_field<PrimitiveType::TYPE_IPV4>(ip_val));
+        block.insert(ColumnWithTypeAndName(std::move(col), type, "col_ipv4"));
+    }
+
+    // 23. IPV6
+    {
+        DataTypePtr type =
+                
make_nullable(DataTypeFactory::instance().create_data_type(TYPE_IPV6, false));
+        MutableColumnPtr col = type->create_column();
+        // ::ffff:192.168.1.100 in IPv6
+        IPv6 ip6_val = 0;
+        ip6_val = (static_cast<IPv6>(0xFFFF) << 32) |
+                  ((192U << 24) | (168U << 16) | (1U << 8) | 100U);
+        col->insert(Field::create_field<PrimitiveType::TYPE_IPV6>(ip6_val));
+        block.insert(ColumnWithTypeAndName(std::move(col), type, "col_ipv6"));
+    }
+
+    // 24. JSONB
+    {
+        DataTypePtr type =
+                
make_nullable(DataTypeFactory::instance().create_data_type(TYPE_JSONB, false));
+        MutableColumnPtr col = type->create_column();
+        // Use JsonbWriter to create JSONB binary data
+        JsonbWriter writer;
+        writer.writeStartObject();
+        writer.writeKey("key", static_cast<uint8_t>(3));
+        writer.writeStartString();
+        writer.writeString("value");
+        writer.writeEndString();
+        writer.writeKey("num", static_cast<uint8_t>(3));
+        writer.writeInt32(123);
+        writer.writeEndObject();
+        const char* jsonb_data = writer.getOutput()->getBuffer();
+        size_t jsonb_size = writer.getOutput()->getSize();
+        JsonbField field(jsonb_data, jsonb_size);
+        
col->insert(Field::create_field<PrimitiveType::TYPE_JSONB>(std::move(field)));
+        block.insert(ColumnWithTypeAndName(std::move(col), type, "col_jsonb"));
+    }
+
+    // 25. ARRAY<INT>
+    {
+        DataTypePtr arr_nested_type = 
DataTypeFactory::instance().create_data_type(TYPE_INT, false);
+        DataTypePtr arr_type = 
make_nullable(std::make_shared<DataTypeArray>(arr_nested_type));
+        MutableColumnPtr col = arr_type->create_column();
+        auto* nullable_col = assert_cast<ColumnNullable*>(col.get());
+        auto& nested = nullable_col->get_nested_column();
+        auto& null_map = nullable_col->get_null_map_data();
+        null_map.push_back(0); // non-null
+
+        auto& array_col = assert_cast<ColumnArray&>(nested);
+        auto& offsets = array_col.get_offsets();
+        auto& data = array_col.get_data();
+        auto mutable_data = data.assume_mutable();
+
+        mutable_data->insert(
+                
Field::create_field<PrimitiveType::TYPE_INT>(static_cast<int32_t>(10)));
+        mutable_data->insert(
+                
Field::create_field<PrimitiveType::TYPE_INT>(static_cast<int32_t>(20)));
+        mutable_data->insert(
+                
Field::create_field<PrimitiveType::TYPE_INT>(static_cast<int32_t>(30)));
+        offsets.push_back(3); // 3 elements
+        block.insert(ColumnWithTypeAndName(std::move(col), arr_type, 
"col_array"));
+    }
+
+    // 26. MAP<STRING, INT>
+    {
+        DataTypePtr map_key_type =
+                DataTypeFactory::instance().create_data_type(TYPE_VARCHAR, 
false);
+        DataTypePtr map_value_type = 
DataTypeFactory::instance().create_data_type(TYPE_INT, false);
+        DataTypePtr map_type =
+                make_nullable(std::make_shared<DataTypeMap>(map_key_type, 
map_value_type));
+        MutableColumnPtr col = map_type->create_column();
+        auto* nullable_col = assert_cast<ColumnNullable*>(col.get());
+        auto& nested = nullable_col->get_nested_column();
+        auto& null_map = nullable_col->get_null_map_data();
+        null_map.push_back(0); // non-null
+
+        auto& offsets = assert_cast<ColumnMap&>(nested).get_offsets();
+        auto& keys = assert_cast<ColumnMap&>(nested).get_keys();
+        auto& values = assert_cast<ColumnMap&>(nested).get_values();
+
+        auto mutable_keys = keys.assume_mutable();
+        auto mutable_values = values.assume_mutable();
+
+        
mutable_keys->insert(Field::create_field<PrimitiveType::TYPE_VARCHAR>(std::string("key1")));
+        mutable_values->insert(
+                
Field::create_field<PrimitiveType::TYPE_INT>(static_cast<int32_t>(100)));
+        
mutable_keys->insert(Field::create_field<PrimitiveType::TYPE_VARCHAR>(std::string("key2")));
+        mutable_values->insert(
+                
Field::create_field<PrimitiveType::TYPE_INT>(static_cast<int32_t>(200)));
+
+        offsets.push_back(2); // 2 key-value pairs
+        block.insert(ColumnWithTypeAndName(std::move(col), map_type, 
"col_map"));
+    }
+
+    // 27. STRUCT<si:INT, ss:VARCHAR>
+    {
+        DataTypes struct_fields;
+        struct_fields.emplace_back(
+                
make_nullable(DataTypeFactory::instance().create_data_type(TYPE_INT, false)));
+        struct_fields.emplace_back(
+                
make_nullable(DataTypeFactory::instance().create_data_type(TYPE_VARCHAR, 
false)));
+        Strings field_names = {"si", "ss"};
+        DataTypePtr struct_type =
+                make_nullable(std::make_shared<DataTypeStruct>(struct_fields, 
field_names));
+        MutableColumnPtr col = struct_type->create_column();
+        auto* nullable_col = assert_cast<ColumnNullable*>(col.get());
+        auto& nested = nullable_col->get_nested_column();
+        auto& null_map = nullable_col->get_null_map_data();
+        null_map.push_back(0); // non-null
+
+        auto& struct_col = assert_cast<ColumnStruct&>(nested);
+        const auto& fields = struct_col.get_columns();
+        auto mutable_field0 = fields[0]->assume_mutable();
+        auto mutable_field1 = fields[1]->assume_mutable();
+
+        mutable_field0->insert(
+                
Field::create_field<PrimitiveType::TYPE_INT>(static_cast<int32_t>(999)));
+        mutable_field1->insert(
+                
Field::create_field<PrimitiveType::TYPE_VARCHAR>(std::string("struct_val")));
+        block.insert(ColumnWithTypeAndName(std::move(col), struct_type, 
"col_struct"));
+    }
+
+    // 28. VARIANT (JSON object)
+    {
+        DataTypePtr variant_type = 
make_nullable(std::make_shared<DataTypeVariant>());
+        MutableColumnPtr col = variant_type->create_column();
+        auto* nullable_col = assert_cast<ColumnNullable*>(col.get());
+        auto& nested = nullable_col->get_nested_column();
+
+        std::string json_str = R"({"name":"test","value":12345})";
+        std::vector<Slice> slices = {Slice(json_str.data(), json_str.size())};
+
+        auto variant_type_inner = std::make_shared<DataTypeVariant>();
+        auto serde = variant_type_inner->get_serde();
+        auto* variant_serde = assert_cast<DataTypeVariantSerDe*>(serde.get());
+
+        uint64_t num_deserialized = 0;
+        DataTypeSerDe::FormatOptions options;
+        Status st = variant_serde->deserialize_column_from_json_vector(nested, 
slices,
+                                                                       
&num_deserialized, options);
+        EXPECT_TRUE(st.ok()) << st;
+        EXPECT_EQ(1U, num_deserialized);
+
+        auto& null_map = nullable_col->get_null_map_data();
+        null_map.clear();
+        null_map.resize_fill(1, 0);
+
+        block.insert(ColumnWithTypeAndName(std::move(col), variant_type, 
"col_variant"));
+    }
+
+    return block;
+}
+
+// Pre-generated native file path for all types test.
+// The file is stored in: be/test/data/vec/native/all_types_single_row.native
+static std::string get_all_types_native_file_path() {
+    auto root_dir = std::string(getenv("ROOT"));
+    return root_dir + "/be/test/data/vec/native/all_types_single_row.native";
+}
+
+// Generator test: Generate native file with all types (DISABLED by default).
+// Run this test manually to regenerate the test data file:
+//   ./run-be-ut.sh --run --filter=*DISABLED_generate_all_types_native_file*
+// Then copy the generated file to: 
be/test/data/vec/native/all_types_single_row.native
+TEST_F(NativeReaderWriterTest, generate_all_types_native_file) {
+    // Output to current directory, user needs to copy it to test data dir
+    std::string file_path = "./all_types_single_row.native";
+
+    auto fs = io::global_local_filesystem();
+    io::FileWriterPtr file_writer;
+    Status st = fs->create_file(file_path, &file_writer);
+    ASSERT_TRUE(st.ok()) << st;
+
+    RuntimeState state;
+    VExprContextSPtrs exprs;
+    VNativeTransformer transformer(&state, file_writer.get(), exprs, false);
+    st = transformer.open();
+    ASSERT_TRUE(st.ok()) << st;
+
+    Block src_block = create_all_types_test_block();
+    ASSERT_EQ(1U, src_block.rows()) << "Source block should have exactly 1 
row";
+    ASSERT_EQ(28U, src_block.columns()) << "Source block should have 28 
columns for all types";
+
+    st = transformer.write(src_block);
+    ASSERT_TRUE(st.ok()) << st;
+    st = transformer.close();
+    ASSERT_TRUE(st.ok()) << st;
+
+    std::cout << "Generated native file: " << file_path << std::endl;
+    std::cout << "Please copy it to: 
be/test/data/vec/native/all_types_single_row.native"
+              << std::endl;
+}
+
+// Test: read pre-generated native file with all types and verify data
+TEST_F(NativeReaderWriterTest, read_all_types_from_pregenerated_file) {
+    std::string file_path = get_all_types_native_file_path();
+
+    auto fs = io::global_local_filesystem();
+    bool exists = false;
+    Status st = fs->exists(file_path, &exists);
+    DCHECK(exists) << "Pre-generated native file not found: " << file_path
+                   << ". Run generate_all_types_native_file to generate it.";
+
+    RuntimeState state;
+    TFileScanRangeParams scan_params;
+    scan_params.__set_file_type(TFileType::FILE_LOCAL);
+    TFileRangeDesc scan_range;
+    scan_range.__set_path(file_path);
+    scan_range.__set_file_type(TFileType::FILE_LOCAL);
+    NativeReader reader_impl(nullptr, scan_params, scan_range, nullptr, 
&state);
+
+    Block dst_block;
+    size_t read_rows = 0;
+    bool eof = false;
+    st = reader_impl.get_next_block(&dst_block, &read_rows, &eof);
+    ASSERT_TRUE(st.ok()) << st;
+    ASSERT_EQ(1U, read_rows) << "Should read exactly 1 row";
+    ASSERT_EQ(28U, dst_block.columns()) << "Should have 28 columns for all 
types";
+    ASSERT_FALSE(eof);
+
+    // Regenerate expected block and compare
+    Block expected_block = create_all_types_test_block();
+
+    // Verify data equality for the single row
+    for (size_t col = 0; col < expected_block.columns(); ++col) {
+        const auto& src = expected_block.get_by_position(col);
+        const auto& dst = dst_block.get_by_position(col);
+
+        // Type family should match
+        ASSERT_EQ(src.type->get_family_name(), dst.type->get_family_name())
+                << "Type mismatch at col=" << col << " (" << src.name << ")";
+        ASSERT_EQ(src.column->size(), dst.column->size())
+                << "Size mismatch at col=" << col << " (" << src.name << ")";
+
+        // Compare field values
+        auto src_field = (*src.column)[0];
+        auto dst_field = (*dst.column)[0];
+        ASSERT_EQ(src_field, dst_field)
+                << "Value mismatch at col=" << col << " (" << src.name << ")";
+    }
+
+    // Next call should hit EOF
+    Block dst_block2;
+    read_rows = 0;
+    eof = false;
+    st = reader_impl.get_next_block(&dst_block2, &read_rows, &eof);
+    ASSERT_TRUE(st.ok()) << st;
+    ASSERT_TRUE(eof);
+    ASSERT_EQ(read_rows, 0U);
+}
+
+// Test: round-trip all known types with a single row (generates temp file 
each time)
+TEST_F(NativeReaderWriterTest, round_trip_all_types_single_row) {
+    UniqueId uid = UniqueId::gen_uid();
+    std::string uuid = uid.to_string();
+    std::string file_path = "./native_format_all_types_" + uuid + ".native";
+
+    auto fs = io::global_local_filesystem();
+    io::FileWriterPtr file_writer;
+    Status st = fs->create_file(file_path, &file_writer);
+    ASSERT_TRUE(st.ok()) << st;
+
+    RuntimeState state;
+    VExprContextSPtrs exprs;
+    VNativeTransformer transformer(&state, file_writer.get(), exprs, false);
+    st = transformer.open();
+    ASSERT_TRUE(st.ok()) << st;
+
+    Block src_block = create_all_types_test_block();
+    ASSERT_EQ(1U, src_block.rows()) << "Source block should have exactly 1 
row";
+    ASSERT_EQ(28U, src_block.columns()) << "Source block should have 28 
columns for all types";
+
+    st = transformer.write(src_block);
+    ASSERT_TRUE(st.ok()) << st;
+    st = transformer.close();
+    ASSERT_TRUE(st.ok()) << st;
+
+    // Read back via NativeReader
+    TFileScanRangeParams scan_params;
+    scan_params.__set_file_type(TFileType::FILE_LOCAL);
+    TFileRangeDesc scan_range;
+    scan_range.__set_path(file_path);
+    scan_range.__set_file_type(TFileType::FILE_LOCAL);
+    NativeReader reader_impl(nullptr, scan_params, scan_range, nullptr, 
&state);
+
+    Block dst_block;
+    size_t read_rows = 0;
+    bool eof = false;
+    st = reader_impl.get_next_block(&dst_block, &read_rows, &eof);
+    ASSERT_TRUE(st.ok()) << st;
+    ASSERT_EQ(src_block.rows(), read_rows);
+    ASSERT_EQ(src_block.columns(), dst_block.columns());
+    ASSERT_FALSE(eof);
+
+    // Verify data equality for the single row
+    for (size_t col = 0; col < src_block.columns(); ++col) {
+        const auto& src = src_block.get_by_position(col);
+        const auto& dst = dst_block.get_by_position(col);
+
+        // Type family should match
+        ASSERT_EQ(src.type->get_family_name(), dst.type->get_family_name())
+                << "Type mismatch at col=" << col << " (" << src.name << ")";
+        ASSERT_EQ(src.column->size(), dst.column->size())
+                << "Size mismatch at col=" << col << " (" << src.name << ")";
+
+        // Compare field values
+        auto src_field = (*src.column)[0];
+        auto dst_field = (*dst.column)[0];
+        ASSERT_EQ(src_field, dst_field)
+                << "Value mismatch at col=" << col << " (" << src.name << ")";
+    }
+
+    // Next call should hit EOF
+    Block dst_block2;
+    read_rows = 0;
+    eof = false;
+    st = reader_impl.get_next_block(&dst_block2, &read_rows, &eof);
+    ASSERT_TRUE(st.ok()) << st;
+    ASSERT_TRUE(eof);
+    ASSERT_EQ(read_rows, 0U);
+
+    // Clean up temp file
+    bool exists = false;
+    st = fs->exists(file_path, &exists);
+    if (st.ok() && exists) {
+        static_cast<void>(fs->delete_file(file_path));
+    }
+}
+
+} // namespace doris::vectorized
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/util/FileFormatConstants.java
 
b/fe/fe-core/src/main/java/org/apache/doris/common/util/FileFormatConstants.java
index 62345cd811d..774ee4e6e83 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/common/util/FileFormatConstants.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/common/util/FileFormatConstants.java
@@ -30,6 +30,7 @@ public class FileFormatConstants {
     public static final String FORMAT_AVRO = "avro";
     public static final String FORMAT_WAL = "wal";
     public static final String FORMAT_ARROW = "arrow";
+    public static final String FORMAT_NATIVE = "native";
 
     public static final String PROP_FORMAT = "format";
     public static final String PROP_COLUMN_SEPARATOR = "column_separator";
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/Util.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/util/Util.java
index b95f35ffb60..ae75e4dd68f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/Util.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/Util.java
@@ -576,6 +576,9 @@ public class Util {
             return TFileFormatType.FORMAT_WAL;
         } else if (lowerFileFormat.equals(FileFormatConstants.FORMAT_ARROW)) {
             return TFileFormatType.FORMAT_ARROW;
+        } else if (lowerFileFormat.equals(FileFormatConstants.FORMAT_NATIVE)) {
+            // Doris Native binary columnar format
+            return TFileFormatType.FORMAT_NATIVE;
         } else {
             return TFileFormatType.FORMAT_UNKNOWN;
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/FileFormatProperties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/FileFormatProperties.java
index 9706982c1ba..ac2512d88b5 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/FileFormatProperties.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/FileFormatProperties.java
@@ -39,6 +39,7 @@ public abstract class FileFormatProperties {
     public static final String FORMAT_AVRO = "avro";
     public static final String FORMAT_WAL = "wal";
     public static final String FORMAT_ARROW = "arrow";
+    public static final String FORMAT_NATIVE = "native";
     public static final String PROP_COMPRESS_TYPE = "compress_type";
 
     protected String formatName;
@@ -102,6 +103,8 @@ public abstract class FileFormatProperties {
                 return new WalFileFormatProperties();
             case FORMAT_ARROW:
                 return new ArrowFileFormatProperties();
+            case FORMAT_NATIVE:
+                return new NativeFileFormatProperties();
             default:
                 throw new AnalysisException("format:" + formatString + " is 
not supported.");
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/NativeFileFormatProperties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/NativeFileFormatProperties.java
new file mode 100644
index 00000000000..fe4306906d4
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/NativeFileFormatProperties.java
@@ -0,0 +1,65 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.property.fileformat;
+
+import org.apache.doris.nereids.exceptions.AnalysisException;
+import org.apache.doris.thrift.TFileAttributes;
+import org.apache.doris.thrift.TFileFormatType;
+import org.apache.doris.thrift.TFileTextScanRangeParams;
+import org.apache.doris.thrift.TResultFileSinkOptions;
+
+import java.util.Map;
+
+/**
+ * File format properties for Doris Native binary columnar format.
+ *
+ * This format is intended for high-performance internal data exchange
+ */
+
+public class NativeFileFormatProperties extends FileFormatProperties {
+
+    public NativeFileFormatProperties() {
+        super(TFileFormatType.FORMAT_NATIVE, 
FileFormatProperties.FORMAT_NATIVE);
+    }
+
+    @Override
+    public void analyzeFileFormatProperties(Map<String, String> 
formatProperties,
+                                            boolean isRemoveOriginProperty)
+            throws AnalysisException {
+        // Currently no extra user visible properties for native format.
+        // Just ignore all other properties gracefully.
+    }
+
+    @Override
+    public void fullTResultFileSinkOptions(TResultFileSinkOptions sinkOptions) 
{
+        // No extra sink options are required for native format.
+    }
+
+    @Override
+    public TFileAttributes toTFileAttributes() {
+        // For now we don't need text params for Native format, but 
TFileAttributes
+        // requires a text_params field to be non-null on BE side.
+        TFileAttributes fileAttributes = new TFileAttributes();
+        TFileTextScanRangeParams textParams = new TFileTextScanRangeParams();
+        fileAttributes.setTextParams(textParams);
+        return fileAttributes;
+    }
+}
+
+
+
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsLoadScanProvider.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsLoadScanProvider.java
index 52f5b3713a8..4ec74e001d3 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsLoadScanProvider.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsLoadScanProvider.java
@@ -368,7 +368,10 @@ public class NereidsLoadScanProvider {
                 }
             } else {
                 Column slotColumn;
-                if (fileGroup.getFileFormatProperties().getFileFormatType() == 
TFileFormatType.FORMAT_ARROW) {
+                TFileFormatType fileFormatType = 
fileGroup.getFileFormatProperties().getFileFormatType();
+                // Use real column type for arrow/native format, other formats 
read as varchar first
+                if (fileFormatType == TFileFormatType.FORMAT_ARROW
+                        || fileFormatType == TFileFormatType.FORMAT_NATIVE) {
                     slotColumn = new Column(realColName, 
colToType.get(realColName), true);
                 } else {
                     if (fileGroupInfo.getUniqueKeyUpdateMode() == 
TUniqueKeyUpdateMode.UPDATE_FLEXIBLE_COLUMNS
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
index e08eaa2c825..3dc8468e846 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
@@ -31,6 +31,7 @@ import org.apache.doris.catalog.StructField;
 import org.apache.doris.catalog.StructType;
 import org.apache.doris.catalog.Table;
 import org.apache.doris.catalog.Type;
+import org.apache.doris.catalog.VariantType;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.ErrorCode;
@@ -359,7 +360,8 @@ public abstract class ExternalFileTableValuedFunction 
extends TableValuedFunctio
      * @return column type and the number of parsed PTypeNodes
      */
     private Pair<Type, Integer> getColumnType(List<PTypeNode> typeNodes, int 
start) {
-        PScalarType columnType = typeNodes.get(start).getScalarType();
+        PTypeNode typeNode = typeNodes.get(start);
+        PScalarType columnType = typeNode.getScalarType();
         TPrimitiveType tPrimitiveType = 
TPrimitiveType.findByValue(columnType.getType());
         Type type;
         int parsedNodes;
@@ -391,6 +393,16 @@ public abstract class ExternalFileTableValuedFunction 
extends TableValuedFunctio
                 parsedNodes += fieldType.value();
             }
             type = new StructType(fields);
+        } else if (tPrimitiveType == TPrimitiveType.VARIANT) {
+            // Preserve VARIANT-specific properties from PTypeNode, especially 
variant_max_subcolumns_count.
+            int maxSubcolumns = typeNode.getVariantMaxSubcolumnsCount();
+            // Currently no predefined fields are carried in PTypeNode for 
VARIANT, so use empty list and default
+            // values for other properties.
+            type = new VariantType(new ArrayList<>(), maxSubcolumns,
+                    /*enableTypedPathsToSparse*/ false,
+                    /*variantMaxSparseColumnStatisticsSize*/ 10000,
+                    /*variantSparseHashShardCount*/ 0);
+            parsedNodes = 1;
         } else {
             type = 
ScalarType.createType(PrimitiveType.fromThrift(tPrimitiveType),
                     columnType.getLen(), columnType.getPrecision(), 
columnType.getScale());
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index 0f9ccbf8b01..4435423f5c5 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -108,7 +108,8 @@ enum TFileFormatType {
     FORMAT_CSV_SNAPPYBLOCK = 14,
     FORMAT_WAL = 15,
     FORMAT_ARROW = 16,
-    FORMAT_TEXT = 17
+    FORMAT_TEXT = 17,
+    FORMAT_NATIVE = 18
 }
 
 // In previous versions, the data compression format and file format were 
stored together, as TFileFormatType,
diff --git 
a/regression-test/data/export_p0/outfile/native/test_outfile_native.out 
b/regression-test/data/export_p0/outfile/native/test_outfile_native.out
new file mode 100644
index 00000000000..fc60340a7e1
--- /dev/null
+++ b/regression-test/data/export_p0/outfile/native/test_outfile_native.out
@@ -0,0 +1,25 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !select_default --
+1      2024-01-01      2024-01-01T00:00        s1      1       1       true    
1.1
+2      2024-01-01      2024-01-01T00:00        s2      2       2       true    
2.2
+3      2024-01-01      2024-01-01T00:00        s3      3       3       true    
3.3
+4      2024-01-01      2024-01-01T00:00        s4      4       4       true    
4.4
+5      2024-01-01      2024-01-01T00:00        s5      5       5       true    
5.5
+6      2024-01-01      2024-01-01T00:00        s6      6       6       true    
6.6
+7      2024-01-01      2024-01-01T00:00        s7      7       7       true    
7.7
+8      2024-01-01      2024-01-01T00:00        s8      8       8       true    
8.800000000000001
+9      2024-01-01      2024-01-01T00:00        s9      9       9       true    
9.9
+10     2024-01-01      2024-01-01T00:00        s10     10      10      true    
10.1
+
+-- !select_s3_native --
+1      2024-01-01      2024-01-01T00:00        s1      1       1       true    
1.1
+2      2024-01-01      2024-01-01T00:00        s2      2       2       true    
2.2
+3      2024-01-01      2024-01-01T00:00        s3      3       3       true    
3.3
+4      2024-01-01      2024-01-01T00:00        s4      4       4       true    
4.4
+5      2024-01-01      2024-01-01T00:00        s5      5       5       true    
5.5
+6      2024-01-01      2024-01-01T00:00        s6      6       6       true    
6.6
+7      2024-01-01      2024-01-01T00:00        s7      7       7       true    
7.7
+8      2024-01-01      2024-01-01T00:00        s8      8       8       true    
8.800000000000001
+9      2024-01-01      2024-01-01T00:00        s9      9       9       true    
9.9
+10     2024-01-01      2024-01-01T00:00        s10     10      10      true    
10.1
+
diff --git 
a/regression-test/suites/export_p0/outfile/native/test_outfile_native.groovy 
b/regression-test/suites/export_p0/outfile/native/test_outfile_native.groovy
new file mode 100644
index 00000000000..334fc809b88
--- /dev/null
+++ b/regression-test/suites/export_p0/outfile/native/test_outfile_native.groovy
@@ -0,0 +1,100 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.codehaus.groovy.runtime.IOGroovyMethods
+
+suite("test_outfile_native", "p0") {
+    // open nereids
+    sql """ set enable_nereids_planner=true """
+    sql """ set enable_fallback_to_original_planner=false """
+
+    String ak = getS3AK()
+    String sk = getS3SK()
+    String s3_endpoint = getS3Endpoint()
+    String region = getS3Region()
+    String bucket = context.config.otherConfigs.get("s3BucketName");
+
+    def tableName = "outfile_native_test"
+    def outFilePath = "${bucket}/outfile/native/exp_"
+
+    // Export helper: write to S3 and return the URL output by FE
+    def outfile_to_s3 = {
+        def res = sql """
+            SELECT * FROM ${tableName} t ORDER BY id
+            INTO OUTFILE "s3://${outFilePath}"
+            FORMAT AS native
+            PROPERTIES (
+                "s3.endpoint" = "${s3_endpoint}",
+                "s3.region" = "${region}",
+                "s3.secret_key"="${sk}",
+                "s3.access_key" = "${ak}"
+            );
+        """
+        return res[0][3]
+    }
+
+    try {
+        sql """ DROP TABLE IF EXISTS ${tableName} """
+        sql """
+        CREATE TABLE IF NOT EXISTS ${tableName} (
+            `id` INT NOT NULL,
+            `c_date` DATE NOT NULL,
+            `c_dt` DATETIME NOT NULL,
+            `c_str` VARCHAR(20),
+            `c_int` INT,
+            `c_tinyint` TINYINT,
+            `c_bool` boolean,
+            `c_double` double
+        )
+        DISTRIBUTED BY HASH(id) PROPERTIES("replication_num" = "1");
+        """
+
+        // Insert 10 rows of test data (the last row is all NULL)
+        StringBuilder sb = new StringBuilder()
+        int i = 1
+        for (; i < 10000; i ++) {
+            sb.append("""
+                (${i}, '2024-01-01', '2024-01-01 00:00:00', 's${i}', ${i}, ${i 
% 128}, true, ${i}.${i}),
+            """)
+        }
+        sb.append("""
+                (${i}, '2024-01-01', '2024-01-01 00:00:00', NULL, NULL, NULL, 
NULL, NULL)
+            """)
+        sql """ INSERT INTO ${tableName} VALUES ${sb.toString()} """
+
+        // baseline: local table query result
+        qt_select_default """ SELECT * FROM ${tableName} t ORDER BY id limit 
10; """
+
+        // 1) 导出为 Native 文件到 S3
+        def outfileUrl = outfile_to_s3()
+
+        // 2) 从 S3 使用 S3 TVF (format=native) 查询回数据,并与 baseline 对比
+        // outfileUrl 形如:s3://bucket/outfile/native/exp_xxx_* ,需要去掉 
"s3://bucket" 前缀和末尾的 '*'
+        qt_select_s3_native """ SELECT * FROM S3 (
+                "uri" = "http://${bucket}.${s3_endpoint}${
+                        outfileUrl.substring(5 + bucket.length(), 
outfileUrl.length() - 1)
+                    }0.native",
+                "ACCESS_KEY"= "${ak}",
+                "SECRET_KEY" = "${sk}",
+                "format" = "native",
+                "region" = "${region}"
+            ) order by id limit 10;
+            """
+    } finally {
+        try_sql("DROP TABLE IF EXISTS ${tableName}")
+    }
+}
\ No newline at end of file
diff --git 
a/regression-test/suites/export_p0/test_export_variant_10k_columns.groovy 
b/regression-test/suites/export_p0/test_export_variant_10k_columns.groovy
new file mode 100644
index 00000000000..911d821263e
--- /dev/null
+++ b/regression-test/suites/export_p0/test_export_variant_10k_columns.groovy
@@ -0,0 +1,215 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import java.io.File
+import org.awaitility.Awaitility
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+suite("test_export_variant_10k_columns", "p0") {
+    // open nereids
+    sql """ set enable_nereids_planner=true """
+    sql """ set enable_fallback_to_original_planner=false """
+
+    String ak = getS3AK()
+    String sk = getS3SK()
+    String s3_endpoint = getS3Endpoint()
+    String region = getS3Region()
+    // String bucket = context.config.otherConfigs.get("s3BucketName");
+    String bucket = getS3BucketName()
+
+    def table_export_name = "test_export_variant_10k"
+    def table_load_name = "test_load_variant_10k"
+    def outfile_path_prefix = """${bucket}/export/p0/variant_10k/exp"""
+
+    def waiting_export = { export_label ->
+        while (true) {
+            def res = sql """ show export where label = "${export_label}" """
+            logger.info("export state: " + res[0][2])
+            if (res[0][2] == "FINISHED") {
+                def json = parseJson(res[0][11])
+                assert json instanceof List
+                // assertEquals("1", json.fileNumber[0][0])
+                log.info("outfile_path: ${json.url[0][0]}")
+                return json.url[0][0];
+            } else if (res[0][2] == "CANCELLED") {
+                throw new IllegalStateException("""export failed: 
${res[0][10]}""")
+            } else {
+                sleep(5000)
+            }
+        }
+    }
+
+    // 1. Create table with variant column
+    sql """ DROP TABLE IF EXISTS ${table_export_name} """
+    sql """
+    CREATE TABLE IF NOT EXISTS ${table_export_name} (
+        `id` INT NOT NULL,
+        `v` VARIANT<PROPERTIES ("variant_max_subcolumns_count" = "2048")> NULL
+    )
+    DISTRIBUTED BY HASH(id) PROPERTIES("replication_num" = "1");
+    """
+
+    // 2. Generate data with 10000 keys in variant
+    // Generate N=100,000 rows.
+    // Total 10,000 columns, but each row only has 50 columns (sparse).
+    // This simulates a realistic sparse wide table scenario.
+
+    File dataFile = File.createTempFile("variant_10k_data", ".json")
+    dataFile.deleteOnExit()
+    int num_rows = 1000
+    try {
+        dataFile.withWriter { writer ->
+            StringBuilder sb = new StringBuilder()
+            for (int i = 1; i <= num_rows; i++) {
+                sb.setLength(0)
+                sb.append("{\"id\": ").append(i).append(", \"v\": {")
+                // Select 50 keys out of 10000 for each row
+                for (int k = 0; k < 50; k++) {
+                    if (k > 0) sb.append(", ")
+                    // Scatter the keys to ensure coverage of all 10000 
columns across rows
+                    int keyIdx = (i + k * 200) % 10000
+                    sb.append('"k').append(keyIdx).append('":').append(i)
+                }
+                sb.append("}}\n")
+                writer.write(sb.toString())
+            }
+        }
+
+        // 3. Stream Load
+        streamLoad {
+            table table_export_name
+            set 'format', 'json'
+            set 'read_json_by_line', 'true'
+            file dataFile.getAbsolutePath()
+            time 60000 // 60s
+            check { result, exception, startTime, endTime ->
+                if (exception != null) {
+                    throw exception
+                }
+                log.info("Stream load result: ${result}".toString())
+                def json = parseJson(result)
+                assertEquals("Success", json.Status)
+                assertEquals(num_rows, json.NumberTotalRows)
+                assertEquals(num_rows, json.NumberLoadedRows)
+            }
+        }
+    } finally {
+        dataFile.delete()
+    }
+
+    // def format = "parquet"
+    def format = "native"
+
+    // 4. Export to S3 (Parquet)
+    def uuid = UUID.randomUUID().toString()
+    // def outFilePath = """/tmp/variant_10k_export"""
+    def outFilePath = """${outfile_path_prefix}_${uuid}"""
+    def label = "label_${uuid}"
+
+    try {
+        sql """
+            EXPORT TABLE ${table_export_name} TO "s3://${outFilePath}/"
+            PROPERTIES(
+                "label" = "${label}",
+                "format" = "${format}"
+            )
+            WITH S3(
+                "s3.endpoint" = "${s3_endpoint}",
+                "s3.region" = "${region}",
+                "s3.secret_key"="${sk}",
+                "s3.access_key" = "${ak}",
+                "provider" = "${getS3Provider()}"
+            );
+        """
+        
+        long startExport = System.currentTimeMillis()
+        def outfile_url = waiting_export.call(label)
+        long endExport = System.currentTimeMillis()
+        logger.info("Export ${num_rows} rows with variant took ${endExport - 
startExport} ms")
+        
+        // 5. Validate by S3 TVF
+        def s3_tvf_sql = """ s3(
+                "uri" = 
"http://${bucket}.${s3_endpoint}${outfile_url.substring(5 + bucket.length(), 
outfile_url.length() - 1)}0.${format}",
+                "s3.access_key"= "${ak}",
+                "s3.secret_key" = "${sk}",
+                "format" = "${format}",
+                "provider" = "${getS3Provider()}",
+                "region" = "${region}"
+            ) """
+
+        // Verify row count
+        def res = sql """ SELECT count(*) FROM ${s3_tvf_sql} """
+        assertEquals(num_rows, res[0][0])
+
+        def value_type = "VARIANT<PROPERTIES (\"variant_max_subcolumns_count\" 
= \"2048\")>"
+        if (new Random().nextInt(2) == 0) {
+            value_type = "text"
+        }
+        // 6. Load back into Doris (to a new table) to verify import 
performance/capability
+        sql """ DROP TABLE IF EXISTS ${table_load_name} """
+        sql """
+        CREATE TABLE ${table_load_name} (
+            `id` INT,
+            `v` ${value_type}
+        )
+        DISTRIBUTED BY HASH(id) PROPERTIES("replication_num" = "1");
+        """
+        
+        // Use LOAD LABEL. The label contains '-' from UUID, so it must be 
quoted with back-quotes.
+        def load_label = "load_${uuid}"
+        
+        sql """
+            LOAD LABEL `${load_label}`
+            (
+                DATA INFILE("s3://${outFilePath}/*")
+                INTO TABLE ${table_load_name}
+                FORMAT AS "${format}"
+                (id, v)
+            )
+            WITH S3 (
+                "s3.endpoint" = "${s3_endpoint}",
+                "s3.region" = "${region}",
+                "s3.secret_key"="${sk}",
+                "s3.access_key" = "${ak}",
+                "provider" = "${getS3Provider()}"
+            );
+        """
+        
+        Awaitility.await().atMost(240, SECONDS).pollInterval(5, 
SECONDS).until({
+            def loadResult = sql """
+           show load where label = '${load_label}'
+           """
+            if (loadResult.get(0).get(2) == 'CANCELLED' || 
loadResult.get(0).get(2) == 'FAILED') {
+                println("load failed: " + loadResult.get(0))
+                throw new RuntimeException("load failed"+ loadResult.get(0))
+            }
+            return loadResult.get(0).get(2) == 'FINISHED'
+        }) 
+        // Check if data loaded
+        def load_count = sql """ SELECT count(*) FROM ${table_load_name} """
+        assertEquals(num_rows, load_count[0][0])
+
+        // Check variant data integrity (sample)
+        // Row 1 has keys: (1 + k*200) % 10000. For k=0, key is k1. Value is 1.
+        def check_v = sql """ SELECT cast(v['k1'] as int) FROM 
${table_load_name} WHERE id = 1 """
+        assertEquals(1, check_v[0][0])
+
+    } finally {
+        // try_sql("DROP TABLE IF EXISTS ${table_export_name}")
+        // try_sql("DROP TABLE IF EXISTS ${table_load_name}")
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to