This is an automated email from the ASF dual-hosted git repository.

suxiaogang223 pushed a commit to branch refact_reader_branch
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/refact_reader_branch by this 
push:
     new 6cb5719cbd6 [feature](be) Add basic parquet map reader
6cb5719cbd6 is described below

commit 6cb5719cbd67bc64d6986ceb4ac9eaa4e15b2243
Author: Socrates <[email protected]>
AuthorDate: Thu May 28 14:54:27 2026 +0800

    [feature](be) Add basic parquet map reader
    
    Issue Number: close #xxx
    
    Related PR: #xxx
    
    Problem Summary: Add initial new Parquet MAP reader support for required 
scalar key/value entries and normalize MAP key_value schema metadata for future 
complex projection work.
    
    None
    
    - Test: Unit Test / Manual test
        - Added parquet column reader unit test coverage for required 
Map(Int32, String).
        - Ran git diff --check.
        - BE build will be verified on Fedora after push.
    - Behavior changed: No
    - Does this need documentation: No
---
 be/src/format/new_parquet/column_reader.cpp        | 191 ++++++++++++++++++++-
 be/src/format/new_parquet/column_reader.h          |   4 +
 .../format/new_parquet/parquet_column_schema.cpp   |  43 ++++-
 .../new_parquet/parquet_column_reader_test.cpp     |  85 ++++++++-
 4 files changed, 310 insertions(+), 13 deletions(-)

diff --git a/be/src/format/new_parquet/column_reader.cpp 
b/be/src/format/new_parquet/column_reader.cpp
index c427b38a970..4d4545f5d80 100644
--- a/be/src/format/new_parquet/column_reader.cpp
+++ b/be/src/format/new_parquet/column_reader.cpp
@@ -32,6 +32,7 @@
 
 #include "core/column/column.h"
 #include "core/column/column_array.h"
+#include "core/column/column_map.h"
 #include "core/column/column_struct.h"
 #include "core/column/column_vector.h"
 #include "core/data_type/data_type_array.h"
@@ -178,6 +179,35 @@ private:
     std::string _name = ParquetColumnReaderFactory::ROW_POSITION_COLUMN_NAME;
 };
 
