This is an automated email from the ASF dual-hosted git repository.

Gabriel39 pushed a commit to branch refact_reader_branch
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/refact_reader_branch by this 
push:
     new 4073912ea67 [feature](be) Support expression filters on file reader 
(#63748)
4073912ea67 is described below

commit 4073912ea6705a84e4bb6a3add2a37294af400d8
Author: Gabriel <[email protected]>
AuthorDate: Wed May 27 18:23:58 2026 +0800

    [feature](be) Support expression filters on file reader (#63748)
---
 be/src/format/new_parquet/parquet_reader.cpp       |  45 ++-
 be/src/format/new_parquet/parquet_reader.h         |   3 +-
 be/src/format/new_parquet/parquet_statistics.cpp   |  26 +-
 be/src/format/new_parquet/parquet_statistics.h     |   7 +-
 be/src/format/reader/column_mapper.cpp             |  98 +++++--
 be/src/format/reader/column_mapper.h               |  16 +-
 be/src/format/reader/file_reader.h                 |  23 +-
 be/src/format/reader/table_reader.cpp              |  26 +-
 be/src/format/reader/table_reader.h                |  16 +-
 be/test/format/new_parquet/parquet_reader_test.cpp | 127 ++++++++-
 be/test/format/reader/table_reader_test.cpp        | 312 ++++++++++++++++++++-
 11 files changed, 589 insertions(+), 110 deletions(-)

diff --git a/be/src/format/new_parquet/parquet_reader.cpp 
b/be/src/format/new_parquet/parquet_reader.cpp
index 190aa87f251..6d0ef3eb742 100644
--- a/be/src/format/new_parquet/parquet_reader.cpp
+++ b/be/src/format/new_parquet/parquet_reader.cpp
@@ -291,10 +291,6 @@ Status 
ParquetReader::_get_projected_schema_field(reader::ColumnId file_column_i
     return Status::OK();
 }
 
-bool ParquetReader::_has_expression_filter(const reader::FileLocalFilter& 
local_filter) {
-    return local_filter.conjunct != nullptr;
-}
-
 Status ParquetReader::_read_filter_columns(int64_t batch_rows, Block* 
file_block,
                                            SelectionVector* selection, 
uint16_t* selected_rows) {
     selection->resize(static_cast<size_t>(batch_rows));
@@ -314,28 +310,29 @@ Status ParquetReader::_read_filter_columns(int64_t 
batch_rows, Block* file_block
                                       column_reader->name(), column_rows, 
batch_rows);
         }
         file_block->replace_by_position(block_position, std::move(column));
+    }
+    return _execute_filter_conjuncts(batch_rows, file_block, selection, 
selected_rows);
+}
 
