This is an automated email from the ASF dual-hosted git repository.
suxiaogang223 pushed a commit to branch refact_reader_branch
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/refact_reader_branch by this
push:
new 6cb5719cbd6 [feature](be) Add basic parquet map reader
6cb5719cbd6 is described below
commit 6cb5719cbd67bc64d6986ceb4ac9eaa4e15b2243
Author: Socrates <[email protected]>
AuthorDate: Thu May 28 14:54:27 2026 +0800
[feature](be) Add basic parquet map reader
Issue Number: close #xxx
Related PR: #xxx
Problem Summary: Add initial new Parquet MAP reader support for required
scalar key/value entries and normalize MAP key_value schema metadata for future
complex projection work.
None
- Test: Unit Test / Manual test
- Added parquet column reader unit test coverage for required
Map(Int32, String).
- Ran git diff --check.
- BE build will be verified on Fedora after push.
- Behavior changed: No
- Does this need documentation: No
---
be/src/format/new_parquet/column_reader.cpp | 191 ++++++++++++++++++++-
be/src/format/new_parquet/column_reader.h | 4 +
.../format/new_parquet/parquet_column_schema.cpp | 43 ++++-
.../new_parquet/parquet_column_reader_test.cpp | 85 ++++++++-
4 files changed, 310 insertions(+), 13 deletions(-)
diff --git a/be/src/format/new_parquet/column_reader.cpp
b/be/src/format/new_parquet/column_reader.cpp
index c427b38a970..4d4545f5d80 100644
--- a/be/src/format/new_parquet/column_reader.cpp
+++ b/be/src/format/new_parquet/column_reader.cpp
@@ -32,6 +32,7 @@
#include "core/column/column.h"
#include "core/column/column_array.h"
+#include "core/column/column_map.h"
#include "core/column/column_struct.h"
#include "core/column/column_vector.h"
#include "core/data_type/data_type_array.h"
@@ -178,6 +179,35 @@ private:
std::string _name = ParquetColumnReaderFactory::ROW_POSITION_COLUMN_NAME;
};
+class MapColumnReader final : public ParquetColumnReader {
+public:
+ MapColumnReader(const ParquetColumnSchema& schema, DataTypePtr type,
+ std::unique_ptr<ParquetColumnReader> key_reader,
+ std::unique_ptr<ParquetColumnReader> value_reader)
+ : _field_id(schema.top_level_field_id),
+ _repeated_repetition_level(schema.repeated_repetition_level),
+ _type(std::move(type)),
+ _name(schema.name),
+ _key_reader(std::move(key_reader)),
+ _value_reader(std::move(value_reader)) {}
+
+ int file_column_id() const override { return _field_id; }
+ int parquet_leaf_column_id() const override { return -1; }
+ const DataTypePtr& type() const override { return _type; }
+ const std::string& name() const override { return _name; }
+
+ Status read(int64_t rows, MutableColumnPtr& column, int64_t* rows_read)
override;
+ Status skip(int64_t rows) override;
+
+private:
+ int _field_id = -1;
+ int16_t _repeated_repetition_level = 0;
+ DataTypePtr _type;
+ std::string _name;
+ std::unique_ptr<ParquetColumnReader> _key_reader;
+ std::unique_ptr<ParquetColumnReader> _value_reader;
+};
+
Status read_records(ScalarColumnReader& column_reader, int64_t batch_rows,
::parquet::internal::RecordReader** record_reader,
int64_t* rows_read) {
auto reader = column_reader.record_reader();
@@ -567,6 +597,135 @@ Status ListColumnReader::skip(int64_t rows) {
return _element_reader->skip(rows);
}
+Status MapColumnReader::read(int64_t rows, MutableColumnPtr& column, int64_t*
rows_read) {
+ if (column.get() == nullptr || rows_read == nullptr) {
+ return Status::InvalidArgument("Invalid parquet map read result
pointer for column {}",
+ _name);
+ }
+ if (_key_reader == nullptr || _value_reader == nullptr) {
+ return Status::InternalError("Parquet map child reader is not
initialized for column {}",
+ _name);
+ }
+ auto* key_reader = dynamic_cast<ScalarColumnReader*>(_key_reader.get());
+ auto* value_reader =
dynamic_cast<ScalarColumnReader*>(_value_reader.get());
+ if (key_reader == nullptr || value_reader == nullptr) {
+ return Status::NotSupported(
+ "Current parquet MAP reader only supports scalar key/value for
column {}", _name);
+ }
+ if (key_reader->descriptor()->max_definition_level() != 1 ||
+ value_reader->descriptor()->max_definition_level() != 1) {
+ return Status::NotSupported(
+ "Current parquet MAP reader only supports required key/value
entries for column {}",
+ _name);
+ }
+
+ ::parquet::internal::RecordReader* key_record_reader = nullptr;
+ int64_t records_read = 0;
+ RETURN_IF_ERROR(read_records(*key_reader, rows, &key_record_reader,
&records_read));
+ const int64_t levels_written = key_record_reader->levels_written();
+ if (records_read != rows || levels_written < records_read) {
+ return Status::Corruption(
+ "Invalid parquet MAP key read result for column {}: rows={},
levels={}", _name,
+ records_read, levels_written);
+ }
+ if (key_record_reader->values_written() != levels_written) {
+ return Status::NotSupported(
+ "Current parquet MAP reader only supports non-empty maps with
required entries "
+ "for column {}",
+ _name);
+ }
+
+ const int16_t max_definition_level =
key_reader->descriptor()->max_definition_level();
+ if (auto* def_levels = key_record_reader->def_levels(); def_levels !=
nullptr) {
+ for (int64_t level_idx = 0; level_idx < levels_written; ++level_idx) {
+ if (def_levels[level_idx] != max_definition_level) {
+ return Status::NotSupported(
+ "Current parquet MAP reader only supports non-empty
maps with required "
+ "entries for column {}",
+ _name);
+ }
+ }
+ }
+
+ ::parquet::internal::RecordReader* value_record_reader = nullptr;
+ int64_t value_records_read = 0;
+ RETURN_IF_ERROR(
+ read_records(*value_reader, records_read, &value_record_reader,
&value_records_read));
+ if (value_records_read != records_read ||
+ value_record_reader->levels_written() != levels_written ||
+ value_record_reader->values_written() != levels_written) {
+ return Status::Corruption(
+ "Invalid parquet MAP value read result for column {}: rows={},
levels={}, "
+ "values={}, expected={}",
+ _name, value_records_read,
value_record_reader->levels_written(),
+ value_record_reader->values_written(), levels_written);
+ }
+ if (auto* def_levels = value_record_reader->def_levels(); def_levels !=
nullptr) {
+ for (int64_t level_idx = 0; level_idx < levels_written; ++level_idx) {
+ if (def_levels[level_idx] != max_definition_level) {
+ return Status::NotSupported(
+ "Current parquet MAP reader only supports non-empty
maps with required "
+ "entries for column {}",
+ _name);
+ }
+ }
+ }
+
+ const auto* key_rep_levels = key_record_reader->rep_levels();
+ const auto* value_rep_levels = value_record_reader->rep_levels();
+ if ((key_rep_levels == nullptr || value_rep_levels == nullptr) &&
levels_written > 0) {
+ return Status::Corruption(
+ "Parquet MAP reader returned null repetition levels for column
{}", _name);
+ }
+ for (int64_t level_idx = 0; level_idx < levels_written; ++level_idx) {
+ if (key_rep_levels[level_idx] != value_rep_levels[level_idx]) {
+ return Status::Corruption(
+ "Parquet MAP key/value repetition levels are not aligned
for column {}", _name);
+ }
+ }
+
+ auto& map_column = assert_cast<ColumnMap&>(*column);
+ auto key_column = map_column.get_keys_ptr()->assume_mutable();
+ RETURN_IF_ERROR(append_scalar_values(*key_reader, *key_record_reader,
levels_written, nullptr,
+ key_column));
+ map_column.get_keys_ptr() = std::move(key_column);
+
+ auto value_column = map_column.get_values_ptr()->assume_mutable();
+ RETURN_IF_ERROR(append_scalar_values(*value_reader, *value_record_reader,
levels_written,
+ nullptr, value_column));
+ map_column.get_values_ptr() = std::move(value_column);
+
+ auto& offsets = map_column.get_offsets();
+ offsets.reserve(offsets.size() + static_cast<size_t>(records_read));
+ size_t current_offset = offsets.empty() ? 0 : offsets.back();
+ int64_t current_record = 0;
+ for (int64_t level_idx = 0; level_idx < levels_written; ++level_idx) {
+ if (level_idx == 0 || key_rep_levels[level_idx] <
_repeated_repetition_level) {
+ if (level_idx != 0) {
+ offsets.push_back(current_offset);
+ current_record++;
+ }
+ }
+ current_offset++;
+ }
+ while (current_record < records_read) {
+ offsets.push_back(current_offset);
+ current_record++;
+ }
+ *rows_read = records_read;
+ return Status::OK();
+}
+
+Status MapColumnReader::skip(int64_t rows) {
+ if (rows <= 0) {
+ return Status::OK();
+ }
+ DORIS_CHECK(_key_reader != nullptr);
+ DORIS_CHECK(_value_reader != nullptr);
+ RETURN_IF_ERROR(_key_reader->skip(rows));
+ return _value_reader->skip(rows);
+}
+
Status ParquetColumnReader::skip(int64_t rows) {
return Status::NotSupported("Parquet column skip is not implemented,
rows={}", rows);
}
@@ -811,6 +970,35 @@ Status
ParquetColumnReaderFactory::create_list_column_reader(
return Status::OK();
}
+Status ParquetColumnReaderFactory::create_map_column_reader(
+ const ParquetColumnSchema& column_schema, const
reader::FieldProjection* projection,
+ std::unique_ptr<ParquetColumnReader>* reader) const {
+ if (reader == nullptr) {
+ return Status::InvalidArgument("reader is null");
+ }
+ if (projection != nullptr && !projection->project_all_children) {
+ return Status::NotSupported("Parquet MAP projection is not implemented
for column {}",
+ column_schema.name);
+ }
+ if (column_schema.type != nullptr && column_schema.type->is_nullable()) {
+ return Status::NotSupported("Nullable parquet MAP reader is not
implemented for column {}",
+ column_schema.name);
+ }
+ if (column_schema.children.size() != 1 ||
column_schema.children[0]->children.size() != 2) {
+ return Status::NotSupported("Unsupported parquet MAP layout for column
{}",
+ column_schema.name);
+ }
+ const auto& key_value_schema = *column_schema.children[0];
+ std::unique_ptr<ParquetColumnReader> key_reader;
+
RETURN_IF_ERROR(create_nested_scalar_column_reader(*key_value_schema.children[0],
&key_reader));
+ std::unique_ptr<ParquetColumnReader> value_reader;
+ RETURN_IF_ERROR(
+ create_nested_scalar_column_reader(*key_value_schema.children[1],
&value_reader));
+ *reader = std::make_unique<MapColumnReader>(column_schema,
column_schema.type,
+ std::move(key_reader),
std::move(value_reader));
+ return Status::OK();
+}
+
Status ParquetColumnReaderFactory::create(const ParquetColumnSchema&
column_schema,
const reader::FieldProjection*
projection,
std::unique_ptr<ParquetColumnReader>* reader) const {
@@ -825,8 +1013,7 @@ Status ParquetColumnReaderFactory::create(const
ParquetColumnSchema& column_sche
case ParquetColumnSchemaKind::LIST:
return create_list_column_reader(column_schema, projection, reader);
case ParquetColumnSchemaKind::MAP:
- return Status::NotSupported("Parquet MAP reader is not implemented for
column {}",
- column_schema.name);
+ return create_map_column_reader(column_schema, projection, reader);
}
return Status::NotSupported("Unsupported parquet column schema kind for
column {}",
column_schema.name);
diff --git a/be/src/format/new_parquet/column_reader.h
b/be/src/format/new_parquet/column_reader.h
index 80f7060fa31..62400d739ca 100644
--- a/be/src/format/new_parquet/column_reader.h
+++ b/be/src/format/new_parquet/column_reader.h
@@ -124,6 +124,10 @@ private:
const reader::FieldProjection* projection,
std::unique_ptr<ParquetColumnReader>*
reader) const;
+ Status create_map_column_reader(const ParquetColumnSchema& column_schema,
+ const reader::FieldProjection* projection,
+ std::unique_ptr<ParquetColumnReader>*
reader) const;
+
Status get_record_reader(int leaf_column_id, const
::parquet::ColumnDescriptor* descriptor,
const std::string& name,
std::shared_ptr<::parquet::internal::RecordReader>* reader) const;
diff --git a/be/src/format/new_parquet/parquet_column_schema.cpp
b/be/src/format/new_parquet/parquet_column_schema.cpp
index 8541769c1d2..cbca53c7f72 100644
--- a/be/src/format/new_parquet/parquet_column_schema.cpp
+++ b/be/src/format/new_parquet/parquet_column_schema.cpp
@@ -187,11 +187,44 @@ Status build_node_schema(const
::parquet::SchemaDescriptor& schema,
return Status::NotSupported("Unsupported parquet MAP encoding for
column {}",
node.name());
}
- std::unique_ptr<ParquetColumnSchema> key_value;
- RETURN_IF_ERROR(build_node_schema(
- schema, *group.field(0),
- child_context(context, *group.field(0), 0,
column_schema->schema_node_id),
- &key_value));
+ const auto& key_value_node = *group.field(0);
+ if (!key_value_node.is_repeated()) {
+ return Status::NotSupported("Unsupported parquet MAP encoding for
column {}",
+ node.name());
+ }
+ auto key_value_context =
+ child_context(context, key_value_node, 0,
column_schema->schema_node_id);
+ column_schema->repeated_repetition_level =
key_value_context.repeated_repetition_level;
+ if (key_value_node.is_primitive()) {
+ return Status::NotSupported("Unsupported parquet MAP key_value
layout for column {}",
+ node.name());
+ }
+ const auto& key_value_group =
+ static_cast<const
::parquet::schema::GroupNode&>(key_value_node);
+ if (key_value_group.field_count() != 2) {
+ return Status::NotSupported("Unsupported parquet MAP key_value
layout for column {}",
+ node.name());
+ }
+ auto key_value = std::make_unique<ParquetColumnSchema>();
+ inherit_common_schema_state(key_value_node, key_value_context,
key_value.get());
+ key_value->kind = ParquetColumnSchemaKind::STRUCT;
+ DataTypes child_types;
+ Strings child_names;
+ child_types.reserve(key_value_group.field_count());
+ child_names.reserve(key_value_group.field_count());
+ for (int child_idx = 0; child_idx < key_value_group.field_count();
++child_idx) {
+ std::unique_ptr<ParquetColumnSchema> child;
+ RETURN_IF_ERROR(build_node_schema(
+ schema, *key_value_group.field(child_idx),
+ child_context(key_value_context,
*key_value_group.field(child_idx), child_idx,
+ key_value->schema_node_id),
+ &child));
+ child_types.push_back(child->type);
+ child_names.push_back(child->name);
+ key_value->children.push_back(std::move(child));
+ }
+ key_value->type = std::make_shared<DataTypeStruct>(child_types,
child_names);
+ propagate_child_levels(key_value.get());
if (key_value->children.size() != 2) {
return Status::NotSupported("Unsupported parquet MAP key_value
layout for column {}",
node.name());
diff --git a/be/test/format/new_parquet/parquet_column_reader_test.cpp
b/be/test/format/new_parquet/parquet_column_reader_test.cpp
index b85bbb80a6a..4a187f4f8e0 100644
--- a/be/test/format/new_parquet/parquet_column_reader_test.cpp
+++ b/be/test/format/new_parquet/parquet_column_reader_test.cpp
@@ -31,12 +31,14 @@
#include "core/assert_cast.h"
#include "core/column/column_array.h"
#include "core/column/column_decimal.h"
+#include "core/column/column_map.h"
#include "core/column/column_nullable.h"
#include "core/column/column_string.h"
#include "core/column/column_struct.h"
#include "core/column/column_vector.h"
#include "core/data_type/data_type.h"
#include "core/data_type/data_type_array.h"
+#include "core/data_type/data_type_map.h"
#include "core/data_type/data_type_nullable.h"
#include "core/data_type/data_type_struct.h"
#include "core/types.h"
@@ -161,6 +163,26 @@ protected:
return finish_array(&builder);
}
+ std::shared_ptr<arrow::Array> build_required_int_string_map_array() {
+ auto key_builder = std::make_shared<arrow::Int32Builder>();
+ auto value_builder = std::make_shared<arrow::StringBuilder>();
+ auto map_type = arrow::map(arrow::int32(), arrow::field("value",
arrow::utf8(), false));
+ arrow::MapBuilder builder(arrow::default_memory_pool(), key_builder,
value_builder,
+ map_type);
+ const std::vector<std::vector<std::pair<int32_t, std::string>>> values
= {
+ {{1, "a"}, {2, "b"}}, {{3, "c"}}, {{4, "d"}, {5,
"e"}, {6, "f"}},
+ {{7, "g"}}, {{8, "h"}, {9, "i"}},
+ };
+ for (const auto& row : values) {
+ EXPECT_TRUE(builder.Append().ok());
+ for (const auto& [key, value] : row) {
+ EXPECT_TRUE(key_builder->Append(key).ok());
+ EXPECT_TRUE(value_builder->Append(value).ok());
+ }
+ }
+ return finish_array(&builder);
+ }
+
std::shared_ptr<arrow::Array> build_time32_array(const
std::shared_ptr<arrow::DataType>& type,
const
std::vector<int32_t>& values) {
arrow::Time32Builder builder(type, arrow::default_memory_pool());
@@ -394,16 +416,55 @@ protected:
TYPE_INT);
const auto& array_column = assert_cast<const
ColumnArray&>(column);
ASSERT_EQ(array_column.size(), ROW_COUNT);
- EXPECT_EQ(array_column.size_at(0), 2);
- EXPECT_EQ(array_column.size_at(1), 1);
- EXPECT_EQ(array_column.size_at(2), 3);
- EXPECT_EQ(array_column.size_at(4), 2);
+ const auto array_size_at = [&array_column](size_t
row_idx) {
+ return array_column.get_offsets()[row_idx] -
+ (row_idx == 0 ? 0 :
array_column.get_offsets()[row_idx - 1]);
+ };
+ EXPECT_EQ(array_size_at(0), 2);
+ EXPECT_EQ(array_size_at(1), 1);
+ EXPECT_EQ(array_size_at(2), 3);
+ EXPECT_EQ(array_size_at(4), 2);
const auto& values = assert_cast<const
ColumnInt32&>(array_column.get_data());
ASSERT_EQ(values.size(), 9);
EXPECT_EQ(values.get_element(0), 1);
EXPECT_EQ(values.get_element(5), 6);
EXPECT_EQ(values.get_element(8), 9);
});
+ add_field(arrow::field(
+ "map_int_string_col",
+ arrow::map(arrow::int32(), arrow::field("value",
arrow::utf8(), false)),
+ false),
+ build_required_int_string_map_array(),
+ [](const ParquetColumnSchema& schema, const IColumn& column)
{
+
EXPECT_EQ(remove_nullable(schema.type)->get_primitive_type(), TYPE_MAP);
+ const auto* map_type =
+ assert_cast<const
DataTypeMap*>(remove_nullable(schema.type).get());
+
EXPECT_EQ(remove_nullable(map_type->get_key_type())->get_primitive_type(),
+ TYPE_INT);
+
EXPECT_EQ(remove_nullable(map_type->get_value_type())->get_primitive_type(),
+ TYPE_STRING);
+ const auto& map_column = assert_cast<const
ColumnMap&>(column);
+ ASSERT_EQ(map_column.size(), ROW_COUNT);
+ const auto map_size_at = [&map_column](size_t row_idx) {
+ return map_column.get_offsets()[row_idx] -
+ (row_idx == 0 ? 0 :
map_column.get_offsets()[row_idx - 1]);
+ };
+ EXPECT_EQ(map_size_at(0), 2);
+ EXPECT_EQ(map_size_at(1), 1);
+ EXPECT_EQ(map_size_at(2), 3);
+ EXPECT_EQ(map_size_at(4), 2);
+ const auto& keys = assert_cast<const
ColumnInt32&>(map_column.get_keys());
+ const auto& values =
+ assert_cast<const
ColumnString&>(map_column.get_values());
+ ASSERT_EQ(keys.size(), 9);
+ ASSERT_EQ(values.size(), 9);
+ EXPECT_EQ(keys.get_element(0), 1);
+ EXPECT_EQ(keys.get_element(5), 6);
+ EXPECT_EQ(keys.get_element(8), 9);
+ EXPECT_EQ(values.get_data_at(0).to_string(), "a");
+ EXPECT_EQ(values.get_data_at(5).to_string(), "f");
+ EXPECT_EQ(values.get_data_at(8).to_string(), "i");
+ });
auto schema = arrow::schema(_arrow_fields);
auto table = arrow::Table::Make(schema, _arrays);
@@ -439,6 +500,16 @@ protected:
_expected_by_field[field_idx](*_fields[field_idx], *column);
}
+ size_t find_field_idx(const std::string& name) const {
+ for (size_t field_idx = 0; field_idx < _fields.size(); ++field_idx) {
+ if (_fields[field_idx]->name == name) {
+ return field_idx;
+ }
+ }
+ ADD_FAILURE() << "Cannot find parquet test field " << name;
+ return _fields.size();
+ }
+
std::filesystem::path _test_dir;
std::string _file_path;
std::unique_ptr<::parquet::ParquetFileReader> _file_reader;
@@ -496,7 +567,8 @@ TEST_F(ParquetColumnReaderTest,
SelectReadsOnlySelectedRanges) {
}
TEST_F(ParquetColumnReaderTest, ReadProjectedStructChildren) {
- const auto field_idx = _fields.size() - 1;
+ const auto field_idx = find_field_idx("struct_col");
+ ASSERT_LT(field_idx, _fields.size());
const auto& struct_schema = *_fields[field_idx];
ASSERT_EQ(struct_schema.name, "struct_col");
ASSERT_EQ(struct_schema.children.size(), 2);
@@ -533,7 +605,8 @@ TEST_F(ParquetColumnReaderTest,
ReadProjectedStructChildren) {
}
TEST_F(ParquetColumnReaderTest, BuildComplexSchemaPathMetadata) {
- const auto field_idx = _fields.size() - 1;
+ const auto field_idx = find_field_idx("struct_col");
+ ASSERT_LT(field_idx, _fields.size());
const auto& struct_schema = *_fields[field_idx];
ASSERT_EQ(struct_schema.name, "struct_col");
ASSERT_EQ(struct_schema.children.size(), 2);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]