+class MapColumnReader final : public ParquetColumnReader {
+public:
+    MapColumnReader(const ParquetColumnSchema& schema, DataTypePtr type,
+                    std::unique_ptr<ParquetColumnReader> key_reader,
+                    std::unique_ptr<ParquetColumnReader> value_reader)
+            : _field_id(schema.top_level_field_id),
+              _repeated_repetition_level(schema.repeated_repetition_level),
+              _type(std::move(type)),
+              _name(schema.name),
+              _key_reader(std::move(key_reader)),
+              _value_reader(std::move(value_reader)) {}
+
+    int file_column_id() const override { return _field_id; }
+    int parquet_leaf_column_id() const override { return -1; }
+    const DataTypePtr& type() const override { return _type; }
+    const std::string& name() const override { return _name; }
+
+    Status read(int64_t rows, MutableColumnPtr& column, int64_t* rows_read) 
override;
+    Status skip(int64_t rows) override;
+
+private:
+    int _field_id = -1;
+    int16_t _repeated_repetition_level = 0;
+    DataTypePtr _type;
+    std::string _name;
+    std::unique_ptr<ParquetColumnReader> _key_reader;
+    std::unique_ptr<ParquetColumnReader> _value_reader;
+};
+
 Status read_records(ScalarColumnReader& column_reader, int64_t batch_rows,
                     ::parquet::internal::RecordReader** record_reader, 
int64_t* rows_read) {
     auto reader = column_reader.record_reader();
@@ -567,6 +597,135 @@ Status ListColumnReader::skip(int64_t rows) {
     return _element_reader->skip(rows);
 }
 
+Status MapColumnReader::read(int64_t rows, MutableColumnPtr& column, int64_t* 
rows_read) {
+    if (column.get() == nullptr || rows_read == nullptr) {
+        return Status::InvalidArgument("Invalid parquet map read result 
pointer for column {}",
+                                       _name);
+    }
+    if (_key_reader == nullptr || _value_reader == nullptr) {
+        return Status::InternalError("Parquet map child reader is not 
initialized for column {}",
+                                     _name);
+    }
+    auto* key_reader = dynamic_cast<ScalarColumnReader*>(_key_reader.get());
+    auto* value_reader = 
dynamic_cast<ScalarColumnReader*>(_value_reader.get());
+    if (key_reader == nullptr || value_reader == nullptr) {
+        return Status::NotSupported(
+                "Current parquet MAP reader only supports scalar key/value for 
column {}", _name);
+    }
+    if (key_reader->descriptor()->max_definition_level() != 1 ||
+        value_reader->descriptor()->max_definition_level() != 1) {
+        return Status::NotSupported(
+                "Current parquet MAP reader only supports required key/value 
entries for column {}",
+                _name);
+    }
+
+    ::parquet::internal::RecordReader* key_record_reader = nullptr;
+    int64_t records_read = 0;
+    RETURN_IF_ERROR(read_records(*key_reader, rows, &key_record_reader, 
&records_read));
+    const int64_t levels_written = key_record_reader->levels_written();
+    if (records_read != rows || levels_written < records_read) {
+        return Status::Corruption(
+                "Invalid parquet MAP key read result for column {}: rows={}, 
levels={}", _name,
+                records_read, levels_written);
+    }
+    if (key_record_reader->values_written() != levels_written) {
+        return Status::NotSupported(
+                "Current parquet MAP reader only supports non-empty maps with 
required entries "
+                "for column {}",
+                _name);
+    }
+
+    const int16_t max_definition_level = 
key_reader->descriptor()->max_definition_level();
+    if (auto* def_levels = key_record_reader->def_levels(); def_levels != 
nullptr) {
+        for (int64_t level_idx = 0; level_idx < levels_written; ++level_idx) {
+            if (def_levels[level_idx] != max_definition_level) {
+                return Status::NotSupported(
+                        "Current parquet MAP reader only supports non-empty 
maps with required "
+                        "entries for column {}",
+                        _name);
+            }
+        }
+    }
+
+    ::parquet::internal::RecordReader* value_record_reader = nullptr;
+    int64_t value_records_read = 0;
+    RETURN_IF_ERROR(
+            read_records(*value_reader, records_read, &value_record_reader, 
&value_records_read));
+    if (value_records_read != records_read ||
+        value_record_reader->levels_written() != levels_written ||
+        value_record_reader->values_written() != levels_written) {
+        return Status::Corruption(
+                "Invalid parquet MAP value read result for column {}: rows={}, 
levels={}, "
+                "values={}, expected={}",
+                _name, value_records_read, 
value_record_reader->levels_written(),
+                value_record_reader->values_written(), levels_written);
+    }
+    if (auto* def_levels = value_record_reader->def_levels(); def_levels != 
nullptr) {
+        for (int64_t level_idx = 0; level_idx < levels_written; ++level_idx) {
+            if (def_levels[level_idx] != max_definition_level) {
+                return Status::NotSupported(
+                        "Current parquet MAP reader only supports non-empty 
maps with required "
+                        "entries for column {}",
+                        _name);
+            }
+        }
+    }
+
+    const auto* key_rep_levels = key_record_reader->rep_levels();
+    const auto* value_rep_levels = value_record_reader->rep_levels();
+    if ((key_rep_levels == nullptr || value_rep_levels == nullptr) && 
levels_written > 0) {
+        return Status::Corruption(
+                "Parquet MAP reader returned null repetition levels for column 
{}", _name);
+    }
+    for (int64_t level_idx = 0; level_idx < levels_written; ++level_idx) {
+        if (key_rep_levels[level_idx] != value_rep_levels[level_idx]) {
+            return Status::Corruption(
+                    "Parquet MAP key/value repetition levels are not aligned 
for column {}", _name);
+        }
+    }
+
+    auto& map_column = assert_cast<ColumnMap&>(*column);
+    auto key_column = map_column.get_keys_ptr()->assume_mutable();
+    RETURN_IF_ERROR(append_scalar_values(*key_reader, *key_record_reader, 
levels_written, nullptr,
+                                         key_column));
+    map_column.get_keys_ptr() = std::move(key_column);
+
+    auto value_column = map_column.get_values_ptr()->assume_mutable();
+    RETURN_IF_ERROR(append_scalar_values(*value_reader, *value_record_reader, 
levels_written,
+                                         nullptr, value_column));
+    map_column.get_values_ptr() = std::move(value_column);
+
+    auto& offsets = map_column.get_offsets();
+    offsets.reserve(offsets.size() + static_cast<size_t>(records_read));
+    size_t current_offset = offsets.empty() ? 0 : offsets.back();
+    int64_t current_record = 0;
+    for (int64_t level_idx = 0; level_idx < levels_written; ++level_idx) {
+        if (level_idx == 0 || key_rep_levels[level_idx] < 
_repeated_repetition_level) {
+            if (level_idx != 0) {
+                offsets.push_back(current_offset);
+                current_record++;
+            }
+        }
+        current_offset++;
+    }
+    while (current_record < records_read) {
+        offsets.push_back(current_offset);
+        current_record++;
+    }
+    *rows_read = records_read;
+    return Status::OK();
+}
+
+Status MapColumnReader::skip(int64_t rows) {
+    if (rows <= 0) {
+        return Status::OK();
+    }
+    DORIS_CHECK(_key_reader != nullptr);
+    DORIS_CHECK(_value_reader != nullptr);
+    RETURN_IF_ERROR(_key_reader->skip(rows));
+    return _value_reader->skip(rows);
+}
+
 Status ParquetColumnReader::skip(int64_t rows) {
     return Status::NotSupported("Parquet column skip is not implemented, 
rows={}", rows);
 }
@@ -811,6 +970,35 @@ Status 
ParquetColumnReaderFactory::create_list_column_reader(
     return Status::OK();
 }
 
+Status ParquetColumnReaderFactory::create_map_column_reader(
+        const ParquetColumnSchema& column_schema, const 
reader::FieldProjection* projection,
+        std::unique_ptr<ParquetColumnReader>* reader) const {
+    if (reader == nullptr) {
+        return Status::InvalidArgument("reader is null");
+    }
+    if (projection != nullptr && !projection->project_all_children) {
+        return Status::NotSupported("Parquet MAP projection is not implemented 
for column {}",
+                                    column_schema.name);
+    }
+    if (column_schema.type != nullptr && column_schema.type->is_nullable()) {
+        return Status::NotSupported("Nullable parquet MAP reader is not 
implemented for column {}",
+                                    column_schema.name);
+    }
+    if (column_schema.children.size() != 1 || 
column_schema.children[0]->children.size() != 2) {
+        return Status::NotSupported("Unsupported parquet MAP layout for column 
{}",
+                                    column_schema.name);
+    }
+    const auto& key_value_schema = *column_schema.children[0];
+    std::unique_ptr<ParquetColumnReader> key_reader;
+    
RETURN_IF_ERROR(create_nested_scalar_column_reader(*key_value_schema.children[0],
 &key_reader));
+    std::unique_ptr<ParquetColumnReader> value_reader;
+    RETURN_IF_ERROR(
+            create_nested_scalar_column_reader(*key_value_schema.children[1], 
&value_reader));
+    *reader = std::make_unique<MapColumnReader>(column_schema, 
column_schema.type,
+                                                std::move(key_reader), 
std::move(value_reader));
+    return Status::OK();
+}
+
 Status ParquetColumnReaderFactory::create(const ParquetColumnSchema& 
column_schema,
                                           const reader::FieldProjection* 
projection,
                                           
std::unique_ptr<ParquetColumnReader>* reader) const {
@@ -825,8 +1013,7 @@ Status ParquetColumnReaderFactory::create(const 
ParquetColumnSchema& column_sche
     case ParquetColumnSchemaKind::LIST:
         return create_list_column_reader(column_schema, projection, reader);
     case ParquetColumnSchemaKind::MAP:
-        return Status::NotSupported("Parquet MAP reader is not implemented for 
column {}",
-                                    column_schema.name);
+        return create_map_column_reader(column_schema, projection, reader);
     }
     return Status::NotSupported("Unsupported parquet column schema kind for 
column {}",
                                 column_schema.name);
diff --git a/be/src/format/new_parquet/column_reader.h 
b/be/src/format/new_parquet/column_reader.h
index 80f7060fa31..62400d739ca 100644
--- a/be/src/format/new_parquet/column_reader.h
+++ b/be/src/format/new_parquet/column_reader.h
@@ -124,6 +124,10 @@ private:
                                      const reader::FieldProjection* projection,
                                      std::unique_ptr<ParquetColumnReader>* 
reader) const;
 
+    Status create_map_column_reader(const ParquetColumnSchema& column_schema,
+                                    const reader::FieldProjection* projection,
+                                    std::unique_ptr<ParquetColumnReader>* 
reader) const;
+
     Status get_record_reader(int leaf_column_id, const 
::parquet::ColumnDescriptor* descriptor,
                              const std::string& name,
                              
std::shared_ptr<::parquet::internal::RecordReader>* reader) const;
diff --git a/be/src/format/new_parquet/parquet_column_schema.cpp 
b/be/src/format/new_parquet/parquet_column_schema.cpp
index 8541769c1d2..cbca53c7f72 100644
--- a/be/src/format/new_parquet/parquet_column_schema.cpp
+++ b/be/src/format/new_parquet/parquet_column_schema.cpp
@@ -187,11 +187,44 @@ Status build_node_schema(const 
::parquet::SchemaDescriptor& schema,
             return Status::NotSupported("Unsupported parquet MAP encoding for 
column {}",
                                         node.name());
         }
-        std::unique_ptr<ParquetColumnSchema> key_value;
-        RETURN_IF_ERROR(build_node_schema(
-                schema, *group.field(0),
-                child_context(context, *group.field(0), 0, 
column_schema->schema_node_id),
-                &key_value));
+        const auto& key_value_node = *group.field(0);
+        if (!key_value_node.is_repeated()) {
+            return Status::NotSupported("Unsupported parquet MAP encoding for 
column {}",
+                                        node.name());
+        }
+        auto key_value_context =
+                child_context(context, key_value_node, 0, 
column_schema->schema_node_id);
+        column_schema->repeated_repetition_level = 
key_value_context.repeated_repetition_level;
+        if (key_value_node.is_primitive()) {
+            return Status::NotSupported("Unsupported parquet MAP key_value 
layout for column {}",
+                                        node.name());
+        }
+        const auto& key_value_group =
+                static_cast<const 
::parquet::schema::GroupNode&>(key_value_node);
+        if (key_value_group.field_count() != 2) {
+            return Status::NotSupported("Unsupported parquet MAP key_value 
layout for column {}",
+                                        node.name());
+        }
+        auto key_value = std::make_unique<ParquetColumnSchema>();
+        inherit_common_schema_state(key_value_node, key_value_context, 
key_value.get());
+        key_value->kind = ParquetColumnSchemaKind::STRUCT;
+        DataTypes child_types;
+        Strings child_names;
+        child_types.reserve(key_value_group.field_count());
+        child_names.reserve(key_value_group.field_count());
+        for (int child_idx = 0; child_idx < key_value_group.field_count(); 
++child_idx) {
+            std::unique_ptr<ParquetColumnSchema> child;
+            RETURN_IF_ERROR(build_node_schema(
+                    schema, *key_value_group.field(child_idx),
+                    child_context(key_value_context, 
*key_value_group.field(child_idx), child_idx,
+                                  key_value->schema_node_id),
+                    &child));
+            child_types.push_back(child->type);
+            child_names.push_back(child->name);
+            key_value->children.push_back(std::move(child));
+        }
+        key_value->type = std::make_shared<DataTypeStruct>(child_types, 
child_names);
+        propagate_child_levels(key_value.get());
         if (key_value->children.size() != 2) {
             return Status::NotSupported("Unsupported parquet MAP key_value 
layout for column {}",
                                         node.name());
diff --git a/be/test/format/new_parquet/parquet_column_reader_test.cpp 
b/be/test/format/new_parquet/parquet_column_reader_test.cpp
index b85bbb80a6a..4a187f4f8e0 100644
--- a/be/test/format/new_parquet/parquet_column_reader_test.cpp
+++ b/be/test/format/new_parquet/parquet_column_reader_test.cpp
@@ -31,12 +31,14 @@
 #include "core/assert_cast.h"
 #include "core/column/column_array.h"
 #include "core/column/column_decimal.h"
+#include "core/column/column_map.h"
 #include "core/column/column_nullable.h"
 #include "core/column/column_string.h"
 #include "core/column/column_struct.h"
 #include "core/column/column_vector.h"
 #include "core/data_type/data_type.h"
 #include "core/data_type/data_type_array.h"
+#include "core/data_type/data_type_map.h"
 #include "core/data_type/data_type_nullable.h"
 #include "core/data_type/data_type_struct.h"
 #include "core/types.h"
@@ -161,6 +163,26 @@ protected:
         return finish_array(&builder);
     }
 
+    std::shared_ptr<arrow::Array> build_required_int_string_map_array() {
+        auto key_builder = std::make_shared<arrow::Int32Builder>();
+        auto value_builder = std::make_shared<arrow::StringBuilder>();
+        auto map_type = arrow::map(arrow::int32(), arrow::field("value", 
arrow::utf8(), false));
+        arrow::MapBuilder builder(arrow::default_memory_pool(), key_builder, 
value_builder,
+                                  map_type);
+        const std::vector<std::vector<std::pair<int32_t, std::string>>> values 
= {
+                {{1, "a"}, {2, "b"}}, {{3, "c"}},           {{4, "d"}, {5, 
"e"}, {6, "f"}},
+                {{7, "g"}},           {{8, "h"}, {9, "i"}},
+        };
+        for (const auto& row : values) {
+            EXPECT_TRUE(builder.Append().ok());
+            for (const auto& [key, value] : row) {
+                EXPECT_TRUE(key_builder->Append(key).ok());
+                EXPECT_TRUE(value_builder->Append(value).ok());
+            }
+        }
+        return finish_array(&builder);
+    }
+
     std::shared_ptr<arrow::Array> build_time32_array(const 
std::shared_ptr<arrow::DataType>& type,
                                                      const 
std::vector<int32_t>& values) {
         arrow::Time32Builder builder(type, arrow::default_memory_pool());
@@ -394,16 +416,55 @@ protected:
                               TYPE_INT);
                       const auto& array_column = assert_cast<const 
ColumnArray&>(column);
                       ASSERT_EQ(array_column.size(), ROW_COUNT);
-                      EXPECT_EQ(array_column.size_at(0), 2);
-                      EXPECT_EQ(array_column.size_at(1), 1);
-                      EXPECT_EQ(array_column.size_at(2), 3);
-                      EXPECT_EQ(array_column.size_at(4), 2);
+                      const auto array_size_at = [&array_column](size_t 
row_idx) {
+                          return array_column.get_offsets()[row_idx] -
+                                 (row_idx == 0 ? 0 : 
array_column.get_offsets()[row_idx - 1]);
+                      };
+                      EXPECT_EQ(array_size_at(0), 2);
+                      EXPECT_EQ(array_size_at(1), 1);
+                      EXPECT_EQ(array_size_at(2), 3);
+                      EXPECT_EQ(array_size_at(4), 2);
                       const auto& values = assert_cast<const 
ColumnInt32&>(array_column.get_data());
                       ASSERT_EQ(values.size(), 9);
                       EXPECT_EQ(values.get_element(0), 1);
                       EXPECT_EQ(values.get_element(5), 6);
                       EXPECT_EQ(values.get_element(8), 9);
                   });
+        add_field(arrow::field(
+                          "map_int_string_col",
+                          arrow::map(arrow::int32(), arrow::field("value", 
arrow::utf8(), false)),
+                          false),
+                  build_required_int_string_map_array(),
+                  [](const ParquetColumnSchema& schema, const IColumn& column) 
{
+                      
EXPECT_EQ(remove_nullable(schema.type)->get_primitive_type(), TYPE_MAP);
+                      const auto* map_type =
+                              assert_cast<const 
DataTypeMap*>(remove_nullable(schema.type).get());
+                      
EXPECT_EQ(remove_nullable(map_type->get_key_type())->get_primitive_type(),
+                                TYPE_INT);
+                      
EXPECT_EQ(remove_nullable(map_type->get_value_type())->get_primitive_type(),
+                                TYPE_STRING);
+                      const auto& map_column = assert_cast<const 
ColumnMap&>(column);
+                      ASSERT_EQ(map_column.size(), ROW_COUNT);
+                      const auto map_size_at = [&map_column](size_t row_idx) {
+                          return map_column.get_offsets()[row_idx] -
+                                 (row_idx == 0 ? 0 : 
map_column.get_offsets()[row_idx - 1]);
+                      };
+                      EXPECT_EQ(map_size_at(0), 2);
+                      EXPECT_EQ(map_size_at(1), 1);
+                      EXPECT_EQ(map_size_at(2), 3);
+                      EXPECT_EQ(map_size_at(4), 2);
+                      const auto& keys = assert_cast<const 
ColumnInt32&>(map_column.get_keys());
+                      const auto& values =
+                              assert_cast<const 
ColumnString&>(map_column.get_values());
+                      ASSERT_EQ(keys.size(), 9);
+                      ASSERT_EQ(values.size(), 9);
+                      EXPECT_EQ(keys.get_element(0), 1);
+                      EXPECT_EQ(keys.get_element(5), 6);
+                      EXPECT_EQ(keys.get_element(8), 9);
+                      EXPECT_EQ(values.get_data_at(0).to_string(), "a");
+                      EXPECT_EQ(values.get_data_at(5).to_string(), "f");
+                      EXPECT_EQ(values.get_data_at(8).to_string(), "i");
+                  });
 
         auto schema = arrow::schema(_arrow_fields);
         auto table = arrow::Table::Make(schema, _arrays);
@@ -439,6 +500,16 @@ protected:
         _expected_by_field[field_idx](*_fields[field_idx], *column);
     }
 
+    size_t find_field_idx(const std::string& name) const {
+        for (size_t field_idx = 0; field_idx < _fields.size(); ++field_idx) {
+            if (_fields[field_idx]->name == name) {
+                return field_idx;
+            }
+        }
+        ADD_FAILURE() << "Cannot find parquet test field " << name;
+        return _fields.size();
+    }
+
     std::filesystem::path _test_dir;
     std::string _file_path;
     std::unique_ptr<::parquet::ParquetFileReader> _file_reader;
@@ -496,7 +567,8 @@ TEST_F(ParquetColumnReaderTest, 
SelectReadsOnlySelectedRanges) {
 }
 
 TEST_F(ParquetColumnReaderTest, ReadProjectedStructChildren) {
-    const auto field_idx = _fields.size() - 1;
+    const auto field_idx = find_field_idx("struct_col");
+    ASSERT_LT(field_idx, _fields.size());
     const auto& struct_schema = *_fields[field_idx];
     ASSERT_EQ(struct_schema.name, "struct_col");
     ASSERT_EQ(struct_schema.children.size(), 2);
@@ -533,7 +605,8 @@ TEST_F(ParquetColumnReaderTest, 
ReadProjectedStructChildren) {
 }
 
 TEST_F(ParquetColumnReaderTest, BuildComplexSchemaPathMetadata) {
-    const auto field_idx = _fields.size() - 1;
+    const auto field_idx = find_field_idx("struct_col");
+    ASSERT_LT(field_idx, _fields.size());
     const auto& struct_schema = *_fields[field_idx];
     ASSERT_EQ(struct_schema.name, "struct_col");
     ASSERT_EQ(struct_schema.children.size(), 2);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to