This is an automated email from the ASF dual-hosted git repository.
suxiaogang223 pushed a commit to branch refact_reader_branch
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/refact_reader_branch by this
push:
new b8a40bd22a1 [feature](be) Support parquet repeated level assembly
b8a40bd22a1 is described below
commit b8a40bd22a1bc56ddce95b11795f05c81234e077
Author: Socrates <[email protected]>
AuthorDate: Thu May 28 15:48:59 2026 +0800
[feature](be) Support parquet repeated level assembly
### What problem does this PR solve?
Issue Number: close #xxx
Related PR: #xxx
Problem Summary: Add shared LIST/MAP level assembly for scalar nested
Parquet children, including null parent rows, empty collections, nullable
element/value slots, overflow buffering, and parent-row skip/select semantics.
### Release note
None
### Check List (For Author)
- Test: Manual test
- Ran git diff --check. BE DEBUG build will be run on Fedora after push.
- Behavior changed: Yes. New Parquet reader can read LIST/MAP scalar
children with null/empty/nullable-child cases.
- Does this need documentation: No
---
be/src/format/new_parquet/column_reader.cpp | 689 ++++++++++++++++-----
.../new_parquet/parquet_column_reader_test.cpp | 322 ++++++++++
2 files changed, 854 insertions(+), 157 deletions(-)
diff --git a/be/src/format/new_parquet/column_reader.cpp
b/be/src/format/new_parquet/column_reader.cpp
index 4d4545f5d80..b4cc00caa98 100644
--- a/be/src/format/new_parquet/column_reader.cpp
+++ b/be/src/format/new_parquet/column_reader.cpp
@@ -33,6 +33,7 @@
#include "core/column/column.h"
#include "core/column/column_array.h"
#include "core/column/column_map.h"
+#include "core/column/column_nullable.h"
#include "core/column/column_struct.h"
#include "core/column/column_vector.h"
#include "core/data_type/data_type_array.h"
@@ -46,6 +47,27 @@
namespace doris::parquet {
namespace {
+constexpr int64_t NESTED_READ_BATCH_ROWS = 4096;
+
+struct NestedScalarBatch {
+ int64_t records_read = 0;
+ int64_t levels_written = 0;
+ int64_t values_written = 0;
+ std::vector<int16_t> def_levels;
+ std::vector<int16_t> rep_levels;
+ std::vector<int64_t> value_indices;
+ MutableColumnPtr values_column;
+
+ bool empty() const { return levels_written == 0; }
+};
+
+struct NestedScalarOverflow {
+ NestedScalarBatch batch;
+
+ bool empty() const { return batch.empty(); }
+ void clear() { batch = NestedScalarBatch(); }
+};
+
class ScalarColumnReader final : public ParquetColumnReader {
public:
ScalarColumnReader(int parquet_leaf_column_id, const
::parquet::ColumnDescriptor* descriptor,
@@ -112,6 +134,7 @@ public:
ListColumnReader(const ParquetColumnSchema& schema, DataTypePtr type,
std::unique_ptr<ParquetColumnReader> element_reader)
: _field_id(schema.top_level_field_id),
+ _nullable_definition_level(schema.nullable_definition_level),
_repeated_repetition_level(schema.repeated_repetition_level),
_type(std::move(type)),
_name(schema.name),
@@ -127,10 +150,12 @@ public:
private:
int _field_id = -1;
+ int16_t _nullable_definition_level = 0;
int16_t _repeated_repetition_level = 0;
DataTypePtr _type;
std::string _name;
std::unique_ptr<ParquetColumnReader> _element_reader;
+ NestedScalarOverflow _element_overflow;
};
class RowPositionColumnReader final : public ParquetColumnReader {
@@ -185,6 +210,7 @@ public:
std::unique_ptr<ParquetColumnReader> key_reader,
std::unique_ptr<ParquetColumnReader> value_reader)
: _field_id(schema.top_level_field_id),
+ _nullable_definition_level(schema.nullable_definition_level),
_repeated_repetition_level(schema.repeated_repetition_level),
_type(std::move(type)),
_name(schema.name),
@@ -201,11 +227,14 @@ public:
private:
int _field_id = -1;
+ int16_t _nullable_definition_level = 0;
int16_t _repeated_repetition_level = 0;
DataTypePtr _type;
std::string _name;
std::unique_ptr<ParquetColumnReader> _key_reader;
std::unique_ptr<ParquetColumnReader> _value_reader;
+ NestedScalarOverflow _key_overflow;
+ NestedScalarOverflow _value_overflow;
};
Status read_records(ScalarColumnReader& column_reader, int64_t batch_rows,
@@ -410,6 +439,231 @@ Status append_scalar_values(const ScalarColumnReader&
column_reader,
return Status::OK();
}
+Status read_nested_scalar_batch(ScalarColumnReader& column_reader, int64_t
batch_rows,
+ int16_t value_slot_definition_level,
NestedScalarBatch* batch) {
+ if (batch == nullptr) {
+ return Status::InvalidArgument("Nested scalar batch is null for column
{}",
+ column_reader.name());
+ }
+ *batch = NestedScalarBatch();
+
+ ::parquet::internal::RecordReader* record_reader = nullptr;
+ RETURN_IF_ERROR(read_records(column_reader, batch_rows, &record_reader,
&batch->records_read));
+ if (column_reader.type()->is_nullable() &&
record_reader->read_dense_for_nullable()) {
+ return Status::NotSupported(
+ "Dense nullable parquet nested reader is not supported for
column {}",
+ column_reader.name());
+ }
+ batch->levels_written = record_reader->levels_written();
+ batch->values_written = record_reader->values_written();
+ if (batch->levels_written < batch->records_read || batch->values_written <
0 ||
+ batch->values_written > batch->levels_written) {
+ return Status::Corruption(
+ "Invalid nested parquet read result for column {}: rows={},
levels={}, values={}",
+ column_reader.name(), batch->records_read,
batch->levels_written,
+ batch->values_written);
+ }
+ if (batch->levels_written == 0) {
+ return Status::OK();
+ }
+
+ auto* def_levels = record_reader->def_levels();
+ if (def_levels == nullptr &&
column_reader.descriptor()->max_definition_level() > 0) {
+ return Status::Corruption(
+ "Nested parquet reader returned null definition levels for
column {}",
+ column_reader.name());
+ }
+ batch->def_levels.resize(static_cast<size_t>(batch->levels_written));
+ if (def_levels == nullptr) {
+ std::fill(batch->def_levels.begin(), batch->def_levels.end(),
+ column_reader.descriptor()->max_definition_level());
+ } else {
+ std::copy(def_levels, def_levels + batch->levels_written,
batch->def_levels.begin());
+ }
+
+ auto* rep_levels = record_reader->rep_levels();
+ if (rep_levels == nullptr &&
column_reader.descriptor()->max_repetition_level() > 0) {
+ return Status::Corruption(
+ "Nested parquet reader returned null repetition levels for
column {}",
+ column_reader.name());
+ }
+ batch->rep_levels.resize(static_cast<size_t>(batch->levels_written));
+ if (rep_levels == nullptr) {
+ std::fill(batch->rep_levels.begin(), batch->rep_levels.end(), 0);
+ } else {
+ std::copy(rep_levels, rep_levels + batch->levels_written,
batch->rep_levels.begin());
+ }
+
+ batch->value_indices.resize(static_cast<size_t>(batch->levels_written),
-1);
+ int64_t value_idx = 0;
+ const int16_t max_definition_level =
column_reader.descriptor()->max_definition_level();
+ NullMap value_null_map;
+ for (int64_t level_idx = 0; level_idx < batch->levels_written;
++level_idx) {
+ if (batch->def_levels[level_idx] >= value_slot_definition_level) {
+ if (value_idx >= batch->values_written) {
+ return Status::Corruption(
+ "Nested parquet reader returned fewer values than
definition levels for "
+ "column {}",
+ column_reader.name());
+ }
+ batch->value_indices[level_idx] = value_idx++;
+ if (column_reader.type()->is_nullable()) {
+ value_null_map.push_back(batch->def_levels[level_idx] !=
max_definition_level);
+ }
+ }
+ }
+ if (value_idx != batch->values_written) {
+ return Status::Corruption(
+ "Nested parquet reader returned extra values for column {}:
consumed={}, values={}",
+ column_reader.name(), value_idx, batch->values_written);
+ }
+ if (column_reader.type()->is_nullable() &&
+ value_null_map.size() != static_cast<size_t>(batch->values_written)) {
+ return Status::Corruption("Invalid nested parquet null map for column
{}",
+ column_reader.name());
+ }
+
+ batch->values_column = column_reader.type()->create_column();
+ if (batch->values_written > 0) {
+ const NullMap* null_map = value_null_map.empty() ? nullptr :
&value_null_map;
+ RETURN_IF_ERROR(append_scalar_values(column_reader, *record_reader,
batch->values_written,
+ null_map, batch->values_column));
+ }
+ return Status::OK();
+}
+
+void move_nested_scalar_tail(const NestedScalarBatch& src, int64_t start_level,
+ NestedScalarOverflow* overflow) {
+ DORIS_CHECK(overflow != nullptr);
+ if (start_level >= src.levels_written) {
+ overflow->clear();
+ return;
+ }
+
+ NestedScalarBatch dst;
+ dst.records_read = 0;
+ dst.levels_written = src.levels_written - start_level;
+ dst.def_levels.assign(src.def_levels.begin() + start_level,
src.def_levels.end());
+ dst.rep_levels.assign(src.rep_levels.begin() + start_level,
src.rep_levels.end());
+ dst.value_indices.resize(static_cast<size_t>(dst.levels_written), -1);
+ dst.values_column = src.values_column->clone_empty();
+
+ for (int64_t level_idx = start_level; level_idx < src.levels_written;
++level_idx) {
+ const int64_t value_idx = src.value_indices[level_idx];
+ if (value_idx < 0) {
+ continue;
+ }
+ dst.value_indices[static_cast<size_t>(level_idx - start_level)] =
dst.values_written;
+ dst.values_column->insert_from(*src.values_column,
static_cast<size_t>(value_idx));
+ dst.values_written++;
+ }
+ overflow->batch = std::move(dst);
+}
+
+Status append_scalar_batch_value(const ScalarColumnReader& column_reader,
+ const NestedScalarBatch& batch, int64_t
level_idx,
+ MutableColumnPtr& column) {
+ const int64_t value_idx = batch.value_indices[level_idx];
+ if (value_idx < 0) {
+ return Status::Corruption("Nested parquet value is absent for column
{}",
+ column_reader.name());
+ }
+ column->insert_from(*batch.values_column, static_cast<size_t>(value_idx));
+ return Status::OK();
+}
+
+ColumnArray* array_column_from_output(MutableColumnPtr& column) {
+ if (auto* nullable_column = check_and_get_column<ColumnNullable>(*column))
{
+ return
assert_cast<ColumnArray*>(&nullable_column->get_nested_column());
+ }
+ return assert_cast<ColumnArray*>(column.get());
+}
+
+ColumnMap* map_column_from_output(MutableColumnPtr& column) {
+ if (auto* nullable_column = check_and_get_column<ColumnNullable>(*column))
{
+ return assert_cast<ColumnMap*>(&nullable_column->get_nested_column());
+ }
+ return assert_cast<ColumnMap*>(column.get());
+}
+
+NullMap* null_map_from_nullable_output(MutableColumnPtr& column) {
+ if (auto* nullable_column = check_and_get_column<ColumnNullable>(*column))
{
+ return &nullable_column->get_null_map_data();
+ }
+ return nullptr;
+}
+
+void append_offsets(ColumnArray::Offsets64& offsets, const
std::vector<uint64_t>& entry_counts) {
+ offsets.reserve(offsets.size() + entry_counts.size());
+ uint64_t current_offset = offsets.empty() ? 0 : offsets.back();
+ for (const auto entry_count : entry_counts) {
+ current_offset += entry_count;
+ offsets.push_back(current_offset);
+ }
+}
+
+void append_parent_nulls(NullMap* dst, const NullMap& src) {
+ if (dst == nullptr) {
+ return;
+ }
+ dst->insert(src.begin(), src.end());
+}
+
+template <typename Sink>
+Status assemble_repeated_levels(ScalarColumnReader& driver_reader, int16_t
repeated_level,
+ int16_t value_slot_definition_level, int64_t
rows,
+ NestedScalarOverflow* overflow, Sink& sink,
int64_t* rows_read) {
+ if (overflow == nullptr || rows_read == nullptr) {
+ return Status::InvalidArgument("Invalid repeated level assembler
arguments for column {}",
+ driver_reader.name());
+ }
+ *rows_read = 0;
+ while (*rows_read < rows) {
+ NestedScalarBatch read_batch;
+ NestedScalarBatch* batch = nullptr;
+ bool from_overflow = false;
+ if (!overflow->empty()) {
+ batch = &overflow->batch;
+ from_overflow = true;
+ } else {
+ const int64_t batch_rows = std::max<int64_t>(rows - *rows_read,
NESTED_READ_BATCH_ROWS);
+ RETURN_IF_ERROR(read_nested_scalar_batch(driver_reader, batch_rows,
+
value_slot_definition_level, &read_batch));
+ if (read_batch.empty()) {
+ break;
+ }
+ batch = &read_batch;
+ }
+ RETURN_IF_ERROR(sink.start_batch(*batch));
+
+ int64_t level_idx = 0;
+ while (level_idx < batch->levels_written) {
+ const bool starts_parent = batch->rep_levels[level_idx] <
repeated_level;
+ if (starts_parent && *rows_read >= rows) {
+ move_nested_scalar_tail(*batch, level_idx, overflow);
+ return Status::OK();
+ }
+ if (starts_parent) {
+ RETURN_IF_ERROR(sink.start_parent(*batch, level_idx));
+ ++*rows_read;
+ } else {
+ if (*rows_read == 0) {
+ return Status::Corruption(
+ "Repeated parquet stream starts with repeated
level for column {}",
+ driver_reader.name());
+ }
+ RETURN_IF_ERROR(sink.append_repeated(*batch, level_idx));
+ }
+ ++level_idx;
+ }
+
+ if (from_overflow) {
+ overflow->clear();
+ }
+ }
+ return Status::OK();
+}
+
} // namespace
Status ScalarColumnReader::read(int64_t rows, MutableColumnPtr& column,
int64_t* rows_read) {
@@ -525,67 +779,80 @@ Status ListColumnReader::read(int64_t rows,
MutableColumnPtr& column, int64_t* r
return Status::NotSupported(
"Current parquet LIST reader only supports scalar elements for
column {}", _name);
}
- if (element_reader->descriptor()->max_definition_level() != 1) {
- return Status::NotSupported(
- "Current parquet LIST reader only supports required elements
for column {}", _name);
- }
-
- ::parquet::internal::RecordReader* record_reader = nullptr;
- int64_t records_read = 0;
- RETURN_IF_ERROR(read_records(*element_reader, rows, &record_reader,
&records_read));
- const int64_t levels_written = record_reader->levels_written();
- if (records_read != rows || levels_written < records_read) {
- return Status::Corruption(
- "Invalid parquet LIST read result for column {}: rows={},
levels={}", _name,
- records_read, levels_written);
- }
- if (record_reader->values_written() != levels_written) {
- return Status::NotSupported(
- "Current parquet LIST reader only supports non-empty lists
with required "
- "elements for column {}",
- _name);
- }
- const int16_t max_definition_level =
element_reader->descriptor()->max_definition_level();
- if (auto* def_levels = record_reader->def_levels(); def_levels != nullptr)
{
- for (int64_t level_idx = 0; level_idx < levels_written; ++level_idx) {
- if (def_levels[level_idx] != max_definition_level) {
- return Status::NotSupported(
- "Current parquet LIST reader only supports non-empty
lists with required "
- "elements for column {}",
- _name);
+ auto* array_column = array_column_from_output(column);
+ DORIS_CHECK(array_column != nullptr);
+ auto* parent_null_map = null_map_from_nullable_output(column);
+ auto nested_column = array_column->get_data_ptr()->assume_mutable();
+ std::vector<uint64_t> entry_counts;
+ NullMap parent_nulls;
+ const int16_t element_slot_definition_level = _nullable_definition_level +
1;
+ const int16_t element_max_definition_level =
+ element_reader->descriptor()->max_definition_level();
+
+ struct ListSink {
+ ListColumnReader* self = nullptr;
+ ScalarColumnReader* element_reader = nullptr;
+ MutableColumnPtr* nested_column = nullptr;
+ std::vector<uint64_t>* entry_counts = nullptr;
+ NullMap* parent_nulls = nullptr;
+ int16_t element_max_definition_level = 0;
+
+ Status start_batch(const NestedScalarBatch&) { return Status::OK(); }
+
+ Status start_parent(const NestedScalarBatch& batch, int64_t level_idx)
{
+ const int16_t def_level = batch.def_levels[level_idx];
+ if (def_level < self->_nullable_definition_level) {
+ if (!self->_type->is_nullable()) {
+ return Status::Corruption(
+ "Parquet LIST column {} contains null for
non-nullable list",
+ self->_name);
+ }
+ entry_counts->push_back(0);
+ parent_nulls->push_back(1);
+ return Status::OK();
+ }
+ entry_counts->push_back(0);
+ parent_nulls->push_back(0);
+ if (def_level == self->_nullable_definition_level) {
+ return Status::OK();
}
+ return append_element(batch, level_idx);
}
- }
- auto& array_column = assert_cast<ColumnArray&>(*column);
- auto nested_column = array_column.get_data_ptr()->assume_mutable();
- RETURN_IF_ERROR(append_scalar_values(*element_reader, *record_reader,
levels_written, nullptr,
- nested_column));
- array_column.get_data_ptr() = std::move(nested_column);
+ Status append_repeated(const NestedScalarBatch& batch, int64_t
level_idx) {
+ if (entry_counts->empty()) {
+ return Status::Corruption("Invalid repeated LIST level for
column {}", self->_name);
+ }
+ return append_element(batch, level_idx);
+ }
- auto* rep_levels = record_reader->rep_levels();
- if (rep_levels == nullptr && levels_written > 0) {
- return Status::Corruption(
- "Parquet LIST reader returned null repetition levels for
column {}", _name);
- }
- auto& offsets = array_column.get_offsets();
- offsets.reserve(offsets.size() + static_cast<size_t>(records_read));
- size_t current_offset = offsets.empty() ? 0 : offsets.back();
- int64_t current_record = 0;
- for (int64_t level_idx = 0; level_idx < levels_written; ++level_idx) {
- if (level_idx == 0 || rep_levels[level_idx] <
_repeated_repetition_level) {
- if (level_idx != 0) {
- offsets.push_back(current_offset);
- current_record++;
+ Status append_element(const NestedScalarBatch& batch, int64_t
level_idx) {
+ const int16_t def_level = batch.def_levels[level_idx];
+ if (def_level == element_max_definition_level) {
+ RETURN_IF_ERROR(append_scalar_batch_value(*element_reader,
batch, level_idx,
+ *nested_column));
+ } else {
+ if (!element_reader->type()->is_nullable()) {
+ return Status::Corruption(
+ "Parquet LIST column {} contains null for
non-nullable element",
+ self->_name);
+ }
+ (*nested_column)->insert_default();
}
+ ++entry_counts->back();
+ return Status::OK();
}
- current_offset++;
- }
- while (current_record < records_read) {
- offsets.push_back(current_offset);
- current_record++;
- }
- *rows_read = records_read;
+ };
+
+ ListSink sink {this, element_reader, &nested_column,
+ &entry_counts, &parent_nulls,
element_max_definition_level};
+ RETURN_IF_ERROR(assemble_repeated_levels(*element_reader,
_repeated_repetition_level,
+ element_slot_definition_level,
rows,
+ &_element_overflow, sink,
rows_read));
+
+ array_column->get_data_ptr() = std::move(nested_column);
+ append_offsets(array_column->get_offsets(), entry_counts);
+ append_parent_nulls(parent_null_map, parent_nulls);
return Status::OK();
}
@@ -593,8 +860,26 @@ Status ListColumnReader::skip(int64_t rows) {
if (rows <= 0) {
return Status::OK();
}
- DORIS_CHECK(_element_reader != nullptr);
- return _element_reader->skip(rows);
+ auto* element_reader =
dynamic_cast<ScalarColumnReader*>(_element_reader.get());
+ if (element_reader == nullptr) {
+ return Status::NotSupported(
+ "Current parquet LIST reader only supports scalar elements for
column {}", _name);
+ }
+ struct SkipSink {
+ Status start_batch(const NestedScalarBatch&) { return Status::OK(); }
+ Status start_parent(const NestedScalarBatch&, int64_t) { return
Status::OK(); }
+ Status append_repeated(const NestedScalarBatch&, int64_t) { return
Status::OK(); }
+ };
+ SkipSink sink;
+ int64_t rows_read = 0;
+ RETURN_IF_ERROR(assemble_repeated_levels(*element_reader,
_repeated_repetition_level,
+ _nullable_definition_level + 1,
rows,
+ &_element_overflow, sink,
&rows_read));
+ if (rows_read != rows) {
+ return Status::Corruption("Failed to skip parquet LIST column {}:
skipped {} of {} rows",
+ _name, rows_read, rows);
+ }
+ return Status::OK();
}
Status MapColumnReader::read(int64_t rows, MutableColumnPtr& column, int64_t*
rows_read) {
@@ -612,107 +897,140 @@ Status MapColumnReader::read(int64_t rows,
MutableColumnPtr& column, int64_t* ro
return Status::NotSupported(
"Current parquet MAP reader only supports scalar key/value for
column {}", _name);
}
- if (key_reader->descriptor()->max_definition_level() != 1 ||
- value_reader->descriptor()->max_definition_level() != 1) {
- return Status::NotSupported(
- "Current parquet MAP reader only supports required key/value
entries for column {}",
- _name);
- }
- ::parquet::internal::RecordReader* key_record_reader = nullptr;
- int64_t records_read = 0;
- RETURN_IF_ERROR(read_records(*key_reader, rows, &key_record_reader,
&records_read));
- const int64_t levels_written = key_record_reader->levels_written();
- if (records_read != rows || levels_written < records_read) {
- return Status::Corruption(
- "Invalid parquet MAP key read result for column {}: rows={},
levels={}", _name,
- records_read, levels_written);
- }
- if (key_record_reader->values_written() != levels_written) {
- return Status::NotSupported(
- "Current parquet MAP reader only supports non-empty maps with
required entries "
- "for column {}",
- _name);
- }
-
- const int16_t max_definition_level =
key_reader->descriptor()->max_definition_level();
- if (auto* def_levels = key_record_reader->def_levels(); def_levels !=
nullptr) {
- for (int64_t level_idx = 0; level_idx < levels_written; ++level_idx) {
- if (def_levels[level_idx] != max_definition_level) {
- return Status::NotSupported(
- "Current parquet MAP reader only supports non-empty
maps with required "
- "entries for column {}",
- _name);
+ auto* map_column = map_column_from_output(column);
+ DORIS_CHECK(map_column != nullptr);
+ auto* parent_null_map = null_map_from_nullable_output(column);
+ auto key_column = map_column->get_keys_ptr()->assume_mutable();
+ auto value_column = map_column->get_values_ptr()->assume_mutable();
+ std::vector<uint64_t> entry_counts;
+ NullMap parent_nulls;
+ const int16_t entry_definition_level = _nullable_definition_level + 1;
+ const int16_t key_max_definition_level =
key_reader->descriptor()->max_definition_level();
+ const int16_t value_max_definition_level =
value_reader->descriptor()->max_definition_level();
+
+ struct MapSink {
+ MapColumnReader* self = nullptr;
+ ScalarColumnReader* key_reader = nullptr;
+ ScalarColumnReader* value_reader = nullptr;
+ MutableColumnPtr* key_column = nullptr;
+ MutableColumnPtr* value_column = nullptr;
+ std::vector<uint64_t>* entry_counts = nullptr;
+ NullMap* parent_nulls = nullptr;
+ int16_t key_max_definition_level = 0;
+ int16_t value_max_definition_level = 0;
+
+ Status read_value_batch(int64_t batch_rows, NestedScalarBatch*
value_batch) {
+ if (!self->_value_overflow.empty()) {
+ *value_batch = std::move(self->_value_overflow.batch);
+ self->_value_overflow.clear();
+ return Status::OK();
}
+ return read_nested_scalar_batch(*value_reader, batch_rows,
+ self->_nullable_definition_level +
1, value_batch);
}
- }
- ::parquet::internal::RecordReader* value_record_reader = nullptr;
- int64_t value_records_read = 0;
- RETURN_IF_ERROR(
- read_records(*value_reader, records_read, &value_record_reader,
&value_records_read));
- if (value_records_read != records_read ||
- value_record_reader->levels_written() != levels_written ||
- value_record_reader->values_written() != levels_written) {
- return Status::Corruption(
- "Invalid parquet MAP value read result for column {}: rows={},
levels={}, "
- "values={}, expected={}",
- _name, value_records_read,
value_record_reader->levels_written(),
- value_record_reader->values_written(), levels_written);
- }
- if (auto* def_levels = value_record_reader->def_levels(); def_levels !=
nullptr) {
- for (int64_t level_idx = 0; level_idx < levels_written; ++level_idx) {
- if (def_levels[level_idx] != max_definition_level) {
- return Status::NotSupported(
- "Current parquet MAP reader only supports non-empty
maps with required "
- "entries for column {}",
- _name);
+ Status validate_value_alignment(const NestedScalarBatch& key_batch,
+ const NestedScalarBatch& value_batch) {
+ if (value_batch.records_read != key_batch.records_read ||
+ value_batch.levels_written != key_batch.levels_written) {
+ return Status::Corruption(
+ "Parquet MAP key/value levels are not aligned for
column {}: key rows={}, "
+ "key levels={}, value rows={}, value levels={}",
+ self->_name, key_batch.records_read,
key_batch.levels_written,
+ value_batch.records_read, value_batch.levels_written);
+ }
+ for (int64_t level_idx = 0; level_idx < key_batch.levels_written;
++level_idx) {
+ if (value_batch.rep_levels[level_idx] !=
key_batch.rep_levels[level_idx]) {
+ return Status::Corruption(
+ "Parquet MAP key/value repetition levels are not
aligned for column {}",
+ self->_name);
+ }
}
+ return Status::OK();
}
- }
- const auto* key_rep_levels = key_record_reader->rep_levels();
- const auto* value_rep_levels = value_record_reader->rep_levels();
- if ((key_rep_levels == nullptr || value_rep_levels == nullptr) &&
levels_written > 0) {
- return Status::Corruption(
- "Parquet MAP reader returned null repetition levels for column
{}", _name);
- }
- for (int64_t level_idx = 0; level_idx < levels_written; ++level_idx) {
- if (key_rep_levels[level_idx] != value_rep_levels[level_idx]) {
- return Status::Corruption(
- "Parquet MAP key/value repetition levels are not aligned
for column {}", _name);
+ Status start_batch(const NestedScalarBatch& key_batch) {
+ RETURN_IF_ERROR(read_value_batch(key_batch.records_read,
&value_batch));
+ RETURN_IF_ERROR(validate_value_alignment(key_batch, value_batch));
+ return Status::OK();
}
- }
- auto& map_column = assert_cast<ColumnMap&>(*column);
- auto key_column = map_column.get_keys_ptr()->assume_mutable();
- RETURN_IF_ERROR(append_scalar_values(*key_reader, *key_record_reader,
levels_written, nullptr,
- key_column));
- map_column.get_keys_ptr() = std::move(key_column);
-
- auto value_column = map_column.get_values_ptr()->assume_mutable();
- RETURN_IF_ERROR(append_scalar_values(*value_reader, *value_record_reader,
levels_written,
- nullptr, value_column));
- map_column.get_values_ptr() = std::move(value_column);
-
- auto& offsets = map_column.get_offsets();
- offsets.reserve(offsets.size() + static_cast<size_t>(records_read));
- size_t current_offset = offsets.empty() ? 0 : offsets.back();
- int64_t current_record = 0;
- for (int64_t level_idx = 0; level_idx < levels_written; ++level_idx) {
- if (level_idx == 0 || key_rep_levels[level_idx] <
_repeated_repetition_level) {
- if (level_idx != 0) {
- offsets.push_back(current_offset);
- current_record++;
+ Status start_parent(const NestedScalarBatch& key_batch, int64_t
level_idx) {
+ const int16_t def_level = key_batch.def_levels[level_idx];
+ if (def_level < self->_nullable_definition_level) {
+ if (!self->_type->is_nullable()) {
+ return Status::Corruption(
+ "Parquet MAP column {} contains null for
non-nullable map",
+ self->_name);
+ }
+ entry_counts->push_back(0);
+ parent_nulls->push_back(1);
+ return Status::OK();
}
+ entry_counts->push_back(0);
+ parent_nulls->push_back(0);
+ if (def_level == self->_nullable_definition_level) {
+ return Status::OK();
+ }
+ return append_entry(key_batch, level_idx);
}
- current_offset++;
- }
- while (current_record < records_read) {
- offsets.push_back(current_offset);
- current_record++;
- }
- *rows_read = records_read;
+
+ Status append_repeated(const NestedScalarBatch& key_batch, int64_t
level_idx) {
+ if (entry_counts->empty()) {
+ return Status::Corruption("Invalid repeated MAP level for
column {}", self->_name);
+ }
+ return append_entry(key_batch, level_idx);
+ }
+
+ Status append_entry(const NestedScalarBatch& key_batch, int64_t
level_idx) {
+ if (key_batch.def_levels[level_idx] != key_max_definition_level) {
+ return Status::Corruption("Parquet MAP column {} contains null
map key",
+ self->_name);
+ }
+ RETURN_IF_ERROR(
+ append_scalar_batch_value(*key_reader, key_batch,
level_idx, *key_column));
+ if (value_batch.def_levels[level_idx] ==
value_max_definition_level) {
+ RETURN_IF_ERROR(append_scalar_batch_value(*value_reader,
value_batch, level_idx,
+ *value_column));
+ } else {
+ if (!value_reader->type()->is_nullable()) {
+ return Status::Corruption(
+ "Parquet MAP column {} contains null for
non-nullable value",
+ self->_name);
+ }
+ (*value_column)->insert_default();
+ }
+ ++entry_counts->back();
+ return Status::OK();
+ }
+
+ NestedScalarBatch value_batch;
+ };
+
+ MapSink sink {this,
+ key_reader,
+ value_reader,
+ &key_column,
+ &value_column,
+ &entry_counts,
+ &parent_nulls,
+ key_max_definition_level,
+ value_max_definition_level};
+ RETURN_IF_ERROR(assemble_repeated_levels(*key_reader,
_repeated_repetition_level,
+ entry_definition_level, rows,
&_key_overflow, sink,
+ rows_read));
+ if (!_key_overflow.empty()) {
+ move_nested_scalar_tail(
+ sink.value_batch,
+ sink.value_batch.levels_written -
_key_overflow.batch.levels_written,
+ &_value_overflow);
+ }
+
+ map_column->get_keys_ptr() = std::move(key_column);
+ map_column->get_values_ptr() = std::move(value_column);
+ append_offsets(map_column->get_offsets(), entry_counts);
+ append_parent_nulls(parent_null_map, parent_nulls);
return Status::OK();
}
@@ -722,8 +1040,73 @@ Status MapColumnReader::skip(int64_t rows) {
}
DORIS_CHECK(_key_reader != nullptr);
DORIS_CHECK(_value_reader != nullptr);
- RETURN_IF_ERROR(_key_reader->skip(rows));
- return _value_reader->skip(rows);
+ auto* key_reader = dynamic_cast<ScalarColumnReader*>(_key_reader.get());
+ auto* value_reader =
dynamic_cast<ScalarColumnReader*>(_value_reader.get());
+ if (key_reader == nullptr || value_reader == nullptr) {
+ return Status::NotSupported(
+ "Current parquet MAP reader only supports scalar key/value for
column {}", _name);
+ }
+ struct SkipSink {
+ MapColumnReader* self = nullptr;
+ ScalarColumnReader* value_reader = nullptr;
+
+ Status read_value_batch(int64_t batch_rows, NestedScalarBatch*
value_batch) {
+ if (!self->_value_overflow.empty()) {
+ *value_batch = std::move(self->_value_overflow.batch);
+ self->_value_overflow.clear();
+ return Status::OK();
+ }
+ return read_nested_scalar_batch(*value_reader, batch_rows,
+ self->_nullable_definition_level +
1, value_batch);
+ }
+
+ Status validate_value_alignment(const NestedScalarBatch& key_batch,
+ const NestedScalarBatch& value_batch) {
+ if (value_batch.records_read != key_batch.records_read ||
+ value_batch.levels_written != key_batch.levels_written) {
+ return Status::Corruption(
+ "Parquet MAP key/value levels are not aligned for
column {} while "
+ "skipping",
+ self->_name);
+ }
+ for (int64_t level_idx = 0; level_idx < key_batch.levels_written;
++level_idx) {
+ if (value_batch.rep_levels[level_idx] !=
key_batch.rep_levels[level_idx]) {
+ return Status::Corruption(
+ "Parquet MAP key/value repetition levels are not
aligned for column {}",
+ self->_name);
+ }
+ }
+ return Status::OK();
+ }
+
+ Status start_batch(const NestedScalarBatch& key_batch) {
+ RETURN_IF_ERROR(read_value_batch(key_batch.records_read,
&value_batch));
+ RETURN_IF_ERROR(validate_value_alignment(key_batch, value_batch));
+ return Status::OK();
+ }
+
+ Status start_parent(const NestedScalarBatch&, int64_t) { return
Status::OK(); }
+
+ Status append_repeated(const NestedScalarBatch&, int64_t) { return
Status::OK(); }
+
+ NestedScalarBatch value_batch;
+ };
+ SkipSink sink {this, value_reader};
+ int64_t rows_read = 0;
+ RETURN_IF_ERROR(assemble_repeated_levels(*key_reader,
_repeated_repetition_level,
+ _nullable_definition_level + 1,
rows, &_key_overflow,
+ sink, &rows_read));
+ if (!_key_overflow.empty()) {
+ move_nested_scalar_tail(
+ sink.value_batch,
+ sink.value_batch.levels_written -
_key_overflow.batch.levels_written,
+ &_value_overflow);
+ }
+ if (rows_read != rows) {
+ return Status::Corruption("Failed to skip parquet MAP column {}:
skipped {} of {} rows",
+ _name, rows_read, rows);
+ }
+ return Status::OK();
}
Status ParquetColumnReader::skip(int64_t rows) {
@@ -954,10 +1337,6 @@ Status
ParquetColumnReaderFactory::create_list_column_reader(
return Status::NotSupported("Parquet LIST projection is not
implemented for column {}",
column_schema.name);
}
- if (column_schema.type != nullptr && column_schema.type->is_nullable()) {
- return Status::NotSupported("Nullable parquet LIST reader is not
implemented for column {}",
- column_schema.name);
- }
if (column_schema.children.size() != 1) {
return Status::NotSupported("Unsupported parquet LIST layout for
column {}",
column_schema.name);
@@ -980,10 +1359,6 @@ Status
ParquetColumnReaderFactory::create_map_column_reader(
return Status::NotSupported("Parquet MAP projection is not implemented
for column {}",
column_schema.name);
}
- if (column_schema.type != nullptr && column_schema.type->is_nullable()) {
- return Status::NotSupported("Nullable parquet MAP reader is not
implemented for column {}",
- column_schema.name);
- }
if (column_schema.children.size() != 1 ||
column_schema.children[0]->children.size() != 2) {
return Status::NotSupported("Unsupported parquet MAP layout for column
{}",
column_schema.name);
diff --git a/be/test/format/new_parquet/parquet_column_reader_test.cpp
b/be/test/format/new_parquet/parquet_column_reader_test.cpp
index 4a187f4f8e0..50aa801f4c7 100644
--- a/be/test/format/new_parquet/parquet_column_reader_test.cpp
+++ b/be/test/format/new_parquet/parquet_column_reader_test.cpp
@@ -163,6 +163,38 @@ protected:
return finish_array(&builder);
}
+ std::shared_ptr<arrow::Array> build_nullable_int_list_array() {
+ auto value_builder = std::make_shared<arrow::Int32Builder>();
+ arrow::ListBuilder builder(arrow::default_memory_pool(),
value_builder);
+ EXPECT_TRUE(builder.Append().ok());
+ EXPECT_TRUE(value_builder->Append(10).ok());
+ EXPECT_TRUE(value_builder->Append(20).ok());
+ EXPECT_TRUE(builder.AppendNull().ok());
+ EXPECT_TRUE(builder.AppendEmptyValue().ok());
+ EXPECT_TRUE(builder.Append().ok());
+ EXPECT_TRUE(value_builder->AppendNull().ok());
+ EXPECT_TRUE(value_builder->Append(30).ok());
+ EXPECT_TRUE(builder.Append().ok());
+ EXPECT_TRUE(value_builder->Append(40).ok());
+ return finish_array(&builder);
+ }
+
+ std::shared_ptr<arrow::Array> build_required_nullable_int_list_array() {
+ auto value_builder = std::make_shared<arrow::Int32Builder>();
+ arrow::ListBuilder builder(arrow::default_memory_pool(),
value_builder);
+ EXPECT_TRUE(builder.AppendEmptyValue().ok());
+ EXPECT_TRUE(builder.Append().ok());
+ EXPECT_TRUE(value_builder->AppendNull().ok());
+ EXPECT_TRUE(value_builder->Append(110).ok());
+ EXPECT_TRUE(builder.Append().ok());
+ EXPECT_TRUE(value_builder->Append(120).ok());
+ EXPECT_TRUE(builder.Append().ok());
+ EXPECT_TRUE(value_builder->Append(130).ok());
+ EXPECT_TRUE(value_builder->AppendNull().ok());
+ EXPECT_TRUE(builder.Append().ok());
+ return finish_array(&builder);
+ }
+
std::shared_ptr<arrow::Array> build_required_int_string_map_array() {
auto key_builder = std::make_shared<arrow::Int32Builder>();
auto value_builder = std::make_shared<arrow::StringBuilder>();
@@ -183,6 +215,50 @@ protected:
return finish_array(&builder);
}
+ std::shared_ptr<arrow::Array> build_nullable_int_string_map_array() {
+ auto key_builder = std::make_shared<arrow::Int32Builder>();
+ auto value_builder = std::make_shared<arrow::StringBuilder>();
+ auto map_type = arrow::map(arrow::int32(), arrow::field("value",
arrow::utf8(), true));
+ arrow::MapBuilder builder(arrow::default_memory_pool(), key_builder,
value_builder,
+ map_type);
+ EXPECT_TRUE(builder.Append().ok());
+ EXPECT_TRUE(key_builder->Append(10).ok());
+ EXPECT_TRUE(value_builder->Append("aa").ok());
+ EXPECT_TRUE(key_builder->Append(20).ok());
+ EXPECT_TRUE(value_builder->AppendNull().ok());
+ EXPECT_TRUE(builder.AppendNull().ok());
+ EXPECT_TRUE(builder.AppendEmptyValue().ok());
+ EXPECT_TRUE(builder.Append().ok());
+ EXPECT_TRUE(key_builder->Append(30).ok());
+ EXPECT_TRUE(value_builder->Append("cc").ok());
+ EXPECT_TRUE(builder.Append().ok());
+ EXPECT_TRUE(key_builder->Append(40).ok());
+ EXPECT_TRUE(value_builder->AppendNull().ok());
+ return finish_array(&builder);
+ }
+
+ std::shared_ptr<arrow::Array> build_required_nullable_string_map_array() {
+ auto key_builder = std::make_shared<arrow::Int32Builder>();
+ auto value_builder = std::make_shared<arrow::StringBuilder>();
+ auto map_type = arrow::map(arrow::int32(), arrow::field("value",
arrow::utf8(), true));
+ arrow::MapBuilder builder(arrow::default_memory_pool(), key_builder,
value_builder,
+ map_type);
+ EXPECT_TRUE(builder.AppendEmptyValue().ok());
+ EXPECT_TRUE(builder.Append().ok());
+ EXPECT_TRUE(key_builder->Append(101).ok());
+ EXPECT_TRUE(value_builder->AppendNull().ok());
+ EXPECT_TRUE(key_builder->Append(102).ok());
+ EXPECT_TRUE(value_builder->Append("bb").ok());
+ EXPECT_TRUE(builder.Append().ok());
+ EXPECT_TRUE(key_builder->Append(103).ok());
+ EXPECT_TRUE(value_builder->Append("cc").ok());
+ EXPECT_TRUE(builder.AppendEmptyValue().ok());
+ EXPECT_TRUE(builder.Append().ok());
+ EXPECT_TRUE(key_builder->Append(104).ok());
+ EXPECT_TRUE(value_builder->AppendNull().ok());
+ return finish_array(&builder);
+ }
+
std::shared_ptr<arrow::Array> build_time32_array(const
std::shared_ptr<arrow::DataType>& type,
const
std::vector<int32_t>& values) {
arrow::Time32Builder builder(type, arrow::default_memory_pool());
@@ -430,6 +506,57 @@ protected:
EXPECT_EQ(values.get_element(5), 6);
EXPECT_EQ(values.get_element(8), 9);
});
+ add_field(arrow::field("nullable_list_int_col",
+ arrow::list(arrow::field("element",
arrow::int32(), true)), true),
+ build_nullable_int_list_array(),
+ [](const ParquetColumnSchema& schema, const IColumn& column)
{
+ EXPECT_TRUE(schema.type->is_nullable());
+ const auto& nullable_column = assert_cast<const
ColumnNullable&>(column);
+ ASSERT_EQ(nullable_column.size(), ROW_COUNT);
+ EXPECT_FALSE(nullable_column.is_null_at(0));
+ EXPECT_TRUE(nullable_column.is_null_at(1));
+ EXPECT_FALSE(nullable_column.is_null_at(2));
+ EXPECT_FALSE(nullable_column.is_null_at(3));
+ const auto& array_column =
+ assert_cast<const
ColumnArray&>(nullable_column.get_nested_column());
+ const auto& offsets = array_column.get_offsets();
+ ASSERT_EQ(offsets.size(), ROW_COUNT);
+ EXPECT_EQ(offsets[0], 2);
+ EXPECT_EQ(offsets[1], 2);
+ EXPECT_EQ(offsets[2], 2);
+ EXPECT_EQ(offsets[3], 4);
+ EXPECT_EQ(offsets[4], 5);
+ const auto& elements =
+ assert_cast<const
ColumnNullable&>(array_column.get_data());
+ const auto& values =
+ assert_cast<const
ColumnInt32&>(elements.get_nested_column());
+ ASSERT_EQ(elements.size(), 5);
+ EXPECT_EQ(values.get_element(0), 10);
+ EXPECT_EQ(values.get_element(1), 20);
+ EXPECT_TRUE(elements.is_null_at(2));
+ EXPECT_EQ(values.get_element(3), 30);
+ EXPECT_EQ(values.get_element(4), 40);
+ });
+ add_field(arrow::field("required_nullable_list_int_col",
+ arrow::list(arrow::field("element",
arrow::int32(), true)), false),
+ build_required_nullable_int_list_array(),
+ [](const ParquetColumnSchema& schema, const IColumn& column)
{
+ EXPECT_FALSE(schema.type->is_nullable());
+ const auto& array_column = assert_cast<const
ColumnArray&>(column);
+ const auto& offsets = array_column.get_offsets();
+ ASSERT_EQ(offsets.size(), ROW_COUNT);
+ EXPECT_EQ(offsets[0], 0);
+ EXPECT_EQ(offsets[1], 2);
+ EXPECT_EQ(offsets[2], 3);
+ EXPECT_EQ(offsets[3], 5);
+ EXPECT_EQ(offsets[4], 5);
+ const auto& elements =
+ assert_cast<const
ColumnNullable&>(array_column.get_data());
+ ASSERT_EQ(elements.size(), 5);
+ EXPECT_TRUE(elements.is_null_at(0));
+ EXPECT_FALSE(elements.is_null_at(1));
+ EXPECT_TRUE(elements.is_null_at(4));
+ });
add_field(arrow::field(
"map_int_string_col",
arrow::map(arrow::int32(), arrow::field("value",
arrow::utf8(), false)),
@@ -465,6 +592,63 @@ protected:
EXPECT_EQ(values.get_data_at(5).to_string(), "f");
EXPECT_EQ(values.get_data_at(8).to_string(), "i");
});
+ add_field(
+ arrow::field("nullable_map_int_string_col",
+ arrow::map(arrow::int32(), arrow::field("value",
arrow::utf8(), true)),
+ true),
+ build_nullable_int_string_map_array(),
+ [](const ParquetColumnSchema& schema, const IColumn& column) {
+ EXPECT_TRUE(schema.type->is_nullable());
+ const auto& nullable_column = assert_cast<const
ColumnNullable&>(column);
+ ASSERT_EQ(nullable_column.size(), ROW_COUNT);
+ EXPECT_FALSE(nullable_column.is_null_at(0));
+ EXPECT_TRUE(nullable_column.is_null_at(1));
+ EXPECT_FALSE(nullable_column.is_null_at(2));
+ const auto& map_column =
+ assert_cast<const
ColumnMap&>(nullable_column.get_nested_column());
+ const auto& offsets = map_column.get_offsets();
+ ASSERT_EQ(offsets.size(), ROW_COUNT);
+ EXPECT_EQ(offsets[0], 2);
+ EXPECT_EQ(offsets[1], 2);
+ EXPECT_EQ(offsets[2], 2);
+ EXPECT_EQ(offsets[3], 3);
+ EXPECT_EQ(offsets[4], 4);
+ const auto& keys = assert_cast<const
ColumnInt32&>(map_column.get_keys());
+ const auto& values =
+ assert_cast<const
ColumnNullable&>(map_column.get_values());
+ const auto& value_data =
+ assert_cast<const
ColumnString&>(values.get_nested_column());
+ ASSERT_EQ(keys.size(), 4);
+ EXPECT_EQ(keys.get_element(0), 10);
+ EXPECT_EQ(keys.get_element(1), 20);
+ EXPECT_EQ(keys.get_element(3), 40);
+ EXPECT_EQ(value_data.get_data_at(0).to_string(), "aa");
+ EXPECT_TRUE(values.is_null_at(1));
+ EXPECT_EQ(value_data.get_data_at(2).to_string(), "cc");
+ EXPECT_TRUE(values.is_null_at(3));
+ });
+ add_field(
+ arrow::field("required_nullable_map_int_string_col",
+ arrow::map(arrow::int32(), arrow::field("value",
arrow::utf8(), true)),
+ false),
+ build_required_nullable_string_map_array(),
+ [](const ParquetColumnSchema& schema, const IColumn& column) {
+ EXPECT_FALSE(schema.type->is_nullable());
+ const auto& map_column = assert_cast<const
ColumnMap&>(column);
+ const auto& offsets = map_column.get_offsets();
+ ASSERT_EQ(offsets.size(), ROW_COUNT);
+ EXPECT_EQ(offsets[0], 0);
+ EXPECT_EQ(offsets[1], 2);
+ EXPECT_EQ(offsets[2], 3);
+ EXPECT_EQ(offsets[3], 3);
+ EXPECT_EQ(offsets[4], 4);
+ const auto& values =
+ assert_cast<const
ColumnNullable&>(map_column.get_values());
+ ASSERT_EQ(values.size(), 4);
+ EXPECT_TRUE(values.is_null_at(0));
+ EXPECT_FALSE(values.is_null_at(1));
+ EXPECT_TRUE(values.is_null_at(3));
+ });
auto schema = arrow::schema(_arrow_fields);
auto table = arrow::Table::Make(schema, _arrays);
@@ -531,6 +715,16 @@ TEST_F(ParquetColumnReaderTest,
ReadAllSupportedPhysicalAndLogicalTypes) {
}
}
+TEST_F(ParquetColumnReaderTest, ReadSupportedComplexTypes) {
+ read_and_validate(find_field_idx("struct_col"));
+ read_and_validate(find_field_idx("list_int_col"));
+ read_and_validate(find_field_idx("nullable_list_int_col"));
+ read_and_validate(find_field_idx("required_nullable_list_int_col"));
+ read_and_validate(find_field_idx("map_int_string_col"));
+ read_and_validate(find_field_idx("nullable_map_int_string_col"));
+ read_and_validate(find_field_idx("required_nullable_map_int_string_col"));
+}
+
TEST_F(ParquetColumnReaderTest, SkipThenRead) {
auto reader = create_reader(1);
auto st = reader->skip(2);
@@ -604,6 +798,134 @@ TEST_F(ParquetColumnReaderTest,
ReadProjectedStructChildren) {
EXPECT_EQ(values.get_data_at(4).to_string(), "se");
}
+TEST_F(ParquetColumnReaderTest, ReadListWithOverflowAcrossChunks) {
+ const auto field_idx = find_field_idx("nullable_list_int_col");
+ auto reader = create_reader(field_idx);
+ MutableColumnPtr column = reader->type()->create_column();
+
+ int64_t rows_read = 0;
+ auto st = reader->read(2, column, &rows_read);
+ ASSERT_TRUE(st.ok()) << st;
+ ASSERT_EQ(rows_read, 2);
+ st = reader->read(3, column, &rows_read);
+ ASSERT_TRUE(st.ok()) << st;
+ ASSERT_EQ(rows_read, 3);
+
+ _expected_by_field[field_idx](*_fields[field_idx], *column);
+}
+
+TEST_F(ParquetColumnReaderTest, SkipListWithOverflowThenRead) {
+ const auto field_idx = find_field_idx("nullable_list_int_col");
+ auto reader = create_reader(field_idx);
+ auto st = reader->skip(1);
+ ASSERT_TRUE(st.ok()) << st;
+
+ MutableColumnPtr column = reader->type()->create_column();
+ int64_t rows_read = 0;
+ st = reader->read(3, column, &rows_read);
+ ASSERT_TRUE(st.ok()) << st;
+ ASSERT_EQ(rows_read, 3);
+
+ const auto& nullable_column = assert_cast<const ColumnNullable&>(*column);
+ ASSERT_EQ(nullable_column.size(), 3);
+ EXPECT_TRUE(nullable_column.is_null_at(0));
+ const auto& array_column = assert_cast<const
ColumnArray&>(nullable_column.get_nested_column());
+ const auto& offsets = array_column.get_offsets();
+ ASSERT_EQ(offsets.size(), 3);
+ EXPECT_EQ(offsets[0], 0);
+ EXPECT_EQ(offsets[1], 0);
+ EXPECT_EQ(offsets[2], 2);
+}
+
+TEST_F(ParquetColumnReaderTest, SelectListWithOverflow) {
+ const auto field_idx = find_field_idx("nullable_list_int_col");
+ auto reader = create_reader(field_idx);
+ SelectionVector selection(3);
+ selection.set_index(0, 0);
+ selection.set_index(1, 3);
+ selection.set_index(2, 4);
+
+ MutableColumnPtr column = reader->type()->create_column();
+ auto st = reader->select(selection, 3, ROW_COUNT, column);
+ ASSERT_TRUE(st.ok()) << st;
+
+ const auto& nullable_column = assert_cast<const ColumnNullable&>(*column);
+ ASSERT_EQ(nullable_column.size(), 3);
+ EXPECT_FALSE(nullable_column.is_null_at(0));
+ EXPECT_FALSE(nullable_column.is_null_at(1));
+ EXPECT_FALSE(nullable_column.is_null_at(2));
+ const auto& array_column = assert_cast<const
ColumnArray&>(nullable_column.get_nested_column());
+ const auto& offsets = array_column.get_offsets();
+ ASSERT_EQ(offsets.size(), 3);
+ EXPECT_EQ(offsets[0], 2);
+ EXPECT_EQ(offsets[1], 4);
+ EXPECT_EQ(offsets[2], 5);
+}
+
+TEST_F(ParquetColumnReaderTest, ReadMapWithOverflowAcrossChunks) {
+ const auto field_idx = find_field_idx("nullable_map_int_string_col");
+ auto reader = create_reader(field_idx);
+ MutableColumnPtr column = reader->type()->create_column();
+
+ int64_t rows_read = 0;
+ auto st = reader->read(2, column, &rows_read);
+ ASSERT_TRUE(st.ok()) << st;
+ ASSERT_EQ(rows_read, 2);
+ st = reader->read(3, column, &rows_read);
+ ASSERT_TRUE(st.ok()) << st;
+ ASSERT_EQ(rows_read, 3);
+
+ _expected_by_field[field_idx](*_fields[field_idx], *column);
+}
+
+TEST_F(ParquetColumnReaderTest, SkipMapWithOverflowThenRead) {
+ const auto field_idx = find_field_idx("nullable_map_int_string_col");
+ auto reader = create_reader(field_idx);
+ auto st = reader->skip(1);
+ ASSERT_TRUE(st.ok()) << st;
+
+ MutableColumnPtr column = reader->type()->create_column();
+ int64_t rows_read = 0;
+ st = reader->read(3, column, &rows_read);
+ ASSERT_TRUE(st.ok()) << st;
+ ASSERT_EQ(rows_read, 3);
+
+ const auto& nullable_column = assert_cast<const ColumnNullable&>(*column);
+ ASSERT_EQ(nullable_column.size(), 3);
+ EXPECT_TRUE(nullable_column.is_null_at(0));
+ const auto& map_column = assert_cast<const
ColumnMap&>(nullable_column.get_nested_column());
+ const auto& offsets = map_column.get_offsets();
+ ASSERT_EQ(offsets.size(), 3);
+ EXPECT_EQ(offsets[0], 0);
+ EXPECT_EQ(offsets[1], 0);
+ EXPECT_EQ(offsets[2], 1);
+}
+
+TEST_F(ParquetColumnReaderTest, SelectMapWithOverflow) {
+ const auto field_idx = find_field_idx("nullable_map_int_string_col");
+ auto reader = create_reader(field_idx);
+ SelectionVector selection(3);
+ selection.set_index(0, 0);
+ selection.set_index(1, 3);
+ selection.set_index(2, 4);
+
+ MutableColumnPtr column = reader->type()->create_column();
+ auto st = reader->select(selection, 3, ROW_COUNT, column);
+ ASSERT_TRUE(st.ok()) << st;
+
+ const auto& nullable_column = assert_cast<const ColumnNullable&>(*column);
+ ASSERT_EQ(nullable_column.size(), 3);
+ EXPECT_FALSE(nullable_column.is_null_at(0));
+ EXPECT_FALSE(nullable_column.is_null_at(1));
+ EXPECT_FALSE(nullable_column.is_null_at(2));
+ const auto& map_column = assert_cast<const
ColumnMap&>(nullable_column.get_nested_column());
+ const auto& offsets = map_column.get_offsets();
+ ASSERT_EQ(offsets.size(), 3);
+ EXPECT_EQ(offsets[0], 2);
+ EXPECT_EQ(offsets[1], 3);
+ EXPECT_EQ(offsets[2], 4);
+}
+
TEST_F(ParquetColumnReaderTest, BuildComplexSchemaPathMetadata) {
const auto field_idx = find_field_idx("struct_col");
ASSERT_LT(field_idx, _fields.size());
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]