This is an automated email from the ASF dual-hosted git repository.
Gabriel39 pushed a commit to branch refact_reader_branch
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/refact_reader_branch by this
push:
new 321134d931e Materialize Iceberg row lineage virtual columns (#63787)
321134d931e is described below
commit 321134d931e1a9e00bfa535a51e34c8b76fb7d7d
Author: Gabriel <[email protected]>
AuthorDate: Thu May 28 13:45:11 2026 +0800
Materialize Iceberg row lineage virtual columns (#63787)
1. ParquetReader reads a range of a parquet file
2. ParquetReader supports virtual column reader (RowPosition)
3. IcebergReader supports virtual columns
---
be/src/format/new_parquet/column_reader.cpp | 62 +++++
be/src/format/new_parquet/column_reader.h | 9 +
be/src/format/new_parquet/parquet_reader.cpp | 98 +++++++-
be/src/format/new_parquet/parquet_reader.h | 6 +
be/src/format/reader/table_reader.cpp | 2 +
be/src/format/reader/table_reader.h | 5 +
be/src/format/table/iceberg_reader_v2.h | 138 +++++++++--
be/src/io/file_factory.h | 2 +
be/test/format/new_parquet/parquet_reader_test.cpp | 166 ++++++++++++-
be/test/format/reader/table_reader_test.cpp | 271 +++++++++++++++++++++
10 files changed, 730 insertions(+), 29 deletions(-)
diff --git a/be/src/format/new_parquet/column_reader.cpp
b/be/src/format/new_parquet/column_reader.cpp
index 9952016832c..c427b38a970 100644
--- a/be/src/format/new_parquet/column_reader.cpp
+++ b/be/src/format/new_parquet/column_reader.cpp
@@ -33,8 +33,10 @@
#include "core/column/column.h"
#include "core/column/column_array.h"
#include "core/column/column_struct.h"
+#include "core/column/column_vector.h"
#include "core/data_type/data_type_array.h"
#include "core/data_type/data_type_nullable.h"
+#include "core/data_type/data_type_number.h"
#include "core/data_type/data_type_struct.h"
#include "core/data_type_serde/decoded_column_view.h"
#include "format/new_parquet/parquet_column_schema.h"
@@ -130,6 +132,52 @@ private:
std::unique_ptr<ParquetColumnReader> _element_reader;
};
+class RowPositionColumnReader final : public ParquetColumnReader {
+public:
+ explicit RowPositionColumnReader(int64_t row_group_first_row)
+ : _row_group_first_row(row_group_first_row) {}
+
+ int file_column_id() const override {
+ return ParquetColumnReaderFactory::ROW_POSITION_COLUMN_ID;
+ }
+ int parquet_leaf_column_id() const override { return -1; }
+ const DataTypePtr& type() const override { return _type; }
+ const std::string& name() const override { return _name; }
+
+ Status read(int64_t rows, MutableColumnPtr& column, int64_t* rows_read)
override {
+ if (column.get() == nullptr || rows_read == nullptr) {
+ return Status::InvalidArgument("Invalid parquet row position read
result pointer");
+ }
+ if (rows < 0) {
+ return Status::InvalidArgument("Invalid parquet row position read
rows {}", rows);
+ }
+ auto* vector_column = assert_cast<ColumnInt64*>(column.get());
+ auto& data = vector_column->get_data();
+ const auto old_size = data.size();
+ data.resize(old_size + rows);
+ for (int64_t row = 0; row < rows; ++row) {
+ data[old_size + row] = _row_group_first_row + _next_row_position +
row;
+ }
+ _next_row_position += rows;
+ *rows_read = rows;
+ return Status::OK();
+ }
+
+ Status skip(int64_t rows) override {
+ if (rows <= 0) {
+ return Status::OK();
+ }
+ _next_row_position += rows;
+ return Status::OK();
+ }
+
+private:
+ int64_t _row_group_first_row = 0;
+ int64_t _next_row_position = 0;
+ DataTypePtr _type = std::make_shared<DataTypeInt64>();
+ std::string _name = ParquetColumnReaderFactory::ROW_POSITION_COLUMN_NAME;
+};
+
Status read_records(ScalarColumnReader& column_reader, int64_t batch_rows,
::parquet::internal::RecordReader** record_reader,
int64_t* rows_read) {
auto reader = column_reader.record_reader();
@@ -558,6 +606,20 @@ ParquetColumnReaderFactory::ParquetColumnReaderFactory(
: _row_group(std::move(row_group)),
_record_readers(static_cast<size_t>(num_leaf_columns)) {}
+reader::SchemaField ParquetColumnReaderFactory::row_position_schema_field() {
+ reader::SchemaField field;
+ field.id = ROW_POSITION_COLUMN_ID;
+ field.name = ROW_POSITION_COLUMN_NAME;
+ field.type = std::make_shared<DataTypeInt64>();
+ field.column_type = reader::ColumnType::ROW_NUMBER;
+ return field;
+}
+
+std::unique_ptr<ParquetColumnReader>
ParquetColumnReaderFactory::create_row_position_column_reader(
+ int64_t row_group_first_row) const {
+ return std::make_unique<RowPositionColumnReader>(row_group_first_row);
+}
+
Status ParquetColumnReaderFactory::create_scalar_reader(
int parquet_leaf_column_id, const ParquetTypeDescriptor&
type_descriptor,
const ::parquet::ColumnDescriptor* descriptor, DataTypePtr type,
std::string name,
diff --git a/be/src/format/new_parquet/column_reader.h
b/be/src/format/new_parquet/column_reader.h
index ec691a9743e..80f7060fa31 100644
--- a/be/src/format/new_parquet/column_reader.h
+++ b/be/src/format/new_parquet/column_reader.h
@@ -41,6 +41,7 @@ class IColumn;
namespace reader {
struct FieldProjection;
+struct SchemaField;
} // namespace reader
namespace parquet {
@@ -89,6 +90,11 @@ public:
ParquetColumnReaderFactory(std::shared_ptr<::parquet::RowGroupReader>
row_group,
int num_leaf_columns);
+ static constexpr int ROW_POSITION_COLUMN_ID = -10001;
+ static constexpr const char* ROW_POSITION_COLUMN_NAME =
"__parquet_row_position";
+
+ static reader::SchemaField row_position_schema_field();
+
// 根据 file-local schema tree 创建 column reader。复杂类型会在这里递归创建
// children。该入口只理解 Parquet file schema,不处理 table/global schema。
Status create(const ParquetColumnSchema& column_schema,
@@ -100,6 +106,9 @@ public:
return create(column_schema, nullptr, reader);
}
+ std::unique_ptr<ParquetColumnReader> create_row_position_column_reader(
+ int64_t row_group_first_row) const;
+
private:
Status create_scalar_column_reader(const ParquetColumnSchema&
column_schema,
std::unique_ptr<ParquetColumnReader>*
reader) const;
diff --git a/be/src/format/new_parquet/parquet_reader.cpp
b/be/src/format/new_parquet/parquet_reader.cpp
index 70902d936ee..043f155dd85 100644
--- a/be/src/format/new_parquet/parquet_reader.cpp
+++ b/be/src/format/new_parquet/parquet_reader.cpp
@@ -159,12 +159,16 @@ struct ParquetReaderScanState {
std::vector<int> predicate_fields;
std::vector<int> non_predicate_fields;
std::vector<int> selected_row_groups;
+ // We need this to quickly determine the first row of each row group,
which is needed for position delete and page index.
+ // TODO: this may be parsed by multiple ParquetReader with the same file
but different scan ranges, so we should cache it
+ std::vector<int64_t> row_group_first_rows;
size_t next_row_group_idx = 0;
std::shared_ptr<::parquet::RowGroupReader> current_row_group;
std::vector<std::unique_ptr<ParquetColumnReader>>
current_predicate_columns;
std::vector<std::unique_ptr<ParquetColumnReader>>
current_non_predicate_columns;
int64_t current_row_group_rows = 0;
int64_t current_row_group_rows_read = 0;
+ int64_t current_row_group_first_row = 0;
};
Status ParquetReader::_reset_reader_position() {
@@ -174,6 +178,7 @@ Status ParquetReader::_reset_reader_position() {
_state->current_non_predicate_columns.clear();
_state->current_row_group_rows = 0;
_state->current_row_group_rows_read = 0;
+ _state->current_row_group_first_row = 0;
return Status::OK();
}
@@ -183,6 +188,7 @@ void ParquetReader::_reset_current_row_group() {
_state->current_non_predicate_columns.clear();
_state->current_row_group_rows = 0;
_state->current_row_group_rows_read = 0;
+ _state->current_row_group_first_row = 0;
}
void ParquetReader::_fill_schema_field(const ParquetColumnSchema&
column_schema,
@@ -383,15 +389,24 @@ Status ParquetReader::_open_next_row_group(bool*
has_row_group) {
_reset_current_row_group();
continue;
}
+ DORIS_CHECK(row_group_idx >= 0 &&
+ row_group_idx <
static_cast<int>(_state->row_group_first_rows.size()));
+ _state->current_row_group_first_row =
_state->row_group_first_rows[row_group_idx];
_state->current_row_group_rows_read = 0;
_state->current_predicate_columns.clear();
_state->current_non_predicate_columns.clear();
ParquetColumnReaderFactory
column_reader_factory(_state->current_row_group,
_state->schema->num_columns());
- for (const auto file_field_id : _request->predicate_columns) {
- const auto& column_schema = _state->file_schema[file_field_id];
- const auto projection_it =
_request->complex_projections.find(file_field_id);
+ for (const auto file_column_id : _request->predicate_columns) {
+ if (file_column_id ==
ParquetColumnReaderFactory::ROW_POSITION_COLUMN_ID) {
+ _state->current_predicate_columns.push_back(
+
column_reader_factory.create_row_position_column_reader(
+ _state->current_row_group_first_row));
+ continue;
+ }
+ const auto& column_schema = _state->file_schema[file_column_id];
+ const auto projection_it =
_request->complex_projections.find(file_column_id);
const auto* projection = projection_it ==
_request->complex_projections.end()
? nullptr
: &projection_it->second;
@@ -400,9 +415,15 @@ Status ParquetReader::_open_next_row_group(bool*
has_row_group) {
column_reader_factory.create(*column_schema, projection,
&column_reader));
_state->current_predicate_columns.push_back(std::move(column_reader));
}
- for (const auto file_field_id : _request->non_predicate_columns) {
- const auto& column_schema = _state->file_schema[file_field_id];
- const auto projection_it =
_request->complex_projections.find(file_field_id);
+ for (const auto file_column_id : _request->non_predicate_columns) {
+ if (file_column_id ==
ParquetColumnReaderFactory::ROW_POSITION_COLUMN_ID) {
+ _state->current_non_predicate_columns.push_back(
+
column_reader_factory.create_row_position_column_reader(
+ _state->current_row_group_first_row));
+ continue;
+ }
+ const auto& column_schema = _state->file_schema[file_column_id];
+ const auto projection_it =
_request->complex_projections.find(file_column_id);
const auto* projection = projection_it ==
_request->complex_projections.end()
? nullptr
: &projection_it->second;
@@ -479,6 +500,48 @@ Status
ParquetReader::_read_current_row_group_batch(int64_t batch_rows, Block* f
return Status::OK();
}
+int64_t ParquetReader::_column_start_offset(
+ const ::parquet::ColumnChunkMetaData& column_metadata) const {
+ return column_metadata.has_dictionary_page()
+ ?
cast_set<int64_t>(column_metadata.dictionary_page_offset())
+ : cast_set<int64_t>(column_metadata.data_page_offset());
+}
+
+bool ParquetReader::_is_row_group_outside_range(int row_group_idx) const {
+ DORIS_CHECK(_file_description != nullptr);
+ // This parquet file is not split
+ if (_file_description->range_size < 0) {
+ return false;
+ }
+ const int64_t range_start_offset = _file_description->range_start_offset;
+ const int64_t range_end_offset = range_start_offset +
_file_description->range_size;
+ DORIS_CHECK(range_start_offset >= 0);
+ DORIS_CHECK(range_end_offset >= range_start_offset);
+ // read whole parquet file if the range covers the whole file, which is a
common case when parquet files are not splittable.
+ if (range_start_offset == 0 &&
+ (_file_description->file_size < 0 || range_end_offset >=
_file_description->file_size)) {
+ return false;
+ }
+
+ auto row_group_metadata = _state->metadata->RowGroup(row_group_idx);
+ DORIS_CHECK(row_group_metadata != nullptr);
+ DORIS_CHECK(row_group_metadata->num_columns() > 0);
+ const auto first_column = row_group_metadata->ColumnChunk(0);
+ const auto last_column =
row_group_metadata->ColumnChunk(row_group_metadata->num_columns() - 1);
+ DORIS_CHECK(first_column != nullptr);
+ DORIS_CHECK(last_column != nullptr);
+ const int64_t row_group_start_offset = _column_start_offset(*first_column);
+ const int64_t row_group_end_offset =
+ _column_start_offset(*last_column) +
last_column->total_compressed_size();
+ // A scan range is a byte split, while Parquet is read by row group. If a
row group crosses
+ // split boundaries, using overlap would let adjacent ranges read the same
row group. Keep the
+ // same ownership rule as the legacy vparquet reader: the range containing
the row group's
+ // midpoint owns the whole row group.
+ const int64_t row_group_mid_offset =
+ row_group_start_offset + (row_group_end_offset -
row_group_start_offset) / 2;
+ return row_group_mid_offset < range_start_offset || row_group_mid_offset
>= range_end_offset;
+}
+
ParquetReader::ParquetReader(std::shared_ptr<io::FileSystemProperties>&
system_properties,
std::unique_ptr<io::FileDescription>&
file_description,
std::shared_ptr<io::IOContext> io_ctx,
RuntimeProfile* profile)
@@ -548,10 +611,16 @@ Status
ParquetReader::open(std::unique_ptr<reader::FileScanRequest>& request) {
const int num_fields = static_cast<int>(_state->file_schema.size());
for (const auto file_column_id : _request->predicate_columns) {
DORIS_CHECK(_request->column_positions.count(file_column_id) > 0);
+ if (file_column_id ==
ParquetColumnReaderFactory::ROW_POSITION_COLUMN_ID) {
+ continue;
+ }
DORIS_CHECK(file_column_id >= 0 && file_column_id < num_fields);
}
for (const auto file_column_id : _request->non_predicate_columns) {
DORIS_CHECK(_request->column_positions.count(file_column_id) > 0);
+ if (file_column_id ==
ParquetColumnReaderFactory::ROW_POSITION_COLUMN_ID) {
+ continue;
+ }
DORIS_CHECK(file_column_id >= 0 && file_column_id < num_fields);
}
for (const auto& column_filter : _request->column_predicate_filters) {
@@ -579,6 +648,23 @@ Status
ParquetReader::open(std::unique_ptr<reader::FileScanRequest>& request) {
}
RETURN_IF_ERROR(select_row_groups_by_statistics(*_state->metadata,
_state->file_schema,
*_request,
&_state->selected_row_groups));
+ std::vector<int> range_selected_row_groups;
+ range_selected_row_groups.reserve(_state->selected_row_groups.size());
+ for (const auto row_group_idx : _state->selected_row_groups) {
+ if (!_is_row_group_outside_range(row_group_idx)) {
+ range_selected_row_groups.push_back(row_group_idx);
+ }
+ }
+ _state->selected_row_groups = std::move(range_selected_row_groups);
+ _state->row_group_first_rows.resize(_state->metadata->num_row_groups());
+ int64_t next_row_group_first_row = 0;
+ for (int row_group_idx = 0; row_group_idx <
_state->metadata->num_row_groups();
+ ++row_group_idx) {
+ _state->row_group_first_rows[row_group_idx] = next_row_group_first_row;
+ auto row_group_metadata = _state->metadata->RowGroup(row_group_idx);
+ DORIS_CHECK(row_group_metadata != nullptr);
+ next_row_group_first_row += row_group_metadata->num_rows();
+ }
RETURN_IF_ERROR(_reset_reader_position());
_eof = _state->selected_row_groups.empty();
return Status::OK();
diff --git a/be/src/format/new_parquet/parquet_reader.h
b/be/src/format/new_parquet/parquet_reader.h
index aa5cbfb5fcd..14a891c75e1 100644
--- a/be/src/format/new_parquet/parquet_reader.h
+++ b/be/src/format/new_parquet/parquet_reader.h
@@ -31,6 +31,10 @@ struct IOContext;
} // namespace io
} // namespace doris
+namespace parquet {
+class ColumnChunkMetaData;
+} // namespace parquet
+
namespace doris::parquet {
struct ParquetReaderScanState;
@@ -137,6 +141,8 @@ private:
uint16_t selected_rows);
Status _open_next_row_group(bool* has_row_group);
Status _read_current_row_group_batch(int64_t batch_rows, Block*
file_block, size_t* rows);
+ bool _is_row_group_outside_range(int row_group_idx) const;
+ int64_t _column_start_offset(const ::parquet::ColumnChunkMetaData&
column_metadata) const;
std::unique_ptr<ParquetReaderScanState> _state;
ParquetProfile _parquet_profile;
diff --git a/be/src/format/reader/table_reader.cpp
b/be/src/format/reader/table_reader.cpp
index 58de8378589..86868b97b0b 100644
--- a/be/src/format/reader/table_reader.cpp
+++ b/be/src/format/reader/table_reader.cpp
@@ -154,6 +154,8 @@ std::unique_ptr<io::FileDescription>
create_file_description(const TFileRangeDes
auto file_description = std::make_unique<io::FileDescription>();
file_description->path = range.path;
file_description->file_size = range.__isset.file_size ? range.file_size :
-1;
+ file_description->range_start_offset = range.__isset.start_offset ?
range.start_offset : 0;
+ file_description->range_size = range.__isset.size ? range.size : -1;
if (range.__isset.fs_name) {
file_description->fs_name = range.fs_name;
}
diff --git a/be/src/format/reader/table_reader.h
b/be/src/format/reader/table_reader.h
index ee252817d40..5441995e18c 100644
--- a/be/src/format/reader/table_reader.h
+++ b/be/src/format/reader/table_reader.h
@@ -230,6 +230,7 @@ protected:
auto file_request = std::make_unique<FileScanRequest>();
RETURN_IF_ERROR(_data_reader.column_mapper.create_scan_request(
_table_filters, _table_column_predicates, _projected_columns,
file_request.get()));
+ RETURN_IF_ERROR(customize_file_scan_request(file_request.get()));
RETURN_IF_ERROR(_open_local_filter_exprs(*file_request));
_data_reader.scan_schema.clear();
_data_reader.block_template.clear();
@@ -259,6 +260,10 @@ protected:
Status _build_table_filters_from_conjuncts();
Status _open_local_filter_exprs(const FileScanRequest& file_request);
+ virtual Status customize_file_scan_request(FileScanRequest* file_request) {
+ return Status::OK();
+ }
+
// 关闭当前具体 reader。
// 该 hook 会被 create_next_reader 和 close 调用;实现应保持幂等。
virtual Status close_current_reader() {
diff --git a/be/src/format/table/iceberg_reader_v2.h
b/be/src/format/table/iceberg_reader_v2.h
index fc957eda124..6c6f4416717 100644
--- a/be/src/format/table/iceberg_reader_v2.h
+++ b/be/src/format/table/iceberg_reader_v2.h
@@ -25,8 +25,17 @@
#include <vector>
#include "common/status.h"
+#include "core/assert_cast.h"
+#include "core/block/block.h"
+#include "core/column/column_const.h"
+#include "core/column/column_nullable.h"
+#include "core/column/column_vector.h"
+#include "core/data_type/define_primitive_type.h"
+#include "core/field.h"
+#include "format/new_parquet/column_reader.h"
#include "format/reader/file_reader.h"
#include "format/reader/table_reader.h"
+#include "gen_cpp/PlanNodes_types.h"
namespace doris {
class Block;
@@ -34,27 +43,6 @@ class Block;
namespace doris::iceberg {
-// Iceberg data file 摘要。它描述当前要读取的物理 data file,不承载列映射逻辑。
-struct IcebergDataFile final : public reader::BaseDataFile {
- int64_t sequence_number = 0;
- int64_t first_row_id = -1;
-};
-
-// Iceberg delete file 摘要。position/equality/deletion vector 的具体读取在
-// IcebergTableReader 实现阶段补齐。
-struct IcebergDeleteFile final : public reader::BaseDataFile {
- int64_t sequence_number = 0;
- std::vector<reader::ColumnId> equality_field_ids;
-};
-
-// 单个 Iceberg data file 的 scan 输入。
-// 该结构只进入 IcebergTableReader,不直接传给 ParquetReader。
-struct IcebergScanTask final : public reader::ScanTask {
- std::vector<IcebergDeleteFile> positional_deletes;
- std::vector<IcebergDeleteFile> equality_deletes;
- std::vector<IcebergDeleteFile> deletion_vectors;
-};
-
// Iceberg table-level reader。
// 该层继承 TableReader,复用多文件编排和动态分区裁剪等通用能力;同时组合
// FileReader 完成 data file 物理读取,不继承具体文件格式 reader。
@@ -62,6 +50,22 @@ class IcebergTableReader : public reader::TableReader {
public:
~IcebergTableReader() override = default;
+ Status prepare_split(const reader::SplitReadOptions& options) override {
+ _row_lineage_columns = {};
+ if (options.current_range.__isset.table_format_params &&
+ options.current_range.table_format_params.__isset.iceberg_params) {
+ const auto& iceberg_params =
options.current_range.table_format_params.iceberg_params;
+ if (iceberg_params.__isset.first_row_id) {
+ _row_lineage_columns.first_row_id =
iceberg_params.first_row_id;
+ }
+ if (iceberg_params.__isset.last_updated_sequence_number) {
+ _row_lineage_columns.last_updated_sequence_number =
+ iceberg_params.last_updated_sequence_number;
+ }
+ }
+ return TableReader::prepare_split(options);
+ }
+
protected:
// 将 file-local block 转换为 table/global schema block。
// 这里执行 ColumnMapping 中的 finalize_expr、缺失列填充、partition/generated 列
@@ -76,7 +80,21 @@ protected:
// 物化 Iceberg 虚拟列。
// 例如 _row_id、_last_updated_sequence_number 等,它们不来自 Parquet 文件物理列。
Status materialize_virtual_columns(Block* table_block) override {
- // 真实实现会物化 _row_id、_last_updated_sequence_number 等 Iceberg 虚拟列。
+ for (size_t column_idx = 0; column_idx <
_data_reader.column_mapper.mappings().size();
+ ++column_idx) {
+ const auto& mapping =
_data_reader.column_mapper.mappings()[column_idx];
+ switch (mapping.virtual_column_type) {
+ case reader::TableVirtualColumnType::ROW_ID:
+ RETURN_IF_ERROR(_materialize_row_lineage_row_id(table_block,
column_idx));
+ break;
+ case reader::TableVirtualColumnType::LAST_UPDATED_SEQUENCE_NUMBER:
+
RETURN_IF_ERROR(_materialize_row_lineage_last_updated_sequence_number(table_block,
+
column_idx));
+ break;
+ case reader::TableVirtualColumnType::INVALID:
+ break;
+ }
+ }
return Status::OK();
}
@@ -88,12 +106,88 @@ protected:
return Status::OK();
}
+ Status customize_file_scan_request(reader::FileScanRequest* file_request)
override {
+ if (_row_lineage_columns.first_row_id < 0 ||
!_need_row_lineage_row_id()) {
+ return Status::OK();
+ }
+ DORIS_CHECK(file_request != nullptr);
+ const auto row_position_column_id =
+
doris::parquet::ParquetColumnReaderFactory::ROW_POSITION_COLUMN_ID;
+ if (file_request->column_positions.count(row_position_column_id) > 0) {
+ return Status::OK();
+ }
+ _row_position_block_position = file_request->column_positions.size();
+ file_request->non_predicate_columns.push_back(row_position_column_id);
+ file_request->column_positions.emplace(row_position_column_id,
+ _row_position_block_position);
+ _data_reader.block_schema.push_back(
+
doris::parquet::ParquetColumnReaderFactory::row_position_schema_field());
+ return Status::OK();
+ }
+
// 在 table block 上应用 equality delete。
// equality delete 依赖 table-level 列语义,因此不能下沉到 ParquetReader。
Status apply_equality_deletes(Block* block) {
// 真实实现会在 table block 上应用 equality delete。
return Status::OK();
}
+
+private:
+ struct RowLineageColumns {
+ int64_t first_row_id = -1;
+ int64_t last_updated_sequence_number = -1;
+ };
+
+ Status _materialize_row_lineage_row_id(Block* table_block, size_t
column_idx) {
+ if (_row_lineage_columns.first_row_id < 0) {
+ return Status::OK();
+ }
+ DORIS_CHECK(_row_position_block_position <
_data_reader.block_template.columns());
+ const auto& row_position_column = assert_cast<const ColumnInt64&>(
+
*_data_reader.block_template.get_by_position(_row_position_block_position).column);
+ DORIS_CHECK(row_position_column.size() == table_block->rows());
+ auto column = table_block->get_by_position(column_idx)
+ .column->convert_to_full_column_if_const()
+ ->assume_mutable();
+ auto* nullable_column = assert_cast<ColumnNullable*>(column.get());
+ auto& null_map = nullable_column->get_null_map_data();
+ auto& data =
+
assert_cast<ColumnInt64&>(*nullable_column->get_nested_column_ptr()).get_data();
+ null_map.resize(row_position_column.size());
+ std::fill(null_map.begin(), null_map.end(), 0);
+ data.resize(row_position_column.size());
+ for (size_t row = 0; row < row_position_column.size(); ++row) {
+ data[row] = _row_lineage_columns.first_row_id +
row_position_column.get_element(row);
+ }
+ table_block->replace_by_position(column_idx, std::move(column));
+ return Status::OK();
+ }
+
+ Status _materialize_row_lineage_last_updated_sequence_number(Block*
table_block,
+ size_t
column_idx) {
+ if (_row_lineage_columns.last_updated_sequence_number < 0) {
+ return Status::OK();
+ }
+ const auto rows = table_block->rows();
+ auto data_column =
table_block->get_by_position(column_idx).type->create_column();
+ data_column->insert(Field::create_field<TYPE_BIGINT>(
+ _row_lineage_columns.last_updated_sequence_number));
+ auto column = ColumnConst::create(std::move(data_column), rows);
+ table_block->replace_by_position(column_idx, std::move(column));
+ return Status::OK();
+ }
+
+ RowLineageColumns _row_lineage_columns;
+ size_t _row_position_block_position = 0;
+
+ bool _need_row_lineage_row_id() const {
+ for (const auto& mapping : _data_reader.column_mapper.mappings()) {
+ if (mapping.virtual_column_type ==
reader::TableVirtualColumnType::ROW_ID) {
+ return true;
+ }
+ }
+ return false;
+ }
};
} // namespace doris::iceberg
diff --git a/be/src/io/file_factory.h b/be/src/io/file_factory.h
index a32c8077c48..33595313b92 100644
--- a/be/src/io/file_factory.h
+++ b/be/src/io/file_factory.h
@@ -65,6 +65,8 @@ struct FileDescription {
// -1 means unset.
// If the file length is not set, the file length will be fetched from the
file system.
int64_t file_size = -1;
+ int64_t range_start_offset = 0;
+ int64_t range_size = -1;
// modification time of this file.
// 0 means unset.
int64_t mtime = 0;
diff --git a/be/test/format/new_parquet/parquet_reader_test.cpp
b/be/test/format/new_parquet/parquet_reader_test.cpp
index f393da6822c..00938482d6c 100644
--- a/be/test/format/new_parquet/parquet_reader_test.cpp
+++ b/be/test/format/new_parquet/parquet_reader_test.cpp
@@ -40,6 +40,7 @@
#include "core/field.h"
#include "exprs/vexpr.h"
#include "exprs/vexpr_context.h"
+#include "format/new_parquet/column_reader.h"
#include "format/reader/column_mapper.h"
#include "format/reader/file_reader.h"
#include "format/reader/table_reader.h"
@@ -208,6 +209,35 @@ Block build_file_block(const
std::vector<reader::SchemaField>& schema) {
return block;
}
+Block build_file_block_with_row_position(const
std::vector<reader::SchemaField>& schema) {
+ auto block = build_file_block(schema);
+ const auto row_position_field =
+ parquet::ParquetColumnReaderFactory::row_position_schema_field();
+ block.insert({row_position_field.type->create_column(),
row_position_field.type,
+ row_position_field.name});
+ return block;
+}
+
+int64_t parquet_column_start_offset(const ::parquet::ColumnChunkMetaData&
column_metadata) {
+ return column_metadata.has_dictionary_page()
+ ?
static_cast<int64_t>(column_metadata.dictionary_page_offset())
+ : static_cast<int64_t>(column_metadata.data_page_offset());
+}
+
+std::pair<int64_t, int64_t> row_group_mid_range(const std::string& file_path,
int row_group_idx) {
+ auto reader = ::parquet::ParquetFileReader::OpenFile(file_path, false);
+ auto metadata = reader->metadata();
+ auto row_group_metadata = metadata->RowGroup(row_group_idx);
+ auto first_column = row_group_metadata->ColumnChunk(0);
+ auto last_column =
row_group_metadata->ColumnChunk(row_group_metadata->num_columns() - 1);
+ const int64_t row_group_start_offset =
parquet_column_start_offset(*first_column);
+ const int64_t row_group_end_offset =
+ parquet_column_start_offset(*last_column) +
last_column->total_compressed_size();
+ const int64_t row_group_mid_offset =
+ row_group_start_offset + (row_group_end_offset -
row_group_start_offset) / 2;
+ return {row_group_mid_offset, 1};
+}
+
class TestFileReader final : public reader::FileReader {
public:
TestFileReader(std::shared_ptr<io::FileSystemProperties>&
system_properties,
@@ -311,12 +341,15 @@ protected:
void TearDown() override { std::filesystem::remove_all(_test_dir); }
- std::unique_ptr<parquet::ParquetReader> create_reader() const {
+ std::unique_ptr<parquet::ParquetReader> create_reader(int64_t
range_start_offset = 0,
+ int64_t range_size =
-1) const {
auto system_properties = std::make_shared<io::FileSystemProperties>();
system_properties->system_type = TFileType::FILE_LOCAL;
auto file_description = std::make_unique<io::FileDescription>();
file_description->path = _file_path;
file_description->file_size =
static_cast<int64_t>(std::filesystem::file_size(_file_path));
+ file_description->range_start_offset = range_start_offset;
+ file_description->range_size = range_size;
return std::make_unique<parquet::ParquetReader>(system_properties,
file_description,
nullptr, nullptr);
}
@@ -541,5 +574,136 @@ TEST_F(NewParquetReaderTest,
PredicateFiltersRowGroupsByStatistics) {
EXPECT_EQ(values, std::vector<std::string>({"three", "four", "five"}));
}
+TEST_F(NewParquetReaderTest, RowPositionReaderReturnsFileLocalPositions) {
+ write_parquet_file(_file_path, 2);
+ auto parquet_file_reader =
::parquet::ParquetFileReader::OpenFile(_file_path, false);
+ ASSERT_EQ(parquet_file_reader->metadata()->num_row_groups(), 3);
+
+ auto reader = create_reader();
+ RuntimeState state {TQueryOptions(), TQueryGlobals()};
+ ASSERT_TRUE(reader->init(&state).ok());
+
+ std::vector<reader::SchemaField> schema;
+ ASSERT_TRUE(reader->get_schema(&schema).ok());
+ auto request = std::make_unique<reader::FileScanRequest>();
+ request->non_predicate_columns =
{parquet::ParquetColumnReaderFactory::ROW_POSITION_COLUMN_ID,
+ 0};
+ request->column_positions = {
+ {0, 0},
+ {parquet::ParquetColumnReaderFactory::ROW_POSITION_COLUMN_ID, 2},
+ };
+ ASSERT_TRUE(reader->open(request).ok());
+
+ std::vector<int64_t> row_positions;
+ std::vector<int32_t> ids;
+ bool eof = false;
+ while (!eof) {
+ Block block = build_file_block_with_row_position(schema);
+ size_t rows = 0;
+ ASSERT_TRUE(reader->get_block(&block, &rows, &eof).ok());
+ if (rows == 0) {
+ continue;
+ }
+ const auto& id_column = assert_cast<const
ColumnInt32&>(*block.get_by_position(0).column);
+ const auto& row_position_column =
+ assert_cast<const
ColumnInt64&>(*block.get_by_position(2).column);
+ for (size_t row = 0; row < rows; ++row) {
+ ids.push_back(id_column.get_element(row));
+ row_positions.push_back(row_position_column.get_element(row));
+ }
+ }
+
+ EXPECT_EQ(ids, std::vector<int32_t>({1, 2, 3, 4, 5}));
+ EXPECT_EQ(row_positions, std::vector<int64_t>({0, 1, 2, 3, 4}));
+}
+
+TEST_F(NewParquetReaderTest, RowPositionReaderKeepsPositionsAfterSelection) {
+ auto reader = create_reader();
+ RuntimeState state {TQueryOptions(), TQueryGlobals()};
+ ASSERT_TRUE(reader->init(&state).ok());
+
+ std::vector<reader::SchemaField> schema;
+ ASSERT_TRUE(reader->get_schema(&schema).ok());
+ Block block = build_file_block_with_row_position(schema);
+
+ auto request = std::make_unique<reader::FileScanRequest>();
+ request->predicate_columns = {0};
+ request->non_predicate_columns =
{parquet::ParquetColumnReaderFactory::ROW_POSITION_COLUMN_ID};
+ request->column_positions = {
+ {0, 0},
+ {parquet::ParquetColumnReaderFactory::ROW_POSITION_COLUMN_ID, 2},
+ };
+ reader::FileExpressionFilter expression_filter;
+ expression_filter.conjunct = create_int32_greater_than_conjunct(0, 2);
+ request->expression_filters.push_back(std::move(expression_filter));
+ ASSERT_TRUE(reader->open(request).ok());
+
+ size_t rows = 0;
+ bool eof = false;
+ ASSERT_TRUE(reader->get_block(&block, &rows, &eof).ok());
+ EXPECT_FALSE(eof);
+ ASSERT_EQ(rows, 3);
+
+ const auto& id_column = assert_cast<const
ColumnInt32&>(*block.get_by_position(0).column);
+ const auto& row_position_column =
+ assert_cast<const ColumnInt64&>(*block.get_by_position(2).column);
+ EXPECT_EQ(id_column.get_element(0), 3);
+ EXPECT_EQ(id_column.get_element(1), 4);
+ EXPECT_EQ(id_column.get_element(2), 5);
+ EXPECT_EQ(row_position_column.get_element(0), 2);
+ EXPECT_EQ(row_position_column.get_element(1), 3);
+ EXPECT_EQ(row_position_column.get_element(2), 4);
+}
+
+TEST_F(NewParquetReaderTest,
RowPositionReaderUsesFileLocalPositionsForScanRange) {
+ write_parquet_file(_file_path, 2);
+ auto parquet_file_reader =
::parquet::ParquetFileReader::OpenFile(_file_path, false);
+ ASSERT_EQ(parquet_file_reader->metadata()->num_row_groups(), 3);
+
+ const std::vector<std::vector<int32_t>> expected_ids = {{1, 2}, {3, 4},
{5}};
+ const std::vector<std::vector<int64_t>> expected_row_positions = {{0, 1},
{2, 3}, {4}};
+ for (int row_group_idx = 0; row_group_idx < 3; ++row_group_idx) {
+ const auto [range_start_offset, range_size] =
+ row_group_mid_range(_file_path, row_group_idx);
+ auto reader = create_reader(range_start_offset, range_size);
+ RuntimeState state {TQueryOptions(), TQueryGlobals()};
+ ASSERT_TRUE(reader->init(&state).ok());
+
+ std::vector<reader::SchemaField> schema;
+ ASSERT_TRUE(reader->get_schema(&schema).ok());
+ auto request = std::make_unique<reader::FileScanRequest>();
+ request->non_predicate_columns = {
+ parquet::ParquetColumnReaderFactory::ROW_POSITION_COLUMN_ID,
0};
+ request->column_positions = {
+ {0, 0},
+ {parquet::ParquetColumnReaderFactory::ROW_POSITION_COLUMN_ID,
2},
+ };
+ ASSERT_TRUE(reader->open(request).ok());
+
+ std::vector<int32_t> ids;
+ std::vector<int64_t> row_positions;
+ bool eof = false;
+ while (!eof) {
+ Block block = build_file_block_with_row_position(schema);
+ size_t rows = 0;
+ ASSERT_TRUE(reader->get_block(&block, &rows, &eof).ok());
+ if (rows == 0) {
+ continue;
+ }
+ const auto& id_column =
+ assert_cast<const
ColumnInt32&>(*block.get_by_position(0).column);
+ const auto& row_position_column =
+ assert_cast<const
ColumnInt64&>(*block.get_by_position(2).column);
+ for (size_t row = 0; row < rows; ++row) {
+ ids.push_back(id_column.get_element(row));
+ row_positions.push_back(row_position_column.get_element(row));
+ }
+ }
+
+ EXPECT_EQ(ids, expected_ids[row_group_idx]);
+ EXPECT_EQ(row_positions, expected_row_positions[row_group_idx]);
+ }
+}
+
} // namespace
} // namespace doris
diff --git a/be/test/format/reader/table_reader_test.cpp
b/be/test/format/reader/table_reader_test.cpp
index f770fddb723..dc050976836 100644
--- a/be/test/format/reader/table_reader_test.cpp
+++ b/be/test/format/reader/table_reader_test.cpp
@@ -20,6 +20,7 @@
#include <arrow/api.h>
#include <arrow/io/api.h>
#include <gtest/gtest.h>
+#include <parquet/api/reader.h>
#include <parquet/arrow/writer.h>
#include <algorithm>
@@ -30,12 +31,15 @@
#include "core/assert_cast.h"
#include "core/block/block.h"
+#include "core/column/column_nullable.h"
#include "core/column/column_string.h"
#include "core/column/column_vector.h"
+#include "core/data_type/data_type_nullable.h"
#include "core/data_type/data_type_number.h"
#include "core/data_type/data_type_string.h"
#include "exprs/vexpr.h"
#include "format/reader/expr/slot_ref.h"
+#include "format/table/iceberg_reader_v2.h"
#include "gen_cpp/PlanNodes_types.h"
#include "runtime/runtime_state.h"
#include "storage/predicate/predicate_creator.h"
@@ -231,6 +235,19 @@ Block build_table_block(const std::vector<TableColumn>&
columns) {
return block;
}
+void expect_nullable_int64_column_values(const IColumn& column,
+ const std::vector<int64_t>&
expected_values) {
+ const auto full_column = column.convert_to_full_column_if_const();
+ const auto& nullable_column = assert_cast<const
ColumnNullable&>(*full_column);
+ const auto& values =
+ assert_cast<const
ColumnInt64&>(nullable_column.get_nested_column()).get_data();
+ ASSERT_EQ(nullable_column.size(), expected_values.size());
+ for (size_t row = 0; row < expected_values.size(); ++row) {
+ EXPECT_EQ(nullable_column.get_null_map_data()[row], 0);
+ EXPECT_EQ(values[row], expected_values[row]);
+ }
+}
+
SplitReadOptions build_split_options(const std::string& file_path) {
SplitReadOptions options;
options.current_range.__set_path(file_path);
@@ -239,6 +256,40 @@ SplitReadOptions build_split_options(const std::string&
file_path) {
return options;
}
+void set_iceberg_row_lineage_params(SplitReadOptions* split_options, int64_t
first_row_id,
+ int64_t last_updated_sequence_number) {
+ TTableFormatFileDesc table_format_params;
+ TIcebergFileDesc iceberg_params;
+ iceberg_params.__set_first_row_id(first_row_id);
+
iceberg_params.__set_last_updated_sequence_number(last_updated_sequence_number);
+ table_format_params.__set_iceberg_params(iceberg_params);
+
split_options->current_range.__set_table_format_params(table_format_params);
+}
+
+int64_t parquet_column_start_offset(const ::parquet::ColumnChunkMetaData&
column_metadata) {
+ return column_metadata.has_dictionary_page()
+ ?
static_cast<int64_t>(column_metadata.dictionary_page_offset())
+ : static_cast<int64_t>(column_metadata.data_page_offset());
+}
+
+SplitReadOptions build_split_options_for_row_group_mid(const std::string&
file_path,
+ int row_group_idx) {
+ auto options = build_split_options(file_path);
+ auto reader = ::parquet::ParquetFileReader::OpenFile(file_path, false);
+ auto metadata = reader->metadata();
+ auto row_group_metadata = metadata->RowGroup(row_group_idx);
+ auto first_column = row_group_metadata->ColumnChunk(0);
+ auto last_column =
row_group_metadata->ColumnChunk(row_group_metadata->num_columns() - 1);
+ const int64_t row_group_start_offset =
parquet_column_start_offset(*first_column);
+ const int64_t row_group_end_offset =
+ parquet_column_start_offset(*last_column) +
last_column->total_compressed_size();
+ const int64_t row_group_mid_offset =
+ row_group_start_offset + (row_group_end_offset -
row_group_start_offset) / 2;
+ options.current_range.__set_start_offset(row_group_mid_offset);
+ options.current_range.__set_size(1);
+ return options;
+}
+
TableColumn make_table_column(ColumnId id, const std::string& name, const
DataTypePtr& type) {
TableColumn column;
column.id = id;
@@ -716,6 +767,226 @@ TEST(TableReaderTest,
ProjectedPartitionColumnUsesSplitPartitionValue) {
std::filesystem::remove_all(test_dir);
}
+TEST(TableReaderTest, IcebergVirtualColumnsUseRowLineageMetadata) {
+ const auto test_dir =
+ std::filesystem::temp_directory_path() /
"doris_iceberg_virtual_columns_test";
+ std::filesystem::remove_all(test_dir);
+ std::filesystem::create_directories(test_dir);
+
+ const auto file_path = (test_dir / "split.parquet").string();
+ write_int_pair_parquet_file(file_path, {1, 2, 3}, {10, 20, 30}, {"one",
"two", "three"});
+
+ std::vector<TableColumn> projected_columns;
+ projected_columns.push_back(
+ make_table_column(100, "_row_id",
make_nullable(std::make_shared<DataTypeInt64>())));
+ projected_columns.push_back(
+ make_table_column(101, "_last_updated_sequence_number",
+
make_nullable(std::make_shared<DataTypeInt64>())));
+ projected_columns.push_back(make_table_column(0, "id",
std::make_shared<DataTypeInt32>()));
+
+ RuntimeState state {TQueryOptions(), TQueryGlobals()};
+ doris::iceberg::IcebergTableReader reader;
+ ASSERT_TRUE(reader.init({
+ .projected_columns = projected_columns,
+ .column_predicates = {},
+ .conjuncts = VExprContext(
+
std::make_shared<TableInt32GreaterThanExpr>(0, 0, 1)),
+ .format = FileFormat::PARQUET,
+ .scan_params = nullptr,
+ .io_ctx = nullptr,
+ .runtime_state = &state,
+ .scanner_profile = nullptr,
+ .allow_missing_columns = true,
+ .profile = nullptr,
+ })
+ .ok());
+
+ auto split_options = build_split_options(file_path);
+ set_iceberg_row_lineage_params(&split_options, 1000, 77);
+ ASSERT_TRUE(reader.prepare_split(split_options).ok());
+
+ Block block = build_table_block(projected_columns);
+ bool eos = false;
+ ASSERT_TRUE(reader.get_block(&block, &eos).ok());
+ ASSERT_FALSE(eos);
+
+ const auto& id_column = assert_cast<const
ColumnInt32&>(*block.get_by_position(2).column);
+
+ ASSERT_EQ(block.rows(), 2);
+ EXPECT_EQ(id_column.get_element(0), 2);
+ EXPECT_EQ(id_column.get_element(1), 3);
+ expect_nullable_int64_column_values(*block.get_by_position(0).column,
{1001, 1002});
+ expect_nullable_int64_column_values(*block.get_by_position(1).column, {77,
77});
+
+ ASSERT_TRUE(reader.close().ok());
+ std::filesystem::remove_all(test_dir);
+}
+
+TEST(TableReaderTest,
IcebergVirtualColumnsKeepRowLineageAfterConjunctFiltering) {
+ const auto test_dir =
+ std::filesystem::temp_directory_path() /
"doris_iceberg_virtual_columns_conjunct_test";
+ std::filesystem::remove_all(test_dir);
+ std::filesystem::create_directories(test_dir);
+
+ const auto file_path = (test_dir / "split.parquet").string();
+ write_int_pair_parquet_file(file_path, {1, 2, 3}, {10, 20, 30}, {"one",
"two", "three"});
+
+ std::vector<TableColumn> projected_columns;
+ projected_columns.push_back(
+ make_table_column(100, "_row_id",
make_nullable(std::make_shared<DataTypeInt64>())));
+ projected_columns.push_back(
+ make_table_column(101, "_last_updated_sequence_number",
+
make_nullable(std::make_shared<DataTypeInt64>())));
+ projected_columns.push_back(make_table_column(0, "id",
std::make_shared<DataTypeInt32>()));
+
+ RuntimeState state {TQueryOptions(), TQueryGlobals()};
+ doris::iceberg::IcebergTableReader reader;
+ ASSERT_TRUE(reader.init({
+ .projected_columns = projected_columns,
+ .column_predicates = {},
+ .conjuncts = VExprContext(
+
std::make_shared<TableInt32GreaterThanExpr>(0, 0, 1)),
+ .format = FileFormat::PARQUET,
+ .scan_params = nullptr,
+ .io_ctx = nullptr,
+ .runtime_state = &state,
+ .scanner_profile = nullptr,
+ .allow_missing_columns = true,
+ .profile = nullptr,
+ })
+ .ok());
+
+ auto split_options = build_split_options(file_path);
+ set_iceberg_row_lineage_params(&split_options, 3000, 88);
+ ASSERT_TRUE(reader.prepare_split(split_options).ok());
+
+ Block block = build_table_block(projected_columns);
+ bool eos = false;
+ ASSERT_TRUE(reader.get_block(&block, &eos).ok());
+ ASSERT_FALSE(eos);
+
+ const auto& id_column = assert_cast<const
ColumnInt32&>(*block.get_by_position(2).column);
+
+ ASSERT_EQ(block.rows(), 2);
+ EXPECT_EQ(id_column.get_element(0), 2);
+ EXPECT_EQ(id_column.get_element(1), 3);
+ expect_nullable_int64_column_values(*block.get_by_position(0).column,
{3001, 3002});
+ expect_nullable_int64_column_values(*block.get_by_position(1).column, {88,
88});
+
+ ASSERT_TRUE(reader.close().ok());
+ std::filesystem::remove_all(test_dir);
+}
+
+TEST(TableReaderTest,
IcebergVirtualColumnsKeepRowLineageAfterRowGroupPredicatePruning) {
+ const auto test_dir = std::filesystem::temp_directory_path() /
+
"doris_iceberg_virtual_columns_row_group_predicate_test";
+ std::filesystem::remove_all(test_dir);
+ std::filesystem::create_directories(test_dir);
+
+ const auto file_path = (test_dir / "split.parquet").string();
+ // ColumnPredicate is used for row-group/statistics pruning. Keep one row
per row group so
+ // id > 2 prunes the first two row groups and leaves only the third
file-local row.
+ write_int_pair_parquet_file(file_path, {1, 2, 3}, {10, 20, 30}, {"one",
"two", "three"}, 1);
+
+ std::vector<TableColumn> projected_columns;
+ projected_columns.push_back(
+ make_table_column(100, "_row_id",
make_nullable(std::make_shared<DataTypeInt64>())));
+ projected_columns.push_back(
+ make_table_column(101, "_last_updated_sequence_number",
+
make_nullable(std::make_shared<DataTypeInt64>())));
+ projected_columns.push_back(make_table_column(0, "id",
std::make_shared<DataTypeInt32>()));
+
+ TableColumnPredicates column_predicates;
+
column_predicates[0].push_back(create_comparison_predicate<PredicateType::GT>(
+ 0, "id", std::make_shared<DataTypeInt32>(),
Field::create_field<TYPE_INT>(2), false));
+
+ RuntimeState state {TQueryOptions(), TQueryGlobals()};
+ doris::iceberg::IcebergTableReader reader;
+ ASSERT_TRUE(reader.init({
+ .projected_columns = projected_columns,
+ .column_predicates =
std::move(column_predicates),
+ .conjuncts = VExprContext(nullptr),
+ .format = FileFormat::PARQUET,
+ .scan_params = nullptr,
+ .io_ctx = nullptr,
+ .runtime_state = &state,
+ .scanner_profile = nullptr,
+ .allow_missing_columns = true,
+ .profile = nullptr,
+ })
+ .ok());
+
+ auto split_options = build_split_options(file_path);
+ set_iceberg_row_lineage_params(&split_options, 4000, 99);
+ ASSERT_TRUE(reader.prepare_split(split_options).ok());
+
+ Block block = build_table_block(projected_columns);
+ bool eos = false;
+ ASSERT_TRUE(reader.get_block(&block, &eos).ok());
+ ASSERT_FALSE(eos);
+
+ const auto& id_column = assert_cast<const
ColumnInt32&>(*block.get_by_position(2).column);
+
+ ASSERT_EQ(block.rows(), 1);
+ EXPECT_EQ(id_column.get_element(0), 3);
+ expect_nullable_int64_column_values(*block.get_by_position(0).column,
{4002});
+ expect_nullable_int64_column_values(*block.get_by_position(1).column,
{99});
+
+ ASSERT_TRUE(reader.close().ok());
+ std::filesystem::remove_all(test_dir);
+}
+
+TEST(TableReaderTest, ParquetReaderReadsOnlyRowGroupsInFileRange) {
+ const auto test_dir =
+ std::filesystem::temp_directory_path() /
"doris_table_reader_file_range_test";
+ std::filesystem::remove_all(test_dir);
+ std::filesystem::create_directories(test_dir);
+
+ const auto file_path = (test_dir / "split.parquet").string();
+ write_int_pair_parquet_file(file_path, {1, 2, 3}, {10, 20, 30},
+ {"range_group_one", "range_group_two",
"range_group_three"}, 1);
+
+ std::vector<TableColumn> projected_columns;
+ projected_columns.push_back(make_table_column(0, "id",
std::make_shared<DataTypeInt32>()));
+ projected_columns.push_back(make_table_column(2, "value",
std::make_shared<DataTypeString>()));
+
+ RuntimeState state {TQueryOptions(), TQueryGlobals()};
+ TableReader reader;
+ ASSERT_TRUE(reader.init({
+ .projected_columns = projected_columns,
+ .column_predicates = {},
+ .conjuncts = VExprContext(nullptr),
+ .format = FileFormat::PARQUET,
+ .scan_params = nullptr,
+ .io_ctx = nullptr,
+ .runtime_state = &state,
+ .scanner_profile = nullptr,
+ .allow_missing_columns = true,
+ .profile = nullptr,
+ })
+ .ok());
+
+
ASSERT_TRUE(reader.prepare_split(build_split_options_for_row_group_mid(file_path,
1)).ok());
+
+ Block block = build_table_block(projected_columns);
+ bool eos = false;
+ ASSERT_TRUE(reader.get_block(&block, &eos).ok());
+ ASSERT_FALSE(eos);
+
+ const auto& id_column = assert_cast<const
ColumnInt32&>(*block.get_by_position(0).column);
+ const auto& value_column = assert_cast<const
ColumnString&>(*block.get_by_position(1).column);
+ ASSERT_EQ(block.rows(), 1);
+ EXPECT_EQ(id_column.get_element(0), 2);
+ EXPECT_EQ(value_column.get_data_at(0).to_string(), "range_group_two");
+
+ ASSERT_TRUE(reader.get_block(&block, &eos).ok());
+ EXPECT_TRUE(eos);
+ EXPECT_EQ(block.rows(), 0);
+
+ ASSERT_TRUE(reader.close().ok());
+ std::filesystem::remove_all(test_dir);
+}
+
TEST(TableReaderTest,
ProjectedColumnsUseMapperExpressionForSameNameDifferentIdParquetSchema) {
const auto test_dir =
std::filesystem::temp_directory_path() /
"doris_table_reader_same_name_diff_id_test";
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]