-        for (const auto& local_filter : _request->local_filters) {
-            if (local_filter.file_column_id != file_field_id ||
-                !_has_expression_filter(local_filter)) {
-                continue;
-            }
-            if (*selected_rows == 0) {
-                break;
-            }
-            IColumn::Filter filter(static_cast<size_t>(batch_rows), 1);
-            bool can_filter_all = false;
-            RETURN_IF_ERROR(local_filter.conjunct->execute_filter(file_block, 
filter.data(),
-                                                                  
static_cast<size_t>(batch_rows),
-                                                                  false, 
&can_filter_all));
-            *selected_rows =
-                    can_filter_all ? 0
-                                   : _apply_filter_to_selection(filter, 
selection, *selected_rows);
-            break;
+Status ParquetReader::_execute_filter_conjuncts(int64_t batch_rows, Block* 
file_block,
+                                                SelectionVector* selection,
+                                                uint16_t* selected_rows) {
+    // Expression filters may reference several predicate columns. Execute 
them only after all
+    // predicate columns in the file-local block have been materialized.
+    for (const auto& expression_filter : _request->expression_filters) {
+        if (expression_filter.conjunct == nullptr) {
+            continue;
         }
         if (*selected_rows == 0) {
             break;
         }
+        IColumn::Filter filter(static_cast<size_t>(batch_rows), 1);
+        bool can_filter_all = false;
+        RETURN_IF_ERROR(expression_filter.conjunct->execute_filter(
+                file_block, filter.data(), static_cast<size_t>(batch_rows), 
false,
+                &can_filter_all));
+        *selected_rows =
+                can_filter_all ? 0 : _apply_filter_to_selection(filter, 
selection, *selected_rows);
     }
     return Status::OK();
 }
@@ -557,10 +554,10 @@ Status 
ParquetReader::open(std::unique_ptr<reader::FileScanRequest>& request) {
         DORIS_CHECK(_request->column_positions.count(file_column_id) > 0);
         DORIS_CHECK(file_column_id >= 0 && file_column_id < num_fields);
     }
-    for (const auto& local_filter : _request->local_filters) {
-        if (local_filter.file_column_id < 0 || local_filter.file_column_id >= 
num_fields) {
+    for (const auto& column_filter : _request->column_predicate_filters) {
+        if (column_filter.file_column_id < 0 || column_filter.file_column_id 
>= num_fields) {
             return Status::InvalidArgument("Invalid parquet filter top-level 
field id {}",
-                                           local_filter.file_column_id);
+                                           column_filter.file_column_id);
         }
     }
     for (const auto& [file_column_id, projection] : 
_request->complex_projections) {
diff --git a/be/src/format/new_parquet/parquet_reader.h 
b/be/src/format/new_parquet/parquet_reader.h
index f6d47f46134..aa5cbfb5fcd 100644
--- a/be/src/format/new_parquet/parquet_reader.h
+++ b/be/src/format/new_parquet/parquet_reader.h
@@ -127,9 +127,10 @@ private:
     Status _get_projected_schema_field(reader::ColumnId file_column_id,
                                        const reader::FieldProjection* 
projection,
                                        reader::SchemaField* field) const;
-    bool _has_expression_filter(const reader::FileLocalFilter& local_filter);
     Status _read_filter_columns(int64_t batch_rows, Block* file_block, 
SelectionVector* selection,
                                 uint16_t* selected_rows);
+    Status _execute_filter_conjuncts(int64_t batch_rows, Block* file_block,
+                                     SelectionVector* selection, uint16_t* 
selected_rows);
     IColumn::Filter _selection_to_filter(const SelectionVector& selection, 
uint16_t selected_rows,
                                          int64_t batch_rows);
     uint16_t _apply_filter_to_selection(const IColumn::Filter& filter, 
SelectionVector* selection,
diff --git a/be/src/format/new_parquet/parquet_statistics.cpp 
b/be/src/format/new_parquet/parquet_statistics.cpp
index aebc6d4e04d..a28ccb8ae25 100644
--- a/be/src/format/new_parquet/parquet_statistics.cpp
+++ b/be/src/format/new_parquet/parquet_statistics.cpp
@@ -159,14 +159,13 @@ ParquetColumnStatistics 
ParquetStatisticsUtils::TransformColumnStatistics(
     }
 }
 
-bool ParquetStatisticsUtils::CheckStatistics(const reader::FileLocalFilter& 
local_filter,
+bool ParquetStatisticsUtils::CheckStatistics(const 
reader::FileColumnPredicateFilter& column_filter,
                                              const ParquetColumnStatistics& 
statistics) {
     if (!statistics.has_any_statistics()) {
         return false;
     }
 
-    // TODO: replace local_filter.predicates by local_filter.conjuncts
-    for (const auto& column_predicate : local_filter.predicates) {
+    for (const auto& column_predicate : column_filter.predicates) {
         if (is_null_only_predicate(*column_predicate)) {
             if (!statistics.has_null_count) {
                 continue;
@@ -184,16 +183,19 @@ bool ParquetStatisticsUtils::CheckStatistics(const 
reader::FileLocalFilter& loca
 bool ParquetStatisticsUtils::RowGroupExcludes(
         const ::parquet::RowGroupMetaData& row_group,
         const std::vector<std::unique_ptr<ParquetColumnSchema>>& schema,
-        const reader::FileLocalFilter& local_filter) {
-    DCHECK(local_filter.file_column_id >= 0 &&
-           local_filter.file_column_id < row_group.num_columns());
-    DCHECK_LT(local_filter.file_column_id, schema.size());
-    auto column_chunk = row_group.ColumnChunk(local_filter.file_column_id);
+        const reader::FileColumnPredicateFilter& column_filter) {
+    if (column_filter.predicates.empty()) {
+        return false;
+    }
+    DCHECK(column_filter.file_column_id >= 0 &&
+           column_filter.file_column_id < row_group.num_columns());
+    DCHECK_LT(column_filter.file_column_id, schema.size());
+    auto column_chunk = row_group.ColumnChunk(column_filter.file_column_id);
     if (column_chunk == nullptr) {
         return false;
     }
-    return CheckStatistics(local_filter,
-                           
TransformColumnStatistics(*schema[local_filter.file_column_id],
+    return CheckStatistics(column_filter,
+                           
TransformColumnStatistics(*schema[column_filter.file_column_id],
                                                      
column_chunk->statistics()));
 }
 
@@ -215,8 +217,8 @@ Status ParquetStatisticsUtils::SelectRowGroups(
             continue;
         }
         bool drop = false;
-        for (const auto& local_filter : request.local_filters) {
-            if (RowGroupExcludes(*row_group, file_schema, local_filter)) {
+        for (const auto& column_filter : request.column_predicate_filters) {
+            if (RowGroupExcludes(*row_group, file_schema, column_filter)) {
                 drop = true;
                 break;
             }
diff --git a/be/src/format/new_parquet/parquet_statistics.h 
b/be/src/format/new_parquet/parquet_statistics.h
index 0def08d4b08..4f43ae245b5 100644
--- a/be/src/format/new_parquet/parquet_statistics.h
+++ b/be/src/format/new_parquet/parquet_statistics.h
@@ -60,13 +60,14 @@ struct ParquetStatisticsUtils {
             const ParquetColumnSchema& column_schema,
             const std::shared_ptr<::parquet::Statistics>& statistics);
 
-    // Return true if the statistics indicate that the row group can be safely 
skipped according to the local filter.
-    static bool CheckStatistics(const reader::FileLocalFilter& local_filter,
+    // Return true if the statistics indicate that the row group can be safely 
skipped according to
+    // the local single-column predicate filter.
+    static bool CheckStatistics(const reader::FileColumnPredicateFilter& 
column_filter,
                                 const ParquetColumnStatistics& statistics);
 
     static bool RowGroupExcludes(const ::parquet::RowGroupMetaData& row_group,
                                  const 
std::vector<std::unique_ptr<ParquetColumnSchema>>& schema,
-                                 const reader::FileLocalFilter& local_filter);
+                                 const reader::FileColumnPredicateFilter& 
column_filter);
 
     static Status SelectRowGroups(
             const ::parquet::FileMetaData& metadata,
diff --git a/be/src/format/reader/column_mapper.cpp 
b/be/src/format/reader/column_mapper.cpp
index 4d9afdeff32..1a33781b965 100644
--- a/be/src/format/reader/column_mapper.cpp
+++ b/be/src/format/reader/column_mapper.cpp
@@ -17,6 +17,7 @@
 
 #include "format/reader/column_mapper.h"
 
+#include <algorithm>
 #include <cstddef>
 #include <memory>
 #include <utility>
@@ -69,6 +70,8 @@ static constexpr const char* 
ROW_LINEAGE_LAST_UPDATED_SEQ_NUMBER = "_last_update
 
 static void add_scan_column(FileScanRequest* file_request, ColumnId 
file_column_id,
                             std::vector<ColumnId>* scan_columns) {
+    // column_positions is the global read-column index for this scan request, 
so it also
+    // deduplicates predicate_columns and non_predicate_columns across all 
filter/projection paths.
     if (file_request->column_positions.count(file_column_id) == 0) {
         file_request->column_positions.emplace(file_column_id,
                                                
file_request->column_positions.size());
@@ -210,6 +213,13 @@ static Status rebuild_projected_file_type(ColumnMapping* 
mapping) {
     return Status::OK();
 }
 
+static std::vector<int32_t> filter_slot_ids(const TableFilter& table_filter) {
+    if (!table_filter.slot_ids.empty()) {
+        return table_filter.slot_ids;
+    }
+    return {};
+}
+
 Status TableColumnMapper::create_mapping(const std::vector<TableColumn>& 
projected_columns,
                                          const std::map<std::string, Field>& 
partition_values,
                                          const std::vector<SchemaField>& 
file_schema) {
@@ -250,7 +260,8 @@ Status TableColumnMapper::create_mapping(const 
std::vector<TableColumn>& project
     return Status::OK();
 }
 
-Status TableColumnMapper::create_scan_request(const std::map<int32_t, 
TableFilter>& table_filters,
+Status TableColumnMapper::create_scan_request(const std::vector<TableFilter>& 
table_filters,
+                                              const TableColumnPredicates& 
table_column_predicates,
                                               const std::vector<TableColumn>& 
projected_columns,
                                               FileScanRequest* file_request) {
     // FileReader evaluates expressions against a file-local block. This 
mapper owns the
@@ -259,12 +270,27 @@ Status TableColumnMapper::create_scan_request(const 
std::map<int32_t, TableFilte
     file_request->non_predicate_columns.clear();
     file_request->column_positions.clear();
     file_request->complex_projections.clear();
-    file_request->local_filters.clear();
+    file_request->expression_filters.clear();
+    file_request->column_predicate_filters.clear();
     file_request->reader_expression_map.clear();
+    // 1. Build referenced non-predicate columns
     for (const auto& table_column : projected_columns) {
         auto* mapping = _find_mapping(table_column.id);
         if (mapping != nullptr && mapping->file_column_id.has_value()) {
-            if (table_filters.count(table_column.id) == 0) {
+            // A file column can be read lazily as a non-predicate column only 
when it is not used
+            // by either expression filters or single-column predicate pruning.
+            bool used_by_filter = 
table_column_predicates.count(table_column.id) > 0;
+            if (!used_by_filter) {
+                for (const auto& table_filter : table_filters) {
+                    const auto slot_ids = filter_slot_ids(table_filter);
+                    if (std::find(slot_ids.begin(), slot_ids.end(), 
table_column.id) !=
+                        slot_ids.end()) {
+                        used_by_filter = true;
+                        break;
+                    }
+                }
+            }
+            if (!used_by_filter) {
                 add_scan_column(file_request, *mapping->file_column_id,
                                 &file_request->non_predicate_columns);
             }
@@ -280,7 +306,9 @@ Status TableColumnMapper::create_scan_request(const 
std::map<int32_t, TableFilte
             }
         }
     }
-    RETURN_IF_ERROR(localize_filters(table_filters, file_request));
+    // 2. Build referenced predicate columns
+    RETURN_IF_ERROR(localize_filters(table_filters, table_column_predicates, 
file_request));
+    // 3. Re-build projections for all referenced file columns to point to the 
correct file-local block positions.
     for (auto& mapping : _mappings) {
         if (!mapping.file_column_id.has_value()) {
             continue;
@@ -292,18 +320,29 @@ Status TableColumnMapper::create_scan_request(const 
std::map<int32_t, TableFilte
     return Status::OK();
 }
 
-Status TableColumnMapper::localize_filters(const std::map<int32_t, 
TableFilter>& table_filters,
+Status TableColumnMapper::localize_filters(const std::vector<TableFilter>& 
table_filters,
+                                           const TableColumnPredicates& 
table_column_predicates,
                                            FileScanRequest* file_request) 
const {
     // 真实实现会处理 trivial mapping、safe cast、reader expression fallback 和
     // finalize-only filter。stub 只复制能够直接定位到 file column 的谓词。
-    for (const auto& it : table_filters) {
-        const auto* mapping = _find_mapping(it.first);
-        if (mapping == nullptr || !mapping->file_column_id.has_value()) {
+    for (const auto& table_filter : table_filters) {
+        if (!table_filter.can_be_localized()) {
+            // TODO: Rewrite table filter to reader_expression_map
+            // file_request->reader_expression_map.emplace_back(..., 
table_filter.conjunct);
             continue;
         }
-        if (!it.second.can_be_localized()) {
-            // TODO: Rewrite table filter to reader_expression_map
-            // 
file_request->reader_expression_map.emplace_back(mapping->table_column_id, 
it.second.conjunct);
+        for (const auto table_column_id : filter_slot_ids(table_filter)) {
+            const auto* mapping = _find_mapping(table_column_id);
+            if (mapping == nullptr || !mapping->file_column_id.has_value()) {
+                continue;
+            }
+            add_scan_column(file_request, *mapping->file_column_id,
+                            &file_request->predicate_columns);
+        }
+    }
+    for (const auto& [table_column_id, _] : table_column_predicates) {
+        const auto* mapping = _find_mapping(table_column_id);
+        if (mapping == nullptr || !mapping->file_column_id.has_value()) {
             continue;
         }
         add_scan_column(file_request, *mapping->file_column_id, 
&file_request->predicate_columns);
@@ -312,20 +351,35 @@ Status TableColumnMapper::localize_filters(const 
std::map<int32_t, TableFilter>&
     // Build the complete table-slot to file-block position map after all 
predicate columns have
     // been assigned. This keeps expression localization independent from 
filter iteration order.
     const auto table_column_to_file_position = 
build_file_position_map(_mappings, *file_request);
-    for (const auto& it : table_filters) {
-        const auto* mapping = _find_mapping(it.first);
-        if (mapping == nullptr || !mapping->file_column_id.has_value() ||
-            !it.second.can_be_localized()) {
+    for (const auto& table_filter : table_filters) {
+        if (!table_filter.can_be_localized()) {
             continue;
         }
-        FileLocalFilter local_filter;
-        local_filter.file_column_id = *mapping->file_column_id;
-        if (it.second.conjunct != nullptr) {
-            local_filter.conjunct = 
VExprContext::create_shared(rewrite_table_expr_to_file_expr(
-                    it.second.conjunct->root(), 
table_column_to_file_position));
+        if (table_filter.conjunct != nullptr) {
+            FileExpressionFilter expression_filter;
+            expression_filter.conjunct =
+                    
VExprContext::create_shared(rewrite_table_expr_to_file_expr(
+                            table_filter.conjunct->root(), 
table_column_to_file_position));
+            
expression_filter.file_column_ids.reserve(table_filter.slot_ids.size());
+            for (const auto table_column_id : table_filter.slot_ids) {
+                const auto* mapping = _find_mapping(table_column_id);
+                if (mapping == nullptr || 
!mapping->file_column_id.has_value()) {
+                    continue;
+                }
+                
expression_filter.file_column_ids.push_back(*mapping->file_column_id);
+            }
+            
file_request->expression_filters.push_back(std::move(expression_filter));
+        }
+    }
+    for (const auto& [table_column_id, predicates] : table_column_predicates) {
+        const auto* mapping = _find_mapping(table_column_id);
+        if (mapping == nullptr || !mapping->file_column_id.has_value() || 
predicates.empty()) {
+            continue;
         }
-        local_filter.predicates = it.second.predicates;
-        file_request->local_filters.push_back(std::move(local_filter));
+        FileColumnPredicateFilter column_predicate_filter;
+        column_predicate_filter.file_column_id = *mapping->file_column_id;
+        column_predicate_filter.predicates = predicates;
+        
file_request->column_predicate_filters.push_back(std::move(column_predicate_filter));
     }
     return Status::OK();
 }
diff --git a/be/src/format/reader/column_mapper.h 
b/be/src/format/reader/column_mapper.h
index 0c6ac9c8e6c..bcfe7152208 100644
--- a/be/src/format/reader/column_mapper.h
+++ b/be/src/format/reader/column_mapper.h
@@ -31,6 +31,10 @@
 #include "exprs/vexpr_fwd.h"
 #include "format/reader/expr/literal.h"
 
+namespace doris {
+class ColumnPredicate;
+} // namespace doris
+
 namespace doris::reader {
 
 struct TableColumn;
@@ -39,6 +43,9 @@ struct SchemaField;
 struct FileScanRequest;
 struct FieldProjection;
 
+using TableColumnPredicates =
+        std::map<int32_t, std::vector<std::shared_ptr<ColumnPredicate>>>;
+
 enum class TableColumnMappingMode {
     BY_FIELD_ID,
     BY_NAME,
@@ -100,15 +107,18 @@ public:
 
     // 把 table-level scan 请求转换成 file-local scan 请求。
     // table_request 使用 table/global schema;file_request 只包含 FileReader 能理解的
-    // projected_file_columns、local_filters 和 reader_expression_map。
-    virtual Status create_scan_request(const std::map<int32_t, TableFilter>& 
table_filters,
+    // projected_file_columns、expression_filters、column_predicate_filters 和
+    // reader_expression_map。
+    virtual Status create_scan_request(const std::vector<TableFilter>& 
table_filters,
+                                       const TableColumnPredicates& 
table_column_predicates,
                                        const std::vector<TableColumn>& 
projected_columns,
                                        FileScanRequest* file_request);
 
     // 将 table-level filter 定位到文件 schema。
     // trivial mapping 可以直接复制结构化谓词;类型变化时可以尝试安全 cast;无法安全
     // 下推的表达式应通过 reader_expression_map 或 table-level finalize/filter fallback 
处理。
-    virtual Status localize_filters(const std::map<int32_t, TableFilter>& 
table_filters,
+    virtual Status localize_filters(const std::vector<TableFilter>& 
table_filters,
+                                    const TableColumnPredicates& 
table_column_predicates,
                                     FileScanRequest* file_request) const;
     void clear() { _mappings.clear(); }
     const std::vector<ColumnMapping>& mappings() const { return _mappings; }
diff --git a/be/src/format/reader/file_reader.h 
b/be/src/format/reader/file_reader.h
index 918e2b4bd35..69720bc8f9a 100644
--- a/be/src/format/reader/file_reader.h
+++ b/be/src/format/reader/file_reader.h
@@ -75,21 +75,19 @@ struct FieldProjection {
     std::vector<FieldProjection> children;
 };
 
-// 已经 localize 到文件 schema 的过滤条件。
-// TableColumnMapper 负责把 table-level filter 转成这个结构;FileReader 只消费
-// file-local column id、表达式和结构化谓词。
-struct FileLocalFilter {
-    ColumnId file_column_id = -1;
-
-    // 表达式过滤。适合 cast、复杂表达式或 reader_expression_map 生成的临时列过滤。
-    // 它通常不能直接驱动 row group stats、page index、dictionary、bloom filter。
+// File-local expression filter. It may reference multiple predicate_columns, 
so FileReader should
+// evaluate it after all referenced predicate columns have been materialized 
in the file-local block.
+struct FileExpressionFilter {
     VExprContextSPtr conjunct;
     // DeletePredicate
     VExprContextSPtr delete_conjunct;
+    std::vector<ColumnId> file_column_ids;
+};
 
-    // 结构化列谓词。适合文件层 pruning,例如 min/max、page index、dictionary、
-    // bloom filter 等只理解单列谓词的优化。
-    // TODO: conjunct 支持表达所有 filter 语义之后删除。
+// File-local single-column predicates for file-layer pruning, such as 
min/max, page index,
+// dictionary and bloom filter. Predicates must all belong to file_column_id.
+struct FileColumnPredicateFilter {
+    ColumnId file_column_id = -1;
     std::vector<std::shared_ptr<ColumnPredicate>> predicates;
 };
 
@@ -110,7 +108,8 @@ struct FileScanRequest {
     std::vector<ColumnId> non_predicate_columns;
     std::map<ColumnId, size_t> column_positions;
     std::map<ColumnId, FieldProjection> complex_projections;
-    std::vector<FileLocalFilter> local_filters;
+    std::vector<FileExpressionFilter> expression_filters;
+    std::vector<FileColumnPredicateFilter> column_predicate_filters;
     // fallback path if filters cannot be localized to file-local predicates. 
The expression can reference projected_file_columns and partition columns.
     std::vector<std::pair<ColumnId, VExprContextSPtr>> reader_expression_map;
 };
diff --git a/be/src/format/reader/table_reader.cpp 
b/be/src/format/reader/table_reader.cpp
index f6cfa21600e..58de8378589 100644
--- a/be/src/format/reader/table_reader.cpp
+++ b/be/src/format/reader/table_reader.cpp
@@ -48,16 +48,10 @@ void collect_table_slot_ids(const VExprSPtr& expr, 
std::set<int>* slot_ids) {
 }
 
 void build_table_filters_from_conjunct(const VExprSPtr& conjunct,
-                                       std::map<int32_t, TableFilter>* 
table_filters) {
+                                       std::vector<TableFilter>* 
table_filters) {
     if (conjunct == nullptr) {
         return;
     }
-    std::set<int> slot_ids;
-    collect_table_slot_ids(conjunct, &slot_ids);
-    if (slot_ids.size() == 1) {
-        (*table_filters)[*slot_ids.begin()].conjunct = 
VExprContext::create_shared(conjunct);
-        return;
-    }
     if (conjunct->node_type() == TExprNodeType::COMPOUND_PRED &&
         conjunct->op() == TExprOpcode::COMPOUND_AND) {
         for (const auto& child : conjunct->children()) {
@@ -65,6 +59,15 @@ void build_table_filters_from_conjunct(const VExprSPtr& 
conjunct,
         }
         return;
     }
+    std::set<int> slot_ids;
+    collect_table_slot_ids(conjunct, &slot_ids);
+    if (!slot_ids.empty()) {
+        TableFilter table_filter;
+        table_filter.conjunct = VExprContext::create_shared(conjunct);
+        table_filter.slot_ids.assign(slot_ids.begin(), slot_ids.end());
+        table_filters->push_back(std::move(table_filter));
+        return;
+    }
 }
 
 } // namespace
@@ -100,6 +103,7 @@ Status TableReader::init(TableReadOptions options) {
     mapper_options.allow_missing_columns = options.allow_missing_columns;
     _data_reader.column_mapper = TableColumnMapper(mapper_options);
     _conjuncts = std::move(options.conjuncts);
+    _table_column_predicates = std::move(options.column_predicates);
     return Status::OK();
 }
 
@@ -111,12 +115,12 @@ Status TableReader::_build_table_filters_from_conjuncts() 
{
 
 Status TableReader::_open_local_filter_exprs(const FileScanRequest& 
file_request) {
     RowDescriptor row_desc;
-    for (const auto& local_filter : file_request.local_filters) {
-        if (local_filter.conjunct == nullptr) {
+    for (const auto& expression_filter : file_request.expression_filters) {
+        if (expression_filter.conjunct == nullptr) {
             continue;
         }
-        RETURN_IF_ERROR(local_filter.conjunct->prepare(_runtime_state, 
row_desc));
-        RETURN_IF_ERROR(local_filter.conjunct->open(_runtime_state));
+        RETURN_IF_ERROR(expression_filter.conjunct->prepare(_runtime_state, 
row_desc));
+        RETURN_IF_ERROR(expression_filter.conjunct->open(_runtime_state));
     }
     return Status::OK();
 }
diff --git a/be/src/format/reader/table_reader.h 
b/be/src/format/reader/table_reader.h
index c9589af8017..2cf5eb30468 100644
--- a/be/src/format/reader/table_reader.h
+++ b/be/src/format/reader/table_reader.h
@@ -62,15 +62,14 @@ struct TableColumn {
 };
 
 // table-level filter。
-// TableColumnMapper 负责把它转换成 FileLocalFilter 或 reader_expression_map。
+// TableColumnMapper 负责把它转换成 FileExpressionFilter 或 reader_expression_map。
 struct TableFilter {
     // 表达式过滤,适合表达 cast、复杂表达式、复杂列提取等语义。
     VExprContextSPtr conjunct;
 
-    // 结构化列谓词,适合下推到文件层做 row group stats、page index、dictionary、
-    // bloom filter 等优化。
-    // TODO: conjunct 支持表达所有 filter 语义之后删除。
-    std::vector<std::shared_ptr<ColumnPredicate>> predicates;
+    // Table slot ids referenced by conjunct. A single expression filter may 
depend on multiple
+    // columns, while ColumnPredicate pruning still belongs to one concrete 
column.
+    std::vector<int32_t> slot_ids;
 
     bool can_be_localized() const { return true; }
 };
@@ -105,6 +104,7 @@ struct ReadProfile {
 
 struct TableReadOptions {
     const std::vector<TableColumn> projected_columns;
+    const TableColumnPredicates column_predicates;
     // All conjuncts from scan operator
     const VExprContext conjuncts;
     const FileFormat format;
@@ -229,7 +229,7 @@ protected:
 
         auto file_request = std::make_unique<FileScanRequest>();
         RETURN_IF_ERROR(_data_reader.column_mapper.create_scan_request(
-                _table_filters, _projected_columns, file_request.get()));
+                _table_filters, _table_column_predicates, _projected_columns, 
file_request.get()));
         RETURN_IF_ERROR(_open_local_filter_exprs(*file_request));
         _data_reader.scan_schema.clear();
         _data_reader.block_template.clear();
@@ -266,6 +266,7 @@ protected:
         _data_reader.reader.reset();
         _data_reader.column_mapper.clear();
         _table_filters.clear();
+        _table_column_predicates.clear();
         _data_reader.block_schema.clear();
         _data_reader.scan_schema.clear();
         _data_reader.block_template.clear();
@@ -331,7 +332,8 @@ protected:
     std::shared_ptr<io::FileSystemProperties> _system_properties;
     // partition key -> value
     std::map<std::string, Field> _partition_values;
-    std::map<int32_t, TableFilter> _table_filters;
+    std::vector<TableFilter> _table_filters;
+    TableColumnPredicates _table_column_predicates;
     VExprContext _conjuncts {nullptr};
     std::unique_ptr<ReadProfile> _profile;
     // Parsed from DELETION_VECTOR in Iceberg and Paimon
diff --git a/be/test/format/new_parquet/parquet_reader_test.cpp 
b/be/test/format/new_parquet/parquet_reader_test.cpp
index 5341b4060b5..43ec9cc0ab1 100644
--- a/be/test/format/new_parquet/parquet_reader_test.cpp
+++ b/be/test/format/new_parquet/parquet_reader_test.cpp
@@ -42,6 +42,7 @@
 #include "exprs/vexpr_context.h"
 #include "format/reader/column_mapper.h"
 #include "format/reader/file_reader.h"
+#include "format/reader/table_reader.h"
 #include "gen_cpp/Types_types.h"
 #include "io/io_common.h"
 #include "runtime/runtime_state.h"
@@ -82,6 +83,41 @@ private:
     const std::string _expr_name = "Int32GreaterThanExpr";
 };
 
+class Int32SumGreaterThanExpr final : public VExpr {
+public:
+    Int32SumGreaterThanExpr(int left_column_id, int right_column_id, int32_t 
value)
+            : VExpr(std::make_shared<DataTypeUInt8>(), false),
+              _left_column_id(left_column_id),
+              _right_column_id(right_column_id),
+              _value(value) {}
+
+    Status execute_column_impl(VExprContext* context, const Block* block, 
const Selector* selector,
+                               size_t count, ColumnPtr& result_column) const 
override {
+        const auto& left_input =
+                assert_cast<const 
ColumnInt32&>(*block->get_by_position(_left_column_id).column);
+        const auto& right_input =
+                assert_cast<const 
ColumnInt32&>(*block->get_by_position(_right_column_id).column);
+        auto result = ColumnUInt8::create();
+        auto& result_data = result->get_data();
+        result_data.resize(count);
+        for (size_t row = 0; row < count; ++row) {
+            const size_t input_row = selector == nullptr ? row : 
(*selector)[row];
+            result_data[row] =
+                    left_input.get_element(input_row) + 
right_input.get_element(input_row) > _value;
+        }
+        result_column = std::move(result);
+        return Status::OK();
+    }
+
+    const std::string& expr_name() const override { return _expr_name; }
+
+private:
+    const int _left_column_id;
+    const int _right_column_id;
+    const int32_t _value;
+    const std::string _expr_name = "Int32SumGreaterThanExpr";
+};
+
 VExprContextSPtr create_int32_greater_than_conjunct(int column_id, int32_t 
value) {
     auto ctx =
             
VExprContext::create_shared(std::make_shared<Int32GreaterThanExpr>(column_id, 
value));
@@ -90,6 +126,15 @@ VExprContextSPtr create_int32_greater_than_conjunct(int 
column_id, int32_t value
     return ctx;
 }
 
+VExprContextSPtr create_int32_sum_greater_than_conjunct(int left_column_id, 
int right_column_id,
+                                                        int32_t value) {
+    auto ctx = VExprContext::create_shared(
+            std::make_shared<Int32SumGreaterThanExpr>(left_column_id, 
right_column_id, value));
+    ctx->_prepared = true;
+    ctx->_opened = true;
+    return ctx;
+}
+
 std::shared_ptr<arrow::Array> finish_array(arrow::ArrayBuilder* builder) {
     std::shared_ptr<arrow::Array> array;
     EXPECT_TRUE(builder->Finish(&array).ok());
@@ -133,6 +178,28 @@ void write_parquet_file(const std::string& file_path, 
int64_t row_group_size = R
                                                       row_group_size, 
builder.build()));
 }
 
+void write_int_pair_parquet_file(const std::string& file_path, int64_t 
row_group_size = ROW_COUNT) {
+    auto schema = arrow::schema({
+            arrow::field("id", arrow::int32(), false),
+            arrow::field("score", arrow::int32(), false),
+            arrow::field("value", arrow::utf8(), false),
+    });
+    auto table = arrow::Table::Make(
+            schema, {build_int32_array({1, 2, 3, 4, 5}), build_int32_array({1, 
2, 3, 4, 5}),
+                     build_string_array({"one", "two", "three", "four", 
"five"})});
+
+    auto file_result = arrow::io::FileOutputStream::Open(file_path);
+    ASSERT_TRUE(file_result.ok()) << file_result.status();
+    std::shared_ptr<arrow::io::FileOutputStream> out = *file_result;
+
+    ::parquet::WriterProperties::Builder builder;
+    builder.version(::parquet::ParquetVersion::PARQUET_2_6);
+    builder.data_page_version(::parquet::ParquetDataPageVersion::V2);
+    builder.compression(::parquet::Compression::UNCOMPRESSED);
+    PARQUET_THROW_NOT_OK(::parquet::arrow::WriteTable(
+            *table, arrow::default_memory_pool(), out, row_group_size, 
builder.build()));
+}
+
 Block build_file_block(const std::vector<reader::SchemaField>& schema) {
     Block block;
     for (const auto& field : schema) {
@@ -216,7 +283,7 @@ TEST(TableColumnMapperTest, 
CreatesComplexProjectionForStructChildren) {
     ASSERT_TRUE(mapper.create_mapping({table_column}, {}, 
{struct_field}).ok());
 
     auto request = std::make_unique<reader::FileScanRequest>();
-    ASSERT_TRUE(mapper.create_scan_request({}, {table_column}, 
request.get()).ok());
+    ASSERT_TRUE(mapper.create_scan_request({}, {}, {table_column}, 
request.get()).ok());
     ASSERT_EQ(request->non_predicate_columns, 
std::vector<reader::ColumnId>({0}));
     ASSERT_EQ(request->complex_projections.size(), 1);
     const auto& projection = request->complex_projections.at(0);
@@ -359,12 +426,14 @@ TEST_F(NewParquetReaderTest, 
ReadPredicateAndNonPredicateColumnsWithSelection) {
     auto request = std::make_unique<reader::FileScanRequest>();
     request->predicate_columns = {0};
     request->non_predicate_columns = {1};
-    reader::FileLocalFilter filter;
-    filter.file_column_id = 0;
-    filter.conjunct = create_int32_greater_than_conjunct(0, 2);
-    filter.predicates.push_back(create_comparison_predicate<PredicateType::GT>(
+    reader::FileExpressionFilter expression_filter;
+    expression_filter.conjunct = create_int32_greater_than_conjunct(0, 2);
+    request->expression_filters.push_back(std::move(expression_filter));
+    reader::FileColumnPredicateFilter column_filter;
+    column_filter.file_column_id = 0;
+    
column_filter.predicates.push_back(create_comparison_predicate<PredicateType::GT>(
             0, "id", schema[0].type, Field::create_field<TYPE_INT>(2), false));
-    request->local_filters.push_back(std::move(filter));
+    request->column_predicate_filters.push_back(std::move(column_filter));
     ASSERT_TRUE(reader->open(request).ok());
 
     size_t rows = 0;
@@ -391,6 +460,40 @@ TEST_F(NewParquetReaderTest, 
ReadPredicateAndNonPredicateColumnsWithSelection) {
     EXPECT_EQ(rows, 0);
 }
 
+TEST_F(NewParquetReaderTest, ReadMultiPredicateColumnsBeforeExpressionFilter) {
+    write_int_pair_parquet_file(_file_path);
+    auto reader = create_reader();
+    RuntimeState state {TQueryOptions(), TQueryGlobals()};
+    ASSERT_TRUE(reader->init(&state).ok());
+
+    std::vector<reader::SchemaField> schema;
+    ASSERT_TRUE(reader->get_schema(&schema).ok());
+    Block block = build_file_block(schema);
+
+    auto request = std::make_unique<reader::FileScanRequest>();
+    request->predicate_columns = {0, 1};
+    request->non_predicate_columns = {};
+    reader::FileExpressionFilter expression_filter;
+    expression_filter.conjunct = create_int32_sum_greater_than_conjunct(0, 1, 
7);
+    request->expression_filters.push_back(std::move(expression_filter));
+    ASSERT_TRUE(reader->open(request).ok());
+
+    size_t rows = 0;
+    bool eof = false;
+    ASSERT_TRUE(reader->get_block(&block, &rows, &eof).ok());
+    EXPECT_FALSE(eof);
+    ASSERT_EQ(rows, 2);
+
+    const auto& ids = assert_cast<const 
ColumnInt32&>(*block.get_by_position(0).column);
+    const auto& scores = assert_cast<const 
ColumnInt32&>(*block.get_by_position(1).column);
+    ASSERT_EQ(ids.size(), 2);
+    ASSERT_EQ(scores.size(), 2);
+    EXPECT_EQ(ids.get_element(0), 4);
+    EXPECT_EQ(ids.get_element(1), 5);
+    EXPECT_EQ(scores.get_element(0), 4);
+    EXPECT_EQ(scores.get_element(1), 5);
+}
+
 TEST_F(NewParquetReaderTest, PredicateFiltersRowGroupsByStatistics) {
     write_parquet_file(_file_path, 2);
     auto parquet_file_reader = 
::parquet::ParquetFileReader::OpenFile(_file_path, false);
@@ -405,12 +508,14 @@ TEST_F(NewParquetReaderTest, 
PredicateFiltersRowGroupsByStatistics) {
     auto request = std::make_unique<reader::FileScanRequest>();
     request->predicate_columns = {0};
     request->non_predicate_columns = {1};
-    reader::FileLocalFilter filter;
-    filter.file_column_id = 0;
-    filter.conjunct = create_int32_greater_than_conjunct(0, 2);
-    filter.predicates.push_back(create_comparison_predicate<PredicateType::GT>(
+    reader::FileExpressionFilter expression_filter;
+    expression_filter.conjunct = create_int32_greater_than_conjunct(0, 2);
+    request->expression_filters.push_back(std::move(expression_filter));
+    reader::FileColumnPredicateFilter column_filter;
+    column_filter.file_column_id = 0;
+    
column_filter.predicates.push_back(create_comparison_predicate<PredicateType::GT>(
             0, "id", schema[0].type, Field::create_field<TYPE_INT>(2), false));
-    request->local_filters.push_back(std::move(filter));
+    request->column_predicate_filters.push_back(std::move(column_filter));
     ASSERT_TRUE(reader->open(request).ok());
 
     std::vector<int32_t> ids;
diff --git a/be/test/format/reader/table_reader_test.cpp 
b/be/test/format/reader/table_reader_test.cpp
index dc2e26f35ea..3d132244122 100644
--- a/be/test/format/reader/table_reader_test.cpp
+++ b/be/test/format/reader/table_reader_test.cpp
@@ -22,6 +22,7 @@
 #include <gtest/gtest.h>
 #include <parquet/arrow/writer.h>
 
+#include <algorithm>
 #include <filesystem>
 #include <memory>
 #include <string>
@@ -37,6 +38,7 @@
 #include "format/reader/expr/slot_ref.h"
 #include "gen_cpp/PlanNodes_types.h"
 #include "runtime/runtime_state.h"
+#include "storage/predicate/predicate_creator.h"
 
 namespace doris::reader {
 namespace {
@@ -75,6 +77,86 @@ private:
     const std::string _expr_name = "TableInt32GreaterThanExpr";
 };
 
+class TableInt32SumGreaterThanExpr final : public VExpr {
+public:
+    TableInt32SumGreaterThanExpr(int left_slot_id, int left_column_id, int 
right_slot_id,
+                                 int right_column_id, int32_t value)
+            : VExpr(std::make_shared<DataTypeUInt8>(), false), _value(value) {
+        add_child(TableSlotRef::create_shared(left_slot_id, left_column_id, -1,
+                                              
std::make_shared<DataTypeInt32>(), "id"));
+        add_child(TableSlotRef::create_shared(right_slot_id, right_column_id, 
-1,
+                                              
std::make_shared<DataTypeInt32>(), "score"));
+        set_node_type(TExprNodeType::BINARY_PRED);
+        _opcode = TExprOpcode::GT;
+    }
+
+    Status execute_column_impl(VExprContext* context, const Block* block, 
const Selector* selector,
+                               size_t count, ColumnPtr& result_column) const 
override {
+        const auto* left_slot_ref = assert_cast<const 
VSlotRef*>(get_child(0).get());
+        const auto* right_slot_ref = assert_cast<const 
VSlotRef*>(get_child(1).get());
+        const auto& left_input = assert_cast<const ColumnInt32&>(
+                *block->get_by_position(left_slot_ref->column_id()).column);
+        const auto& right_input = assert_cast<const ColumnInt32&>(
+                *block->get_by_position(right_slot_ref->column_id()).column);
+        auto result = ColumnUInt8::create();
+        auto& result_data = result->get_data();
+        result_data.resize(count);
+        for (size_t row = 0; row < count; ++row) {
+            const size_t input_row = selector == nullptr ? row : 
(*selector)[row];
+            result_data[row] =
+                    left_input.get_element(input_row) + 
right_input.get_element(input_row) > _value;
+        }
+        result_column = std::move(result);
+        return Status::OK();
+    }
+
+    const std::string& expr_name() const override { return _expr_name; }
+
+private:
+    const int32_t _value;
+    const std::string _expr_name = "TableInt32SumGreaterThanExpr";
+};
+
+class TableInt32SumLessThanExpr final : public VExpr {
+public:
+    TableInt32SumLessThanExpr(int left_slot_id, int left_column_id, int 
right_slot_id,
+                              int right_column_id, int32_t value)
+            : VExpr(std::make_shared<DataTypeUInt8>(), false), _value(value) {
+        add_child(TableSlotRef::create_shared(left_slot_id, left_column_id, -1,
+                                              
std::make_shared<DataTypeInt32>(), "id"));
+        add_child(TableSlotRef::create_shared(right_slot_id, right_column_id, 
-1,
+                                              
std::make_shared<DataTypeInt32>(), "score"));
+        set_node_type(TExprNodeType::BINARY_PRED);
+        _opcode = TExprOpcode::LT;
+    }
+
+    Status execute_column_impl(VExprContext* context, const Block* block, 
const Selector* selector,
+                               size_t count, ColumnPtr& result_column) const 
override {
+        const auto* left_slot_ref = assert_cast<const 
VSlotRef*>(get_child(0).get());
+        const auto* right_slot_ref = assert_cast<const 
VSlotRef*>(get_child(1).get());
+        const auto& left_input = assert_cast<const ColumnInt32&>(
+                *block->get_by_position(left_slot_ref->column_id()).column);
+        const auto& right_input = assert_cast<const ColumnInt32&>(
+                *block->get_by_position(right_slot_ref->column_id()).column);
+        auto result = ColumnUInt8::create();
+        auto& result_data = result->get_data();
+        result_data.resize(count);
+        for (size_t row = 0; row < count; ++row) {
+            const size_t input_row = selector == nullptr ? row : 
(*selector)[row];
+            result_data[row] =
+                    left_input.get_element(input_row) + 
right_input.get_element(input_row) < _value;
+        }
+        result_column = std::move(result);
+        return Status::OK();
+    }
+
+    const std::string& expr_name() const override { return _expr_name; }
+
+private:
+    const int32_t _value;
+    const std::string _expr_name = "TableInt32SumLessThanExpr";
+};
+
 std::shared_ptr<arrow::Array> finish_array(arrow::ArrayBuilder* builder) {
     std::shared_ptr<arrow::Array> array;
     EXPECT_TRUE(builder->Finish(&array).ok());
@@ -117,6 +199,32 @@ void write_parquet_file(const std::string& file_path, 
int32_t id, const std::str
             *table, arrow::default_memory_pool(), out, 1, builder.build()));
 }
 
+void write_int_pair_parquet_file(const std::string& file_path, const 
std::vector<int32_t>& ids,
+                                 const std::vector<int32_t>& scores,
+                                 const std::vector<std::string>& values,
+                                 int64_t row_group_size = -1) {
+    auto schema = arrow::schema({
+            arrow::field("id", arrow::int32(), false),
+            arrow::field("score", arrow::int32(), false),
+            arrow::field("value", arrow::utf8(), false),
+    });
+    auto table = arrow::Table::Make(schema, {build_int32_array(ids), 
build_int32_array(scores),
+                                             build_string_array(values)});
+
+    auto file_result = arrow::io::FileOutputStream::Open(file_path);
+    ASSERT_TRUE(file_result.ok()) << file_result.status();
+    std::shared_ptr<arrow::io::FileOutputStream> out = *file_result;
+
+    ::parquet::WriterProperties::Builder builder;
+    builder.version(::parquet::ParquetVersion::PARQUET_2_6);
+    builder.data_page_version(::parquet::ParquetDataPageVersion::V2);
+    builder.compression(::parquet::Compression::UNCOMPRESSED);
+    const auto write_row_group_size =
+            row_group_size > 0 ? row_group_size : 
static_cast<int64_t>(ids.size());
+    PARQUET_THROW_NOT_OK(::parquet::arrow::WriteTable(
+            *table, arrow::default_memory_pool(), out, write_row_group_size, 
builder.build()));
+}
+
 Block build_table_block(const std::vector<TableColumn>& columns) {
     Block block;
     for (const auto& column : columns) {
@@ -164,6 +272,7 @@ TEST(TableReaderTest, ReopenSplitAfterClose) {
     ASSERT_TRUE(reader
                         .init({
                                 .projected_columns = projected_columns,
+                                .column_predicates = {},
                                 .conjuncts = VExprContext(
                                         
std::make_shared<TableInt32GreaterThanExpr>(0, 0, 0)),
                                 .format = FileFormat::PARQUET,
@@ -229,8 +338,9 @@ TEST(TableReaderTest, 
OpenReaderBuildsTableFiltersFromConjuncts) {
     ASSERT_TRUE(reader
                         .init({
                                 .projected_columns = projected_columns,
-                                .conjuncts = 
VExprContext(std::make_shared<TableInt32GreaterThanExpr>(
-                                        0, 0, 2)),
+                                .column_predicates = {},
+                                .conjuncts = VExprContext(
+                                        
std::make_shared<TableInt32GreaterThanExpr>(0, 0, 2)),
                                 .format = FileFormat::PARQUET,
                                 .scan_params = nullptr,
                                 .io_ctx = nullptr,
@@ -262,8 +372,9 @@ TEST(TableReaderTest, 
OpenReaderBuildsTableFiltersFromConjuncts) {
     ASSERT_TRUE(filtered_reader
                         .init({
                                 .projected_columns = projected_columns,
-                                .conjuncts = 
VExprContext(std::make_shared<TableInt32GreaterThanExpr>(
-                                        0, 0, 4)),
+                                .column_predicates = {},
+                                .conjuncts = VExprContext(
+                                        
std::make_shared<TableInt32GreaterThanExpr>(0, 0, 4)),
                                 .format = FileFormat::PARQUET,
                                 .scan_params = nullptr,
                                 .io_ctx = nullptr,
@@ -285,6 +396,195 @@ TEST(TableReaderTest, 
OpenReaderBuildsTableFiltersFromConjuncts) {
     std::filesystem::remove_all(test_dir);
 }
 
+TEST(TableReaderTest, OpenReaderBuildsColumnPredicateFilters) {
+    const auto test_dir =
+            std::filesystem::temp_directory_path() / 
"doris_table_reader_column_predicate_test";
+    std::filesystem::remove_all(test_dir);
+    std::filesystem::create_directories(test_dir);
+
+    const auto file_path = (test_dir / "split.parquet").string();
+    // ColumnPredicate is only used for row-group/statistics pruning. Keep one 
row per row
+    // group so the predicate can prune the first two row groups and leave 
only id = 3.
+    write_int_pair_parquet_file(file_path, {1, 2, 3}, {1, 5, 8}, {"one", 
"two", "three"}, 1);
+
+    std::vector<TableColumn> projected_columns;
+    projected_columns.push_back(make_table_column(2, "value", 
std::make_shared<DataTypeString>()));
+    projected_columns.push_back(make_table_column(0, "id", 
std::make_shared<DataTypeInt32>()));
+
+    TableColumnPredicates column_predicates;
+    
column_predicates[0].push_back(create_comparison_predicate<PredicateType::GT>(
+            0, "id", std::make_shared<DataTypeInt32>(), 
Field::create_field<TYPE_INT>(2), false));
+
+    RuntimeState state {TQueryOptions(), TQueryGlobals()};
+    TableReader reader;
+    ASSERT_TRUE(reader
+                        .init({
+                                .projected_columns = projected_columns,
+                                .column_predicates = 
std::move(column_predicates),
+                                .conjuncts = VExprContext(nullptr),
+                                .format = FileFormat::PARQUET,
+                                .scan_params = nullptr,
+                                .io_ctx = nullptr,
+                                .runtime_state = &state,
+                                .scanner_profile = nullptr,
+                                .allow_missing_columns = true,
+                                .profile = nullptr,
+                        })
+                        .ok());
+
+    ASSERT_TRUE(reader.prepare_split(build_split_options(file_path)).ok());
+
+    Block block = build_table_block(projected_columns);
+    bool eos = false;
+    ASSERT_TRUE(reader.get_block(&block, &eos).ok());
+    ASSERT_FALSE(eos);
+
+    const auto& value_column = assert_cast<const 
ColumnString&>(*block.get_by_position(0).column);
+    const auto& id_column = assert_cast<const 
ColumnInt32&>(*block.get_by_position(1).column);
+    ASSERT_EQ(id_column.size(), 1);
+    ASSERT_EQ(value_column.size(), 1);
+    EXPECT_EQ(id_column.get_element(0), 3);
+    EXPECT_EQ(value_column.get_data_at(0).to_string(), "three");
+
+    ASSERT_TRUE(reader.close().ok());
+    std::filesystem::remove_all(test_dir);
+}
+
+TEST(TableReaderTest, CreateScanRequestDeduplicatesSharedPredicateColumns) {
+    const auto int_type = std::make_shared<DataTypeInt32>();
+    const std::vector<TableColumn> projected_columns = {
+            make_table_column(0, "a", int_type),
+            make_table_column(1, "b", int_type),
+            make_table_column(2, "c", int_type),
+            make_table_column(3, "value", std::make_shared<DataTypeString>()),
+    };
+    const std::vector<SchemaField> file_schema = {
+            {.id = 0,
+             .name = "a",
+             .type = int_type,
+             .children = {},
+             .file_path = {0},
+             .field_id_path = {0},
+             .name_path = {"a"},
+             .column_type = DATA_COLUMN},
+            {.id = 1,
+             .name = "b",
+             .type = int_type,
+             .children = {},
+             .file_path = {1},
+             .field_id_path = {1},
+             .name_path = {"b"},
+             .column_type = DATA_COLUMN},
+            {.id = 2,
+             .name = "c",
+             .type = int_type,
+             .children = {},
+             .file_path = {2},
+             .field_id_path = {2},
+             .name_path = {"c"},
+             .column_type = DATA_COLUMN},
+            {.id = 3,
+             .name = "value",
+             .type = std::make_shared<DataTypeString>(),
+             .children = {},
+             .file_path = {3},
+             .field_id_path = {3},
+             .name_path = {"value"},
+             .column_type = DATA_COLUMN},
+    };
+
+    TableColumnMapper mapper;
+    ASSERT_TRUE(mapper.create_mapping(projected_columns, {}, 
file_schema).ok());
+
+    std::vector<TableFilter> table_filters;
+    table_filters.push_back({
+            .conjunct = VExprContext::create_shared(
+                    std::make_shared<TableInt32SumGreaterThanExpr>(0, 0, 1, 1, 
1)),
+            .slot_ids = {0, 1},
+    });
+    table_filters.push_back({
+            .conjunct = VExprContext::create_shared(
+                    std::make_shared<TableInt32SumLessThanExpr>(0, 0, 2, 2, 
3)),
+            .slot_ids = {0, 2},
+    });
+
+    FileScanRequest file_request;
+    ASSERT_TRUE(mapper.create_scan_request(table_filters, {}, 
projected_columns, &file_request)
+                        .ok());
+
+    // Both filters reference column a. It must still be read once as a 
predicate column, and a
+    // predicate column must not be repeated as a non-predicate column.
+    EXPECT_EQ(file_request.predicate_columns, std::vector<ColumnId>({0, 1, 
2}));
+    EXPECT_EQ(file_request.non_predicate_columns, std::vector<ColumnId>({3}));
+    ASSERT_EQ(file_request.column_positions.size(), 4);
+    EXPECT_EQ(file_request.column_positions.at(3), 0);
+    EXPECT_EQ(file_request.column_positions.at(0), 1);
+    EXPECT_EQ(file_request.column_positions.at(1), 2);
+    EXPECT_EQ(file_request.column_positions.at(2), 3);
+    for (const auto predicate_column : file_request.predicate_columns) {
+        EXPECT_TRUE(std::find(file_request.non_predicate_columns.begin(),
+                              file_request.non_predicate_columns.end(),
+                              predicate_column) == 
file_request.non_predicate_columns.end());
+    }
+}
+
+TEST(TableReaderTest, OpenReaderPushesMultiColumnConjunctToParquetReader) {
+    const auto test_dir =
+            std::filesystem::temp_directory_path() / 
"doris_table_reader_multi_conjunct_test";
+    std::filesystem::remove_all(test_dir);
+    std::filesystem::create_directories(test_dir);
+
+    const auto file_path = (test_dir / "split.parquet").string();
+    write_int_pair_parquet_file(file_path, {1, 2, 3}, {1, 5, 8}, {"one", 
"two", "three"});
+
+    std::vector<TableColumn> projected_columns;
+    projected_columns.push_back(make_table_column(2, "value", 
std::make_shared<DataTypeString>()));
+    projected_columns.push_back(make_table_column(0, "id", 
std::make_shared<DataTypeInt32>()));
+    projected_columns.push_back(make_table_column(1, "score", 
std::make_shared<DataTypeInt32>()));
+
+    RuntimeState state {TQueryOptions(), TQueryGlobals()};
+    TableReader reader;
+    ASSERT_TRUE(reader
+                        .init({
+                                .projected_columns = projected_columns,
+                                .column_predicates = {},
+                                .conjuncts = VExprContext(
+                                        
std::make_shared<TableInt32SumGreaterThanExpr>(
+                                                0, 0, 1, 1, 8)),
+                                .format = FileFormat::PARQUET,
+                                .scan_params = nullptr,
+                                .io_ctx = nullptr,
+                                .runtime_state = &state,
+                                .scanner_profile = nullptr,
+                                .allow_missing_columns = true,
+                                .profile = nullptr,
+                        })
+                        .ok());
+
+    ASSERT_TRUE(reader.prepare_split(build_split_options(file_path)).ok());
+
+    // The conjunct references both id and score, so ColumnMapper must put 
both file columns into
+    // predicate_columns and rewrite both slot refs to ParquetReader's 
file-local block positions.
+    // ParquetReader then evaluates the expression after all predicate columns 
have been read.
+    Block block = build_table_block(projected_columns);
+    bool eos = false;
+    ASSERT_TRUE(reader.get_block(&block, &eos).ok());
+    ASSERT_FALSE(eos);
+
+    const auto& value_column = assert_cast<const 
ColumnString&>(*block.get_by_position(0).column);
+    const auto& id_column = assert_cast<const 
ColumnInt32&>(*block.get_by_position(1).column);
+    const auto& score_column = assert_cast<const 
ColumnInt32&>(*block.get_by_position(2).column);
+    ASSERT_EQ(id_column.size(), 1);
+    ASSERT_EQ(score_column.size(), 1);
+    ASSERT_EQ(value_column.size(), 1);
+    EXPECT_EQ(id_column.get_element(0), 3);
+    EXPECT_EQ(score_column.get_element(0), 8);
+    EXPECT_EQ(value_column.get_data_at(0).to_string(), "three");
+
+    ASSERT_TRUE(reader.close().ok());
+    std::filesystem::remove_all(test_dir);
+}
+
 TEST(TableReaderTest, ProjectedColumnsFillDefaultForParquetSchemaMismatch) {
     const auto test_dir =
             std::filesystem::temp_directory_path() / 
"doris_table_reader_schema_mismatch_test";
@@ -303,6 +603,7 @@ TEST(TableReaderTest, 
ProjectedColumnsFillDefaultForParquetSchemaMismatch) {
     ASSERT_TRUE(reader
                         .init({
                                 .projected_columns = projected_columns,
+                                .column_predicates = {},
                                 .conjuncts = VExprContext(nullptr),
                                 .format = FileFormat::PARQUET,
                                 .scan_params = nullptr,
@@ -347,6 +648,7 @@ TEST(TableReaderTest, 
ProjectedColumnsRejectParquetSchemaMismatchWhenMissingColu
     ASSERT_TRUE(reader
                         .init({
                                 .projected_columns = projected_columns,
+                                .column_predicates = {},
                                 .conjuncts = VExprContext(nullptr),
                                 .format = FileFormat::PARQUET,
                                 .scan_params = nullptr,
@@ -389,6 +691,7 @@ TEST(TableReaderTest, 
ProjectedColumnsUseMapperExpressionForSameNameDifferentIdP
     ASSERT_TRUE(reader
                         .init({
                                 .projected_columns = projected_columns,
+                                .column_predicates = {},
                                 .conjuncts = VExprContext(nullptr),
                                 .format = FileFormat::PARQUET,
                                 .scan_params = nullptr,
@@ -438,6 +741,7 @@ TEST(TableReaderTest, 
ProjectedColumnsUseMapperExpressionsForParquetSchemaMismat
     ASSERT_TRUE(reader
                         .init({
                                 .projected_columns = projected_columns,
+                                .column_predicates = {},
                                 .conjuncts = VExprContext(nullptr),
                                 .format = FileFormat::PARQUET,
                                 .scan_params = nullptr,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to