This is an automated email from the ASF dual-hosted git repository.

suxiaogang223 pushed a commit to branch refact_reader_branch
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/refact_reader_branch by this 
push:
     new b8a40bd22a1 [feature](be) Support parquet repeated level assembly
b8a40bd22a1 is described below

commit b8a40bd22a1bc56ddce95b11795f05c81234e077
Author: Socrates <[email protected]>
AuthorDate: Thu May 28 15:48:59 2026 +0800

    [feature](be) Support parquet repeated level assembly
    
    ### What problem does this PR solve?
    
    Issue Number: close #xxx
    
    Related PR: #xxx
    
    Problem Summary: Add shared LIST/MAP level assembly for scalar nested 
Parquet children, including null parent rows, empty collections, nullable 
element/value slots, overflow buffering, and parent-row skip/select semantics.
    
    ### Release note
    
    None
    
    ### Check List (For Author)
    
    - Test: Manual test
        - Ran git diff --check. BE DEBUG build will be run on Fedora after push.
    - Behavior changed: Yes. New Parquet reader can read LIST/MAP scalar 
children with null/empty/nullable-child cases.
    - Does this need documentation: No
---
 be/src/format/new_parquet/column_reader.cpp        | 689 ++++++++++++++++-----
 .../new_parquet/parquet_column_reader_test.cpp     | 322 ++++++++++
 2 files changed, 854 insertions(+), 157 deletions(-)

diff --git a/be/src/format/new_parquet/column_reader.cpp 
b/be/src/format/new_parquet/column_reader.cpp
index 4d4545f5d80..b4cc00caa98 100644
--- a/be/src/format/new_parquet/column_reader.cpp
+++ b/be/src/format/new_parquet/column_reader.cpp
@@ -33,6 +33,7 @@
 #include "core/column/column.h"
 #include "core/column/column_array.h"
 #include "core/column/column_map.h"
+#include "core/column/column_nullable.h"
 #include "core/column/column_struct.h"
 #include "core/column/column_vector.h"
 #include "core/data_type/data_type_array.h"
