This is an automated email from the ASF dual-hosted git repository.

Gabriel39 pushed a commit to branch refact_reader_branch
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/refact_reader_branch by this 
push:
     new 321134d931e Materialize Iceberg row lineage virtual columns  (#63787)
321134d931e is described below

commit 321134d931e1a9e00bfa535a51e34c8b76fb7d7d
Author: Gabriel <[email protected]>
AuthorDate: Thu May 28 13:45:11 2026 +0800

    Materialize Iceberg row lineage virtual columns  (#63787)
    
    1. ParquetReader reads a range of a parquet file
    2. ParquetReader supports virtual column reader (RowPosition)
    3. IcebergReader supports virtual columns
---
 be/src/format/new_parquet/column_reader.cpp        |  62 +++++
 be/src/format/new_parquet/column_reader.h          |   9 +
 be/src/format/new_parquet/parquet_reader.cpp       |  98 +++++++-
 be/src/format/new_parquet/parquet_reader.h         |   6 +
 be/src/format/reader/table_reader.cpp              |   2 +
 be/src/format/reader/table_reader.h                |   5 +
 be/src/format/table/iceberg_reader_v2.h            | 138 +++++++++--
 be/src/io/file_factory.h                           |   2 +
 be/test/format/new_parquet/parquet_reader_test.cpp | 166 ++++++++++++-
 be/test/format/reader/table_reader_test.cpp        | 271 +++++++++++++++++++++
 10 files changed, 730 insertions(+), 29 deletions(-)

diff --git a/be/src/format/new_parquet/column_reader.cpp 
b/be/src/format/new_parquet/column_reader.cpp
index 9952016832c..c427b38a970 100644
--- a/be/src/format/new_parquet/column_reader.cpp
+++ b/be/src/format/new_parquet/column_reader.cpp
@@ -33,8 +33,10 @@
 #include "core/column/column.h"
 #include "core/column/column_array.h"
 #include "core/column/column_struct.h"
+#include "core/column/column_vector.h"
 #include "core/data_type/data_type_array.h"
 #include "core/data_type/data_type_nullable.h"
+#include "core/data_type/data_type_number.h"
 #include "core/data_type/data_type_struct.h"
 #include "core/data_type_serde/decoded_column_view.h"
 #include "format/new_parquet/parquet_column_schema.h"
@@ -130,6 +132,52 @@ private:
     std::unique_ptr<ParquetColumnReader> _element_reader;
 };
 
+class RowPositionColumnReader final : public ParquetColumnReader {
+public:
+    explicit RowPositionColumnReader(int64_t row_group_first_row)
+            : _row_group_first_row(row_group_first_row) {}
+
+    int file_column_id() const override {
+        return ParquetColumnReaderFactory::ROW_POSITION_COLUMN_ID;
+    }
+    int parquet_leaf_column_id() const override { return -1; }
+    const DataTypePtr& type() const override { return _type; }
+    const std::string& name() const override { return _name; }
+
+    Status read(int64_t rows, MutableColumnPtr& column, int64_t* rows_read) 
override {
+        if (column.get() == nullptr || rows_read == nullptr) {
+            return Status::InvalidArgument("Invalid parquet row position read 
result pointer");
+        }
+        if (rows < 0) {
+            return Status::InvalidArgument("Invalid parquet row position read 
rows {}", rows);
+        }
+        auto* vector_column = assert_cast<ColumnInt64*>(column.get());
+        auto& data = vector_column->get_data();
+        const auto old_size = data.size();
+        data.resize(old_size + rows);
+        for (int64_t row = 0; row < rows; ++row) {
+            data[old_size + row] = _row_group_first_row + _next_row_position + 
row;
+        }
+        _next_row_position += rows;
+        *rows_read = rows;
+        return Status::OK();
+    }
+
+    Status skip(int64_t rows) override {
+        if (rows <= 0) {
+            return Status::OK();
+        }
+        _next_row_position += rows;
+        return Status::OK();
+    }
+
+private:
+    int64_t _row_group_first_row = 0;
+    int64_t _next_row_position = 0;
+    DataTypePtr _type = std::make_shared<DataTypeInt64>();
+    std::string _name = ParquetColumnReaderFactory::ROW_POSITION_COLUMN_NAME;
+};
+
 Status read_records(ScalarColumnReader& column_reader, int64_t batch_rows,
                     ::parquet::internal::RecordReader** record_reader, 
int64_t* rows_read) {
     auto reader = column_reader.record_reader();
@@ -558,6 +606,20 @@ ParquetColumnReaderFactory::ParquetColumnReaderFactory(
         : _row_group(std::move(row_group)),
           _record_readers(static_cast<size_t>(num_leaf_columns)) {}
 
+reader::SchemaField ParquetColumnReaderFactory::row_position_schema_field() {
+    reader::SchemaField field;
+    field.id = ROW_POSITION_COLUMN_ID;
+    field.name = ROW_POSITION_COLUMN_NAME;
+    field.type = std::make_shared<DataTypeInt64>();
+    field.column_type = reader::ColumnType::ROW_NUMBER;
+    return field;
+}
+
+std::unique_ptr<ParquetColumnReader> 
ParquetColumnReaderFactory::create_row_position_column_reader(
+        int64_t row_group_first_row) const {
+    return std::make_unique<RowPositionColumnReader>(row_group_first_row);
+}
+
 Status ParquetColumnReaderFactory::create_scalar_reader(
         int parquet_leaf_column_id, const ParquetTypeDescriptor& 
type_descriptor,
         const ::parquet::ColumnDescriptor* descriptor, DataTypePtr type, 
std::string name,
diff --git a/be/src/format/new_parquet/column_reader.h 
b/be/src/format/new_parquet/column_reader.h
index ec691a9743e..80f7060fa31 100644
--- a/be/src/format/new_parquet/column_reader.h
+++ b/be/src/format/new_parquet/column_reader.h
@@ -41,6 +41,7 @@ class IColumn;
 
 namespace reader {
 struct FieldProjection;
+struct SchemaField;
 } // namespace reader
 
 namespace parquet {
@@ -89,6 +90,11 @@ public:
     ParquetColumnReaderFactory(std::shared_ptr<::parquet::RowGroupReader> 
row_group,
                                int num_leaf_columns);
 
+    static constexpr int ROW_POSITION_COLUMN_ID = -10001;
+    static constexpr const char* ROW_POSITION_COLUMN_NAME = 
"__parquet_row_position";
+
+    static reader::SchemaField row_position_schema_field();
+
     // 根据 file-local schema tree 创建 column reader。复杂类型会在这里递归创建
     // children。该入口只理解 Parquet file schema,不处理 table/global schema。
     Status create(const ParquetColumnSchema& column_schema,
@@ -100,6 +106,9 @@ public:
         return create(column_schema, nullptr, reader);
     }
 
+    std::unique_ptr<ParquetColumnReader> create_row_position_column_reader(
+            int64_t row_group_first_row) const;
+
 private:
     Status create_scalar_column_reader(const ParquetColumnSchema& 
column_schema,
                                        std::unique_ptr<ParquetColumnReader>* 
reader) const;
diff --git a/be/src/format/new_parquet/parquet_reader.cpp 
b/be/src/format/new_parquet/parquet_reader.cpp
index 70902d936ee..043f155dd85 100644
--- a/be/src/format/new_parquet/parquet_reader.cpp
+++ b/be/src/format/new_parquet/parquet_reader.cpp
@@ -159,12 +159,16 @@ struct ParquetReaderScanState {
     std::vector<int> predicate_fields;
     std::vector<int> non_predicate_fields;
     std::vector<int> selected_row_groups;
+    // We need this to quickly determine the first row of each row group, 
which is needed for position delete and page index.
+    // TODO: this may be parsed by multiple ParquetReader with the same file 
but different scan ranges, so we should cache it
+    std::vector<int64_t> row_group_first_rows;
     size_t next_row_group_idx = 0;
     std::shared_ptr<::parquet::RowGroupReader> current_row_group;
     std::vector<std::unique_ptr<ParquetColumnReader>> 
current_predicate_columns;
     std::vector<std::unique_ptr<ParquetColumnReader>> 
current_non_predicate_columns;
     int64_t current_row_group_rows = 0;
     int64_t current_row_group_rows_read = 0;
+    int64_t current_row_group_first_row = 0;
 };
 
 Status ParquetReader::_reset_reader_position() {
@@ -174,6 +178,7 @@ Status ParquetReader::_reset_reader_position() {
     _state->current_non_predicate_columns.clear();
     _state->current_row_group_rows = 0;
     _state->current_row_group_rows_read = 0;
+    _state->current_row_group_first_row = 0;
     return Status::OK();
 }
 
@@ -183,6 +188,7 @@ void ParquetReader::_reset_current_row_group() {
     _state->current_non_predicate_columns.clear();
     _state->current_row_group_rows = 0;
     _state->current_row_group_rows_read = 0;
+    _state->current_row_group_first_row = 0;
 }
 
 void ParquetReader::_fill_schema_field(const ParquetColumnSchema& 
column_schema,
@@ -383,15 +389,24 @@ Status ParquetReader::_open_next_row_group(bool* 
has_row_group) {
             _reset_current_row_group();
             continue;
         }
+        DORIS_CHECK(row_group_idx >= 0 &&
+                    row_group_idx < 
static_cast<int>(_state->row_group_first_rows.size()));
+        _state->current_row_group_first_row = 
_state->row_group_first_rows[row_group_idx];
         _state->current_row_group_rows_read = 0;
         _state->current_predicate_columns.clear();
         _state->current_non_predicate_columns.clear();
 
         ParquetColumnReaderFactory 
column_reader_factory(_state->current_row_group,
                                                          
_state->schema->num_columns());
-        for (const auto file_field_id : _request->predicate_columns) {
-            const auto& column_schema = _state->file_schema[file_field_id];
-            const auto projection_it = 
_request->complex_projections.find(file_field_id);
+        for (const auto file_column_id : _request->predicate_columns) {
+            if (file_column_id == 
ParquetColumnReaderFactory::ROW_POSITION_COLUMN_ID) {
+                _state->current_predicate_columns.push_back(
+                        
column_reader_factory.create_row_position_column_reader(
+                                _state->current_row_group_first_row));
+                continue;
+            }
+            const auto& column_schema = _state->file_schema[file_column_id];
+            const auto projection_it = 
_request->complex_projections.find(file_column_id);
             const auto* projection = projection_it == 
_request->complex_projections.end()
                                              ? nullptr
                                              : &projection_it->second;
@@ -400,9 +415,15 @@ Status ParquetReader::_open_next_row_group(bool* 
has_row_group) {
                     column_reader_factory.create(*column_schema, projection, 
&column_reader));
             
_state->current_predicate_columns.push_back(std::move(column_reader));
         }
-        for (const auto file_field_id : _request->non_predicate_columns) {
-            const auto& column_schema = _state->file_schema[file_field_id];
-            const auto projection_it = 
_request->complex_projections.find(file_field_id);
+        for (const auto file_column_id : _request->non_predicate_columns) {
+            if (file_column_id == 
ParquetColumnReaderFactory::ROW_POSITION_COLUMN_ID) {
+                _state->current_non_predicate_columns.push_back(
+                        
column_reader_factory.create_row_position_column_reader(
+                                _state->current_row_group_first_row));
+                continue;
+            }
+            const auto& column_schema = _state->file_schema[file_column_id];
+            const auto projection_it = 
_request->complex_projections.find(file_column_id);
             const auto* projection = projection_it == 
_request->complex_projections.end()
                                              ? nullptr
                                              : &projection_it->second;
@@ -479,6 +500,48 @@ Status 
ParquetReader::_read_current_row_group_batch(int64_t batch_rows, Block* f
     return Status::OK();
 }
 
+int64_t ParquetReader::_column_start_offset(
+        const ::parquet::ColumnChunkMetaData& column_metadata) const {
+    return column_metadata.has_dictionary_page()
+                   ? 
cast_set<int64_t>(column_metadata.dictionary_page_offset())
+                   : cast_set<int64_t>(column_metadata.data_page_offset());
+}
+
+bool ParquetReader::_is_row_group_outside_range(int row_group_idx) const {
+    DORIS_CHECK(_file_description != nullptr);
+    // This parquet file is not split
+    if (_file_description->range_size < 0) {
+        return false;
+    }
+    const int64_t range_start_offset = _file_description->range_start_offset;
+    const int64_t range_end_offset = range_start_offset + 
_file_description->range_size;
+    DORIS_CHECK(range_start_offset >= 0);
+    DORIS_CHECK(range_end_offset >= range_start_offset);
+    // read whole parquet file if the range covers the whole file, which is a 
common case when parquet files are not splittable.
+    if (range_start_offset == 0 &&
+        (_file_description->file_size < 0 || range_end_offset >= 
_file_description->file_size)) {
+        return false;
+    }
+
+    auto row_group_metadata = _state->metadata->RowGroup(row_group_idx);
+    DORIS_CHECK(row_group_metadata != nullptr);
+    DORIS_CHECK(row_group_metadata->num_columns() > 0);
+    const auto first_column = row_group_metadata->ColumnChunk(0);
+    const auto last_column = 
row_group_metadata->ColumnChunk(row_group_metadata->num_columns() - 1);
+    DORIS_CHECK(first_column != nullptr);
+    DORIS_CHECK(last_column != nullptr);
+    const int64_t row_group_start_offset = _column_start_offset(*first_column);
+    const int64_t row_group_end_offset =
+            _column_start_offset(*last_column) + 
last_column->total_compressed_size();
+    // A scan range is a byte split, while Parquet is read by row group. If a 
row group crosses
+    // split boundaries, using overlap would let adjacent ranges read the same 
row group. Keep the
+    // same ownership rule as the legacy vparquet reader: the range containing 
the row group's
+    // midpoint owns the whole row group.
+    const int64_t row_group_mid_offset =
+            row_group_start_offset + (row_group_end_offset - 
row_group_start_offset) / 2;
+    return row_group_mid_offset < range_start_offset || row_group_mid_offset 
>= range_end_offset;
+}
+
 ParquetReader::ParquetReader(std::shared_ptr<io::FileSystemProperties>& 
system_properties,
                              std::unique_ptr<io::FileDescription>& 
file_description,
                              std::shared_ptr<io::IOContext> io_ctx, 
RuntimeProfile* profile)
@@ -548,10 +611,16 @@ Status 
ParquetReader::open(std::unique_ptr<reader::FileScanRequest>& request) {
     const int num_fields = static_cast<int>(_state->file_schema.size());
     for (const auto file_column_id : _request->predicate_columns) {
         DORIS_CHECK(_request->column_positions.count(file_column_id) > 0);
+        if (file_column_id == 
ParquetColumnReaderFactory::ROW_POSITION_COLUMN_ID) {
+            continue;
+        }
         DORIS_CHECK(file_column_id >= 0 && file_column_id < num_fields);
     }
     for (const auto file_column_id : _request->non_predicate_columns) {
         DORIS_CHECK(_request->column_positions.count(file_column_id) > 0);
+        if (file_column_id == 
ParquetColumnReaderFactory::ROW_POSITION_COLUMN_ID) {
+            continue;
+        }
         DORIS_CHECK(file_column_id >= 0 && file_column_id < num_fields);
     }
     for (const auto& column_filter : _request->column_predicate_filters) {
@@ -579,6 +648,23 @@ Status 
ParquetReader::open(std::unique_ptr<reader::FileScanRequest>& request) {
     }
     RETURN_IF_ERROR(select_row_groups_by_statistics(*_state->metadata, 
_state->file_schema,
                                                     *_request, 
&_state->selected_row_groups));
+    std::vector<int> range_selected_row_groups;
+    range_selected_row_groups.reserve(_state->selected_row_groups.size());
+    for (const auto row_group_idx : _state->selected_row_groups) {
+        if (!_is_row_group_outside_range(row_group_idx)) {
+            range_selected_row_groups.push_back(row_group_idx);
+        }
+    }
+    _state->selected_row_groups = std::move(range_selected_row_groups);
+    _state->row_group_first_rows.resize(_state->metadata->num_row_groups());
+    int64_t next_row_group_first_row = 0;
+    for (int row_group_idx = 0; row_group_idx < 
_state->metadata->num_row_groups();
+         ++row_group_idx) {
+        _state->row_group_first_rows[row_group_idx] = next_row_group_first_row;
+        auto row_group_metadata = _state->metadata->RowGroup(row_group_idx);
+        DORIS_CHECK(row_group_metadata != nullptr);
+        next_row_group_first_row += row_group_metadata->num_rows();
+    }
     RETURN_IF_ERROR(_reset_reader_position());
     _eof = _state->selected_row_groups.empty();
     return Status::OK();
diff --git a/be/src/format/new_parquet/parquet_reader.h 
b/be/src/format/new_parquet/parquet_reader.h
index aa5cbfb5fcd..14a891c75e1 100644
--- a/be/src/format/new_parquet/parquet_reader.h
+++ b/be/src/format/new_parquet/parquet_reader.h
@@ -31,6 +31,10 @@ struct IOContext;
 } // namespace io
 } // namespace doris
 
+namespace parquet {
+class ColumnChunkMetaData;
+} // namespace parquet
+
 namespace doris::parquet {
 
 struct ParquetReaderScanState;
@@ -137,6 +141,8 @@ private:
                                         uint16_t selected_rows);
     Status _open_next_row_group(bool* has_row_group);
     Status _read_current_row_group_batch(int64_t batch_rows, Block* 
file_block, size_t* rows);
+    bool _is_row_group_outside_range(int row_group_idx) const;
+    int64_t _column_start_offset(const ::parquet::ColumnChunkMetaData& 
column_metadata) const;
 
     std::unique_ptr<ParquetReaderScanState> _state;
     ParquetProfile _parquet_profile;
diff --git a/be/src/format/reader/table_reader.cpp 
b/be/src/format/reader/table_reader.cpp
index 58de8378589..86868b97b0b 100644
--- a/be/src/format/reader/table_reader.cpp
+++ b/be/src/format/reader/table_reader.cpp
@@ -154,6 +154,8 @@ std::unique_ptr<io::FileDescription> 
create_file_description(const TFileRangeDes
     auto file_description = std::make_unique<io::FileDescription>();
     file_description->path = range.path;
     file_description->file_size = range.__isset.file_size ? range.file_size : 
-1;
+    file_description->range_start_offset = range.__isset.start_offset ? 
range.start_offset : 0;
+    file_description->range_size = range.__isset.size ? range.size : -1;
     if (range.__isset.fs_name) {
         file_description->fs_name = range.fs_name;
     }
diff --git a/be/src/format/reader/table_reader.h 
b/be/src/format/reader/table_reader.h
index ee252817d40..5441995e18c 100644
--- a/be/src/format/reader/table_reader.h
+++ b/be/src/format/reader/table_reader.h
@@ -230,6 +230,7 @@ protected:
         auto file_request = std::make_unique<FileScanRequest>();
         RETURN_IF_ERROR(_data_reader.column_mapper.create_scan_request(
                 _table_filters, _table_column_predicates, _projected_columns, 
file_request.get()));
+        RETURN_IF_ERROR(customize_file_scan_request(file_request.get()));
         RETURN_IF_ERROR(_open_local_filter_exprs(*file_request));
         _data_reader.scan_schema.clear();
         _data_reader.block_template.clear();
@@ -259,6 +260,10 @@ protected:
     Status _build_table_filters_from_conjuncts();
     Status _open_local_filter_exprs(const FileScanRequest& file_request);
 
+    virtual Status customize_file_scan_request(FileScanRequest* file_request) {
+        return Status::OK();
+    }
+
     // 关闭当前具体 reader。
     // 该 hook 会被 create_next_reader 和 close 调用;实现应保持幂等。
     virtual Status close_current_reader() {
diff --git a/be/src/format/table/iceberg_reader_v2.h 
b/be/src/format/table/iceberg_reader_v2.h
index fc957eda124..6c6f4416717 100644
--- a/be/src/format/table/iceberg_reader_v2.h
+++ b/be/src/format/table/iceberg_reader_v2.h
@@ -25,8 +25,17 @@
 #include <vector>
 
 #include "common/status.h"
+#include "core/assert_cast.h"
+#include "core/block/block.h"
+#include "core/column/column_const.h"
+#include "core/column/column_nullable.h"
+#include "core/column/column_vector.h"
+#include "core/data_type/define_primitive_type.h"
+#include "core/field.h"
+#include "format/new_parquet/column_reader.h"
 #include "format/reader/file_reader.h"
 #include "format/reader/table_reader.h"
+#include "gen_cpp/PlanNodes_types.h"
 
 namespace doris {
 class Block;
@@ -34,27 +43,6 @@ class Block;
 
 namespace doris::iceberg {
 
-// Iceberg data file 摘要。它描述当前要读取的物理 data file,不承载列映射逻辑。
-struct IcebergDataFile final : public reader::BaseDataFile {
-    int64_t sequence_number = 0;
-    int64_t first_row_id = -1;
-};
-
-// Iceberg delete file 摘要。position/equality/deletion vector 的具体读取在
-// IcebergTableReader 实现阶段补齐。
-struct IcebergDeleteFile final : public reader::BaseDataFile {
-    int64_t sequence_number = 0;
-    std::vector<reader::ColumnId> equality_field_ids;
-};
-
-// 单个 Iceberg data file 的 scan 输入。
-// 该结构只进入 IcebergTableReader,不直接传给 ParquetReader。
-struct IcebergScanTask final : public reader::ScanTask {
-    std::vector<IcebergDeleteFile> positional_deletes;
-    std::vector<IcebergDeleteFile> equality_deletes;
-    std::vector<IcebergDeleteFile> deletion_vectors;
-};
-
 // Iceberg table-level reader。
 // 该层继承 TableReader,复用多文件编排和动态分区裁剪等通用能力;同时组合
 // FileReader 完成 data file 物理读取,不继承具体文件格式 reader。
@@ -62,6 +50,22 @@ class IcebergTableReader : public reader::TableReader {
 public:
     ~IcebergTableReader() override = default;
 
+    Status prepare_split(const reader::SplitReadOptions& options) override {
+        _row_lineage_columns = {};
+        if (options.current_range.__isset.table_format_params &&
+            options.current_range.table_format_params.__isset.iceberg_params) {
+            const auto& iceberg_params = 
options.current_range.table_format_params.iceberg_params;
+            if (iceberg_params.__isset.first_row_id) {
+                _row_lineage_columns.first_row_id = 
iceberg_params.first_row_id;
+            }
+            if (iceberg_params.__isset.last_updated_sequence_number) {
+                _row_lineage_columns.last_updated_sequence_number =
+                        iceberg_params.last_updated_sequence_number;
+            }
+        }
+        return TableReader::prepare_split(options);
+    }
+
 protected:
     // 将 file-local block 转换为 table/global schema block。
     // 这里执行 ColumnMapping 中的 finalize_expr、缺失列填充、partition/generated 列
@@ -76,7 +80,21 @@ protected:
     // 物化 Iceberg 虚拟列。
     // 例如 _row_id、_last_updated_sequence_number 等,它们不来自 Parquet 文件物理列。
     Status materialize_virtual_columns(Block* table_block) override {
-        // 真实实现会物化 _row_id、_last_updated_sequence_number 等 Iceberg 虚拟列。
+        for (size_t column_idx = 0; column_idx < 
_data_reader.column_mapper.mappings().size();
+             ++column_idx) {
+            const auto& mapping = 
_data_reader.column_mapper.mappings()[column_idx];
+            switch (mapping.virtual_column_type) {
+            case reader::TableVirtualColumnType::ROW_ID:
+                RETURN_IF_ERROR(_materialize_row_lineage_row_id(table_block, 
column_idx));
+                break;
+            case reader::TableVirtualColumnType::LAST_UPDATED_SEQUENCE_NUMBER:
+                
RETURN_IF_ERROR(_materialize_row_lineage_last_updated_sequence_number(table_block,
+                                                                               
       column_idx));
+                break;
+            case reader::TableVirtualColumnType::INVALID:
+                break;
+            }
+        }
         return Status::OK();
     }
 
@@ -88,12 +106,88 @@ protected:
         return Status::OK();
     }
 
+    Status customize_file_scan_request(reader::FileScanRequest* file_request) 
override {
+        if (_row_lineage_columns.first_row_id < 0 || 
!_need_row_lineage_row_id()) {
+            return Status::OK();
+        }
+        DORIS_CHECK(file_request != nullptr);
+        const auto row_position_column_id =
+                
doris::parquet::ParquetColumnReaderFactory::ROW_POSITION_COLUMN_ID;
+        if (file_request->column_positions.count(row_position_column_id) > 0) {
+            return Status::OK();
+        }
+        _row_position_block_position = file_request->column_positions.size();
+        file_request->non_predicate_columns.push_back(row_position_column_id);
+        file_request->column_positions.emplace(row_position_column_id,
+                                               _row_position_block_position);
+        _data_reader.block_schema.push_back(
+                
doris::parquet::ParquetColumnReaderFactory::row_position_schema_field());
+        return Status::OK();
+    }
+
     // 在 table block 上应用 equality delete。
     // equality delete 依赖 table-level 列语义,因此不能下沉到 ParquetReader。
     Status apply_equality_deletes(Block* block) {
         // 真实实现会在 table block 上应用 equality delete。
         return Status::OK();
     }
+
+private:
+    struct RowLineageColumns {
+        int64_t first_row_id = -1;
+        int64_t last_updated_sequence_number = -1;
+    };
+
+    Status _materialize_row_lineage_row_id(Block* table_block, size_t 
column_idx) {
+        if (_row_lineage_columns.first_row_id < 0) {
+            return Status::OK();
+        }
+        DORIS_CHECK(_row_position_block_position < 
_data_reader.block_template.columns());
+        const auto& row_position_column = assert_cast<const ColumnInt64&>(
+                
*_data_reader.block_template.get_by_position(_row_position_block_position).column);
+        DORIS_CHECK(row_position_column.size() == table_block->rows());
+        auto column = table_block->get_by_position(column_idx)
+                              .column->convert_to_full_column_if_const()
+                              ->assume_mutable();
+        auto* nullable_column = assert_cast<ColumnNullable*>(column.get());
+        auto& null_map = nullable_column->get_null_map_data();
+        auto& data =
+                
assert_cast<ColumnInt64&>(*nullable_column->get_nested_column_ptr()).get_data();
+        null_map.resize(row_position_column.size());
+        std::fill(null_map.begin(), null_map.end(), 0);
+        data.resize(row_position_column.size());
+        for (size_t row = 0; row < row_position_column.size(); ++row) {
+            data[row] = _row_lineage_columns.first_row_id + 
row_position_column.get_element(row);
+        }
+        table_block->replace_by_position(column_idx, std::move(column));
+        return Status::OK();
+    }
+
+    Status _materialize_row_lineage_last_updated_sequence_number(Block* 
table_block,
+                                                                 size_t 
column_idx) {
+        if (_row_lineage_columns.last_updated_sequence_number < 0) {
+            return Status::OK();
+        }
+        const auto rows = table_block->rows();
+        auto data_column = 
table_block->get_by_position(column_idx).type->create_column();
+        data_column->insert(Field::create_field<TYPE_BIGINT>(
+                _row_lineage_columns.last_updated_sequence_number));
+        auto column = ColumnConst::create(std::move(data_column), rows);
+        table_block->replace_by_position(column_idx, std::move(column));
+        return Status::OK();
+    }
+
+    RowLineageColumns _row_lineage_columns;
+    size_t _row_position_block_position = 0;
+
+    bool _need_row_lineage_row_id() const {
+        for (const auto& mapping : _data_reader.column_mapper.mappings()) {
+            if (mapping.virtual_column_type == 
reader::TableVirtualColumnType::ROW_ID) {
+                return true;
+            }
+        }
+        return false;
+    }
 };
 
 } // namespace doris::iceberg
diff --git a/be/src/io/file_factory.h b/be/src/io/file_factory.h
index a32c8077c48..33595313b92 100644
--- a/be/src/io/file_factory.h
+++ b/be/src/io/file_factory.h
@@ -65,6 +65,8 @@ struct FileDescription {
     // -1 means unset.
     // If the file length is not set, the file length will be fetched from the 
file system.
     int64_t file_size = -1;
+    int64_t range_start_offset = 0;
+    int64_t range_size = -1;
     // modification time of this file.
     // 0 means unset.
     int64_t mtime = 0;
diff --git a/be/test/format/new_parquet/parquet_reader_test.cpp 
b/be/test/format/new_parquet/parquet_reader_test.cpp
index f393da6822c..00938482d6c 100644
--- a/be/test/format/new_parquet/parquet_reader_test.cpp
+++ b/be/test/format/new_parquet/parquet_reader_test.cpp
@@ -40,6 +40,7 @@
 #include "core/field.h"
 #include "exprs/vexpr.h"
 #include "exprs/vexpr_context.h"
+#include "format/new_parquet/column_reader.h"
 #include "format/reader/column_mapper.h"
 #include "format/reader/file_reader.h"
 #include "format/reader/table_reader.h"
@@ -208,6 +209,35 @@ Block build_file_block(const 
std::vector<reader::SchemaField>& schema) {
     return block;
 }
 
+Block build_file_block_with_row_position(const 
std::vector<reader::SchemaField>& schema) {
+    auto block = build_file_block(schema);
+    const auto row_position_field =
+            parquet::ParquetColumnReaderFactory::row_position_schema_field();
+    block.insert({row_position_field.type->create_column(), 
row_position_field.type,
+                  row_position_field.name});
+    return block;
+}
+
+int64_t parquet_column_start_offset(const ::parquet::ColumnChunkMetaData& 
column_metadata) {
+    return column_metadata.has_dictionary_page()
+                   ? 
static_cast<int64_t>(column_metadata.dictionary_page_offset())
+                   : static_cast<int64_t>(column_metadata.data_page_offset());
+}
+
+std::pair<int64_t, int64_t> row_group_mid_range(const std::string& file_path, 
int row_group_idx) {
+    auto reader = ::parquet::ParquetFileReader::OpenFile(file_path, false);
+    auto metadata = reader->metadata();
+    auto row_group_metadata = metadata->RowGroup(row_group_idx);
+    auto first_column = row_group_metadata->ColumnChunk(0);
+    auto last_column = 
row_group_metadata->ColumnChunk(row_group_metadata->num_columns() - 1);
+    const int64_t row_group_start_offset = 
parquet_column_start_offset(*first_column);
+    const int64_t row_group_end_offset =
+            parquet_column_start_offset(*last_column) + 
last_column->total_compressed_size();
+    const int64_t row_group_mid_offset =
+            row_group_start_offset + (row_group_end_offset - 
row_group_start_offset) / 2;
+    return {row_group_mid_offset, 1};
+}
+
 class TestFileReader final : public reader::FileReader {
 public:
     TestFileReader(std::shared_ptr<io::FileSystemProperties>& 
system_properties,
@@ -311,12 +341,15 @@ protected:
 
     void TearDown() override { std::filesystem::remove_all(_test_dir); }
 
-    std::unique_ptr<parquet::ParquetReader> create_reader() const {
+    std::unique_ptr<parquet::ParquetReader> create_reader(int64_t 
range_start_offset = 0,
+                                                          int64_t range_size = 
-1) const {
         auto system_properties = std::make_shared<io::FileSystemProperties>();
         system_properties->system_type = TFileType::FILE_LOCAL;
         auto file_description = std::make_unique<io::FileDescription>();
         file_description->path = _file_path;
         file_description->file_size = 
static_cast<int64_t>(std::filesystem::file_size(_file_path));
+        file_description->range_start_offset = range_start_offset;
+        file_description->range_size = range_size;
         return std::make_unique<parquet::ParquetReader>(system_properties, 
file_description,
                                                         nullptr, nullptr);
     }
@@ -541,5 +574,136 @@ TEST_F(NewParquetReaderTest, 
PredicateFiltersRowGroupsByStatistics) {
     EXPECT_EQ(values, std::vector<std::string>({"three", "four", "five"}));
 }
 
+TEST_F(NewParquetReaderTest, RowPositionReaderReturnsFileLocalPositions) {
+    write_parquet_file(_file_path, 2);
+    auto parquet_file_reader = 
::parquet::ParquetFileReader::OpenFile(_file_path, false);
+    ASSERT_EQ(parquet_file_reader->metadata()->num_row_groups(), 3);
+
+    auto reader = create_reader();
+    RuntimeState state {TQueryOptions(), TQueryGlobals()};
+    ASSERT_TRUE(reader->init(&state).ok());
+
+    std::vector<reader::SchemaField> schema;
+    ASSERT_TRUE(reader->get_schema(&schema).ok());
+    auto request = std::make_unique<reader::FileScanRequest>();
+    request->non_predicate_columns = 
{parquet::ParquetColumnReaderFactory::ROW_POSITION_COLUMN_ID,
+                                      0};
+    request->column_positions = {
+            {0, 0},
+            {parquet::ParquetColumnReaderFactory::ROW_POSITION_COLUMN_ID, 2},
+    };
+    ASSERT_TRUE(reader->open(request).ok());
+
+    std::vector<int64_t> row_positions;
+    std::vector<int32_t> ids;
+    bool eof = false;
+    while (!eof) {
+        Block block = build_file_block_with_row_position(schema);
+        size_t rows = 0;
+        ASSERT_TRUE(reader->get_block(&block, &rows, &eof).ok());
+        if (rows == 0) {
+            continue;
+        }
+        const auto& id_column = assert_cast<const 
ColumnInt32&>(*block.get_by_position(0).column);
+        const auto& row_position_column =
+                assert_cast<const 
ColumnInt64&>(*block.get_by_position(2).column);
+        for (size_t row = 0; row < rows; ++row) {
+            ids.push_back(id_column.get_element(row));
+            row_positions.push_back(row_position_column.get_element(row));
+        }
+    }
+
+    EXPECT_EQ(ids, std::vector<int32_t>({1, 2, 3, 4, 5}));
+    EXPECT_EQ(row_positions, std::vector<int64_t>({0, 1, 2, 3, 4}));
+}
+
+TEST_F(NewParquetReaderTest, RowPositionReaderKeepsPositionsAfterSelection) {
+    auto reader = create_reader();
+    RuntimeState state {TQueryOptions(), TQueryGlobals()};
+    ASSERT_TRUE(reader->init(&state).ok());
+
+    std::vector<reader::SchemaField> schema;
+    ASSERT_TRUE(reader->get_schema(&schema).ok());
+    Block block = build_file_block_with_row_position(schema);
+
+    auto request = std::make_unique<reader::FileScanRequest>();
+    request->predicate_columns = {0};
+    request->non_predicate_columns = 
{parquet::ParquetColumnReaderFactory::ROW_POSITION_COLUMN_ID};
+    request->column_positions = {
+            {0, 0},
+            {parquet::ParquetColumnReaderFactory::ROW_POSITION_COLUMN_ID, 2},
+    };
+    reader::FileExpressionFilter expression_filter;
+    expression_filter.conjunct = create_int32_greater_than_conjunct(0, 2);
+    request->expression_filters.push_back(std::move(expression_filter));
+    ASSERT_TRUE(reader->open(request).ok());
+
+    size_t rows = 0;
+    bool eof = false;
+    ASSERT_TRUE(reader->get_block(&block, &rows, &eof).ok());
+    EXPECT_FALSE(eof);
+    ASSERT_EQ(rows, 3);
+
+    const auto& id_column = assert_cast<const 
ColumnInt32&>(*block.get_by_position(0).column);
+    const auto& row_position_column =
+            assert_cast<const ColumnInt64&>(*block.get_by_position(2).column);
+    EXPECT_EQ(id_column.get_element(0), 3);
+    EXPECT_EQ(id_column.get_element(1), 4);
+    EXPECT_EQ(id_column.get_element(2), 5);
+    EXPECT_EQ(row_position_column.get_element(0), 2);
+    EXPECT_EQ(row_position_column.get_element(1), 3);
+    EXPECT_EQ(row_position_column.get_element(2), 4);
+}
+
+TEST_F(NewParquetReaderTest, 
RowPositionReaderUsesFileLocalPositionsForScanRange) {
+    write_parquet_file(_file_path, 2);
+    auto parquet_file_reader = 
::parquet::ParquetFileReader::OpenFile(_file_path, false);
+    ASSERT_EQ(parquet_file_reader->metadata()->num_row_groups(), 3);
+
+    const std::vector<std::vector<int32_t>> expected_ids = {{1, 2}, {3, 4}, 
{5}};
+    const std::vector<std::vector<int64_t>> expected_row_positions = {{0, 1}, 
{2, 3}, {4}};
+    for (int row_group_idx = 0; row_group_idx < 3; ++row_group_idx) {
+        const auto [range_start_offset, range_size] =
+                row_group_mid_range(_file_path, row_group_idx);
+        auto reader = create_reader(range_start_offset, range_size);
+        RuntimeState state {TQueryOptions(), TQueryGlobals()};
+        ASSERT_TRUE(reader->init(&state).ok());
+
+        std::vector<reader::SchemaField> schema;
+        ASSERT_TRUE(reader->get_schema(&schema).ok());
+        auto request = std::make_unique<reader::FileScanRequest>();
+        request->non_predicate_columns = {
+                parquet::ParquetColumnReaderFactory::ROW_POSITION_COLUMN_ID, 
0};
+        request->column_positions = {
+                {0, 0},
+                {parquet::ParquetColumnReaderFactory::ROW_POSITION_COLUMN_ID, 
2},
+        };
+        ASSERT_TRUE(reader->open(request).ok());
+
+        std::vector<int32_t> ids;
+        std::vector<int64_t> row_positions;
+        bool eof = false;
+        while (!eof) {
+            Block block = build_file_block_with_row_position(schema);
+            size_t rows = 0;
+            ASSERT_TRUE(reader->get_block(&block, &rows, &eof).ok());
+            if (rows == 0) {
+                continue;
+            }
+            const auto& id_column =
+                    assert_cast<const 
ColumnInt32&>(*block.get_by_position(0).column);
+            const auto& row_position_column =
+                    assert_cast<const 
ColumnInt64&>(*block.get_by_position(2).column);
+            for (size_t row = 0; row < rows; ++row) {
+                ids.push_back(id_column.get_element(row));
+                row_positions.push_back(row_position_column.get_element(row));
+            }
+        }
+
+        EXPECT_EQ(ids, expected_ids[row_group_idx]);
+        EXPECT_EQ(row_positions, expected_row_positions[row_group_idx]);
+    }
+}
+
 } // namespace
 } // namespace doris
diff --git a/be/test/format/reader/table_reader_test.cpp 
b/be/test/format/reader/table_reader_test.cpp
index f770fddb723..dc050976836 100644
--- a/be/test/format/reader/table_reader_test.cpp
+++ b/be/test/format/reader/table_reader_test.cpp
@@ -20,6 +20,7 @@
 #include <arrow/api.h>
 #include <arrow/io/api.h>
 #include <gtest/gtest.h>
+#include <parquet/api/reader.h>
 #include <parquet/arrow/writer.h>
 
 #include <algorithm>
@@ -30,12 +31,15 @@
 
 #include "core/assert_cast.h"
 #include "core/block/block.h"
+#include "core/column/column_nullable.h"
 #include "core/column/column_string.h"
 #include "core/column/column_vector.h"
+#include "core/data_type/data_type_nullable.h"
 #include "core/data_type/data_type_number.h"
 #include "core/data_type/data_type_string.h"
 #include "exprs/vexpr.h"
 #include "format/reader/expr/slot_ref.h"
+#include "format/table/iceberg_reader_v2.h"
 #include "gen_cpp/PlanNodes_types.h"
 #include "runtime/runtime_state.h"
 #include "storage/predicate/predicate_creator.h"
@@ -231,6 +235,19 @@ Block build_table_block(const std::vector<TableColumn>& 
columns) {
     return block;
 }
 
+void expect_nullable_int64_column_values(const IColumn& column,
+                                         const std::vector<int64_t>& 
expected_values) {
+    const auto full_column = column.convert_to_full_column_if_const();
+    const auto& nullable_column = assert_cast<const 
ColumnNullable&>(*full_column);
+    const auto& values =
+            assert_cast<const 
ColumnInt64&>(nullable_column.get_nested_column()).get_data();
+    ASSERT_EQ(nullable_column.size(), expected_values.size());
+    for (size_t row = 0; row < expected_values.size(); ++row) {
+        EXPECT_EQ(nullable_column.get_null_map_data()[row], 0);
+        EXPECT_EQ(values[row], expected_values[row]);
+    }
+}
+
 SplitReadOptions build_split_options(const std::string& file_path) {
     SplitReadOptions options;
     options.current_range.__set_path(file_path);
@@ -239,6 +256,40 @@ SplitReadOptions build_split_options(const std::string& 
file_path) {
     return options;
 }
 
+void set_iceberg_row_lineage_params(SplitReadOptions* split_options, int64_t 
first_row_id,
+                                    int64_t last_updated_sequence_number) {
+    TTableFormatFileDesc table_format_params;
+    TIcebergFileDesc iceberg_params;
+    iceberg_params.__set_first_row_id(first_row_id);
+    
iceberg_params.__set_last_updated_sequence_number(last_updated_sequence_number);
+    table_format_params.__set_iceberg_params(iceberg_params);
+    
split_options->current_range.__set_table_format_params(table_format_params);
+}
+
+int64_t parquet_column_start_offset(const ::parquet::ColumnChunkMetaData& 
column_metadata) {
+    return column_metadata.has_dictionary_page()
+                   ? 
static_cast<int64_t>(column_metadata.dictionary_page_offset())
+                   : static_cast<int64_t>(column_metadata.data_page_offset());
+}
+
+SplitReadOptions build_split_options_for_row_group_mid(const std::string& 
file_path,
+                                                       int row_group_idx) {
+    auto options = build_split_options(file_path);
+    auto reader = ::parquet::ParquetFileReader::OpenFile(file_path, false);
+    auto metadata = reader->metadata();
+    auto row_group_metadata = metadata->RowGroup(row_group_idx);
+    auto first_column = row_group_metadata->ColumnChunk(0);
+    auto last_column = 
row_group_metadata->ColumnChunk(row_group_metadata->num_columns() - 1);
+    const int64_t row_group_start_offset = 
parquet_column_start_offset(*first_column);
+    const int64_t row_group_end_offset =
+            parquet_column_start_offset(*last_column) + 
last_column->total_compressed_size();
+    const int64_t row_group_mid_offset =
+            row_group_start_offset + (row_group_end_offset - 
row_group_start_offset) / 2;
+    options.current_range.__set_start_offset(row_group_mid_offset);
+    options.current_range.__set_size(1);
+    return options;
+}
+
 TableColumn make_table_column(ColumnId id, const std::string& name, const 
DataTypePtr& type) {
     TableColumn column;
     column.id = id;
@@ -716,6 +767,226 @@ TEST(TableReaderTest, 
ProjectedPartitionColumnUsesSplitPartitionValue) {
     std::filesystem::remove_all(test_dir);
 }
 
+TEST(TableReaderTest, IcebergVirtualColumnsUseRowLineageMetadata) {
+    const auto test_dir =
+            std::filesystem::temp_directory_path() / 
"doris_iceberg_virtual_columns_test";
+    std::filesystem::remove_all(test_dir);
+    std::filesystem::create_directories(test_dir);
+
+    const auto file_path = (test_dir / "split.parquet").string();
+    write_int_pair_parquet_file(file_path, {1, 2, 3}, {10, 20, 30}, {"one", 
"two", "three"});
+
+    std::vector<TableColumn> projected_columns;
+    projected_columns.push_back(
+            make_table_column(100, "_row_id", 
make_nullable(std::make_shared<DataTypeInt64>())));
+    projected_columns.push_back(
+            make_table_column(101, "_last_updated_sequence_number",
+                              
make_nullable(std::make_shared<DataTypeInt64>())));
+    projected_columns.push_back(make_table_column(0, "id", 
std::make_shared<DataTypeInt32>()));
+
+    RuntimeState state {TQueryOptions(), TQueryGlobals()};
+    doris::iceberg::IcebergTableReader reader;
+    ASSERT_TRUE(reader.init({
+                                    .projected_columns = projected_columns,
+                                    .column_predicates = {},
+                                    .conjuncts = VExprContext(
+                                            
std::make_shared<TableInt32GreaterThanExpr>(0, 0, 1)),
+                                    .format = FileFormat::PARQUET,
+                                    .scan_params = nullptr,
+                                    .io_ctx = nullptr,
+                                    .runtime_state = &state,
+                                    .scanner_profile = nullptr,
+                                    .allow_missing_columns = true,
+                                    .profile = nullptr,
+                            })
+                        .ok());
+
+    auto split_options = build_split_options(file_path);
+    set_iceberg_row_lineage_params(&split_options, 1000, 77);
+    ASSERT_TRUE(reader.prepare_split(split_options).ok());
+
+    Block block = build_table_block(projected_columns);
+    bool eos = false;
+    ASSERT_TRUE(reader.get_block(&block, &eos).ok());
+    ASSERT_FALSE(eos);
+
+    const auto& id_column = assert_cast<const 
ColumnInt32&>(*block.get_by_position(2).column);
+
+    ASSERT_EQ(block.rows(), 2);
+    EXPECT_EQ(id_column.get_element(0), 2);
+    EXPECT_EQ(id_column.get_element(1), 3);
+    expect_nullable_int64_column_values(*block.get_by_position(0).column, 
{1001, 1002});
+    expect_nullable_int64_column_values(*block.get_by_position(1).column, {77, 
77});
+
+    ASSERT_TRUE(reader.close().ok());
+    std::filesystem::remove_all(test_dir);
+}
+
+TEST(TableReaderTest, 
IcebergVirtualColumnsKeepRowLineageAfterConjunctFiltering) {
+    const auto test_dir =
+            std::filesystem::temp_directory_path() / 
"doris_iceberg_virtual_columns_conjunct_test";
+    std::filesystem::remove_all(test_dir);
+    std::filesystem::create_directories(test_dir);
+
+    const auto file_path = (test_dir / "split.parquet").string();
+    write_int_pair_parquet_file(file_path, {1, 2, 3}, {10, 20, 30}, {"one", 
"two", "three"});
+
+    std::vector<TableColumn> projected_columns;
+    projected_columns.push_back(
+            make_table_column(100, "_row_id", 
make_nullable(std::make_shared<DataTypeInt64>())));
+    projected_columns.push_back(
+            make_table_column(101, "_last_updated_sequence_number",
+                              
make_nullable(std::make_shared<DataTypeInt64>())));
+    projected_columns.push_back(make_table_column(0, "id", 
std::make_shared<DataTypeInt32>()));
+
+    RuntimeState state {TQueryOptions(), TQueryGlobals()};
+    doris::iceberg::IcebergTableReader reader;
+    ASSERT_TRUE(reader.init({
+                                    .projected_columns = projected_columns,
+                                    .column_predicates = {},
+                                    .conjuncts = VExprContext(
+                                            
std::make_shared<TableInt32GreaterThanExpr>(0, 0, 1)),
+                                    .format = FileFormat::PARQUET,
+                                    .scan_params = nullptr,
+                                    .io_ctx = nullptr,
+                                    .runtime_state = &state,
+                                    .scanner_profile = nullptr,
+                                    .allow_missing_columns = true,
+                                    .profile = nullptr,
+                            })
+                        .ok());
+
+    auto split_options = build_split_options(file_path);
+    set_iceberg_row_lineage_params(&split_options, 3000, 88);
+    ASSERT_TRUE(reader.prepare_split(split_options).ok());
+
+    Block block = build_table_block(projected_columns);
+    bool eos = false;
+    ASSERT_TRUE(reader.get_block(&block, &eos).ok());
+    ASSERT_FALSE(eos);
+
+    const auto& id_column = assert_cast<const 
ColumnInt32&>(*block.get_by_position(2).column);
+
+    ASSERT_EQ(block.rows(), 2);
+    EXPECT_EQ(id_column.get_element(0), 2);
+    EXPECT_EQ(id_column.get_element(1), 3);
+    expect_nullable_int64_column_values(*block.get_by_position(0).column, 
{3001, 3002});
+    expect_nullable_int64_column_values(*block.get_by_position(1).column, {88, 
88});
+
+    ASSERT_TRUE(reader.close().ok());
+    std::filesystem::remove_all(test_dir);
+}
+
+TEST(TableReaderTest, 
IcebergVirtualColumnsKeepRowLineageAfterRowGroupPredicatePruning) {
+    const auto test_dir = std::filesystem::temp_directory_path() /
+                          
"doris_iceberg_virtual_columns_row_group_predicate_test";
+    std::filesystem::remove_all(test_dir);
+    std::filesystem::create_directories(test_dir);
+
+    const auto file_path = (test_dir / "split.parquet").string();
+    // ColumnPredicate is used for row-group/statistics pruning. Keep one row 
per row group so
+    // id > 2 prunes the first two row groups and leaves only the third 
file-local row.
+    write_int_pair_parquet_file(file_path, {1, 2, 3}, {10, 20, 30}, {"one", 
"two", "three"}, 1);
+
+    std::vector<TableColumn> projected_columns;
+    projected_columns.push_back(
+            make_table_column(100, "_row_id", 
make_nullable(std::make_shared<DataTypeInt64>())));
+    projected_columns.push_back(
+            make_table_column(101, "_last_updated_sequence_number",
+                              
make_nullable(std::make_shared<DataTypeInt64>())));
+    projected_columns.push_back(make_table_column(0, "id", 
std::make_shared<DataTypeInt32>()));
+
+    TableColumnPredicates column_predicates;
+    
column_predicates[0].push_back(create_comparison_predicate<PredicateType::GT>(
+            0, "id", std::make_shared<DataTypeInt32>(), 
Field::create_field<TYPE_INT>(2), false));
+
+    RuntimeState state {TQueryOptions(), TQueryGlobals()};
+    doris::iceberg::IcebergTableReader reader;
+    ASSERT_TRUE(reader.init({
+                                    .projected_columns = projected_columns,
+                                    .column_predicates = 
std::move(column_predicates),
+                                    .conjuncts = VExprContext(nullptr),
+                                    .format = FileFormat::PARQUET,
+                                    .scan_params = nullptr,
+                                    .io_ctx = nullptr,
+                                    .runtime_state = &state,
+                                    .scanner_profile = nullptr,
+                                    .allow_missing_columns = true,
+                                    .profile = nullptr,
+                            })
+                        .ok());
+
+    auto split_options = build_split_options(file_path);
+    set_iceberg_row_lineage_params(&split_options, 4000, 99);
+    ASSERT_TRUE(reader.prepare_split(split_options).ok());
+
+    Block block = build_table_block(projected_columns);
+    bool eos = false;
+    ASSERT_TRUE(reader.get_block(&block, &eos).ok());
+    ASSERT_FALSE(eos);
+
+    const auto& id_column = assert_cast<const 
ColumnInt32&>(*block.get_by_position(2).column);
+
+    ASSERT_EQ(block.rows(), 1);
+    EXPECT_EQ(id_column.get_element(0), 3);
+    expect_nullable_int64_column_values(*block.get_by_position(0).column, 
{4002});
+    expect_nullable_int64_column_values(*block.get_by_position(1).column, 
{99});
+
+    ASSERT_TRUE(reader.close().ok());
+    std::filesystem::remove_all(test_dir);
+}
+
+TEST(TableReaderTest, ParquetReaderReadsOnlyRowGroupsInFileRange) {
+    const auto test_dir =
+            std::filesystem::temp_directory_path() / 
"doris_table_reader_file_range_test";
+    std::filesystem::remove_all(test_dir);
+    std::filesystem::create_directories(test_dir);
+
+    const auto file_path = (test_dir / "split.parquet").string();
+    write_int_pair_parquet_file(file_path, {1, 2, 3}, {10, 20, 30},
+                                {"range_group_one", "range_group_two", 
"range_group_three"}, 1);
+
+    std::vector<TableColumn> projected_columns;
+    projected_columns.push_back(make_table_column(0, "id", 
std::make_shared<DataTypeInt32>()));
+    projected_columns.push_back(make_table_column(2, "value", 
std::make_shared<DataTypeString>()));
+
+    RuntimeState state {TQueryOptions(), TQueryGlobals()};
+    TableReader reader;
+    ASSERT_TRUE(reader.init({
+                                    .projected_columns = projected_columns,
+                                    .column_predicates = {},
+                                    .conjuncts = VExprContext(nullptr),
+                                    .format = FileFormat::PARQUET,
+                                    .scan_params = nullptr,
+                                    .io_ctx = nullptr,
+                                    .runtime_state = &state,
+                                    .scanner_profile = nullptr,
+                                    .allow_missing_columns = true,
+                                    .profile = nullptr,
+                            })
+                        .ok());
+
+    
ASSERT_TRUE(reader.prepare_split(build_split_options_for_row_group_mid(file_path,
 1)).ok());
+
+    Block block = build_table_block(projected_columns);
+    bool eos = false;
+    ASSERT_TRUE(reader.get_block(&block, &eos).ok());
+    ASSERT_FALSE(eos);
+
+    const auto& id_column = assert_cast<const 
ColumnInt32&>(*block.get_by_position(0).column);
+    const auto& value_column = assert_cast<const 
ColumnString&>(*block.get_by_position(1).column);
+    ASSERT_EQ(block.rows(), 1);
+    EXPECT_EQ(id_column.get_element(0), 2);
+    EXPECT_EQ(value_column.get_data_at(0).to_string(), "range_group_two");
+
+    ASSERT_TRUE(reader.get_block(&block, &eos).ok());
+    EXPECT_TRUE(eos);
+    EXPECT_EQ(block.rows(), 0);
+
+    ASSERT_TRUE(reader.close().ok());
+    std::filesystem::remove_all(test_dir);
+}
+
 TEST(TableReaderTest, 
ProjectedColumnsUseMapperExpressionForSameNameDifferentIdParquetSchema) {
     const auto test_dir =
             std::filesystem::temp_directory_path() / 
"doris_table_reader_same_name_diff_id_test";


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to