This is an automated email from the ASF dual-hosted git repository.

Gabriel39 pushed a commit to branch refact_reader_branch
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/refact_reader_branch by this 
push:
     new 6b3ce8c9185 [feature](be) Support Iceberg equality deletes in reader 
(#63852)
6b3ce8c9185 is described below

commit 6b3ce8c9185f7f837e2f26645f444e316fbd2c5b
Author: Gabriel <[email protected]>
AuthorDate: Thu May 28 23:16:41 2026 +0800

    [feature](be) Support Iceberg equality deletes in reader (#63852)
    
    ### What problem does this PR solve?
    
    Issue Number: close #xxx
    
    Related PR: #xxx
    
    Problem Summary: Implement Iceberg equality delete filtering in the v2
    Iceberg reader by materializing equality delete keys as delete predicate
    expressions and applying them through the file reader filter path.
    
    ### Release note
    
    Support reading Iceberg equality delete files in the BE Iceberg reader.
    
    ### Check List (For Author)
    
    - Test: Unit Test / Manual test
    - Added EqualityDeletePredicateTest for single-column, multi-column,
    null matching, and error handling.
        - Manual test: git diff --check.
    - Not run: run-be-ut.sh failed because this environment only has JDK 11
    and requires JDK 17; clang-format script failed because llvm@16 is not
    installed.
    - Behavior changed: Yes, Iceberg reader now filters equality-deleted
    rows.
    - Does this need documentation: No
    
    ### What problem does this PR solve?
    
    Issue Number: close #xxx
    
    Related PR: #xxx
    
    Problem Summary:
    
    ### Release note
    
    None
    
    ### Check List (For Author)
    
    - Test <!-- At least one of them must be included. -->
        - [ ] Regression test
        - [ ] Unit Test
        - [ ] Manual test (add detailed scripts or steps below)
        - [ ] No need to test or manual test. Explain why:
    - [ ] This is a refactor/code format and no logic has been changed.
            - [ ] Previous test can cover this change.
            - [ ] No code files have been changed.
            - [ ] Other reason <!-- Add your reason?  -->
    
    - Behavior changed:
        - [ ] No.
        - [ ] Yes. <!-- Explain the behavior change -->
    
    - Does this need documentation?
        - [ ] No.
    - [ ] Yes. <!-- Add document PR link here. eg:
    https://github.com/apache/doris-website/pull/1214 -->
    
    ### Check List (For Reviewer who merge this PR)
    
    - [ ] Confirm the release note
    - [ ] Confirm test cases
    - [ ] Confirm document
    - [ ] Add branch pick label <!-- Add branch pick label that this PR
    should merge into -->
---
 be/src/format/new_parquet/parquet_reader.cpp       |   8 +-
 .../reader/expr/equality_delete_predicate.cpp      | 158 ++++++++++++++
 .../format/reader/expr/equality_delete_predicate.h |  71 +++++++
 be/src/format/reader/table_reader.cpp              |   2 +-
 be/src/format/reader/table_reader.h                |   4 +-
 be/src/format/table/iceberg_reader_v2.cpp          | 229 ++++++++++++++++-----
 be/src/format/table/iceberg_reader_v2.h            |  51 ++---
 .../reader/expr/equality_delete_predicate_test.cpp | 181 ++++++++++++++++
 be/test/format/reader/table_reader_test.cpp        |  19 +-
 9 files changed, 634 insertions(+), 89 deletions(-)

diff --git a/be/src/format/new_parquet/parquet_reader.cpp 
b/be/src/format/new_parquet/parquet_reader.cpp
index 489e184cd2b..5e4107d727d 100644
--- a/be/src/format/new_parquet/parquet_reader.cpp
+++ b/be/src/format/new_parquet/parquet_reader.cpp
@@ -339,9 +339,9 @@ Status ParquetReader::_execute_filter_conjuncts(int64_t 
batch_rows, Block* file_
             RETURN_IF_ERROR(expression_filter.conjunct->execute_filter(
                     file_block, filter.data(), 
static_cast<size_t>(batch_rows), false,
                     &can_filter_all));
-            *selected_rows = can_filter_all ? 0
-                                            : 
_apply_filter_to_selection(filter, selection,
-                                                                         
*selected_rows);
+            *selected_rows =
+                    can_filter_all ? 0
+                                   : _apply_filter_to_selection(filter, 
selection, *selected_rows);
         }
         if (*selected_rows == 0) {
             break;
@@ -367,7 +367,7 @@ Status ParquetReader::_execute_filter_conjuncts(int64_t 
batch_rows, Block* file_
         file_block->erase(result_column_id);
         *selected_rows =
                 !has_kept_row ? 0
-                               : _apply_filter_to_selection(keep_filter, 
selection, *selected_rows);
+                              : _apply_filter_to_selection(keep_filter, 
selection, *selected_rows);
     }
     return Status::OK();
 }
diff --git a/be/src/format/reader/expr/equality_delete_predicate.cpp 
b/be/src/format/reader/expr/equality_delete_predicate.cpp
new file mode 100644
index 00000000000..2b714abade7
--- /dev/null
+++ b/be/src/format/reader/expr/equality_delete_predicate.cpp
@@ -0,0 +1,158 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "format/reader/expr/equality_delete_predicate.h"
+
+#include <gen_cpp/Exprs_types.h>
+
+#include <utility>
+
+#include "common/status.h"
+#include "core/assert_cast.h"
+#include "core/block/column_with_type_and_name.h"
+#include "core/column/column_nullable.h"
+#include "core/column/column_vector.h"
+#include "core/data_type/data_type_number.h"
+
+namespace doris {
+namespace {
+
+bool column_value_equal(const ColumnPtr& lhs, size_t lhs_row, const ColumnPtr& 
rhs,
+                        size_t rhs_row) {
+    if (lhs->is_nullable() && rhs->is_nullable()) {
+        return lhs->compare_at(lhs_row, rhs_row, *rhs, -1) == 0;
+    }
+    if (lhs->is_nullable()) {
+        const auto& nullable_lhs = assert_cast<const ColumnNullable&>(*lhs);
+        return !nullable_lhs.is_null_at(lhs_row) &&
+               nullable_lhs.get_nested_column().compare_at(lhs_row, rhs_row, 
*rhs, -1) == 0;
+    }
+    if (rhs->is_nullable()) {
+        const auto& nullable_rhs = assert_cast<const ColumnNullable&>(*rhs);
+        return !nullable_rhs.is_null_at(rhs_row) &&
+               lhs->compare_at(lhs_row, rhs_row, 
nullable_rhs.get_nested_column(), -1) == 0;
+    }
+    return lhs->compare_at(lhs_row, rhs_row, *rhs, -1) == 0;
+}
+
+} // namespace
+
+EqualityDeletePredicate::EqualityDeletePredicate(Block delete_block, 
std::vector<int> field_ids)
+        : VExpr(), _delete_block(std::move(delete_block)), 
_field_ids(std::move(field_ids)) {
+    _node_type = TExprNodeType::PREDICATE;
+    _opcode = TExprOpcode::DELETE;
+    _data_type = std::make_shared<DataTypeBool>();
+    _expr_name = "EqualityDeletePredicate";
+    DCHECK_EQ(_delete_block.columns(), _field_ids.size());
+    _delete_hashes = _build_hashes(_delete_block);
+    for (size_t row = 0; row < _delete_hashes.size(); ++row) {
+        _delete_hash_map.emplace(_delete_hashes[row], row);
+    }
+}
+
+Status EqualityDeletePredicate::prepare(RuntimeState* state, const 
RowDescriptor& desc,
+                                        VExprContext* context) {
+    RETURN_IF_ERROR_OR_PREPARED(VExpr::prepare(state, desc, context));
+    _expr_name = "EqualityDeletePredicate";
+    _prepare_finished = true;
+    return Status::OK();
+}
+
+Status EqualityDeletePredicate::open(RuntimeState* state, VExprContext* 
context,
+                                     FunctionContext::FunctionStateScope 
scope) {
+    DCHECK(_prepare_finished);
+    for (auto& child : _children) {
+        RETURN_IF_ERROR(child->open(state, context, scope));
+    }
+    if (scope == FunctionContext::FRAGMENT_LOCAL) {
+        RETURN_IF_ERROR(VExpr::get_const_col(context, nullptr));
+    }
+    _open_finished = true;
+    return Status::OK();
+}
+
+void EqualityDeletePredicate::close(VExprContext* context,
+                                    FunctionContext::FunctionStateScope scope) 
{
+    VExpr::close(context, scope);
+}
+
+Status EqualityDeletePredicate::execute(VExprContext* context, Block* block,
+                                        int* result_column_id) const {
+    if (_children.size() != _field_ids.size()) {
+        return Status::InternalError(
+                "EqualityDeletePredicate should have {} child exprs, but got 
{}", _field_ids.size(),
+                _children.size());
+    }
+
+    Block data_key_block;
+    for (const auto& child : _children) {
+        int slot = -1;
+        RETURN_IF_ERROR(child->execute(context, block, &slot));
+        const auto& key_column = block->get_by_position(slot);
+        data_key_block.insert({key_column.column, key_column.type, 
key_column.name});
+    }
+
+    const auto rows = data_key_block.rows();
+    auto res_col = ColumnBool::create(rows, 0);
+    if (_delete_hash_map.empty() || rows == 0) {
+        block->insert({std::move(res_col), std::make_shared<DataTypeBool>(), 
expr_name()});
+        *result_column_id = static_cast<int>(block->columns() - 1);
+        return Status::OK();
+    }
+
+    auto data_hashes = _build_hashes(data_key_block);
+    auto& result_data = res_col->get_data();
+    for (size_t row = 0; row < rows; ++row) {
+        const auto range = _delete_hash_map.equal_range(data_hashes[row]);
+        for (auto it = range.first; it != range.second; ++it) {
+            if (_equal(data_key_block, row, it->second)) {
+                result_data[row] = true;
+                break;
+            }
+        }
+    }
+
+    block->insert({std::move(res_col), std::make_shared<DataTypeBool>(), 
expr_name()});
+    *result_column_id = static_cast<int>(block->columns() - 1);
+    return Status::OK();
+}
+
+std::vector<uint64_t> EqualityDeletePredicate::_build_hashes(const Block& 
block) {
+    std::vector<uint64_t> hashes(block.rows(), 0);
+    for (const auto& column : block.get_columns()) {
+        column->update_hashes_with_value(hashes.data(), nullptr);
+    }
+    return hashes;
+}
+
+bool EqualityDeletePredicate::_equal(const Block& data_block, size_t data_row,
+                                     size_t delete_row) const {
+    for (size_t column_idx = 0; column_idx < _delete_block.columns(); 
++column_idx) {
+        const auto& data_column = 
data_block.get_by_position(column_idx).column;
+        const auto& delete_column = 
_delete_block.get_by_position(column_idx).column;
+        if (!column_value_equal(data_column, data_row, delete_column, 
delete_row)) {
+            return false;
+        }
+    }
+    return true;
+}
+
+std::string EqualityDeletePredicate::debug_string() const {
+    return _expr_name;
+}
+
+} // namespace doris
diff --git a/be/src/format/reader/expr/equality_delete_predicate.h 
b/be/src/format/reader/expr/equality_delete_predicate.h
new file mode 100644
index 00000000000..2e33cffb398
--- /dev/null
+++ b/be/src/format/reader/expr/equality_delete_predicate.h
@@ -0,0 +1,71 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstddef>
+#include <cstdint>
+#include <map>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "common/status.h"
+#include "core/block/block.h"
+#include "exprs/function_context.h"
+#include "exprs/vexpr.h"
+
+namespace doris {
+class RowDescriptor;
+class RuntimeState;
+class VExprContext;
+} // namespace doris
+
+namespace doris {
+
+class EqualityDeletePredicate final : public VExpr {
+    ENABLE_FACTORY_CREATOR(EqualityDeletePredicate);
+
+public:
+    EqualityDeletePredicate(Block delete_block, std::vector<int> field_ids);
+    ~EqualityDeletePredicate() override = default;
+
+    Status execute(VExprContext* context, Block* block, int* result_column_id) 
const override;
+    Status execute_column_impl(VExprContext* context, const Block* block, 
const Selector* selector,
+                               size_t count, ColumnPtr& result_column) const 
override {
+        return Status::InternalError("Not implement 
EqualityDeletePredicate::execute_column_impl");
+    }
+    Status prepare(RuntimeState* state, const RowDescriptor& desc, 
VExprContext* context) override;
+    Status open(RuntimeState* state, VExprContext* context,
+                FunctionContext::FunctionStateScope scope) override;
+    void close(VExprContext* context, FunctionContext::FunctionStateScope 
scope) override;
+    std::string debug_string() const override;
+    uint64_t get_digest(uint64_t seed) const override { return 0; }
+    const std::string& expr_name() const override { return _expr_name; }
+
+private:
+    static std::vector<uint64_t> _build_hashes(const Block& block);
+    bool _equal(const Block& data_block, size_t data_row, size_t delete_row) 
const;
+
+    std::string _expr_name;
+    Block _delete_block;
+    std::vector<int> _field_ids;
+    std::vector<uint64_t> _delete_hashes;
+    std::multimap<uint64_t, size_t> _delete_hash_map;
+};
+
+} // namespace doris
diff --git a/be/src/format/reader/table_reader.cpp 
b/be/src/format/reader/table_reader.cpp
index 0735cc51f38..8289d637d78 100644
--- a/be/src/format/reader/table_reader.cpp
+++ b/be/src/format/reader/table_reader.cpp
@@ -21,8 +21,8 @@
 #include <gen_cpp/Types_types.h>
 
 #include <cstring>
-#include <stdexcept>
 #include <set>
+#include <stdexcept>
 #include <vector>
 
 #include "common/cast_set.h"
diff --git a/be/src/format/reader/table_reader.h 
b/be/src/format/reader/table_reader.h
index f94e98bd837..de7626dfb24 100644
--- a/be/src/format/reader/table_reader.h
+++ b/be/src/format/reader/table_reader.h
@@ -139,7 +139,9 @@ public:
     // 子类可以在自己的 init(options) 中调用该方法;这里不接收具体表格式 schema/task。
     virtual Status init(TableReadOptions options);
 
-    // 读取当前 split/partition 之前初始化。
+    // Prepare for reading a new split/task.
+    // 1. Pass a new split/task to reader, which will be used in subsequent 
open_reader() to initialize the underlying file reader.
+    // 2. Parse delete predicates from split/task information, which will be 
used for later dynamic filtering and delete handling.
     virtual Status prepare_split(const SplitReadOptions& options);
 
     // table-level 动态过滤入口。
diff --git a/be/src/format/table/iceberg_reader_v2.cpp 
b/be/src/format/table/iceberg_reader_v2.cpp
index ad72313cc89..ed6649fce2c 100644
--- a/be/src/format/table/iceberg_reader_v2.cpp
+++ b/be/src/format/table/iceberg_reader_v2.cpp
@@ -22,6 +22,7 @@
 #include <memory>
 #include <utility>
 
+#include "common/cast_set.h"
 #include "core/assert_cast.h"
 #include "core/block/block.h"
 #include "core/column/column_const.h"
@@ -33,30 +34,32 @@
 #include "core/field.h"
 #include "format/new_parquet/column_reader.h"
 #include "format/new_parquet/parquet_reader.h"
+#include "format/reader/expr/cast.h"
+#include "format/reader/expr/equality_delete_predicate.h"
+#include "format/reader/expr/slot_ref.h"
 #include "format/reader/table_reader.h"
 #include "format/table/deletion_vector_reader.h"
 #include "io/file_factory.h"
 
 namespace doris::iceberg {
 
-IcebergTableReader::PositionDeleteBlockCollector::PositionDeleteBlockCollector(
-        std::string data_file_path, std::map<std::string, reader::DeleteRows>* 
rows)
+IcebergTableReader::PositionDeleteRowsCollector::PositionDeleteRowsCollector(
+        std::string data_file_path, reader::DeleteRows* rows)
         : _data_file_path(std::move(data_file_path)), _rows(rows) {}
 
-Status IcebergTableReader::PositionDeleteBlockCollector::collect(const Block& 
block,
-                                                                 size_t 
read_rows) {
+Status IcebergTableReader::PositionDeleteRowsCollector::collect(const Block& 
block,
+                                                                size_t 
read_rows) {
     if (read_rows == 0) {
         return Status::OK();
     }
     const auto& file_path_column = assert_cast<const ColumnString&>(
             *block.get_by_position(ICEBERG_FILE_PATH_BLOCK_POSITION).column);
-    const auto& pos_column =
-            assert_cast<const 
ColumnInt64&>(*block.get_by_position(ICEBERG_ROW_POS_BLOCK_POSITION)
-                                                     .column);
+    const auto& pos_column = assert_cast<const ColumnInt64&>(
+            *block.get_by_position(ICEBERG_ROW_POS_BLOCK_POSITION).column);
     for (size_t row = 0; row < read_rows; ++row) {
         const auto file_path = file_path_column.get_data_at(row).to_string();
         if (file_path == _data_file_path) {
-            (*_rows)[file_path].push_back(pos_column.get_element(row));
+            _rows->push_back(pos_column.get_element(row));
         }
     }
     return Status::OK();
@@ -67,7 +70,7 @@ Status IcebergTableReader::prepare_split(const 
reader::SplitReadOptions& options
     _iceberg_params = nullptr;
     _delete_predicates_initialized = false;
     _position_delete_rows_storage.clear();
-    _equality_delete_files.clear();
+    _equality_delete_filters.clear();
     if (options.current_range.__isset.table_format_params &&
         options.current_range.table_format_params.__isset.iceberg_params) {
         const auto& iceberg_params = 
options.current_range.table_format_params.iceberg_params;
@@ -81,13 +84,7 @@ Status IcebergTableReader::prepare_split(const 
reader::SplitReadOptions& options
         }
     }
     RETURN_IF_ERROR(TableReader::prepare_split(options));
-    return 
_collect_position_delete_rows(options.current_range.table_format_params);
-}
-
-Status IcebergTableReader::finalize_chunk(Block* block, const size_t rows) {
-    RETURN_IF_ERROR(reader::TableReader::finalize_chunk(block, rows));
-    RETURN_IF_ERROR(apply_equality_deletes(block));
-    return Status::OK();
+    return _init_delete_predicates(options.current_range.table_format_params);
 }
 
 Status IcebergTableReader::materialize_virtual_columns(Block* table_block) {
@@ -114,6 +111,7 @@ Status 
IcebergTableReader::customize_file_scan_request(reader::FileScanRequest*
     if (_row_lineage_columns.first_row_id >= 0 && _need_row_lineage_row_id()) {
         RETURN_IF_ERROR(_append_row_position_output_column(file_request));
     }
+    RETURN_IF_ERROR(_append_equality_delete_predicates(file_request));
     return Status::OK();
 }
 
@@ -161,7 +159,7 @@ Status 
IcebergTableReader::_parse_deletion_vector_file(const TTableFormatFileDes
     return Status::OK();
 }
 
-Status IcebergTableReader::_collect_position_delete_rows(const 
TTableFormatFileDesc& t_desc) {
+Status IcebergTableReader::_init_delete_predicates(const TTableFormatFileDesc& 
t_desc) {
     if (!t_desc.__isset.iceberg_params || _delete_predicates_initialized) {
         _delete_predicates_initialized = true;
         return Status::OK();
@@ -175,6 +173,7 @@ Status 
IcebergTableReader::_collect_position_delete_rows(const TTableFormatFileD
     }
 
     std::vector<TIcebergDeleteFileDesc> position_delete_files;
+    std::vector<TIcebergDeleteFileDesc> equality_delete_files;
     for (const auto& delete_file : iceberg_params.delete_files) {
         if (!delete_file.__isset.content) {
             continue;
@@ -182,29 +181,31 @@ Status 
IcebergTableReader::_collect_position_delete_rows(const TTableFormatFileD
         if (delete_file.content == POSITION_DELETE) {
             position_delete_files.push_back(delete_file);
         } else if (delete_file.content == EQUALITY_DELETE) {
-            _equality_delete_files.push_back(delete_file);
+            equality_delete_files.push_back(delete_file);
         }
     }
 
+    // `_delete_rows != nullptr` means DeleteVector is parsed
     if (_delete_rows != nullptr) {
         _position_delete_rows_storage = *_delete_rows;
         _delete_rows = &_position_delete_rows_storage;
     }
+    // Combine position delete rows from both deletion vector and position 
delete files, and
+    // initialize equality delete predicates. Position delete files contain 
row positions of
+    // deleted rows, which can be directly added to `_delete_rows`. Equality 
delete files contain
+    // values of deleted rows, which require reading the files and building 
predicates for later
+    // filtering.
     if (!position_delete_files.empty()) {
-        RETURN_IF_ERROR(_read_position_delete_files(position_delete_files));
+        RETURN_IF_ERROR(_init_position_delete_rows(position_delete_files));
+    }
+    if (!equality_delete_files.empty()) {
+        
RETURN_IF_ERROR(_init_equality_delete_predicates(equality_delete_files));
     }
 
     _delete_predicates_initialized = true;
     return Status::OK();
 }
 
-Status IcebergTableReader::apply_equality_deletes(Block* block) {
-    if (!_equality_delete_files.empty()) {
-        return Status::NotSupported("Iceberg equality delete is not supported 
by TableReader");
-    }
-    return Status::OK();
-}
-
 std::string IcebergTableReader::_iceberg_delete_vector_cache_key(
         const TIcebergDeleteFileDesc& delete_file) {
     const std::string key_prefix = "iceberg_dv:";
@@ -259,14 +260,6 @@ const reader::SchemaField* 
IcebergTableReader::_find_delete_field(
     return nullptr;
 }
 
-Block IcebergTableReader::_build_position_delete_block(const 
reader::SchemaField& file_path_field,
-                                                       const 
reader::SchemaField& pos_field) {
-    Block block;
-    block.insert({file_path_field.type->create_column(), file_path_field.type, 
ICEBERG_FILE_PATH});
-    block.insert({pos_field.type->create_column(), pos_field.type, 
ICEBERG_ROW_POS});
-    return block;
-}
-
 Status 
IcebergTableReader::_append_row_position_output_column(reader::FileScanRequest* 
request) {
     const auto row_position_column_id =
             doris::parquet::ParquetColumnReaderFactory::ROW_POSITION_COLUMN_ID;
@@ -275,6 +268,46 @@ Status 
IcebergTableReader::_append_row_position_output_column(reader::FileScanRe
     return Status::OK();
 }
 
+Status 
IcebergTableReader::_append_equality_delete_predicates(reader::FileScanRequest* 
request) {
+    DORIS_CHECK(request != nullptr);
+    for (const auto& filter : _equality_delete_filters) {
+        auto delete_predicate =
+                std::make_shared<EqualityDeletePredicate>(filter.delete_block, 
filter.field_ids);
+        reader::FileExpressionFilter expression_filter;
+        expression_filter.delete_conjunct = 
VExprContext::create_shared(delete_predicate);
+        DCHECK_EQ(filter.field_ids.size(), filter.key_types.size());
+        for (size_t idx = 0; idx < filter.field_ids.size(); ++idx) {
+            const int field_id = filter.field_ids[idx];
+            auto field_it =
+                    std::find_if(_data_reader.file_schema.begin(), 
_data_reader.file_schema.end(),
+                                 [field_id](const reader::SchemaField& field) {
+                                     return !field.field_id_path.empty() &&
+                                            field.field_id_path.back() == 
field_id;
+                                 });
+            if (field_it == _data_reader.file_schema.end()) {
+                return Status::InternalError(
+                        "Can not find equality delete column field id {} in 
data file schema",
+                        field_id);
+            }
+            _append_file_scan_column(request, field_it->id, 
&request->predicate_columns);
+            const auto block_position = 
request->column_positions.at(field_it->id);
+            auto slot = 
TableSlotRef::create_shared(cast_set<int>(block_position),
+                                                    
cast_set<int>(block_position), -1,
+                                                    field_it->type, 
field_it->name);
+            if (field_it->type->equals(*filter.key_types[idx])) {
+                delete_predicate->add_child(std::move(slot));
+            } else {
+                auto cast_expr = Cast::create_shared(filter.key_types[idx]);
+                cast_expr->add_child(std::move(slot));
+                delete_predicate->add_child(std::move(cast_expr));
+            }
+            expression_filter.file_column_ids.push_back(field_it->id);
+        }
+        request->expression_filters.push_back(std::move(expression_filter));
+    }
+    return Status::OK();
+}
+
 std::string IcebergTableReader::_data_file_path() const {
     if (_iceberg_params != nullptr && 
_iceberg_params->__isset.original_file_path) {
         return _iceberg_params->original_file_path;
@@ -286,7 +319,7 @@ std::string IcebergTableReader::_data_file_path() const {
 
 Status IcebergTableReader::_read_parquet_position_delete_file(
         const TIcebergDeleteFileDesc& delete_file, const TFileScanRangeParams& 
scan_params,
-        IcebergDeleteFileIOContext* delete_io_ctx, 
PositionDeleteBlockCollector* collector) {
+        IcebergDeleteFileIOContext* delete_io_ctx, 
PositionDeleteRowsCollector* collector) {
     if (!delete_file.__isset.file_format) {
         return Status::InternalError("Iceberg position delete file is missing 
file format");
     }
@@ -326,8 +359,16 @@ Status 
IcebergTableReader::_read_parquet_position_delete_file(
     RETURN_IF_ERROR(reader.open(request));
 
     bool eof = false;
+    auto build_position_delete_block = [](const reader::SchemaField& 
file_path_field,
+                                          const reader::SchemaField& 
pos_field) -> Block {
+        Block block;
+        block.insert(
+                {file_path_field.type->create_column(), file_path_field.type, 
ICEBERG_FILE_PATH});
+        block.insert({pos_field.type->create_column(), pos_field.type, 
ICEBERG_ROW_POS});
+        return block;
+    };
     while (!eof) {
-        Block block = _build_position_delete_block(*file_path_field, 
*pos_field);
+        Block block = build_position_delete_block(*file_path_field, 
*pos_field);
         size_t read_rows = 0;
         RETURN_IF_ERROR(reader.get_block(&block, &read_rows, &eof));
         RETURN_IF_ERROR(collector->collect(block, read_rows));
@@ -335,35 +376,127 @@ Status 
IcebergTableReader::_read_parquet_position_delete_file(
     return reader.close();
 }
 
-Status IcebergTableReader::_read_position_delete_files(
+Status IcebergTableReader::_init_position_delete_rows(
         const std::vector<TIcebergDeleteFileDesc>& delete_files) {
     TFileScanRangeParams delete_scan_params =
             _scan_params == nullptr ? TFileScanRangeParams() : *_scan_params;
-    std::map<std::string, reader::DeleteRows> rows_by_file;
+    reader::DeleteRows position_delete_rows;
     const auto data_file_path = _data_file_path();
     IcebergDeleteFileIOContext delete_io_ctx(_runtime_state);
-    PositionDeleteBlockCollector collector(data_file_path, &rows_by_file);
+    PositionDeleteRowsCollector collector(data_file_path, 
&position_delete_rows);
     for (const auto& delete_file : delete_files) {
         RETURN_IF_ERROR(_read_parquet_position_delete_file(delete_file, 
delete_scan_params,
                                                            &delete_io_ctx, 
&collector));
     }
-    auto rows_it = rows_by_file.find(data_file_path);
-    if (rows_it == rows_by_file.end()) {
+    if (position_delete_rows.empty()) {
         return Status::OK();
     }
     // Position delete files and deletion vectors both become row-position 
deletes for the
     // common TableReader DeletePredicate path. Keep the merged rows in a 
member vector because
     // DeletePredicate stores a reference to the vector used by _delete_rows.
     _position_delete_rows_storage.insert(_position_delete_rows_storage.end(),
-                                         rows_it->second.begin(), 
rows_it->second.end());
+                                         position_delete_rows.begin(), 
position_delete_rows.end());
     std::sort(_position_delete_rows_storage.begin(), 
_position_delete_rows_storage.end());
-    
_position_delete_rows_storage.erase(std::unique(_position_delete_rows_storage.begin(),
-                                                    
_position_delete_rows_storage.end()),
-                                        _position_delete_rows_storage.end());
+    _position_delete_rows_storage.erase(
+            std::unique(_position_delete_rows_storage.begin(), 
_position_delete_rows_storage.end()),
+            _position_delete_rows_storage.end());
     _delete_rows = &_position_delete_rows_storage;
     return Status::OK();
 }
 
+Status IcebergTableReader::_init_equality_delete_predicates(
+        const std::vector<TIcebergDeleteFileDesc>& delete_files) {
+    TFileScanRangeParams delete_scan_params =
+            _scan_params == nullptr ? TFileScanRangeParams() : *_scan_params;
+    IcebergDeleteFileIOContext delete_io_ctx(_runtime_state);
+    for (const auto& delete_file : delete_files) {
+        RETURN_IF_ERROR(_read_parquet_equality_delete_file(delete_file, 
delete_scan_params,
+                                                           &delete_io_ctx));
+    }
+    return Status::OK();
+}
+
+Status IcebergTableReader::_read_parquet_equality_delete_file(
+        const TIcebergDeleteFileDesc& delete_file, const TFileScanRangeParams& 
scan_params,
+        IcebergDeleteFileIOContext* delete_io_ctx) {
+    if (!delete_file.__isset.file_format) {
+        return Status::InternalError("Iceberg equality delete file is missing 
file format");
+    }
+    if (delete_file.file_format != TFileFormatType::FORMAT_PARQUET) {
+        return Status::NotSupported("Unsupported Iceberg equality delete file 
format {}",
+                                    delete_file.file_format);
+    }
+    if (!delete_file.__isset.field_ids || delete_file.field_ids.empty()) {
+        return Status::InternalError("Iceberg equality delete file is missing 
field ids");
+    }
+
+    auto delete_range = build_iceberg_delete_file_range(delete_file.path);
+    if (_current_task != nullptr && _current_task->data_file != nullptr &&
+        !_current_task->data_file->fs_name.empty()) {
+        delete_range.__set_fs_name(_current_task->data_file->fs_name);
+    }
+    auto system_properties = _delete_file_system_properties(scan_params);
+    auto file_description = _delete_file_description(delete_range);
+    std::shared_ptr<io::IOContext> io_ctx(&delete_io_ctx->io_ctx, 
[](io::IOContext*) {});
+    parquet::ParquetReader reader(system_properties, file_description, io_ctx, 
_scanner_profile);
+    RETURN_IF_ERROR(reader.init(_runtime_state));
+
+    std::vector<reader::SchemaField> schema;
+    RETURN_IF_ERROR(reader.get_schema(&schema));
+    std::vector<reader::SchemaField> delete_fields;
+    std::vector<int> delete_field_ids;
+    std::vector<DataTypePtr> delete_key_types;
+    for (const auto field_id : delete_file.field_ids) {
+        auto field_it = std::find_if(
+                schema.begin(), schema.end(), [field_id](const 
reader::SchemaField& field) {
+                    return !field.field_id_path.empty() && 
field.field_id_path.back() == field_id;
+                });
+        if (field_it == schema.end()) {
+            return Status::InternalError("Can not find field id {} in equality 
delete file {}",
+                                         field_id, delete_file.path);
+        }
+        if (!field_it->children.empty()) {
+            return Status::NotSupported(
+                    "Iceberg equality delete does not support complex column 
{}", field_it->name);
+        }
+        delete_fields.push_back(*field_it);
+        delete_field_ids.push_back(field_id);
+        delete_key_types.push_back(field_it->type);
+    }
+
+    auto request = std::make_unique<reader::FileScanRequest>();
+    for (size_t idx = 0; idx < delete_fields.size(); ++idx) {
+        request->non_predicate_columns.push_back(delete_fields[idx].id);
+        request->column_positions.emplace(delete_fields[idx].id, idx);
+    }
+    RETURN_IF_ERROR(reader.open(request));
+
+    auto build_equality_delete_block = [](const 
std::vector<reader::SchemaField> fields) -> Block {
+        Block block;
+        for (const auto& field : fields) {
+            block.insert({field.type->create_column(), field.type, 
field.name});
+        }
+        return block;
+    };
+    Block delete_block = build_equality_delete_block(delete_fields);
+    bool eof = false;
+    while (!eof) {
+        Block block = build_equality_delete_block(delete_fields);
+        size_t read_rows = 0;
+        RETURN_IF_ERROR(reader.get_block(&block, &read_rows, &eof));
+        if (read_rows > 0) {
+            MutableBlock mutable_block(&delete_block);
+            RETURN_IF_ERROR(mutable_block.merge(block));
+        }
+    }
+    RETURN_IF_ERROR(reader.close());
+    _equality_delete_filters.push_back(
+            EqualityDeleteFilter {.field_ids = std::move(delete_field_ids),
+                                  .key_types = std::move(delete_key_types),
+                                  .delete_block = std::move(delete_block)});
+    return Status::OK();
+}
+
 Status IcebergTableReader::_materialize_row_lineage_row_id(Block* table_block, 
size_t column_idx) {
     if (_row_lineage_columns.first_row_id < 0) {
         return Status::OK();
@@ -372,9 +505,9 @@ Status 
IcebergTableReader::_materialize_row_lineage_row_id(Block* table_block, s
     const auto& row_position_column = assert_cast<const ColumnInt64&>(
             
*_data_reader.block_template.get_by_position(_row_position_block_position).column);
     DORIS_CHECK(row_position_column.size() == table_block->rows());
-    auto column =
-            
table_block->get_by_position(column_idx).column->convert_to_full_column_if_const()
-                    ->assume_mutable();
+    auto column = table_block->get_by_position(column_idx)
+                          .column->convert_to_full_column_if_const()
+                          ->assume_mutable();
     auto* nullable_column = assert_cast<ColumnNullable*>(column.get());
     auto& null_map = nullable_column->get_null_map_data();
     auto& data = 
assert_cast<ColumnInt64&>(*nullable_column->get_nested_column_ptr()).get_data();
diff --git a/be/src/format/table/iceberg_reader_v2.h 
b/be/src/format/table/iceberg_reader_v2.h
index fbc8e28441b..497a989289a 100644
--- a/be/src/format/table/iceberg_reader_v2.h
+++ b/be/src/format/table/iceberg_reader_v2.h
@@ -17,14 +17,12 @@
 
 #pragma once
 
-#include <cstddef>
-#include <cstdint>
-#include <map>
 #include <memory>
 #include <string>
 #include <vector>
 
 #include "common/status.h"
+#include "core/block/block.h"
 #include "format/reader/file_reader.h"
 #include "format/reader/table_reader.h"
 #include "format/table/iceberg_delete_file_reader_helper.h"
@@ -51,11 +49,6 @@ public:
     Status prepare_split(const reader::SplitReadOptions& options) override;
 
 protected:
-    // 将 file-local block 转换为 table/global schema block。
-    // 这里执行 ColumnMapping 中的 finalize_expr、缺失列填充、partition/generated 列
-    // 物化以及复杂列 remap。
-    Status finalize_chunk(Block* block, const size_t rows) override;
-
     Status materialize_virtual_columns(Block* table_block) override;
 
     Status customize_file_scan_request(reader::FileScanRequest* file_request) 
override;
@@ -63,11 +56,7 @@ protected:
     Status _parse_deletion_vector_file(const TTableFormatFileDesc& t_desc, 
DeleteFileDesc* desc,
                                        bool* has_delete_file) override;
 
-    Status _collect_position_delete_rows(const TTableFormatFileDesc& t_desc);
-
-    // 在 table block 上应用 equality delete。
-    // equality delete 依赖 table-level 列语义,因此不能下沉到 ParquetReader。
-    Status apply_equality_deletes(Block* block);
+    Status _init_delete_predicates(const TTableFormatFileDesc& t_desc);
 
 private:
     static constexpr int MIN_SUPPORT_DELETE_FILES_VERSION = 2;
@@ -85,16 +74,15 @@ private:
     static constexpr size_t ICEBERG_FILE_PATH_BLOCK_POSITION = 0;
     static constexpr size_t ICEBERG_ROW_POS_BLOCK_POSITION = 1;
 
-    class PositionDeleteBlockCollector final {
+    class PositionDeleteRowsCollector final {
     public:
-        PositionDeleteBlockCollector(std::string data_file_path,
-                                     std::map<std::string, 
reader::DeleteRows>* rows);
+        PositionDeleteRowsCollector(std::string data_file_path, 
reader::DeleteRows* rows);
 
         Status collect(const Block& block, size_t read_rows);
 
     private:
         std::string _data_file_path;
-        std::map<std::string, reader::DeleteRows>* _rows = nullptr;
+        reader::DeleteRows* _rows = nullptr;
     };
 
     static std::string _iceberg_delete_vector_cache_key(const 
TIcebergDeleteFileDesc& delete_file);
@@ -102,27 +90,35 @@ private:
     static std::shared_ptr<io::FileSystemProperties> 
_delete_file_system_properties(
             const TFileScanRangeParams& scan_params);
 
-    static std::unique_ptr<io::FileDescription> _delete_file_description(const 
TFileRangeDesc& range);
+    static std::unique_ptr<io::FileDescription> _delete_file_description(
+            const TFileRangeDesc& range);
 
     static const reader::SchemaField* _find_delete_field(
             const std::vector<reader::SchemaField>& schema, const std::string& 
name);
 
-    static Block _build_position_delete_block(const reader::SchemaField& 
file_path_field,
-                                              const reader::SchemaField& 
pos_field);
-
     Status _append_row_position_output_column(reader::FileScanRequest* 
request);
 
+    Status _append_equality_delete_predicates(reader::FileScanRequest* 
request);
+
+    Status _init_equality_delete_predicates(
+            const std::vector<TIcebergDeleteFileDesc>& delete_files);
+
     std::string _data_file_path() const;
 
+    // Read equality/position delete files.
+    Status _read_parquet_equality_delete_file(const TIcebergDeleteFileDesc& 
delete_file,
+                                              const TFileScanRangeParams& 
scan_params,
+                                              IcebergDeleteFileIOContext* 
delete_io_ctx);
     Status _read_parquet_position_delete_file(const TIcebergDeleteFileDesc& 
delete_file,
                                               const TFileScanRangeParams& 
scan_params,
                                               IcebergDeleteFileIOContext* 
delete_io_ctx,
-                                              PositionDeleteBlockCollector* 
collector);
+                                              PositionDeleteRowsCollector* 
collector);
 
-    Status _read_position_delete_files(const 
std::vector<TIcebergDeleteFileDesc>& delete_files);
+    // Read position delete files and collect deleted row positions to update 
DeletePredicate.
+    Status _init_position_delete_rows(const 
std::vector<TIcebergDeleteFileDesc>& delete_files);
 
+    // Materialize row lineage virtual columns based on the position delete 
file.
     Status _materialize_row_lineage_row_id(Block* table_block, size_t 
column_idx);
-
     Status _materialize_row_lineage_last_updated_sequence_number(Block* 
table_block,
                                                                  size_t 
column_idx);
 
@@ -131,7 +127,12 @@ private:
     const TIcebergFileDesc* _iceberg_params = nullptr;
     bool _delete_predicates_initialized = false;
     reader::DeleteRows _position_delete_rows_storage;
-    std::vector<TIcebergDeleteFileDesc> _equality_delete_files;
+    struct EqualityDeleteFilter {
+        std::vector<int> field_ids;
+        std::vector<DataTypePtr> key_types;
+        Block delete_block;
+    };
+    std::vector<EqualityDeleteFilter> _equality_delete_filters;
 
     bool _need_row_lineage_row_id() const;
 };
diff --git a/be/test/format/reader/expr/equality_delete_predicate_test.cpp 
b/be/test/format/reader/expr/equality_delete_predicate_test.cpp
new file mode 100644
index 00000000000..07ff0f78f81
--- /dev/null
+++ b/be/test/format/reader/expr/equality_delete_predicate_test.cpp
@@ -0,0 +1,181 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "format/reader/expr/equality_delete_predicate.h"
+
+#include <gtest/gtest.h>
+
+#include <memory>
+#include <optional>
+#include <string>
+#include <vector>
+
+#include "common/status.h"
+#include "core/assert_cast.h"
+#include "core/block/block.h"
+#include "core/column/column_nullable.h"
+#include "core/column/column_string.h"
+#include "core/column/column_vector.h"
+#include "core/data_type/data_type_nullable.h"
+#include "core/data_type/data_type_number.h"
+#include "core/data_type/data_type_string.h"
+#include "exprs/vexpr_context.h"
+#include "format/reader/expr/cast.h"
+#include "runtime/descriptors.h"
+#include "testutil/column_helper.h"
+#include "testutil/mock/mock_runtime_state.h"
+#include "testutil/mock/mock_slot_ref.h"
+
+namespace doris {
+
+class EqualityDeletePredicateTest : public testing::Test {
+protected:
+    static ColumnWithTypeAndName make_nullable_int_column(
+            const std::string& name, const std::vector<std::optional<int>>& 
values) {
+        auto data = ColumnInt32::create();
+        auto null_map = ColumnUInt8::create();
+        for (const auto& value : values) {
+            data->insert_value(value.value_or(0));
+            null_map->insert_value(!value.has_value());
+        }
+        auto type = make_nullable(std::make_shared<DataTypeInt32>());
+        return {ColumnNullable::create(std::move(data), std::move(null_map)), 
type, name};
+    }
+
+    static ColumnWithTypeAndName make_nullable_string_column(
+            const std::string& name, const 
std::vector<std::optional<std::string>>& values) {
+        auto data = ColumnString::create();
+        auto null_map = ColumnUInt8::create();
+        for (const auto& value : values) {
+            const std::string data_value = value.value_or("");
+            data->insert_data(data_value.data(), data_value.size());
+            null_map->insert_value(!value.has_value());
+        }
+        auto type = make_nullable(std::make_shared<DataTypeString>());
+        return {ColumnNullable::create(std::move(data), std::move(null_map)), 
type, name};
+    }
+
+    static std::vector<UInt8> result_column_data(const Block& block, int 
result_column_id) {
+        const auto& result_column =
+                assert_cast<const 
ColumnBool&>(*block.get_by_position(result_column_id).column);
+        return {result_column.get_data().begin(), 
result_column.get_data().end()};
+    }
+
+    static Status execute_equality_delete_predicate(Block delete_block, 
std::vector<int> field_ids,
+                                                    Block* data_block, int* 
result_column_id) {
+        auto predicate =
+                
std::make_shared<EqualityDeletePredicate>(std::move(delete_block), field_ids);
+        predicate->_open_finished = true;
+        for (size_t idx = 0; idx < field_ids.size(); ++idx) {
+            predicate->add_child(
+                    std::make_shared<MockSlotRef>(idx, 
data_block->get_by_position(idx).type));
+        }
+
+        VExprContext context(predicate);
+        return predicate->execute(&context, data_block, result_column_id);
+    }
+
+    static Status execute_prepared_equality_delete_predicate(const 
VExprContextSPtr& context,
+                                                             MockRuntimeState* 
state,
+                                                             Block* data_block,
+                                                             int* 
result_column_id) {
+        RETURN_IF_ERROR(context->prepare(state, RowDescriptor()));
+        RETURN_IF_ERROR(context->open(state));
+        return context->execute(data_block, result_column_id);
+    }
+};
+
+TEST_F(EqualityDeletePredicateTest, MatchSingleColumn) {
+    Block delete_block;
+    delete_block.insert(make_nullable_int_column("id", {1, 4}));
+    Block data_block;
+    data_block.insert(make_nullable_int_column("id", {1, 2, 3, 4}));
+
+    int result_column_id = -1;
+    auto status = execute_equality_delete_predicate(std::move(delete_block), 
{1}, &data_block,
+                                                    &result_column_id);
+    ASSERT_TRUE(status.ok()) << status;
+    EXPECT_EQ(result_column_data(data_block, result_column_id), 
std::vector<UInt8>({1, 0, 0, 1}));
+}
+
+TEST_F(EqualityDeletePredicateTest, MatchMultipleColumns) {
+    Block delete_block;
+    delete_block.insert(make_nullable_int_column("id", {1, 2}));
+    delete_block.insert(make_nullable_string_column("name", {"a", "b"}));
+    Block data_block;
+    data_block.insert(make_nullable_int_column("id", {1, 1, 2, 2}));
+    data_block.insert(make_nullable_string_column("name", {"a", "b", "a", 
"b"}));
+
+    int result_column_id = -1;
+    auto status = execute_equality_delete_predicate(std::move(delete_block), 
{1, 2}, &data_block,
+                                                    &result_column_id);
+    ASSERT_TRUE(status.ok()) << status;
+    EXPECT_EQ(result_column_data(data_block, result_column_id), 
std::vector<UInt8>({1, 0, 0, 1}));
+}
+
+TEST_F(EqualityDeletePredicateTest, MatchNullValues) {
+    Block delete_block;
+    delete_block.insert(make_nullable_int_column("id", {std::nullopt}));
+    Block data_block;
+    data_block.insert(make_nullable_int_column("id", {1, std::nullopt, 3}));
+
+    int result_column_id = -1;
+    auto status = execute_equality_delete_predicate(std::move(delete_block), 
{1}, &data_block,
+                                                    &result_column_id);
+    ASSERT_TRUE(status.ok()) << status;
+    EXPECT_EQ(result_column_data(data_block, result_column_id), 
std::vector<UInt8>({0, 1, 0}));
+}
+
+TEST_F(EqualityDeletePredicateTest, MatchAfterCastToDeleteKeyType) {
+    Block delete_block;
+    delete_block.insert(make_nullable_int_column("id", {1, 4}));
+    Block data_block;
+    data_block.insert(ColumnHelper::create_column_with_name<DataTypeInt64>({1, 
2, 4}));
+
+    auto predicate = 
std::make_shared<EqualityDeletePredicate>(std::move(delete_block),
+                                                               
std::vector<int> {1});
+    auto cast_expr = 
Cast::create_shared(make_nullable(std::make_shared<DataTypeInt32>()));
+    cast_expr->add_child(std::make_shared<MockSlotRef>(0, 
data_block.get_by_position(0).type));
+    predicate->add_child(std::move(cast_expr));
+    auto context = VExprContext::create_shared(predicate);
+    MockRuntimeState state;
+
+    int result_column_id = -1;
+    auto status = execute_prepared_equality_delete_predicate(context, &state, 
&data_block,
+                                                             
&result_column_id);
+    ASSERT_TRUE(status.ok()) << status;
+    EXPECT_EQ(result_column_data(data_block, result_column_id), 
std::vector<UInt8>({1, 0, 1}));
+    context->close();
+}
+
+TEST_F(EqualityDeletePredicateTest, ChildCountMismatchReturnsError) {
+    Block delete_block;
+    delete_block.insert(make_nullable_int_column("id", {1}));
+    auto predicate = 
std::make_shared<EqualityDeletePredicate>(std::move(delete_block),
+                                                               
std::vector<int> {1});
+    predicate->_open_finished = true;
+    Block data_block;
+    data_block.insert(make_nullable_int_column("id", {1}));
+    VExprContext context(predicate);
+
+    int result_column_id = -1;
+    auto status = predicate->execute(&context, &data_block, &result_column_id);
+    ASSERT_FALSE(status.ok());
+    EXPECT_NE(status.to_string().find("should have 1 child exprs"), 
std::string::npos);
+}
+
+} // namespace doris
diff --git a/be/test/format/reader/table_reader_test.cpp 
b/be/test/format/reader/table_reader_test.cpp
index 8705775485f..8a72937002d 100644
--- a/be/test/format/reader/table_reader_test.cpp
+++ b/be/test/format/reader/table_reader_test.cpp
@@ -301,9 +301,8 @@ void write_position_delete_parquet_file(const std::string& 
file_path,
             arrow::field("file_path", arrow::utf8(), false),
             arrow::field("pos", arrow::int64(), false),
     });
-    auto table = arrow::Table::Make(schema,
-                                    {build_string_array(data_file_paths),
-                                     build_int64_array(positions)});
+    auto table = arrow::Table::Make(
+            schema, {build_string_array(data_file_paths), 
build_int64_array(positions)});
 
     auto file_result = arrow::io::FileOutputStream::Open(file_path);
     ASSERT_TRUE(file_result.ok()) << file_result.status();
@@ -313,9 +312,9 @@ void write_position_delete_parquet_file(const std::string& 
file_path,
     builder.version(::parquet::ParquetVersion::PARQUET_2_6);
     builder.data_page_version(::parquet::ParquetDataPageVersion::V2);
     builder.compression(::parquet::Compression::UNCOMPRESSED);
-    PARQUET_THROW_NOT_OK(::parquet::arrow::WriteTable(
-            *table, arrow::default_memory_pool(), out, 
static_cast<int64_t>(positions.size()),
-            builder.build()));
+    PARQUET_THROW_NOT_OK(::parquet::arrow::WriteTable(*table, 
arrow::default_memory_pool(), out,
+                                                      
static_cast<int64_t>(positions.size()),
+                                                      builder.build()));
 }
 
 int64_t write_iceberg_deletion_vector_file(const std::string& file_path,
@@ -423,7 +422,8 @@ std::unique_ptr<ReadProfile> 
make_table_read_profile(RuntimeProfile* profile) {
 }
 
 TTableFormatFileDesc make_iceberg_table_format_desc(
-        const std::string& data_file_path, const 
std::vector<TIcebergDeleteFileDesc>& delete_files) {
+        const std::string& data_file_path,
+        const std::vector<TIcebergDeleteFileDesc>& delete_files) {
     TTableFormatFileDesc table_format_params;
     TIcebergFileDesc iceberg_params;
     iceberg_params.__set_format_version(2);
@@ -433,9 +433,8 @@ TTableFormatFileDesc make_iceberg_table_format_desc(
     return table_format_params;
 }
 
-std::vector<int32_t> read_iceberg_ids(
-        doris::iceberg::IcebergTableReader* reader,
-        const std::vector<TableColumn>& projected_columns) {
+std::vector<int32_t> read_iceberg_ids(doris::iceberg::IcebergTableReader* 
reader,
+                                      const std::vector<TableColumn>& 
projected_columns) {
     std::vector<int32_t> ids;
     bool eos = false;
     while (!eos) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to