This is an automated email from the ASF dual-hosted git repository.
suxiaogang223 pushed a commit to branch refact_reader_branch
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/refact_reader_branch by this
push:
new c3c9d3b28ba [feature](be) Support parquet struct scalar assembly
c3c9d3b28ba is described below
commit c3c9d3b28ba02688071f5cc0c065c775eb3353bd
Author: Socrates <[email protected]>
AuthorDate: Thu May 28 17:16:33 2026 +0800
[feature](be) Support parquet struct scalar assembly
### What problem does this PR solve?
Issue Number: close #xxx
Related PR: #xxx
Problem Summary: Support parquet STRUCT reading with scalar children
through definition-level assembly, including nullable parent struct handling
and projected struct child reads.
### Release note
None
### Check List (For Author)
- Test: Manual test
- Ran git diff --check locally.
- Ran BUILD_TYPE=DEBUG ./build.sh --be on Fedora.
- Behavior changed: Yes
- New parquet reader now supports nullable STRUCT columns with scalar
children and projected scalar struct children.
- Does this need documentation: No
---
be/src/format/new_parquet/column_reader.cpp | 133 +++++++++++++++++++--
.../new_parquet/parquet_column_reader_test.cpp | 113 +++++++++++++++++
2 files changed, 237 insertions(+), 9 deletions(-)
diff --git a/be/src/format/new_parquet/column_reader.cpp
b/be/src/format/new_parquet/column_reader.cpp
index 9d9ac98ced9..37d1efa322e 100644
--- a/be/src/format/new_parquet/column_reader.cpp
+++ b/be/src/format/new_parquet/column_reader.cpp
@@ -110,6 +110,7 @@ public:
StructColumnReader(const ParquetColumnSchema& schema, DataTypePtr type,
std::vector<std::unique_ptr<ParquetColumnReader>>
children)
: _field_id(schema.top_level_field_id),
+ _nullable_definition_level(schema.nullable_definition_level),
_type(std::move(type)),
_name(schema.name),
_children(std::move(children)) {}
@@ -124,6 +125,7 @@ public:
private:
int _field_id = -1;
+ int16_t _nullable_definition_level = 0;
DataTypePtr _type;
std::string _name;
std::vector<std::unique_ptr<ParquetColumnReader>> _children;
@@ -586,6 +588,13 @@ ColumnMap* map_column_from_output(MutableColumnPtr&
column) {
return assert_cast<ColumnMap*>(column.get());
}
+ColumnStruct* struct_column_from_output(MutableColumnPtr& column) {
+ if (auto* nullable_column = check_and_get_column<ColumnNullable>(*column))
{
+ return
assert_cast<ColumnStruct*>(&nullable_column->get_nested_column());
+ }
+ return assert_cast<ColumnStruct*>(column.get());
+}
+
NullMap* null_map_from_nullable_output(MutableColumnPtr& column) {
if (auto* nullable_column = check_and_get_column<ColumnNullable>(*column))
{
return &nullable_column->get_null_map_data();
@@ -732,14 +741,120 @@ Status StructColumnReader::read(int64_t rows,
MutableColumnPtr& column, int64_t*
return Status::OK();
}
+ auto* struct_column = struct_column_from_output(column);
+ DORIS_CHECK(struct_column != nullptr);
+ auto* parent_null_map = null_map_from_nullable_output(column);
+ DCHECK_EQ(struct_column->get_columns().size(), _children.size());
+
+ std::vector<ScalarColumnReader*> scalar_children;
+ scalar_children.reserve(_children.size());
+ bool all_scalar_children = true;
+ for (const auto& child_reader : _children) {
+ DORIS_CHECK(child_reader != nullptr);
+ auto* scalar_child =
dynamic_cast<ScalarColumnReader*>(child_reader.get());
+ if (scalar_child == nullptr) {
+ all_scalar_children = false;
+ break;
+ }
+ scalar_children.push_back(scalar_child);
+ }
+ if (all_scalar_children) {
+ std::vector<NestedScalarBatch> child_batches(scalar_children.size());
+ int64_t expected_rows = -1;
+ for (size_t child_idx = 0; child_idx < scalar_children.size();
++child_idx) {
+
RETURN_IF_ERROR(read_nested_scalar_batch(*scalar_children[child_idx], rows,
+
_nullable_definition_level,
+
&child_batches[child_idx]));
+ if (expected_rows < 0) {
+ expected_rows = child_batches[child_idx].records_read;
+ } else if (child_batches[child_idx].records_read != expected_rows)
{
+ return Status::Corruption(
+ "Parquet struct children returned different row counts
in column {}: {} "
+ "vs {}",
+ _name, expected_rows,
child_batches[child_idx].records_read);
+ }
+ if (child_batches[child_idx].levels_written !=
child_batches[child_idx].records_read) {
+ return Status::Corruption(
+ "Parquet struct child {} returned repeated levels in
column {}",
+ scalar_children[child_idx]->name(), _name);
+ }
+ }
+
+ if (expected_rows <= 0) {
+ *rows_read = 0;
+ return Status::OK();
+ }
+
+ std::vector<MutableColumnPtr> child_columns;
+ child_columns.reserve(scalar_children.size());
+ for (size_t child_idx = 0; child_idx < scalar_children.size();
++child_idx) {
+
child_columns.push_back(struct_column->get_column_ptr(child_idx)->assume_mutable());
+ }
+
+ NullMap parent_nulls;
+ parent_nulls.reserve(static_cast<size_t>(expected_rows));
+ for (int64_t row_idx = 0; row_idx < expected_rows; ++row_idx) {
+ const bool parent_is_null =
+ child_batches[0].def_levels[row_idx] <
_nullable_definition_level;
+ parent_nulls.push_back(parent_is_null);
+ for (size_t child_idx = 1; child_idx < child_batches.size();
++child_idx) {
+ const bool child_parent_is_null =
+ child_batches[child_idx].def_levels[row_idx] <
_nullable_definition_level;
+ if (child_parent_is_null != parent_is_null) {
+ return Status::Corruption(
+ "Parquet struct children returned different null
parent shape in "
+ "column {}",
+ _name);
+ }
+ }
+ for (size_t child_idx = 0; child_idx < scalar_children.size();
++child_idx) {
+ if (parent_is_null) {
+ child_columns[child_idx]->insert_default();
+ } else {
+ if (!scalar_children[child_idx]->type()->is_nullable() &&
+ child_batches[child_idx].def_levels[row_idx] !=
+
scalar_children[child_idx]->descriptor()->max_definition_level()) {
+ return Status::Corruption(
+ "Parquet STRUCT column {} contains null for
non-nullable child {}",
+ _name, scalar_children[child_idx]->name());
+ }
+
RETURN_IF_ERROR(append_scalar_batch_value(*scalar_children[child_idx],
+
child_batches[child_idx], row_idx,
+
child_columns[child_idx]));
+ }
+ }
+ }
+ for (size_t child_idx = 0; child_idx < child_columns.size();
++child_idx) {
+ struct_column->get_column_ptr(child_idx) =
std::move(child_columns[child_idx]);
+ }
+ if (parent_null_map == nullptr) {
+ for (const auto parent_is_null : parent_nulls) {
+ if (parent_is_null) {
+ return Status::Corruption(
+ "Parquet STRUCT column {} contains null for
non-nullable struct",
+ _name);
+ }
+ }
+ } else {
+ append_parent_nulls(parent_null_map, parent_nulls);
+ }
+ *rows_read = expected_rows;
+ return Status::OK();
+ }
+
+ if (parent_null_map != nullptr) {
+ return Status::NotSupported(
+ "Current parquet nullable STRUCT reader only supports scalar
children for column "
+ "{}",
+ _name);
+ }
+
int64_t expected_rows = -1;
size_t child_idx = 0;
- DCHECK_EQ(assert_cast<ColumnStruct&>(*column).get_columns().size(),
_children.size());
for (auto& child_reader : _children) {
DORIS_CHECK(child_reader != nullptr);
int64_t child_rows = 0;
- auto child_column =
-
assert_cast<ColumnStruct&>(*column).get_column_ptr(child_idx)->assume_mutable();
+ auto child_column =
struct_column->get_column_ptr(child_idx)->assume_mutable();
RETURN_IF_ERROR(child_reader->read(rows, child_column, &child_rows));
if (expected_rows < 0) {
expected_rows = child_rows;
@@ -748,6 +863,7 @@ Status StructColumnReader::read(int64_t rows,
MutableColumnPtr& column, int64_t*
"Parquet struct children returned different row counts in
column {}: {} vs {}",
_name, expected_rows, child_rows);
}
+ struct_column->get_column_ptr(child_idx) = std::move(child_column);
child_idx++;
}
@@ -1288,11 +1404,6 @@ Status
ParquetColumnReaderFactory::create_struct_column_reader(
if (reader == nullptr) {
return Status::InvalidArgument("reader is null");
}
- if (column_schema.type != nullptr && column_schema.type->is_nullable()) {
- return Status::NotSupported(
- "Nullable parquet STRUCT reader is not implemented for column
{}",
- column_schema.name);
- }
std::vector<std::unique_ptr<ParquetColumnReader>> child_readers;
child_readers.reserve(column_schema.children.size());
DataTypes projected_child_types;
@@ -1311,7 +1422,11 @@ Status
ParquetColumnReaderFactory::create_struct_column_reader(
child_projection = &*it;
}
std::unique_ptr<ParquetColumnReader> child_reader;
- RETURN_IF_ERROR(create(*child_schema, child_projection,
&child_reader));
+ if (child_schema->kind == ParquetColumnSchemaKind::PRIMITIVE) {
+ RETURN_IF_ERROR(create_nested_scalar_column_reader(*child_schema,
&child_reader));
+ } else {
+ RETURN_IF_ERROR(create(*child_schema, child_projection,
&child_reader));
+ }
projected_child_types.push_back(child_reader->type());
projected_child_names.push_back(child_reader->name());
child_readers.push_back(std::move(child_reader));
diff --git a/be/test/format/new_parquet/parquet_column_reader_test.cpp
b/be/test/format/new_parquet/parquet_column_reader_test.cpp
index 50aa801f4c7..059e9b709aa 100644
--- a/be/test/format/new_parquet/parquet_column_reader_test.cpp
+++ b/be/test/format/new_parquet/parquet_column_reader_test.cpp
@@ -148,6 +148,33 @@ protected:
return finish_array(&builder);
}
+ std::shared_ptr<arrow::Array> build_nullable_struct_array() {
+ auto struct_type = arrow::struct_(
+ {arrow::field("a", arrow::int32(), false), arrow::field("b",
arrow::utf8(), true)});
+ std::vector<std::shared_ptr<arrow::ArrayBuilder>> field_builders;
+ auto a_array_builder = std::make_unique<arrow::Int32Builder>();
+
field_builders.push_back(std::shared_ptr<arrow::ArrayBuilder>(std::move(a_array_builder)));
+ auto b_array_builder = std::make_unique<arrow::StringBuilder>();
+
field_builders.push_back(std::shared_ptr<arrow::ArrayBuilder>(std::move(b_array_builder)));
+ arrow::StructBuilder builder(struct_type, arrow::default_memory_pool(),
+ std::move(field_builders));
+ auto* a_builder =
assert_cast<arrow::Int32Builder*>(builder.field_builder(0));
+ auto* b_builder =
assert_cast<arrow::StringBuilder*>(builder.field_builder(1));
+
+ EXPECT_TRUE(builder.Append().ok());
+ EXPECT_TRUE(a_builder->Append(201).ok());
+ EXPECT_TRUE(b_builder->Append("nsa").ok());
+ EXPECT_TRUE(builder.AppendNull().ok());
+ EXPECT_TRUE(builder.Append().ok());
+ EXPECT_TRUE(a_builder->Append(203).ok());
+ EXPECT_TRUE(b_builder->AppendNull().ok());
+ EXPECT_TRUE(builder.Append().ok());
+ EXPECT_TRUE(a_builder->Append(204).ok());
+ EXPECT_TRUE(b_builder->Append("nsd").ok());
+ EXPECT_TRUE(builder.AppendNull().ok());
+ return finish_array(&builder);
+ }
+
std::shared_ptr<arrow::Array> build_required_int_list_array() {
auto value_builder = std::make_shared<arrow::Int32Builder>();
arrow::ListBuilder builder(arrow::default_memory_pool(),
value_builder);
@@ -480,6 +507,41 @@ protected:
EXPECT_EQ(b_values.get_data_at(1).to_string(), "sb");
EXPECT_EQ(b_values.get_data_at(4).to_string(), "se");
});
+ add_field(arrow::field("nullable_struct_col",
+ arrow::struct_({
+ arrow::field("a", arrow::int32(),
false),
+ arrow::field("b", arrow::utf8(), true),
+ }),
+ true),
+ build_nullable_struct_array(),
+ [](const ParquetColumnSchema& schema, const IColumn& column)
{
+ EXPECT_TRUE(schema.type->is_nullable());
+ const auto& nullable_column = assert_cast<const
ColumnNullable&>(column);
+ ASSERT_EQ(nullable_column.size(), ROW_COUNT);
+ EXPECT_FALSE(nullable_column.is_null_at(0));
+ EXPECT_TRUE(nullable_column.is_null_at(1));
+ EXPECT_FALSE(nullable_column.is_null_at(2));
+ EXPECT_FALSE(nullable_column.is_null_at(3));
+ EXPECT_TRUE(nullable_column.is_null_at(4));
+
+ const auto& struct_column =
+ assert_cast<const
ColumnStruct&>(nullable_column.get_nested_column());
+ ASSERT_EQ(struct_column.get_columns().size(), 2);
+ const auto& a_values =
+ assert_cast<const
ColumnInt32&>(struct_column.get_column(0));
+ const auto& b_values =
+ assert_cast<const
ColumnNullable&>(struct_column.get_column(1));
+ const auto& b_nested =
+ assert_cast<const
ColumnString&>(b_values.get_nested_column());
+ EXPECT_EQ(a_values.get_element(0), 201);
+ EXPECT_EQ(a_values.get_element(2), 203);
+ EXPECT_EQ(a_values.get_element(3), 204);
+ EXPECT_FALSE(b_values.is_null_at(0));
+ EXPECT_TRUE(b_values.is_null_at(2));
+ EXPECT_FALSE(b_values.is_null_at(3));
+ EXPECT_EQ(b_nested.get_data_at(0).to_string(), "nsa");
+ EXPECT_EQ(b_nested.get_data_at(3).to_string(), "nsd");
+ });
add_field(arrow::field("list_int_col",
arrow::list(arrow::field("element",
arrow::int32(), false)), false),
build_required_int_list_array(),
@@ -717,6 +779,7 @@ TEST_F(ParquetColumnReaderTest,
ReadAllSupportedPhysicalAndLogicalTypes) {
TEST_F(ParquetColumnReaderTest, ReadSupportedComplexTypes) {
read_and_validate(find_field_idx("struct_col"));
+ read_and_validate(find_field_idx("nullable_struct_col"));
read_and_validate(find_field_idx("list_int_col"));
read_and_validate(find_field_idx("nullable_list_int_col"));
read_and_validate(find_field_idx("required_nullable_list_int_col"));
@@ -798,6 +861,56 @@ TEST_F(ParquetColumnReaderTest,
ReadProjectedStructChildren) {
EXPECT_EQ(values.get_data_at(4).to_string(), "se");
}
+TEST_F(ParquetColumnReaderTest, ReadProjectedNullableStructChildren) {
+ const auto field_idx = find_field_idx("nullable_struct_col");
+ ASSERT_LT(field_idx, _fields.size());
+ const auto& struct_schema = *_fields[field_idx];
+ ASSERT_EQ(struct_schema.name, "nullable_struct_col");
+ ASSERT_EQ(struct_schema.children.size(), 2);
+
+ reader::FieldProjection projection;
+ projection.file_column_id = struct_schema.top_level_field_id;
+ projection.file_path = struct_schema.file_path;
+ projection.project_all_children = false;
+ reader::FieldProjection child_projection;
+ child_projection.file_column_id = struct_schema.top_level_field_id;
+ child_projection.file_path = struct_schema.children[1]->file_path;
+ projection.children.push_back(std::move(child_projection));
+
+ ParquetColumnReaderFactory factory(_row_group,
_file_reader->metadata()->num_columns());
+ std::unique_ptr<ParquetColumnReader> reader;
+ auto st = factory.create(struct_schema, &projection, &reader);
+ ASSERT_TRUE(st.ok()) << st;
+ ASSERT_TRUE(reader->type()->is_nullable());
+ ASSERT_EQ(remove_nullable(reader->type())->get_primitive_type(),
TYPE_STRUCT);
+ const auto* projected_type =
+ assert_cast<const
DataTypeStruct*>(remove_nullable(reader->type()).get());
+ ASSERT_EQ(projected_type->get_elements().size(), 1);
+ EXPECT_EQ(projected_type->get_element_name(0), "b");
+
+ MutableColumnPtr column = reader->type()->create_column();
+ int64_t rows_read = 0;
+ st = reader->read(ROW_COUNT, column, &rows_read);
+ ASSERT_TRUE(st.ok()) << st;
+ ASSERT_EQ(rows_read, ROW_COUNT);
+ const auto& nullable_column = assert_cast<const ColumnNullable&>(*column);
+ EXPECT_FALSE(nullable_column.is_null_at(0));
+ EXPECT_TRUE(nullable_column.is_null_at(1));
+ EXPECT_FALSE(nullable_column.is_null_at(2));
+ EXPECT_FALSE(nullable_column.is_null_at(3));
+ EXPECT_TRUE(nullable_column.is_null_at(4));
+ const auto& struct_column =
+ assert_cast<const
ColumnStruct&>(nullable_column.get_nested_column());
+ ASSERT_EQ(struct_column.get_columns().size(), 1);
+ const auto& values = assert_cast<const
ColumnNullable&>(struct_column.get_column(0));
+ const auto& nested_values = assert_cast<const
ColumnString&>(values.get_nested_column());
+ EXPECT_FALSE(values.is_null_at(0));
+ EXPECT_TRUE(values.is_null_at(2));
+ EXPECT_FALSE(values.is_null_at(3));
+ EXPECT_EQ(nested_values.get_data_at(0).to_string(), "nsa");
+ EXPECT_EQ(nested_values.get_data_at(3).to_string(), "nsd");
+}
+
TEST_F(ParquetColumnReaderTest, ReadListWithOverflowAcrossChunks) {
const auto field_idx = find_field_idx("nullable_list_int_col");
auto reader = create_reader(field_idx);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]