@@ -46,6 +47,27 @@
 namespace doris::parquet {
 namespace {
 
+constexpr int64_t NESTED_READ_BATCH_ROWS = 4096;
+
+struct NestedScalarBatch {
+    int64_t records_read = 0;
+    int64_t levels_written = 0;
+    int64_t values_written = 0;
+    std::vector<int16_t> def_levels;
+    std::vector<int16_t> rep_levels;
+    std::vector<int64_t> value_indices;
+    MutableColumnPtr values_column;
+
+    bool empty() const { return levels_written == 0; }
+};
+
+struct NestedScalarOverflow {
+    NestedScalarBatch batch;
+
+    bool empty() const { return batch.empty(); }
+    void clear() { batch = NestedScalarBatch(); }
+};
+
 class ScalarColumnReader final : public ParquetColumnReader {
 public:
     ScalarColumnReader(int parquet_leaf_column_id, const 
::parquet::ColumnDescriptor* descriptor,
@@ -112,6 +134,7 @@ public:
     ListColumnReader(const ParquetColumnSchema& schema, DataTypePtr type,
                      std::unique_ptr<ParquetColumnReader> element_reader)
             : _field_id(schema.top_level_field_id),
+              _nullable_definition_level(schema.nullable_definition_level),
               _repeated_repetition_level(schema.repeated_repetition_level),
               _type(std::move(type)),
               _name(schema.name),
@@ -127,10 +150,12 @@ public:
 
 private:
     int _field_id = -1;
+    int16_t _nullable_definition_level = 0;
     int16_t _repeated_repetition_level = 0;
     DataTypePtr _type;
     std::string _name;
     std::unique_ptr<ParquetColumnReader> _element_reader;
+    NestedScalarOverflow _element_overflow;
 };
 
 class RowPositionColumnReader final : public ParquetColumnReader {
@@ -185,6 +210,7 @@ public:
                     std::unique_ptr<ParquetColumnReader> key_reader,
                     std::unique_ptr<ParquetColumnReader> value_reader)
             : _field_id(schema.top_level_field_id),
+              _nullable_definition_level(schema.nullable_definition_level),
               _repeated_repetition_level(schema.repeated_repetition_level),
               _type(std::move(type)),
               _name(schema.name),
@@ -201,11 +227,14 @@ public:
 
 private:
     int _field_id = -1;
+    int16_t _nullable_definition_level = 0;
     int16_t _repeated_repetition_level = 0;
     DataTypePtr _type;
     std::string _name;
     std::unique_ptr<ParquetColumnReader> _key_reader;
     std::unique_ptr<ParquetColumnReader> _value_reader;
+    NestedScalarOverflow _key_overflow;
+    NestedScalarOverflow _value_overflow;
 };
 
 Status read_records(ScalarColumnReader& column_reader, int64_t batch_rows,
@@ -410,6 +439,231 @@ Status append_scalar_values(const ScalarColumnReader& 
column_reader,
     return Status::OK();
 }
 
+Status read_nested_scalar_batch(ScalarColumnReader& column_reader, int64_t 
batch_rows,
+                                int16_t value_slot_definition_level, 
NestedScalarBatch* batch) {
+    if (batch == nullptr) {
+        return Status::InvalidArgument("Nested scalar batch is null for column 
{}",
+                                       column_reader.name());
+    }
+    *batch = NestedScalarBatch();
+
+    ::parquet::internal::RecordReader* record_reader = nullptr;
+    RETURN_IF_ERROR(read_records(column_reader, batch_rows, &record_reader, 
&batch->records_read));
+    if (column_reader.type()->is_nullable() && 
record_reader->read_dense_for_nullable()) {
+        return Status::NotSupported(
+                "Dense nullable parquet nested reader is not supported for 
column {}",
+                column_reader.name());
+    }
+    batch->levels_written = record_reader->levels_written();
+    batch->values_written = record_reader->values_written();
+    if (batch->levels_written < batch->records_read || batch->values_written < 
0 ||
+        batch->values_written > batch->levels_written) {
+        return Status::Corruption(
+                "Invalid nested parquet read result for column {}: rows={}, 
levels={}, values={}",
+                column_reader.name(), batch->records_read, 
batch->levels_written,
+                batch->values_written);
+    }
+    if (batch->levels_written == 0) {
+        return Status::OK();
+    }
+
+    auto* def_levels = record_reader->def_levels();
+    if (def_levels == nullptr && 
column_reader.descriptor()->max_definition_level() > 0) {
+        return Status::Corruption(
+                "Nested parquet reader returned null definition levels for 
column {}",
+                column_reader.name());
+    }
+    batch->def_levels.resize(static_cast<size_t>(batch->levels_written));
+    if (def_levels == nullptr) {
+        std::fill(batch->def_levels.begin(), batch->def_levels.end(),
+                  column_reader.descriptor()->max_definition_level());
+    } else {
+        std::copy(def_levels, def_levels + batch->levels_written, 
batch->def_levels.begin());
+    }
+
+    auto* rep_levels = record_reader->rep_levels();
+    if (rep_levels == nullptr && 
column_reader.descriptor()->max_repetition_level() > 0) {
+        return Status::Corruption(
+                "Nested parquet reader returned null repetition levels for 
column {}",
+                column_reader.name());
+    }
+    batch->rep_levels.resize(static_cast<size_t>(batch->levels_written));
+    if (rep_levels == nullptr) {
+        std::fill(batch->rep_levels.begin(), batch->rep_levels.end(), 0);
+    } else {
+        std::copy(rep_levels, rep_levels + batch->levels_written, 
batch->rep_levels.begin());
+    }
+
+    batch->value_indices.resize(static_cast<size_t>(batch->levels_written), 
-1);
+    int64_t value_idx = 0;
+    const int16_t max_definition_level = 
column_reader.descriptor()->max_definition_level();
+    NullMap value_null_map;
+    for (int64_t level_idx = 0; level_idx < batch->levels_written; 
++level_idx) {
+        if (batch->def_levels[level_idx] >= value_slot_definition_level) {
+            if (value_idx >= batch->values_written) {
+                return Status::Corruption(
+                        "Nested parquet reader returned fewer values than 
definition levels for "
+                        "column {}",
+                        column_reader.name());
+            }
+            batch->value_indices[level_idx] = value_idx++;
+            if (column_reader.type()->is_nullable()) {
+                value_null_map.push_back(batch->def_levels[level_idx] != 
max_definition_level);
+            }
+        }
+    }
+    if (value_idx != batch->values_written) {
+        return Status::Corruption(
+                "Nested parquet reader returned extra values for column {}: 
consumed={}, values={}",
+                column_reader.name(), value_idx, batch->values_written);
+    }
+    if (column_reader.type()->is_nullable() &&
+        value_null_map.size() != static_cast<size_t>(batch->values_written)) {
+        return Status::Corruption("Invalid nested parquet null map for column 
{}",
+                                  column_reader.name());
+    }
+
+    batch->values_column = column_reader.type()->create_column();
+    if (batch->values_written > 0) {
+        const NullMap* null_map = value_null_map.empty() ? nullptr : 
&value_null_map;
+        RETURN_IF_ERROR(append_scalar_values(column_reader, *record_reader, 
batch->values_written,
+                                             null_map, batch->values_column));
+    }
+    return Status::OK();
+}
+
+void move_nested_scalar_tail(const NestedScalarBatch& src, int64_t start_level,
+                             NestedScalarOverflow* overflow) {
+    DORIS_CHECK(overflow != nullptr);
+    if (start_level >= src.levels_written) {
+        overflow->clear();
+        return;
+    }
+
+    NestedScalarBatch dst;
+    dst.records_read = 0;
+    dst.levels_written = src.levels_written - start_level;
+    dst.def_levels.assign(src.def_levels.begin() + start_level, 
src.def_levels.end());
+    dst.rep_levels.assign(src.rep_levels.begin() + start_level, 
src.rep_levels.end());
+    dst.value_indices.resize(static_cast<size_t>(dst.levels_written), -1);
+    dst.values_column = src.values_column->clone_empty();
+
+    for (int64_t level_idx = start_level; level_idx < src.levels_written; 
++level_idx) {
+        const int64_t value_idx = src.value_indices[level_idx];
+        if (value_idx < 0) {
+            continue;
+        }
+        dst.value_indices[static_cast<size_t>(level_idx - start_level)] = 
dst.values_written;
+        dst.values_column->insert_from(*src.values_column, 
static_cast<size_t>(value_idx));
+        dst.values_written++;
+    }
+    overflow->batch = std::move(dst);
+}
+
+Status append_scalar_batch_value(const ScalarColumnReader& column_reader,
+                                 const NestedScalarBatch& batch, int64_t 
level_idx,
+                                 MutableColumnPtr& column) {
+    const int64_t value_idx = batch.value_indices[level_idx];
+    if (value_idx < 0) {
+        return Status::Corruption("Nested parquet value is absent for column 
{}",
+                                  column_reader.name());
+    }
+    column->insert_from(*batch.values_column, static_cast<size_t>(value_idx));
+    return Status::OK();
+}
+
+ColumnArray* array_column_from_output(MutableColumnPtr& column) {
+    if (auto* nullable_column = check_and_get_column<ColumnNullable>(*column)) 
{
+        return 
assert_cast<ColumnArray*>(&nullable_column->get_nested_column());
+    }
+    return assert_cast<ColumnArray*>(column.get());
+}
+
+ColumnMap* map_column_from_output(MutableColumnPtr& column) {
+    if (auto* nullable_column = check_and_get_column<ColumnNullable>(*column)) 
{
+        return assert_cast<ColumnMap*>(&nullable_column->get_nested_column());
+    }
+    return assert_cast<ColumnMap*>(column.get());
+}
+
+NullMap* null_map_from_nullable_output(MutableColumnPtr& column) {
+    if (auto* nullable_column = check_and_get_column<ColumnNullable>(*column)) 
{
+        return &nullable_column->get_null_map_data();
+    }
+    return nullptr;
+}
+
+void append_offsets(ColumnArray::Offsets64& offsets, const 
std::vector<uint64_t>& entry_counts) {
+    offsets.reserve(offsets.size() + entry_counts.size());
+    uint64_t current_offset = offsets.empty() ? 0 : offsets.back();
+    for (const auto entry_count : entry_counts) {
+        current_offset += entry_count;
+        offsets.push_back(current_offset);
+    }
+}
+
+void append_parent_nulls(NullMap* dst, const NullMap& src) {
+    if (dst == nullptr) {
+        return;
+    }
+    dst->insert(src.begin(), src.end());
+}
+
+template <typename Sink>
+Status assemble_repeated_levels(ScalarColumnReader& driver_reader, int16_t 
repeated_level,
+                                int16_t value_slot_definition_level, int64_t 
rows,
+                                NestedScalarOverflow* overflow, Sink& sink, 
int64_t* rows_read) {
+    if (overflow == nullptr || rows_read == nullptr) {
+        return Status::InvalidArgument("Invalid repeated level assembler 
arguments for column {}",
+                                       driver_reader.name());
+    }
+    *rows_read = 0;
+    while (*rows_read < rows) {
+        NestedScalarBatch read_batch;
+        NestedScalarBatch* batch = nullptr;
+        bool from_overflow = false;
+        if (!overflow->empty()) {
+            batch = &overflow->batch;
+            from_overflow = true;
+        } else {
+            const int64_t batch_rows = std::max<int64_t>(rows - *rows_read, 
NESTED_READ_BATCH_ROWS);
+            RETURN_IF_ERROR(read_nested_scalar_batch(driver_reader, batch_rows,
+                                                     
value_slot_definition_level, &read_batch));
+            if (read_batch.empty()) {
+                break;
+            }
+            batch = &read_batch;
+        }
+        RETURN_IF_ERROR(sink.start_batch(*batch));
+
+        int64_t level_idx = 0;
+        while (level_idx < batch->levels_written) {
+            const bool starts_parent = batch->rep_levels[level_idx] < 
repeated_level;
+            if (starts_parent && *rows_read >= rows) {
+                move_nested_scalar_tail(*batch, level_idx, overflow);
+                return Status::OK();
+            }
+            if (starts_parent) {
+                RETURN_IF_ERROR(sink.start_parent(*batch, level_idx));
+                ++*rows_read;
+            } else {
+                if (*rows_read == 0) {
+                    return Status::Corruption(
+                            "Repeated parquet stream starts with repeated 
level for column {}",
+                            driver_reader.name());
+                }
+                RETURN_IF_ERROR(sink.append_repeated(*batch, level_idx));
+            }
+            ++level_idx;
+        }
+
+        if (from_overflow) {
+            overflow->clear();
+        }
+    }
+    return Status::OK();
+}
+
 } // namespace
 
 Status ScalarColumnReader::read(int64_t rows, MutableColumnPtr& column, 
int64_t* rows_read) {
@@ -525,67 +779,80 @@ Status ListColumnReader::read(int64_t rows, 
MutableColumnPtr& column, int64_t* r
         return Status::NotSupported(
                 "Current parquet LIST reader only supports scalar elements for 
column {}", _name);
     }
-    if (element_reader->descriptor()->max_definition_level() != 1) {
-        return Status::NotSupported(
-                "Current parquet LIST reader only supports required elements 
for column {}", _name);
-    }
-
-    ::parquet::internal::RecordReader* record_reader = nullptr;
-    int64_t records_read = 0;
-    RETURN_IF_ERROR(read_records(*element_reader, rows, &record_reader, 
&records_read));
-    const int64_t levels_written = record_reader->levels_written();
-    if (records_read != rows || levels_written < records_read) {
-        return Status::Corruption(
-                "Invalid parquet LIST read result for column {}: rows={}, 
levels={}", _name,
-                records_read, levels_written);
-    }
-    if (record_reader->values_written() != levels_written) {
-        return Status::NotSupported(
-                "Current parquet LIST reader only supports non-empty lists 
with required "
-                "elements for column {}",
-                _name);
-    }
-    const int16_t max_definition_level = 
element_reader->descriptor()->max_definition_level();
-    if (auto* def_levels = record_reader->def_levels(); def_levels != nullptr) 
{
-        for (int64_t level_idx = 0; level_idx < levels_written; ++level_idx) {
-            if (def_levels[level_idx] != max_definition_level) {
-                return Status::NotSupported(
-                        "Current parquet LIST reader only supports non-empty 
lists with required "
-                        "elements for column {}",
-                        _name);
+    auto* array_column = array_column_from_output(column);
+    DORIS_CHECK(array_column != nullptr);
+    auto* parent_null_map = null_map_from_nullable_output(column);
+    auto nested_column = array_column->get_data_ptr()->assume_mutable();
+    std::vector<uint64_t> entry_counts;
+    NullMap parent_nulls;
+    const int16_t element_slot_definition_level = _nullable_definition_level + 
1;
+    const int16_t element_max_definition_level =
+            element_reader->descriptor()->max_definition_level();
+
+    struct ListSink {
+        ListColumnReader* self = nullptr;
+        ScalarColumnReader* element_reader = nullptr;
+        MutableColumnPtr* nested_column = nullptr;
+        std::vector<uint64_t>* entry_counts = nullptr;
+        NullMap* parent_nulls = nullptr;
+        int16_t element_max_definition_level = 0;
+
+        Status start_batch(const NestedScalarBatch&) { return Status::OK(); }
+
+        Status start_parent(const NestedScalarBatch& batch, int64_t level_idx) 
{
+            const int16_t def_level = batch.def_levels[level_idx];
+            if (def_level < self->_nullable_definition_level) {
+                if (!self->_type->is_nullable()) {
+                    return Status::Corruption(
+                            "Parquet LIST column {} contains null for 
non-nullable list",
+                            self->_name);
+                }
+                entry_counts->push_back(0);
+                parent_nulls->push_back(1);
+                return Status::OK();
+            }
+            entry_counts->push_back(0);
+            parent_nulls->push_back(0);
+            if (def_level == self->_nullable_definition_level) {
+                return Status::OK();
             }
+            return append_element(batch, level_idx);
         }
-    }
 
-    auto& array_column = assert_cast<ColumnArray&>(*column);
-    auto nested_column = array_column.get_data_ptr()->assume_mutable();
-    RETURN_IF_ERROR(append_scalar_values(*element_reader, *record_reader, 
levels_written, nullptr,
-                                         nested_column));
-    array_column.get_data_ptr() = std::move(nested_column);
+        Status append_repeated(const NestedScalarBatch& batch, int64_t 
level_idx) {
+            if (entry_counts->empty()) {
+                return Status::Corruption("Invalid repeated LIST level for 
column {}", self->_name);
+            }
+            return append_element(batch, level_idx);
+        }
 
-    auto* rep_levels = record_reader->rep_levels();
-    if (rep_levels == nullptr && levels_written > 0) {
-        return Status::Corruption(
-                "Parquet LIST reader returned null repetition levels for 
column {}", _name);
-    }
-    auto& offsets = array_column.get_offsets();
-    offsets.reserve(offsets.size() + static_cast<size_t>(records_read));
-    size_t current_offset = offsets.empty() ? 0 : offsets.back();
-    int64_t current_record = 0;
-    for (int64_t level_idx = 0; level_idx < levels_written; ++level_idx) {
-        if (level_idx == 0 || rep_levels[level_idx] < 
_repeated_repetition_level) {
-            if (level_idx != 0) {
-                offsets.push_back(current_offset);
-                current_record++;
+        Status append_element(const NestedScalarBatch& batch, int64_t 
level_idx) {
+            const int16_t def_level = batch.def_levels[level_idx];
+            if (def_level == element_max_definition_level) {
+                RETURN_IF_ERROR(append_scalar_batch_value(*element_reader, 
batch, level_idx,
+                                                          *nested_column));
+            } else {
+                if (!element_reader->type()->is_nullable()) {
+                    return Status::Corruption(
+                            "Parquet LIST column {} contains null for 
non-nullable element",
+                            self->_name);
+                }
+                (*nested_column)->insert_default();
             }
+            ++entry_counts->back();
+            return Status::OK();
         }
-        current_offset++;
-    }
-    while (current_record < records_read) {
-        offsets.push_back(current_offset);
-        current_record++;
-    }
-    *rows_read = records_read;
+    };
+
+    ListSink sink {this,          element_reader, &nested_column,
+                   &entry_counts, &parent_nulls,  
element_max_definition_level};
+    RETURN_IF_ERROR(assemble_repeated_levels(*element_reader, 
_repeated_repetition_level,
+                                             element_slot_definition_level, 
rows,
+                                             &_element_overflow, sink, 
rows_read));
+
+    array_column->get_data_ptr() = std::move(nested_column);
+    append_offsets(array_column->get_offsets(), entry_counts);
+    append_parent_nulls(parent_null_map, parent_nulls);
     return Status::OK();
 }
 
@@ -593,8 +860,26 @@ Status ListColumnReader::skip(int64_t rows) {
     if (rows <= 0) {
         return Status::OK();
     }
-    DORIS_CHECK(_element_reader != nullptr);
-    return _element_reader->skip(rows);
+    auto* element_reader = 
dynamic_cast<ScalarColumnReader*>(_element_reader.get());
+    if (element_reader == nullptr) {
+        return Status::NotSupported(
+                "Current parquet LIST reader only supports scalar elements for 
column {}", _name);
+    }
+    struct SkipSink {
+        Status start_batch(const NestedScalarBatch&) { return Status::OK(); }
+        Status start_parent(const NestedScalarBatch&, int64_t) { return 
Status::OK(); }
+        Status append_repeated(const NestedScalarBatch&, int64_t) { return 
Status::OK(); }
+    };
+    SkipSink sink;
+    int64_t rows_read = 0;
+    RETURN_IF_ERROR(assemble_repeated_levels(*element_reader, 
_repeated_repetition_level,
+                                             _nullable_definition_level + 1, 
rows,
+                                             &_element_overflow, sink, 
&rows_read));
+    if (rows_read != rows) {
+        return Status::Corruption("Failed to skip parquet LIST column {}: 
skipped {} of {} rows",
+                                  _name, rows_read, rows);
+    }
+    return Status::OK();
 }
 
 Status MapColumnReader::read(int64_t rows, MutableColumnPtr& column, int64_t* 
rows_read) {
@@ -612,107 +897,140 @@ Status MapColumnReader::read(int64_t rows, 
MutableColumnPtr& column, int64_t* ro
         return Status::NotSupported(
                 "Current parquet MAP reader only supports scalar key/value for 
column {}", _name);
     }
-    if (key_reader->descriptor()->max_definition_level() != 1 ||
-        value_reader->descriptor()->max_definition_level() != 1) {
-        return Status::NotSupported(
-                "Current parquet MAP reader only supports required key/value 
entries for column {}",
-                _name);
-    }
 
-    ::parquet::internal::RecordReader* key_record_reader = nullptr;
-    int64_t records_read = 0;
-    RETURN_IF_ERROR(read_records(*key_reader, rows, &key_record_reader, 
&records_read));
-    const int64_t levels_written = key_record_reader->levels_written();
-    if (records_read != rows || levels_written < records_read) {
-        return Status::Corruption(
-                "Invalid parquet MAP key read result for column {}: rows={}, 
levels={}", _name,
-                records_read, levels_written);
-    }
-    if (key_record_reader->values_written() != levels_written) {
-        return Status::NotSupported(
-                "Current parquet MAP reader only supports non-empty maps with 
required entries "
-                "for column {}",
-                _name);
-    }
-
-    const int16_t max_definition_level = 
key_reader->descriptor()->max_definition_level();
-    if (auto* def_levels = key_record_reader->def_levels(); def_levels != 
nullptr) {
-        for (int64_t level_idx = 0; level_idx < levels_written; ++level_idx) {
-            if (def_levels[level_idx] != max_definition_level) {
-                return Status::NotSupported(
-                        "Current parquet MAP reader only supports non-empty 
maps with required "
-                        "entries for column {}",
-                        _name);
+    auto* map_column = map_column_from_output(column);
+    DORIS_CHECK(map_column != nullptr);
+    auto* parent_null_map = null_map_from_nullable_output(column);
+    auto key_column = map_column->get_keys_ptr()->assume_mutable();
+    auto value_column = map_column->get_values_ptr()->assume_mutable();
+    std::vector<uint64_t> entry_counts;
+    NullMap parent_nulls;
+    const int16_t entry_definition_level = _nullable_definition_level + 1;
+    const int16_t key_max_definition_level = 
key_reader->descriptor()->max_definition_level();
+    const int16_t value_max_definition_level = 
value_reader->descriptor()->max_definition_level();
+
+    struct MapSink {
+        MapColumnReader* self = nullptr;
+        ScalarColumnReader* key_reader = nullptr;
+        ScalarColumnReader* value_reader = nullptr;
+        MutableColumnPtr* key_column = nullptr;
+        MutableColumnPtr* value_column = nullptr;
+        std::vector<uint64_t>* entry_counts = nullptr;
+        NullMap* parent_nulls = nullptr;
+        int16_t key_max_definition_level = 0;
+        int16_t value_max_definition_level = 0;
+
+        Status read_value_batch(int64_t batch_rows, NestedScalarBatch* 
value_batch) {
+            if (!self->_value_overflow.empty()) {
+                *value_batch = std::move(self->_value_overflow.batch);
+                self->_value_overflow.clear();
+                return Status::OK();
             }
+            return read_nested_scalar_batch(*value_reader, batch_rows,
+                                            self->_nullable_definition_level + 
1, value_batch);
         }
-    }
 
-    ::parquet::internal::RecordReader* value_record_reader = nullptr;
-    int64_t value_records_read = 0;
-    RETURN_IF_ERROR(
-            read_records(*value_reader, records_read, &value_record_reader, 
&value_records_read));
-    if (value_records_read != records_read ||
-        value_record_reader->levels_written() != levels_written ||
-        value_record_reader->values_written() != levels_written) {
-        return Status::Corruption(
-                "Invalid parquet MAP value read result for column {}: rows={}, 
levels={}, "
-                "values={}, expected={}",
-                _name, value_records_read, 
value_record_reader->levels_written(),
-                value_record_reader->values_written(), levels_written);
-    }
-    if (auto* def_levels = value_record_reader->def_levels(); def_levels != 
nullptr) {
-        for (int64_t level_idx = 0; level_idx < levels_written; ++level_idx) {
-            if (def_levels[level_idx] != max_definition_level) {
-                return Status::NotSupported(
-                        "Current parquet MAP reader only supports non-empty 
maps with required "
-                        "entries for column {}",
-                        _name);
+        Status validate_value_alignment(const NestedScalarBatch& key_batch,
+                                        const NestedScalarBatch& value_batch) {
+            if (value_batch.records_read != key_batch.records_read ||
+                value_batch.levels_written != key_batch.levels_written) {
+                return Status::Corruption(
+                        "Parquet MAP key/value levels are not aligned for 
column {}: key rows={}, "
+                        "key levels={}, value rows={}, value levels={}",
+                        self->_name, key_batch.records_read, 
key_batch.levels_written,
+                        value_batch.records_read, value_batch.levels_written);
+            }
+            for (int64_t level_idx = 0; level_idx < key_batch.levels_written; 
++level_idx) {
+                if (value_batch.rep_levels[level_idx] != 
key_batch.rep_levels[level_idx]) {
+                    return Status::Corruption(
+                            "Parquet MAP key/value repetition levels are not 
aligned for column {}",
+                            self->_name);
+                }
             }
+            return Status::OK();
         }
-    }
 
-    const auto* key_rep_levels = key_record_reader->rep_levels();
-    const auto* value_rep_levels = value_record_reader->rep_levels();
-    if ((key_rep_levels == nullptr || value_rep_levels == nullptr) && 
levels_written > 0) {
-        return Status::Corruption(
-                "Parquet MAP reader returned null repetition levels for column 
{}", _name);
-    }
-    for (int64_t level_idx = 0; level_idx < levels_written; ++level_idx) {
-        if (key_rep_levels[level_idx] != value_rep_levels[level_idx]) {
-            return Status::Corruption(
-                    "Parquet MAP key/value repetition levels are not aligned 
for column {}", _name);
+        Status start_batch(const NestedScalarBatch& key_batch) {
+            RETURN_IF_ERROR(read_value_batch(key_batch.records_read, 
&value_batch));
+            RETURN_IF_ERROR(validate_value_alignment(key_batch, value_batch));
+            return Status::OK();
         }
-    }
 
-    auto& map_column = assert_cast<ColumnMap&>(*column);
-    auto key_column = map_column.get_keys_ptr()->assume_mutable();
-    RETURN_IF_ERROR(append_scalar_values(*key_reader, *key_record_reader, 
levels_written, nullptr,
-                                         key_column));
-    map_column.get_keys_ptr() = std::move(key_column);
-
-    auto value_column = map_column.get_values_ptr()->assume_mutable();
-    RETURN_IF_ERROR(append_scalar_values(*value_reader, *value_record_reader, 
levels_written,
-                                         nullptr, value_column));
-    map_column.get_values_ptr() = std::move(value_column);
-
-    auto& offsets = map_column.get_offsets();
-    offsets.reserve(offsets.size() + static_cast<size_t>(records_read));
-    size_t current_offset = offsets.empty() ? 0 : offsets.back();
-    int64_t current_record = 0;
-    for (int64_t level_idx = 0; level_idx < levels_written; ++level_idx) {
-        if (level_idx == 0 || key_rep_levels[level_idx] < 
_repeated_repetition_level) {
-            if (level_idx != 0) {
-                offsets.push_back(current_offset);
-                current_record++;
+        Status start_parent(const NestedScalarBatch& key_batch, int64_t 
level_idx) {
+            const int16_t def_level = key_batch.def_levels[level_idx];
+            if (def_level < self->_nullable_definition_level) {
+                if (!self->_type->is_nullable()) {
+                    return Status::Corruption(
+                            "Parquet MAP column {} contains null for 
non-nullable map",
+                            self->_name);
+                }
+                entry_counts->push_back(0);
+                parent_nulls->push_back(1);
+                return Status::OK();
             }
+            entry_counts->push_back(0);
+            parent_nulls->push_back(0);
+            if (def_level == self->_nullable_definition_level) {
+                return Status::OK();
+            }
+            return append_entry(key_batch, level_idx);
         }
-        current_offset++;
-    }
-    while (current_record < records_read) {
-        offsets.push_back(current_offset);
-        current_record++;
-    }
-    *rows_read = records_read;
+
+        Status append_repeated(const NestedScalarBatch& key_batch, int64_t 
level_idx) {
+            if (entry_counts->empty()) {
+                return Status::Corruption("Invalid repeated MAP level for 
column {}", self->_name);
+            }
+            return append_entry(key_batch, level_idx);
+        }
+
+        Status append_entry(const NestedScalarBatch& key_batch, int64_t 
level_idx) {
+            if (key_batch.def_levels[level_idx] != key_max_definition_level) {
+                return Status::Corruption("Parquet MAP column {} contains null 
map key",
+                                          self->_name);
+            }
+            RETURN_IF_ERROR(
+                    append_scalar_batch_value(*key_reader, key_batch, 
level_idx, *key_column));
+            if (value_batch.def_levels[level_idx] == 
value_max_definition_level) {
+                RETURN_IF_ERROR(append_scalar_batch_value(*value_reader, 
value_batch, level_idx,
+                                                          *value_column));
+            } else {
+                if (!value_reader->type()->is_nullable()) {
+                    return Status::Corruption(
+                            "Parquet MAP column {} contains null for 
non-nullable value",
+                            self->_name);
+                }
+                (*value_column)->insert_default();
+            }
+            ++entry_counts->back();
+            return Status::OK();
+        }
+
+        NestedScalarBatch value_batch;
+    };
+
+    MapSink sink {this,
+                  key_reader,
+                  value_reader,
+                  &key_column,
+                  &value_column,
+                  &entry_counts,
+                  &parent_nulls,
+                  key_max_definition_level,
+                  value_max_definition_level};
+    RETURN_IF_ERROR(assemble_repeated_levels(*key_reader, 
_repeated_repetition_level,
+                                             entry_definition_level, rows, 
&_key_overflow, sink,
+                                             rows_read));
+    if (!_key_overflow.empty()) {
+        move_nested_scalar_tail(
+                sink.value_batch,
+                sink.value_batch.levels_written - 
_key_overflow.batch.levels_written,
+                &_value_overflow);
+    }
+
+    map_column->get_keys_ptr() = std::move(key_column);
+    map_column->get_values_ptr() = std::move(value_column);
+    append_offsets(map_column->get_offsets(), entry_counts);
+    append_parent_nulls(parent_null_map, parent_nulls);
     return Status::OK();
 }
 
@@ -722,8 +1040,73 @@ Status MapColumnReader::skip(int64_t rows) {
     }
     DORIS_CHECK(_key_reader != nullptr);
     DORIS_CHECK(_value_reader != nullptr);
-    RETURN_IF_ERROR(_key_reader->skip(rows));
-    return _value_reader->skip(rows);
+    auto* key_reader = dynamic_cast<ScalarColumnReader*>(_key_reader.get());
+    auto* value_reader = 
dynamic_cast<ScalarColumnReader*>(_value_reader.get());
+    if (key_reader == nullptr || value_reader == nullptr) {
+        return Status::NotSupported(
+                "Current parquet MAP reader only supports scalar key/value for 
column {}", _name);
+    }
+    struct SkipSink {
+        MapColumnReader* self = nullptr;
+        ScalarColumnReader* value_reader = nullptr;
+
+        Status read_value_batch(int64_t batch_rows, NestedScalarBatch* 
value_batch) {
+            if (!self->_value_overflow.empty()) {
+                *value_batch = std::move(self->_value_overflow.batch);
+                self->_value_overflow.clear();
+                return Status::OK();
+            }
+            return read_nested_scalar_batch(*value_reader, batch_rows,
+                                            self->_nullable_definition_level + 
1, value_batch);
+        }
+
+        Status validate_value_alignment(const NestedScalarBatch& key_batch,
+                                        const NestedScalarBatch& value_batch) {
+            if (value_batch.records_read != key_batch.records_read ||
+                value_batch.levels_written != key_batch.levels_written) {
+                return Status::Corruption(
+                        "Parquet MAP key/value levels are not aligned for 
column {} while "
+                        "skipping",
+                        self->_name);
+            }
+            for (int64_t level_idx = 0; level_idx < key_batch.levels_written; 
++level_idx) {
+                if (value_batch.rep_levels[level_idx] != 
key_batch.rep_levels[level_idx]) {
+                    return Status::Corruption(
+                            "Parquet MAP key/value repetition levels are not 
aligned for column {}",
+                            self->_name);
+                }
+            }
+            return Status::OK();
+        }
+
+        Status start_batch(const NestedScalarBatch& key_batch) {
+            RETURN_IF_ERROR(read_value_batch(key_batch.records_read, 
&value_batch));
+            RETURN_IF_ERROR(validate_value_alignment(key_batch, value_batch));
+            return Status::OK();
+        }
+
+        Status start_parent(const NestedScalarBatch&, int64_t) { return 
Status::OK(); }
+
+        Status append_repeated(const NestedScalarBatch&, int64_t) { return 
Status::OK(); }
+
+        NestedScalarBatch value_batch;
+    };
+    SkipSink sink {this, value_reader};
+    int64_t rows_read = 0;
+    RETURN_IF_ERROR(assemble_repeated_levels(*key_reader, 
_repeated_repetition_level,
+                                             _nullable_definition_level + 1, 
rows, &_key_overflow,
+                                             sink, &rows_read));
+    if (!_key_overflow.empty()) {
+        move_nested_scalar_tail(
+                sink.value_batch,
+                sink.value_batch.levels_written - 
_key_overflow.batch.levels_written,
+                &_value_overflow);
+    }
+    if (rows_read != rows) {
+        return Status::Corruption("Failed to skip parquet MAP column {}: 
skipped {} of {} rows",
+                                  _name, rows_read, rows);
+    }
+    return Status::OK();
 }
 
 Status ParquetColumnReader::skip(int64_t rows) {
@@ -954,10 +1337,6 @@ Status 
ParquetColumnReaderFactory::create_list_column_reader(
         return Status::NotSupported("Parquet LIST projection is not 
implemented for column {}",
                                     column_schema.name);
     }
-    if (column_schema.type != nullptr && column_schema.type->is_nullable()) {
-        return Status::NotSupported("Nullable parquet LIST reader is not 
implemented for column {}",
-                                    column_schema.name);
-    }
     if (column_schema.children.size() != 1) {
         return Status::NotSupported("Unsupported parquet LIST layout for 
column {}",
                                     column_schema.name);
@@ -980,10 +1359,6 @@ Status 
ParquetColumnReaderFactory::create_map_column_reader(
         return Status::NotSupported("Parquet MAP projection is not implemented 
for column {}",
                                     column_schema.name);
     }
-    if (column_schema.type != nullptr && column_schema.type->is_nullable()) {
-        return Status::NotSupported("Nullable parquet MAP reader is not 
implemented for column {}",
-                                    column_schema.name);
-    }
     if (column_schema.children.size() != 1 || 
column_schema.children[0]->children.size() != 2) {
         return Status::NotSupported("Unsupported parquet MAP layout for column 
{}",
                                     column_schema.name);
diff --git a/be/test/format/new_parquet/parquet_column_reader_test.cpp 
b/be/test/format/new_parquet/parquet_column_reader_test.cpp
index 4a187f4f8e0..50aa801f4c7 100644
--- a/be/test/format/new_parquet/parquet_column_reader_test.cpp
+++ b/be/test/format/new_parquet/parquet_column_reader_test.cpp
@@ -163,6 +163,38 @@ protected:
         return finish_array(&builder);
     }
 
+    std::shared_ptr<arrow::Array> build_nullable_int_list_array() {
+        auto value_builder = std::make_shared<arrow::Int32Builder>();
+        arrow::ListBuilder builder(arrow::default_memory_pool(), 
value_builder);
+        EXPECT_TRUE(builder.Append().ok());
+        EXPECT_TRUE(value_builder->Append(10).ok());
+        EXPECT_TRUE(value_builder->Append(20).ok());
+        EXPECT_TRUE(builder.AppendNull().ok());
+        EXPECT_TRUE(builder.AppendEmptyValue().ok());
+        EXPECT_TRUE(builder.Append().ok());
+        EXPECT_TRUE(value_builder->AppendNull().ok());
+        EXPECT_TRUE(value_builder->Append(30).ok());
+        EXPECT_TRUE(builder.Append().ok());
+        EXPECT_TRUE(value_builder->Append(40).ok());
+        return finish_array(&builder);
+    }
+
+    std::shared_ptr<arrow::Array> build_required_nullable_int_list_array() {
+        auto value_builder = std::make_shared<arrow::Int32Builder>();
+        arrow::ListBuilder builder(arrow::default_memory_pool(), 
value_builder);
+        EXPECT_TRUE(builder.AppendEmptyValue().ok());
+        EXPECT_TRUE(builder.Append().ok());
+        EXPECT_TRUE(value_builder->AppendNull().ok());
+        EXPECT_TRUE(value_builder->Append(110).ok());
+        EXPECT_TRUE(builder.Append().ok());
+        EXPECT_TRUE(value_builder->Append(120).ok());
+        EXPECT_TRUE(builder.Append().ok());
+        EXPECT_TRUE(value_builder->Append(130).ok());
+        EXPECT_TRUE(value_builder->AppendNull().ok());
+        EXPECT_TRUE(builder.Append().ok());
+        return finish_array(&builder);
+    }
+
     std::shared_ptr<arrow::Array> build_required_int_string_map_array() {
         auto key_builder = std::make_shared<arrow::Int32Builder>();
         auto value_builder = std::make_shared<arrow::StringBuilder>();
@@ -183,6 +215,50 @@ protected:
         return finish_array(&builder);
     }
 
+    std::shared_ptr<arrow::Array> build_nullable_int_string_map_array() {
+        auto key_builder = std::make_shared<arrow::Int32Builder>();
+        auto value_builder = std::make_shared<arrow::StringBuilder>();
+        auto map_type = arrow::map(arrow::int32(), arrow::field("value", 
arrow::utf8(), true));
+        arrow::MapBuilder builder(arrow::default_memory_pool(), key_builder, 
value_builder,
+                                  map_type);
+        EXPECT_TRUE(builder.Append().ok());
+        EXPECT_TRUE(key_builder->Append(10).ok());
+        EXPECT_TRUE(value_builder->Append("aa").ok());
+        EXPECT_TRUE(key_builder->Append(20).ok());
+        EXPECT_TRUE(value_builder->AppendNull().ok());
+        EXPECT_TRUE(builder.AppendNull().ok());
+        EXPECT_TRUE(builder.AppendEmptyValue().ok());
+        EXPECT_TRUE(builder.Append().ok());
+        EXPECT_TRUE(key_builder->Append(30).ok());
+        EXPECT_TRUE(value_builder->Append("cc").ok());
+        EXPECT_TRUE(builder.Append().ok());
+        EXPECT_TRUE(key_builder->Append(40).ok());
+        EXPECT_TRUE(value_builder->AppendNull().ok());
+        return finish_array(&builder);
+    }
+
+    std::shared_ptr<arrow::Array> build_required_nullable_string_map_array() {
+        auto key_builder = std::make_shared<arrow::Int32Builder>();
+        auto value_builder = std::make_shared<arrow::StringBuilder>();
+        auto map_type = arrow::map(arrow::int32(), arrow::field("value", 
arrow::utf8(), true));
+        arrow::MapBuilder builder(arrow::default_memory_pool(), key_builder, 
value_builder,
+                                  map_type);
+        EXPECT_TRUE(builder.AppendEmptyValue().ok());
+        EXPECT_TRUE(builder.Append().ok());
+        EXPECT_TRUE(key_builder->Append(101).ok());
+        EXPECT_TRUE(value_builder->AppendNull().ok());
+        EXPECT_TRUE(key_builder->Append(102).ok());
+        EXPECT_TRUE(value_builder->Append("bb").ok());
+        EXPECT_TRUE(builder.Append().ok());
+        EXPECT_TRUE(key_builder->Append(103).ok());
+        EXPECT_TRUE(value_builder->Append("cc").ok());
+        EXPECT_TRUE(builder.AppendEmptyValue().ok());
+        EXPECT_TRUE(builder.Append().ok());
+        EXPECT_TRUE(key_builder->Append(104).ok());
+        EXPECT_TRUE(value_builder->AppendNull().ok());
+        return finish_array(&builder);
+    }
+
     std::shared_ptr<arrow::Array> build_time32_array(const 
std::shared_ptr<arrow::DataType>& type,
                                                      const 
std::vector<int32_t>& values) {
         arrow::Time32Builder builder(type, arrow::default_memory_pool());
@@ -430,6 +506,57 @@ protected:
                       EXPECT_EQ(values.get_element(5), 6);
                       EXPECT_EQ(values.get_element(8), 9);
                   });
+        add_field(arrow::field("nullable_list_int_col",
+                               arrow::list(arrow::field("element", 
arrow::int32(), true)), true),
+                  build_nullable_int_list_array(),
+                  [](const ParquetColumnSchema& schema, const IColumn& column) 
{
+                      EXPECT_TRUE(schema.type->is_nullable());
+                      const auto& nullable_column = assert_cast<const 
ColumnNullable&>(column);
+                      ASSERT_EQ(nullable_column.size(), ROW_COUNT);
+                      EXPECT_FALSE(nullable_column.is_null_at(0));
+                      EXPECT_TRUE(nullable_column.is_null_at(1));
+                      EXPECT_FALSE(nullable_column.is_null_at(2));
+                      EXPECT_FALSE(nullable_column.is_null_at(3));
+                      const auto& array_column =
+                              assert_cast<const 
ColumnArray&>(nullable_column.get_nested_column());
+                      const auto& offsets = array_column.get_offsets();
+                      ASSERT_EQ(offsets.size(), ROW_COUNT);
+                      EXPECT_EQ(offsets[0], 2);
+                      EXPECT_EQ(offsets[1], 2);
+                      EXPECT_EQ(offsets[2], 2);
+                      EXPECT_EQ(offsets[3], 4);
+                      EXPECT_EQ(offsets[4], 5);
+                      const auto& elements =
+                              assert_cast<const 
ColumnNullable&>(array_column.get_data());
+                      const auto& values =
+                              assert_cast<const 
ColumnInt32&>(elements.get_nested_column());
+                      ASSERT_EQ(elements.size(), 5);
+                      EXPECT_EQ(values.get_element(0), 10);
+                      EXPECT_EQ(values.get_element(1), 20);
+                      EXPECT_TRUE(elements.is_null_at(2));
+                      EXPECT_EQ(values.get_element(3), 30);
+                      EXPECT_EQ(values.get_element(4), 40);
+                  });
+        add_field(arrow::field("required_nullable_list_int_col",
+                               arrow::list(arrow::field("element", 
arrow::int32(), true)), false),
+                  build_required_nullable_int_list_array(),
+                  [](const ParquetColumnSchema& schema, const IColumn& column) 
{
+                      EXPECT_FALSE(schema.type->is_nullable());
+                      const auto& array_column = assert_cast<const 
ColumnArray&>(column);
+                      const auto& offsets = array_column.get_offsets();
+                      ASSERT_EQ(offsets.size(), ROW_COUNT);
+                      EXPECT_EQ(offsets[0], 0);
+                      EXPECT_EQ(offsets[1], 2);
+                      EXPECT_EQ(offsets[2], 3);
+                      EXPECT_EQ(offsets[3], 5);
+                      EXPECT_EQ(offsets[4], 5);
+                      const auto& elements =
+                              assert_cast<const 
ColumnNullable&>(array_column.get_data());
+                      ASSERT_EQ(elements.size(), 5);
+                      EXPECT_TRUE(elements.is_null_at(0));
+                      EXPECT_FALSE(elements.is_null_at(1));
+                      EXPECT_TRUE(elements.is_null_at(4));
+                  });
         add_field(arrow::field(
                           "map_int_string_col",
                           arrow::map(arrow::int32(), arrow::field("value", 
arrow::utf8(), false)),
@@ -465,6 +592,63 @@ protected:
                       EXPECT_EQ(values.get_data_at(5).to_string(), "f");
                       EXPECT_EQ(values.get_data_at(8).to_string(), "i");
                   });
+        add_field(
+                arrow::field("nullable_map_int_string_col",
+                             arrow::map(arrow::int32(), arrow::field("value", 
arrow::utf8(), true)),
+                             true),
+                build_nullable_int_string_map_array(),
+                [](const ParquetColumnSchema& schema, const IColumn& column) {
+                    EXPECT_TRUE(schema.type->is_nullable());
+                    const auto& nullable_column = assert_cast<const 
ColumnNullable&>(column);
+                    ASSERT_EQ(nullable_column.size(), ROW_COUNT);
+                    EXPECT_FALSE(nullable_column.is_null_at(0));
+                    EXPECT_TRUE(nullable_column.is_null_at(1));
+                    EXPECT_FALSE(nullable_column.is_null_at(2));
+                    const auto& map_column =
+                            assert_cast<const 
ColumnMap&>(nullable_column.get_nested_column());
+                    const auto& offsets = map_column.get_offsets();
+                    ASSERT_EQ(offsets.size(), ROW_COUNT);
+                    EXPECT_EQ(offsets[0], 2);
+                    EXPECT_EQ(offsets[1], 2);
+                    EXPECT_EQ(offsets[2], 2);
+                    EXPECT_EQ(offsets[3], 3);
+                    EXPECT_EQ(offsets[4], 4);
+                    const auto& keys = assert_cast<const 
ColumnInt32&>(map_column.get_keys());
+                    const auto& values =
+                            assert_cast<const 
ColumnNullable&>(map_column.get_values());
+                    const auto& value_data =
+                            assert_cast<const 
ColumnString&>(values.get_nested_column());
+                    ASSERT_EQ(keys.size(), 4);
+                    EXPECT_EQ(keys.get_element(0), 10);
+                    EXPECT_EQ(keys.get_element(1), 20);
+                    EXPECT_EQ(keys.get_element(3), 40);
+                    EXPECT_EQ(value_data.get_data_at(0).to_string(), "aa");
+                    EXPECT_TRUE(values.is_null_at(1));
+                    EXPECT_EQ(value_data.get_data_at(2).to_string(), "cc");
+                    EXPECT_TRUE(values.is_null_at(3));
+                });
+        add_field(
+                arrow::field("required_nullable_map_int_string_col",
+                             arrow::map(arrow::int32(), arrow::field("value", 
arrow::utf8(), true)),
+                             false),
+                build_required_nullable_string_map_array(),
+                [](const ParquetColumnSchema& schema, const IColumn& column) {
+                    EXPECT_FALSE(schema.type->is_nullable());
+                    const auto& map_column = assert_cast<const 
ColumnMap&>(column);
+                    const auto& offsets = map_column.get_offsets();
+                    ASSERT_EQ(offsets.size(), ROW_COUNT);
+                    EXPECT_EQ(offsets[0], 0);
+                    EXPECT_EQ(offsets[1], 2);
+                    EXPECT_EQ(offsets[2], 3);
+                    EXPECT_EQ(offsets[3], 3);
+                    EXPECT_EQ(offsets[4], 4);
+                    const auto& values =
+                            assert_cast<const 
ColumnNullable&>(map_column.get_values());
+                    ASSERT_EQ(values.size(), 4);
+                    EXPECT_TRUE(values.is_null_at(0));
+                    EXPECT_FALSE(values.is_null_at(1));
+                    EXPECT_TRUE(values.is_null_at(3));
+                });
 
         auto schema = arrow::schema(_arrow_fields);
         auto table = arrow::Table::Make(schema, _arrays);
@@ -531,6 +715,16 @@ TEST_F(ParquetColumnReaderTest, 
ReadAllSupportedPhysicalAndLogicalTypes) {
     }
 }
 
+TEST_F(ParquetColumnReaderTest, ReadSupportedComplexTypes) {
+    read_and_validate(find_field_idx("struct_col"));
+    read_and_validate(find_field_idx("list_int_col"));
+    read_and_validate(find_field_idx("nullable_list_int_col"));
+    read_and_validate(find_field_idx("required_nullable_list_int_col"));
+    read_and_validate(find_field_idx("map_int_string_col"));
+    read_and_validate(find_field_idx("nullable_map_int_string_col"));
+    read_and_validate(find_field_idx("required_nullable_map_int_string_col"));
+}
+
 TEST_F(ParquetColumnReaderTest, SkipThenRead) {
     auto reader = create_reader(1);
     auto st = reader->skip(2);
@@ -604,6 +798,134 @@ TEST_F(ParquetColumnReaderTest, 
ReadProjectedStructChildren) {
     EXPECT_EQ(values.get_data_at(4).to_string(), "se");
 }
 
+TEST_F(ParquetColumnReaderTest, ReadListWithOverflowAcrossChunks) {
+    const auto field_idx = find_field_idx("nullable_list_int_col");
+    auto reader = create_reader(field_idx);
+    MutableColumnPtr column = reader->type()->create_column();
+
+    int64_t rows_read = 0;
+    auto st = reader->read(2, column, &rows_read);
+    ASSERT_TRUE(st.ok()) << st;
+    ASSERT_EQ(rows_read, 2);
+    st = reader->read(3, column, &rows_read);
+    ASSERT_TRUE(st.ok()) << st;
+    ASSERT_EQ(rows_read, 3);
+
+    _expected_by_field[field_idx](*_fields[field_idx], *column);
+}
+
+TEST_F(ParquetColumnReaderTest, SkipListWithOverflowThenRead) {
+    const auto field_idx = find_field_idx("nullable_list_int_col");
+    auto reader = create_reader(field_idx);
+    auto st = reader->skip(1);
+    ASSERT_TRUE(st.ok()) << st;
+
+    MutableColumnPtr column = reader->type()->create_column();
+    int64_t rows_read = 0;
+    st = reader->read(3, column, &rows_read);
+    ASSERT_TRUE(st.ok()) << st;
+    ASSERT_EQ(rows_read, 3);
+
+    const auto& nullable_column = assert_cast<const ColumnNullable&>(*column);
+    ASSERT_EQ(nullable_column.size(), 3);
+    EXPECT_TRUE(nullable_column.is_null_at(0));
+    const auto& array_column = assert_cast<const 
ColumnArray&>(nullable_column.get_nested_column());
+    const auto& offsets = array_column.get_offsets();
+    ASSERT_EQ(offsets.size(), 3);
+    EXPECT_EQ(offsets[0], 0);
+    EXPECT_EQ(offsets[1], 0);
+    EXPECT_EQ(offsets[2], 2);
+}
+
+TEST_F(ParquetColumnReaderTest, SelectListWithOverflow) {
+    const auto field_idx = find_field_idx("nullable_list_int_col");
+    auto reader = create_reader(field_idx);
+    SelectionVector selection(3);
+    selection.set_index(0, 0);
+    selection.set_index(1, 3);
+    selection.set_index(2, 4);
+
+    MutableColumnPtr column = reader->type()->create_column();
+    auto st = reader->select(selection, 3, ROW_COUNT, column);
+    ASSERT_TRUE(st.ok()) << st;
+
+    const auto& nullable_column = assert_cast<const ColumnNullable&>(*column);
+    ASSERT_EQ(nullable_column.size(), 3);
+    EXPECT_FALSE(nullable_column.is_null_at(0));
+    EXPECT_FALSE(nullable_column.is_null_at(1));
+    EXPECT_FALSE(nullable_column.is_null_at(2));
+    const auto& array_column = assert_cast<const 
ColumnArray&>(nullable_column.get_nested_column());
+    const auto& offsets = array_column.get_offsets();
+    ASSERT_EQ(offsets.size(), 3);
+    EXPECT_EQ(offsets[0], 2);
+    EXPECT_EQ(offsets[1], 4);
+    EXPECT_EQ(offsets[2], 5);
+}
+
+TEST_F(ParquetColumnReaderTest, ReadMapWithOverflowAcrossChunks) {
+    const auto field_idx = find_field_idx("nullable_map_int_string_col");
+    auto reader = create_reader(field_idx);
+    MutableColumnPtr column = reader->type()->create_column();
+
+    int64_t rows_read = 0;
+    auto st = reader->read(2, column, &rows_read);
+    ASSERT_TRUE(st.ok()) << st;
+    ASSERT_EQ(rows_read, 2);
+    st = reader->read(3, column, &rows_read);
+    ASSERT_TRUE(st.ok()) << st;
+    ASSERT_EQ(rows_read, 3);
+
+    _expected_by_field[field_idx](*_fields[field_idx], *column);
+}
+
+TEST_F(ParquetColumnReaderTest, SkipMapWithOverflowThenRead) {
+    const auto field_idx = find_field_idx("nullable_map_int_string_col");
+    auto reader = create_reader(field_idx);
+    auto st = reader->skip(1);
+    ASSERT_TRUE(st.ok()) << st;
+
+    MutableColumnPtr column = reader->type()->create_column();
+    int64_t rows_read = 0;
+    st = reader->read(3, column, &rows_read);
+    ASSERT_TRUE(st.ok()) << st;
+    ASSERT_EQ(rows_read, 3);
+
+    const auto& nullable_column = assert_cast<const ColumnNullable&>(*column);
+    ASSERT_EQ(nullable_column.size(), 3);
+    EXPECT_TRUE(nullable_column.is_null_at(0));
+    const auto& map_column = assert_cast<const 
ColumnMap&>(nullable_column.get_nested_column());
+    const auto& offsets = map_column.get_offsets();
+    ASSERT_EQ(offsets.size(), 3);
+    EXPECT_EQ(offsets[0], 0);
+    EXPECT_EQ(offsets[1], 0);
+    EXPECT_EQ(offsets[2], 1);
+}
+
+TEST_F(ParquetColumnReaderTest, SelectMapWithOverflow) {
+    const auto field_idx = find_field_idx("nullable_map_int_string_col");
+    auto reader = create_reader(field_idx);
+    SelectionVector selection(3);
+    selection.set_index(0, 0);
+    selection.set_index(1, 3);
+    selection.set_index(2, 4);
+
+    MutableColumnPtr column = reader->type()->create_column();
+    auto st = reader->select(selection, 3, ROW_COUNT, column);
+    ASSERT_TRUE(st.ok()) << st;
+
+    const auto& nullable_column = assert_cast<const ColumnNullable&>(*column);
+    ASSERT_EQ(nullable_column.size(), 3);
+    EXPECT_FALSE(nullable_column.is_null_at(0));
+    EXPECT_FALSE(nullable_column.is_null_at(1));
+    EXPECT_FALSE(nullable_column.is_null_at(2));
+    const auto& map_column = assert_cast<const 
ColumnMap&>(nullable_column.get_nested_column());
+    const auto& offsets = map_column.get_offsets();
+    ASSERT_EQ(offsets.size(), 3);
+    EXPECT_EQ(offsets[0], 2);
+    EXPECT_EQ(offsets[1], 3);
+    EXPECT_EQ(offsets[2], 4);
+}
+
 TEST_F(ParquetColumnReaderTest, BuildComplexSchemaPathMetadata) {
     const auto field_idx = find_field_idx("struct_col");
     ASSERT_LT(field_idx, _fields.size());


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to