This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 77d544bec45 [feature](scan) Add value predicate pushdown control for 
MOR tables (#60513)
77d544bec45 is described below

commit 77d544bec45dfbdf8b6fd279b84799cd00135050
Author: Yongqiang YANG <[email protected]>
AuthorDate: Mon Mar 16 00:58:57 2026 -0700

    [feature](scan) Add value predicate pushdown control for MOR tables (#60513)
    
    ## Summary
    - Add session variable `enable_mor_value_predicate_pushdown_tables` to
    control value column predicate pushdown for MOR (Merge-On-Read) tables
    - FE: session variable with table-list matching (`db.table`, `table`, or
    `*`), thrift flag on TOlapScanNode, `isMorTable()` helper on OlapTable
    - BE: propagate flag through scan operator → tablet reader → rowset
    reader; extend `_should_push_down_value_predicates()` for all rowsets;
    skip `__DORIS_DELETE_SIGN__` to preserve delete correctness; keep VExpr
    in conjuncts as post-merge safety net
    
    ## Motivation
    MOR tables with inverted indexes on value columns cannot utilize those
    indexes for filtering because value predicates are not pushed to the
    storage layer. This feature enables per-segment value predicate pushdown
    for dedup-only/insert-only MOR workloads where the same key always
    carries identical values across rowsets, allowing inverted indexes and
    zone maps to filter data early.
    
    ## Design
    Two-layer predicate approach:
    1. **ColumnPredicate** (storage layer): pushed per-segment for early
    filtering via inverted index, zone map, bloom filter
    2. **VExpr** (compute layer): retained in `_conjuncts` as post-merge
    safety net
    
    Delete sign handling: `__DORIS_DELETE_SIGN__` predicate is excluded from
    per-segment pushdown to prevent delete markers from being filtered
    before merge, which would cause deleted rows to reappear.
    
    ## Test plan
    - [x] Test 1: Basic session variable matching (table name, db.table,
    wildcard, not-in-list)
    - [x] Test 2: Dedup-only pattern with multiple overlapping rowsets
    - [x] Test 3: Dedup + row-level delete via `INSERT INTO (...,
    __DORIS_DELETE_SIGN__) VALUES (..., 1)`
    - [x] Test 4: Dedup + delete predicate via `DELETE FROM ... WHERE`
    - [x] Test 5: Multiple tables in session variable list
    - [x] Test 6: MOW table (unaffected by setting)
    - [x] Test 7: DUP_KEYS table (unaffected by setting)
    
    ---------
    
    Co-authored-by: Claude Opus 4.6 <[email protected]>
---
 be/src/exec/operator/olap_scan_operator.cpp        |   8 +-
 be/src/exec/operator/olap_scan_operator.h          |   1 +
 be/src/exec/operator/scan_operator.cpp             |   2 +-
 be/src/exec/scan/olap_scanner.cpp                  |  12 +-
 be/src/storage/rowset/beta_rowset_reader.cpp       |   3 +-
 be/src/storage/rowset/rowset_reader_context.h      |   3 +
 be/src/storage/tablet/tablet_reader.cpp            |  10 +
 be/src/storage/tablet/tablet_reader.h              |   3 +
 .../java/org/apache/doris/catalog/OlapTable.java   |   8 +
 .../doris/nereids/rules/analysis/BindRelation.java |   5 +-
 .../nereids/rules/rewrite/SetPreAggStatus.java     |   9 +-
 .../trees/plans/logical/LogicalOlapScan.java       |   7 +
 .../org/apache/doris/planner/OlapScanNode.java     |  13 +
 .../java/org/apache/doris/qe/SessionVariable.java  |  69 ++++
 .../nereids/rules/analysis/ReadMorAsDupTest.java   | 280 ++++++++++++++++
 .../org/apache/doris/qe/SessionVariablesTest.java  |  55 ++++
 gensrc/thrift/PlanNodes.thrift                     |   4 +
 .../unique/test_mor_value_predicate_pushdown.out   |  76 +++++
 .../test_mor_value_predicate_pushdown.groovy       | 366 +++++++++++++++++++++
 .../unique_with_mor_p0/test_read_mor_as_dup.groovy | 119 +++++++
 20 files changed, 1047 insertions(+), 6 deletions(-)

diff --git a/be/src/exec/operator/olap_scan_operator.cpp 
b/be/src/exec/operator/olap_scan_operator.cpp
index 60a53e9c67f..53c17626d95 100644
--- a/be/src/exec/operator/olap_scan_operator.cpp
+++ b/be/src/exec/operator/olap_scan_operator.cpp
@@ -454,7 +454,13 @@ bool OlapScanLocalState::_storage_no_merge() {
     return (p._olap_scan_node.keyType == TKeysType::DUP_KEYS ||
             (p._olap_scan_node.keyType == TKeysType::UNIQUE_KEYS &&
              p._olap_scan_node.__isset.enable_unique_key_merge_on_write &&
-             p._olap_scan_node.enable_unique_key_merge_on_write));
+             p._olap_scan_node.enable_unique_key_merge_on_write)) ||
+           _read_mor_as_dup();
+}
+
+bool OlapScanLocalState::_read_mor_as_dup() {
+    auto& p = _parent->cast<OlapScanOperatorX>();
+    return p._olap_scan_node.__isset.read_mor_as_dup && 
p._olap_scan_node.read_mor_as_dup;
 }
 
 Status OlapScanLocalState::_init_scanners(std::list<ScannerSPtr>* scanners) {
diff --git a/be/src/exec/operator/olap_scan_operator.h 
b/be/src/exec/operator/olap_scan_operator.h
index 27414c6b25b..ae010479840 100644
--- a/be/src/exec/operator/olap_scan_operator.h
+++ b/be/src/exec/operator/olap_scan_operator.h
@@ -103,6 +103,7 @@ private:
 
     bool _storage_no_merge() override;
 
+    bool _read_mor_as_dup();
     bool _push_down_topn(const RuntimePredicate& predicate) override {
         if (!predicate.target_is_slot(_parent->node_id())) {
             return false;
diff --git a/be/src/exec/operator/scan_operator.cpp 
b/be/src/exec/operator/scan_operator.cpp
index ae41f3303b0..b2ac312ffda 100644
--- a/be/src/exec/operator/scan_operator.cpp
+++ b/be/src/exec/operator/scan_operator.cpp
@@ -443,7 +443,7 @@ Status 
ScanLocalState<Derived>::_normalize_predicate(VExprContext* context, cons
         return Status::OK();
     }
 
-    if (pdt == PushDownType::ACCEPTABLE && (_is_key_column(slot->col_name()))) 
{
+    if (pdt == PushDownType::ACCEPTABLE && _is_key_column(slot->col_name())) {
         output_expr = nullptr;
         return Status::OK();
     } else {
diff --git a/be/src/exec/scan/olap_scanner.cpp 
b/be/src/exec/scan/olap_scanner.cpp
index 4fb031912cb..aa6af4b7159 100644
--- a/be/src/exec/scan/olap_scanner.cpp
+++ b/be/src/exec/scan/olap_scanner.cpp
@@ -333,7 +333,10 @@ Status OlapScanner::_init_tablet_reader_params(
     // if the table with rowset [0-x] or [0-1] [2-y], and [0-1] is empty
     const bool single_version = _tablet_reader_params.has_single_version();
 
-    if (_state->skip_storage_engine_merge()) {
+    auto* olap_local_state = static_cast<OlapScanLocalState*>(_local_state);
+    bool read_mor_as_dup = 
olap_local_state->olap_scan_node().__isset.read_mor_as_dup &&
+                           olap_local_state->olap_scan_node().read_mor_as_dup;
+    if (_state->skip_storage_engine_merge() || read_mor_as_dup) {
         _tablet_reader_params.direct_mode = true;
         _tablet_reader_params.aggregation = true;
     } else {
@@ -475,6 +478,13 @@ Status OlapScanner::_init_tablet_reader_params(
     if (!_state->skip_storage_engine_merge()) {
         auto* olap_scan_local_state = (OlapScanLocalState*)_local_state;
         TOlapScanNode& olap_scan_node = 
olap_scan_local_state->olap_scan_node();
+
+        // Set MOR value predicate pushdown flag
+        if (olap_scan_node.__isset.enable_mor_value_predicate_pushdown &&
+            olap_scan_node.enable_mor_value_predicate_pushdown) {
+            _tablet_reader_params.enable_mor_value_predicate_pushdown = true;
+        }
+
         // order by table keys optimization for topn
         // will only read head/tail of data file since it's already sorted by 
keys
         if (olap_scan_node.__isset.sort_info && 
!olap_scan_node.sort_info.is_asc_order.empty()) {
diff --git a/be/src/storage/rowset/beta_rowset_reader.cpp 
b/be/src/storage/rowset/beta_rowset_reader.cpp
index f1126391544..2869659ed12 100644
--- a/be/src/storage/rowset/beta_rowset_reader.cpp
+++ b/be/src/storage/rowset/beta_rowset_reader.cpp
@@ -360,7 +360,8 @@ bool BetaRowsetReader::_should_push_down_value_predicates() 
const {
            (((_rowset->start_version() == 0 || _rowset->start_version() == 2) 
&&
              !_rowset->_rowset_meta->is_segments_overlapping() &&
              _read_context->sequence_id_idx == -1) ||
-            _read_context->enable_unique_key_merge_on_write);
+            _read_context->enable_unique_key_merge_on_write ||
+            _read_context->enable_mor_value_predicate_pushdown);
 }
 #include "common/compile_check_end.h"
 } // namespace doris
diff --git a/be/src/storage/rowset/rowset_reader_context.h 
b/be/src/storage/rowset/rowset_reader_context.h
index 5855b113a93..e44733367c8 100644
--- a/be/src/storage/rowset/rowset_reader_context.h
+++ b/be/src/storage/rowset/rowset_reader_context.h
@@ -100,6 +100,9 @@ struct RowsetReaderContext {
     std::shared_ptr<segment_v2::AnnTopNRuntime> ann_topn_runtime;
 
     uint64_t condition_cache_digest = 0;
+
+    // When true, push down value predicates for MOR tables
+    bool enable_mor_value_predicate_pushdown = false;
 };
 
 } // namespace doris
diff --git a/be/src/storage/tablet/tablet_reader.cpp 
b/be/src/storage/tablet/tablet_reader.cpp
index 35568743212..b241d621d05 100644
--- a/be/src/storage/tablet/tablet_reader.cpp
+++ b/be/src/storage/tablet/tablet_reader.cpp
@@ -194,6 +194,8 @@ Status TabletReader::_capture_rs_readers(const 
ReaderParams& read_params) {
     _reader_context.merged_rows = &_merged_rows;
     _reader_context.delete_bitmap = read_params.delete_bitmap;
     _reader_context.enable_unique_key_merge_on_write = 
tablet()->enable_unique_key_merge_on_write();
+    _reader_context.enable_mor_value_predicate_pushdown =
+            read_params.enable_mor_value_predicate_pushdown;
     _reader_context.record_rowids = read_params.record_rowids;
     _reader_context.rowid_conversion = read_params.rowid_conversion;
     _reader_context.is_key_column_group = read_params.is_key_column_group;
@@ -500,9 +502,17 @@ Status TabletReader::_init_conditions_param(const 
ReaderParams& read_params) {
         }
     }
 
+    int32_t delete_sign_idx = _tablet_schema->delete_sign_idx();
     for (auto predicate : predicates) {
         auto column = _tablet_schema->column(predicate->column_id());
         if (column.aggregation() != 
FieldAggregationMethod::OLAP_FIELD_AGGREGATION_NONE) {
+            // When MOR value predicate pushdown is enabled, drop 
__DORIS_DELETE_SIGN__
+            // from storage-layer predicates entirely. Delete sign must only 
be evaluated
+            // post-merge via VExpr to prevent deleted rows from reappearing.
+            if (read_params.enable_mor_value_predicate_pushdown && 
delete_sign_idx >= 0 &&
+                predicate->column_id() == 
static_cast<uint32_t>(delete_sign_idx)) {
+                continue;
+            }
             _value_col_predicates.push_back(predicate);
         } else {
             _col_predicates.push_back(predicate);
diff --git a/be/src/storage/tablet/tablet_reader.h 
b/be/src/storage/tablet/tablet_reader.h
index 8616d8eeac0..e4842086cc8 100644
--- a/be/src/storage/tablet/tablet_reader.h
+++ b/be/src/storage/tablet/tablet_reader.h
@@ -187,6 +187,9 @@ public:
 
         bool is_segcompaction = false;
 
+        // Enable value predicate pushdown for MOR tables
+        bool enable_mor_value_predicate_pushdown = false;
+
         std::vector<RowwiseIteratorUPtr>* segment_iters_ptr = nullptr;
 
         void check_validation() const;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
index 5199ddbe4fa..b0200c707fc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
@@ -3073,6 +3073,14 @@ public class OlapTable extends Table implements 
MTMVRelatedTableIf, GsonPostProc
         return getKeysType() == KeysType.UNIQUE_KEYS && 
getEnableUniqueKeyMergeOnWrite();
     }
 
+    /**
+     * Check if this is a MOR (Merge-On-Read) table.
+     * MOR = UNIQUE_KEYS without merge-on-write enabled.
+     */
+    public boolean isMorTable() {
+        return getKeysType() == KeysType.UNIQUE_KEYS && 
!getEnableUniqueKeyMergeOnWrite();
+    }
+
     public boolean isUniqKeyMergeOnWriteWithClusterKeys() {
         return isUniqKeyMergeOnWrite() && 
getBaseSchema().stream().anyMatch(Column::isClusterKey);
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java
index 2d11e1c75db..81a54f8c67e 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java
@@ -371,7 +371,10 @@ public class BindRelation extends OneAnalysisRuleFactory {
     public static LogicalPlan checkAndAddDeleteSignFilter(LogicalOlapScan 
scan, ConnectContext connectContext,
             OlapTable olapTable) {
         if (!Util.showHiddenColumns() && scan.getTable().hasDeleteSign()
-                && !connectContext.getSessionVariable().skipDeleteSign()) {
+                && !connectContext.getSessionVariable().skipDeleteSign()
+                && !(olapTable.isMorTable()
+                    && 
connectContext.getSessionVariable().isReadMorAsDupEnabled(
+                        olapTable.getQualifiedDbName(), olapTable.getName()))) 
{
             // table qualifier is catalog.db.table, we make db.table.column
             Slot deleteSlot = null;
             for (Slot slot : scan.getOutput()) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/SetPreAggStatus.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/SetPreAggStatus.java
index 2f7743cb86f..7ae49fc6ec8 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/SetPreAggStatus.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/SetPreAggStatus.java
@@ -53,6 +53,7 @@ import 
org.apache.doris.nereids.trees.plans.logical.LogicalRepeat;
 import org.apache.doris.nereids.trees.plans.visitor.CustomRewriter;
 import org.apache.doris.nereids.trees.plans.visitor.DefaultPlanRewriter;
 import org.apache.doris.nereids.util.ExpressionUtils;
+import org.apache.doris.qe.ConnectContext;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
@@ -151,7 +152,13 @@ public class SetPreAggStatus extends 
DefaultPlanRewriter<Stack<SetPreAggStatus.P
             long selectIndexId = logicalOlapScan.getSelectedIndexId();
             MaterializedIndexMeta meta = 
logicalOlapScan.getTable().getIndexMetaByIndexId(selectIndexId);
             if (meta.getKeysType() == KeysType.DUP_KEYS || (meta.getKeysType() 
== KeysType.UNIQUE_KEYS
-                    && 
logicalOlapScan.getTable().getEnableUniqueKeyMergeOnWrite())) {
+                    && 
logicalOlapScan.getTable().getEnableUniqueKeyMergeOnWrite())
+                    || (meta.getKeysType() == KeysType.UNIQUE_KEYS
+                        && logicalOlapScan.getTable().isMorTable()
+                        && ConnectContext.get() != null
+                        && 
ConnectContext.get().getSessionVariable().isReadMorAsDupEnabled(
+                            logicalOlapScan.getTable().getQualifiedDbName(),
+                            logicalOlapScan.getTable().getName()))) {
                 return logicalOlapScan.withPreAggStatus(PreAggStatus.on());
             } else {
                 if (context.empty()) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalOlapScan.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalOlapScan.java
index 7f17e19e1f5..9a6f8262414 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalOlapScan.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalOlapScan.java
@@ -795,6 +795,13 @@ public class LogicalOlapScan extends 
LogicalCatalogRelation implements OlapScan,
                     && getTable().getKeysType() == KeysType.UNIQUE_KEYS) {
                 return;
             }
+            // When readMorAsDup is enabled, MOR tables are read as DUP, so 
uniqueness cannot be guaranteed.
+            if (getTable().getKeysType() == KeysType.UNIQUE_KEYS
+                    && getTable().isMorTable()
+                    && 
ConnectContext.get().getSessionVariable().isReadMorAsDupEnabled(
+                        getTable().getQualifiedDbName(), 
getTable().getName())) {
+                return;
+            }
             ImmutableSet.Builder<Slot> uniqSlots = 
ImmutableSet.builderWithExpectedSize(outputSet.size());
             for (Slot slot : outputSet) {
                 if (!(slot instanceof SlotReference)) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
index 0da78206cba..d651bf79010 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
@@ -1198,6 +1198,19 @@ public class OlapScanNode extends ScanNode {
         msg.olap_scan_node.setTableName(tableName);
         
msg.olap_scan_node.setEnableUniqueKeyMergeOnWrite(olapTable.getEnableUniqueKeyMergeOnWrite());
 
+        // Set MOR value predicate pushdown flag based on session variable
+        if (olapTable.isMorTable() && ConnectContext.get() != null) {
+            String dbName = olapTable.getQualifiedDbName();
+            String tblName = olapTable.getName();
+            boolean enabled = ConnectContext.get().getSessionVariable()
+                    .isMorValuePredicatePushdownEnabled(dbName, tblName);
+            msg.olap_scan_node.setEnableMorValuePredicatePushdown(enabled);
+            if (ConnectContext.get().getSessionVariable()
+                    .isReadMorAsDupEnabled(dbName, tblName)) {
+                msg.olap_scan_node.setReadMorAsDup(true);
+            }
+        }
+
         msg.setPushDownAggTypeOpt(pushDownAggNoGroupingOp);
 
         msg.olap_scan_node.setPushDownAggTypeOpt(pushDownAggNoGroupingOp);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index d54a128dcc9..6888134673c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -734,6 +734,11 @@ public class SessionVariable implements Serializable, 
Writable {
 
     public static final String ENABLE_PUSHDOWN_STRING_MINMAX = 
"enable_pushdown_string_minmax";
 
+    public static final String ENABLE_MOR_VALUE_PREDICATE_PUSHDOWN_TABLES
+            = "enable_mor_value_predicate_pushdown_tables";
+
+    public static final String READ_MOR_AS_DUP_TABLES = 
"read_mor_as_dup_tables";
+
     // When set use fix replica = true, the fixed replica maybe bad, try to 
use the health one if
     // this session variable is set to true.
     public static final String FALLBACK_OTHER_REPLICA_WHEN_FIXED_CORRUPT = 
"fallback_other_replica_when_fixed_corrupt";
@@ -2226,6 +2231,21 @@ public class SessionVariable implements Serializable, 
Writable {
         "是否启用 string 类型 min max 下推。", "Set whether to enable push down string 
type minmax."})
     public boolean enablePushDownStringMinMax = false;
 
+    // Comma-separated list of MOR tables to enable value predicate pushdown.
+    @VariableMgr.VarAttr(name = ENABLE_MOR_VALUE_PREDICATE_PUSHDOWN_TABLES, 
needForward = true, description = {
+        "指定启用MOR表value列谓词下推的表列表,格式:db1.tbl1,db2.tbl2 或 * 表示所有MOR表。",
+        "Comma-separated list of MOR tables to enable value predicate 
pushdown. "
+                + "Format: db1.tbl1,db2.tbl2 or * for all MOR tables."})
+    public String enableMorValuePredicatePushdownTables = "";
+
+    // Comma-separated list of MOR tables to read as DUP (skip merge, skip 
delete sign filter).
+    @VariableMgr.VarAttr(name = READ_MOR_AS_DUP_TABLES, needForward = true,
+            affectQueryResultInPlan = true, description = {
+                    "指定以DUP模式读取MOR表的表列表(跳过合并和删除标记过滤),格式:db1.tbl1,db2.tbl2 或 * 
表示所有MOR表。",
+                    "Comma-separated list of MOR tables to read as DUP (skip 
merge, skip delete sign filter). "
+                            + "Format: db1.tbl1,db2.tbl2 or * for all MOR 
tables."})
+    public String readMorAsDupTables = "";
+
     // Whether drop table when create table as select insert data appear error.
     @VariableMgr.VarAttr(name = DROP_TABLE_IF_CTAS_FAILED, needForward = true)
     public boolean dropTableIfCtasFailed = true;
@@ -4842,6 +4862,55 @@ public class SessionVariable implements Serializable, 
Writable {
         return enablePushDownStringMinMax;
     }
 
+    public String getEnableMorValuePredicatePushdownTables() {
+        return enableMorValuePredicatePushdownTables;
+    }
+
+    public boolean isMorValuePredicatePushdownEnabled(String dbName, String 
tableName) {
+        return isTableInList(enableMorValuePredicatePushdownTables, dbName, 
tableName);
+    }
+
+    public boolean isReadMorAsDupEnabled(String dbName, String tableName) {
+        return isTableInList(readMorAsDupTables, dbName, tableName);
+    }
+
+    /**
+     * Check if a table matches any entry in a comma-separated table list.
+     * Parses entries the same way as TableNameInfo: split by "." to extract
+     * component parts (table, db.table, or ctl.db.table).
+     * When entry specifies db, both db and table must match.
+     * When entry is just a table name, it matches any database.
+     */
+    private static boolean isTableInList(String tableList, String dbName, 
String tableName) {
+        if (tableList == null || tableList.isEmpty()) {
+            return false;
+        }
+        String trimmed = tableList.trim();
+        if ("*".equals(trimmed)) {
+            return true;
+        }
+        for (String entry : trimmed.split(",")) {
+            String trimmedEntry = entry.trim();
+            if (trimmedEntry.isEmpty()) {
+                continue;
+            }
+            String[] parts = trimmedEntry.split("\\.");
+            String entryTbl = parts[parts.length - 1];
+            String entryDb = parts.length >= 2 ? parts[parts.length - 2] : 
null;
+            if (!entryTbl.equalsIgnoreCase(tableName)) {
+                continue;
+            }
+            if (entryDb != null) {
+                if (dbName != null && entryDb.equalsIgnoreCase(dbName)) {
+                    return true;
+                }
+            } else {
+                return true;
+            }
+        }
+        return false;
+    }
+
     /** canUseNereidsDistributePlanner */
     public static boolean canUseNereidsDistributePlanner() {
         ConnectContext connectContext = ConnectContext.get();
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/analysis/ReadMorAsDupTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/analysis/ReadMorAsDupTest.java
new file mode 100644
index 00000000000..1c3f687c8ab
--- /dev/null
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/analysis/ReadMorAsDupTest.java
@@ -0,0 +1,280 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.rules.analysis;
+
+import org.apache.doris.catalog.Column;
+import org.apache.doris.nereids.pattern.GeneratedPlanPatterns;
+import org.apache.doris.nereids.rules.RulePromise;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
+import org.apache.doris.nereids.util.PlanChecker;
+import org.apache.doris.utframe.TestWithFeService;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Set;
+
+class ReadMorAsDupTest extends TestWithFeService implements 
GeneratedPlanPatterns {
+    private static final String DB = "test_read_mor_as_dup_db";
+
+    @Override
+    protected void runBeforeAll() throws Exception {
+        createDatabase(DB);
+        connectContext.setDatabase(DEFAULT_CLUSTER_PREFIX + DB);
+
+        // MOR UNIQUE table (enable_unique_key_merge_on_write = false)
+        createTable("CREATE TABLE " + DB + ".mor_tbl (\n"
+                + "  k INT NOT NULL,\n"
+                + "  v1 INT,\n"
+                + "  v2 VARCHAR(100)\n"
+                + ") ENGINE=OLAP\n"
+                + "UNIQUE KEY(k)\n"
+                + "DISTRIBUTED BY HASH(k) BUCKETS 1\n"
+                + "PROPERTIES ('replication_num' = '1', 
'enable_unique_key_merge_on_write' = 'false');");
+
+        // MOW UNIQUE table (should not be affected)
+        createTable("CREATE TABLE " + DB + ".mow_tbl (\n"
+                + "  k INT NOT NULL,\n"
+                + "  v1 INT\n"
+                + ") ENGINE=OLAP\n"
+                + "UNIQUE KEY(k)\n"
+                + "DISTRIBUTED BY HASH(k) BUCKETS 1\n"
+                + "PROPERTIES ('replication_num' = '1', 
'enable_unique_key_merge_on_write' = 'true');");
+
+        // DUP table (should not be affected)
+        createTable("CREATE TABLE " + DB + ".dup_tbl (\n"
+                + "  k INT NOT NULL,\n"
+                + "  v1 INT\n"
+                + ") ENGINE=OLAP\n"
+                + "DUPLICATE KEY(k)\n"
+                + "DISTRIBUTED BY HASH(k) BUCKETS 1\n"
+                + "PROPERTIES ('replication_num' = '1');");
+
+        
connectContext.getSessionVariable().setDisableNereidsRules("PRUNE_EMPTY_PARTITION");
+    }
+
+    @Test
+    void testDeleteSignFilterPresentByDefault() {
+        connectContext.getSessionVariable().readMorAsDupTables = "";
+
+        PlanChecker.from(connectContext)
+                .analyze("select * from mor_tbl")
+                .rewrite()
+                .matches(
+                        logicalProject(
+                                logicalFilter(
+                                        logicalOlapScan()
+                                ).when(filter -> 
hasDeleteSignPredicate(filter.getConjuncts()))
+                        )
+                );
+    }
+
+    @Test
+    void testDeleteSignFilterSkippedWithWildcard() {
+        connectContext.getSessionVariable().readMorAsDupTables = "*";
+
+        try {
+            PlanChecker.from(connectContext)
+                    .analyze("select * from mor_tbl")
+                    .rewrite()
+                    .nonMatch(
+                            logicalFilter(
+                                    logicalOlapScan()
+                            ).when(filter -> 
hasDeleteSignPredicate(filter.getConjuncts()))
+                    );
+        } finally {
+            connectContext.getSessionVariable().readMorAsDupTables = "";
+        }
+    }
+
+    @Test
+    void testDeleteSignFilterSkippedWithTableName() {
+        connectContext.getSessionVariable().readMorAsDupTables = "mor_tbl";
+
+        try {
+            PlanChecker.from(connectContext)
+                    .analyze("select * from mor_tbl")
+                    .rewrite()
+                    .nonMatch(
+                            logicalFilter(
+                                    logicalOlapScan()
+                            ).when(filter -> 
hasDeleteSignPredicate(filter.getConjuncts()))
+                    );
+        } finally {
+            connectContext.getSessionVariable().readMorAsDupTables = "";
+        }
+    }
+
+    @Test
+    void testDeleteSignFilterSkippedWithDbTableName() {
+        connectContext.getSessionVariable().readMorAsDupTables =
+                DEFAULT_CLUSTER_PREFIX + DB + ".mor_tbl";
+
+        try {
+            PlanChecker.from(connectContext)
+                    .analyze("select * from mor_tbl")
+                    .rewrite()
+                    .nonMatch(
+                            logicalFilter(
+                                    logicalOlapScan()
+                            ).when(filter -> 
hasDeleteSignPredicate(filter.getConjuncts()))
+                    );
+        } finally {
+            connectContext.getSessionVariable().readMorAsDupTables = "";
+        }
+    }
+
+    @Test
+    void testMowTableNotAffected() {
+        // MOW table should still have delete sign filter even with 
read_mor_as_dup = *
+        connectContext.getSessionVariable().readMorAsDupTables = "*";
+
+        try {
+            // MOW tables also have delete sign filter; read_mor_as_dup should 
NOT remove it
+            PlanChecker.from(connectContext)
+                    .analyze("select * from mow_tbl")
+                    .rewrite()
+                    .matches(
+                            logicalProject(
+                                    logicalFilter(
+                                            logicalOlapScan()
+                                    ).when(filter -> 
hasDeleteSignPredicate(filter.getConjuncts()))
+                            )
+                    );
+        } finally {
+            connectContext.getSessionVariable().readMorAsDupTables = "";
+        }
+    }
+
+    @Test
+    void testPreAggOnForMorWithReadAsDup() {
+        connectContext.getSessionVariable().readMorAsDupTables = "*";
+
+        try {
+            Plan plan = PlanChecker.from(connectContext)
+                    .analyze("select * from mor_tbl")
+                    .rewrite()
+                    .getPlan();
+
+            // Find the LogicalOlapScan and verify preAggStatus is ON
+            LogicalOlapScan scan = findOlapScan(plan);
+            Assertions.assertNotNull(scan, "Should find LogicalOlapScan in 
plan");
+            Assertions.assertTrue(scan.getPreAggStatus().isOn(),
+                    "PreAggStatus should be ON when read_mor_as_dup is 
enabled");
+        } finally {
+            connectContext.getSessionVariable().readMorAsDupTables = "";
+        }
+    }
+
+    @Test
+    void testPerTableControlOnlyAffectsSpecifiedTable() {
+        // Only enable for mor_tbl, not for other tables
+        connectContext.getSessionVariable().readMorAsDupTables = "mor_tbl";
+
+        try {
+            // mor_tbl should NOT have delete sign filter
+            PlanChecker.from(connectContext)
+                    .analyze("select * from mor_tbl")
+                    .rewrite()
+                    .nonMatch(
+                            logicalFilter(
+                                    logicalOlapScan()
+                            ).when(filter -> 
hasDeleteSignPredicate(filter.getConjuncts()))
+                    );
+
+            // dup_tbl should be unaffected (no delete sign filter for DUP)
+            PlanChecker.from(connectContext)
+                    .analyze("select * from dup_tbl")
+                    .rewrite()
+                    .matches(logicalOlapScan());
+        } finally {
+            connectContext.getSessionVariable().readMorAsDupTables = "";
+        }
+    }
+
+    @Test
+    void testSessionVariableHelperMethod() {
+        // Test the isReadMorAsDupEnabled helper directly
+        connectContext.getSessionVariable().readMorAsDupTables = "";
+        Assertions.assertFalse(
+                
connectContext.getSessionVariable().isReadMorAsDupEnabled("db", "tbl"));
+
+        connectContext.getSessionVariable().readMorAsDupTables = "*";
+        Assertions.assertTrue(
+                
connectContext.getSessionVariable().isReadMorAsDupEnabled("db", "tbl"));
+        Assertions.assertTrue(
+                
connectContext.getSessionVariable().isReadMorAsDupEnabled("any_db", "any_tbl"));
+
+        connectContext.getSessionVariable().readMorAsDupTables = "mydb.mytbl";
+        Assertions.assertTrue(
+                
connectContext.getSessionVariable().isReadMorAsDupEnabled("mydb", "mytbl"));
+        Assertions.assertFalse(
+                
connectContext.getSessionVariable().isReadMorAsDupEnabled("otherdb", 
"othertbl"));
+        // "mydb.mytbl" entry requires both db and table components to match
+        Assertions.assertFalse(
+                
connectContext.getSessionVariable().isReadMorAsDupEnabled("anything", "mytbl"));
+
+        // Table name only (no db prefix) matches any db
+        connectContext.getSessionVariable().readMorAsDupTables = "mytbl";
+        Assertions.assertTrue(
+                
connectContext.getSessionVariable().isReadMorAsDupEnabled("anydb", "mytbl"));
+        Assertions.assertFalse(
+                
connectContext.getSessionVariable().isReadMorAsDupEnabled("anydb", "othertbl"));
+
+        connectContext.getSessionVariable().readMorAsDupTables = 
"db1.tbl1,db2.tbl2";
+        Assertions.assertTrue(
+                
connectContext.getSessionVariable().isReadMorAsDupEnabled("db1", "tbl1"));
+        Assertions.assertTrue(
+                
connectContext.getSessionVariable().isReadMorAsDupEnabled("db2", "tbl2"));
+        Assertions.assertFalse(
+                
connectContext.getSessionVariable().isReadMorAsDupEnabled("db1", "tbl2"));
+
+        // Case insensitive
+        connectContext.getSessionVariable().readMorAsDupTables = "MyDB.MyTbl";
+        Assertions.assertTrue(
+                
connectContext.getSessionVariable().isReadMorAsDupEnabled("mydb", "mytbl"));
+
+        // Cleanup
+        connectContext.getSessionVariable().readMorAsDupTables = "";
+    }
+
+    private boolean hasDeleteSignPredicate(Set<Expression> conjuncts) {
+        return conjuncts.stream()
+                .anyMatch(expr -> expr.toSql().contains(Column.DELETE_SIGN));
+    }
+
+    private LogicalOlapScan findOlapScan(Plan plan) {
+        if (plan instanceof LogicalOlapScan) {
+            return (LogicalOlapScan) plan;
+        }
+        for (Plan child : plan.children()) {
+            LogicalOlapScan result = findOlapScan(child);
+            if (result != null) {
+                return result;
+            }
+        }
+        return null;
+    }
+
+    @Override
+    public RulePromise defaultPromise() {
+        return RulePromise.REWRITE;
+    }
+}
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/qe/SessionVariablesTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/qe/SessionVariablesTest.java
index 8721879f5af..4b3c307e818 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/qe/SessionVariablesTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/qe/SessionVariablesTest.java
@@ -81,4 +81,59 @@ public class SessionVariablesTest extends TestWithFeService {
         new NereidsParser().parseSQL(sql);
         Assertions.assertEquals(false, 
connectContext.getSessionVariable().enableNereidsDmlWithPipeline);
     }
+
+    @Test
+    public void testMorValuePredicatePushdownEnabled() {
+        SessionVariable sv = new SessionVariable();
+
+        // default empty string — disabled for all tables
+        Assertions.assertFalse(sv.isMorValuePredicatePushdownEnabled("db1", 
"tbl1"));
+
+        // wildcard enables all tables
+        sv.enableMorValuePredicatePushdownTables = "*";
+        Assertions.assertTrue(sv.isMorValuePredicatePushdownEnabled("db1", 
"tbl1"));
+        Assertions.assertTrue(sv.isMorValuePredicatePushdownEnabled(null, 
"tbl1"));
+
+        // single table name without db — matches any database
+        sv.enableMorValuePredicatePushdownTables = "tbl1";
+        Assertions.assertTrue(sv.isMorValuePredicatePushdownEnabled("db1", 
"tbl1"));
+        Assertions.assertTrue(sv.isMorValuePredicatePushdownEnabled("db2", 
"tbl1"));
+        Assertions.assertFalse(sv.isMorValuePredicatePushdownEnabled("db1", 
"tbl2"));
+
+        // table name with db prefix — must match both
+        sv.enableMorValuePredicatePushdownTables = "db1.tbl1";
+        Assertions.assertTrue(sv.isMorValuePredicatePushdownEnabled("db1", 
"tbl1"));
+        Assertions.assertFalse(sv.isMorValuePredicatePushdownEnabled("db2", 
"tbl1"));
+
+        // multiple tables comma-separated
+        sv.enableMorValuePredicatePushdownTables = "db1.tbl1,tbl2";
+        Assertions.assertTrue(sv.isMorValuePredicatePushdownEnabled("db1", 
"tbl1"));
+        Assertions.assertFalse(sv.isMorValuePredicatePushdownEnabled("db2", 
"tbl1"));
+        Assertions.assertTrue(sv.isMorValuePredicatePushdownEnabled("db2", 
"tbl2"));
+
+        // case-insensitive matching
+        sv.enableMorValuePredicatePushdownTables = "DB1.TBL1";
+        Assertions.assertTrue(sv.isMorValuePredicatePushdownEnabled("db1", 
"tbl1"));
+
+        // whitespace handling
+        sv.enableMorValuePredicatePushdownTables = " tbl1 , db2.tbl2 ";
+        Assertions.assertTrue(sv.isMorValuePredicatePushdownEnabled("db1", 
"tbl1"));
+        Assertions.assertTrue(sv.isMorValuePredicatePushdownEnabled("db2", 
"tbl2"));
+        Assertions.assertFalse(sv.isMorValuePredicatePushdownEnabled("db1", 
"tbl2"));
+
+        // null dbName — matches table-only entries, not db-qualified entries
+        sv.enableMorValuePredicatePushdownTables = "tbl1,db2.tbl2";
+        Assertions.assertTrue(sv.isMorValuePredicatePushdownEnabled(null, 
"tbl1"));
+        Assertions.assertFalse(sv.isMorValuePredicatePushdownEnabled(null, 
"tbl2"));
+
+        // consecutive commas / empty entries
+        sv.enableMorValuePredicatePushdownTables = "tbl1,,tbl2";
+        Assertions.assertTrue(sv.isMorValuePredicatePushdownEnabled("db1", 
"tbl1"));
+        Assertions.assertTrue(sv.isMorValuePredicatePushdownEnabled("db1", 
"tbl2"));
+
+        // ctl.db.table format — matches on db and table components
+        sv.enableMorValuePredicatePushdownTables = "ctl1.db1.tbl1";
+        Assertions.assertTrue(sv.isMorValuePredicatePushdownEnabled("db1", 
"tbl1"));
+        Assertions.assertFalse(sv.isMorValuePredicatePushdownEnabled("db2", 
"tbl1"));
+    }
 }
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index afae4dbc255..64ad52267b3 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -897,6 +897,10 @@ struct TOlapScanNode {
   21: optional TSortInfo ann_sort_info
   22: optional i64 ann_sort_limit
   23: optional TScoreRangeInfo score_range_info
+  // Enable value predicate pushdown for MOR tables
+  24: optional bool enable_mor_value_predicate_pushdown
+  // Read MOR table as DUP table: skip merge, skip delete sign
+  25: optional bool read_mor_as_dup
 }
 
 struct TEqJoinCondition {
diff --git 
a/regression-test/data/data_model_p0/unique/test_mor_value_predicate_pushdown.out
 
b/regression-test/data/data_model_p0/unique/test_mor_value_predicate_pushdown.out
new file mode 100644
index 00000000000..3ab8254a635
--- /dev/null
+++ 
b/regression-test/data/data_model_p0/unique/test_mor_value_predicate_pushdown.out
@@ -0,0 +1,76 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !select_disabled --
+2      200     world
+3      300     test
+
+-- !select_enabled_tablename --
+2      200     world
+3      300     test
+
+-- !select_enabled_fullname --
+2      200     world
+3      300     test
+
+-- !select_enabled_wildcard --
+2      200     world
+3      300     test
+
+-- !select_eq_predicate --
+2      200     world
+
+-- !select_not_in_list --
+2      200     world
+3      300     test
+
+-- !select_dedup_all --
+1      100     first
+2      300     third
+3      500     fifth
+
+-- !select_dedup_eq --
+2      300     third
+
+-- !select_dedup_none --
+
+-- !select_delete_range --
+3      300     test
+
+-- !select_delete_eq --
+
+-- !select_delete_all --
+1      100     hello
+3      300     test
+
+-- !select_delpred_range --
+3      300     test
+
+-- !select_delpred_eq --
+
+-- !select_delpred_all --
+1      100     hello
+3      300     test
+
+-- !select_update_disabled_old --
+
+-- !select_update_disabled_new --
+1      500     new
+
+-- !select_update_enabled_old --
+1      100     old
+
+-- !select_update_enabled_new --
+1      500     new
+
+-- !select_update_enabled_range --
+1      500     new
+3      300     keep
+
+-- !select_multiple_tables --
+2      200
+
+-- !select_mow_table --
+2      200
+
+-- !select_dup_table --
+2      200
+
diff --git 
a/regression-test/suites/data_model_p0/unique/test_mor_value_predicate_pushdown.groovy
 
b/regression-test/suites/data_model_p0/unique/test_mor_value_predicate_pushdown.groovy
new file mode 100644
index 00000000000..b82aba0f712
--- /dev/null
+++ 
b/regression-test/suites/data_model_p0/unique/test_mor_value_predicate_pushdown.groovy
@@ -0,0 +1,366 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_mor_value_predicate_pushdown") {
+    def tbName = "test_mor_value_pred_pushdown"
+    def dbName = context.config.getDbNameByFile(context.file)
+
+    // Test 1: Basic MOR table with value predicate pushdown (dedup-only 
scenario)
+    // This feature is designed for insert-only/dedup-only workloads where
+    // the same key always has identical values across rowsets.
+    sql "DROP TABLE IF EXISTS ${tbName}"
+    sql """
+        CREATE TABLE IF NOT EXISTS ${tbName} (
+            k1 INT,
+            v1 INT,
+            v2 VARCHAR(100)
+        )
+        UNIQUE KEY(k1)
+        DISTRIBUTED BY HASH(k1) BUCKETS 3
+        PROPERTIES (
+            "replication_num" = "1",
+            "enable_unique_key_merge_on_write" = "false",
+            "disable_auto_compaction" = "true"
+        );
+    """
+
+    // Insert test data across separate rowsets (dedup-only: same key has same 
values)
+    sql "INSERT INTO ${tbName} VALUES (1, 100, 'hello'), (2, 200, 'world'), 
(3, 300, 'test')"
+    // Re-insert duplicates to create overlapping rowsets for dedup
+    sql "INSERT INTO ${tbName} VALUES (1, 100, 'hello'), (2, 200, 'world')"
+
+    // Test: pushdown disabled (default)
+    sql "SET enable_mor_value_predicate_pushdown_tables = ''"
+
+    // Verify session variable is set correctly
+    def result = sql "SHOW VARIABLES LIKE 
'enable_mor_value_predicate_pushdown_tables'"
+    assertTrue(result.size() > 0)
+    assertTrue(result[0][1] == "")
+
+    // Query with value predicate - should return correct results
+    qt_select_disabled """
+        SELECT * FROM ${tbName} WHERE v1 > 150 ORDER BY k1
+    """
+
+    // Test: enable for specific table (just table name)
+    sql "SET enable_mor_value_predicate_pushdown_tables = '${tbName}'"
+
+    result = sql "SHOW VARIABLES LIKE 
'enable_mor_value_predicate_pushdown_tables'"
+    assertTrue(result[0][1] == tbName)
+
+    qt_select_enabled_tablename """
+        SELECT * FROM ${tbName} WHERE v1 > 150 ORDER BY k1
+    """
+
+    // Test: enable for specific table with db prefix
+    sql "SET enable_mor_value_predicate_pushdown_tables = 
'${dbName}.${tbName}'"
+
+    qt_select_enabled_fullname """
+        SELECT * FROM ${tbName} WHERE v1 > 150 ORDER BY k1
+    """
+
+    // Test: enable for all MOR tables with '*'
+    sql "SET enable_mor_value_predicate_pushdown_tables = '*'"
+
+    result = sql "SHOW VARIABLES LIKE 
'enable_mor_value_predicate_pushdown_tables'"
+    assertTrue(result[0][1] == "*")
+
+    qt_select_enabled_wildcard """
+        SELECT * FROM ${tbName} WHERE v1 > 150 ORDER BY k1
+    """
+
+    // Test: equality predicate on value column with pushdown
+    qt_select_eq_predicate """
+        SELECT * FROM ${tbName} WHERE v1 = 200 ORDER BY k1
+    """
+
+    // Test: table not in list - pushdown should be disabled
+    sql "SET enable_mor_value_predicate_pushdown_tables = 'other_table'"
+
+    qt_select_not_in_list """
+        SELECT * FROM ${tbName} WHERE v1 > 150 ORDER BY k1
+    """
+
+    // Test 2: Multiple rowsets with dedup-only pattern
+    // Verify correctness when same keys appear across many rowsets with 
identical values
+    sql "DROP TABLE IF EXISTS ${tbName}"
+    sql """
+        CREATE TABLE IF NOT EXISTS ${tbName} (
+            k1 INT,
+            v1 INT,
+            v2 VARCHAR(100)
+        )
+        UNIQUE KEY(k1)
+        DISTRIBUTED BY HASH(k1) BUCKETS 1
+        PROPERTIES (
+            "replication_num" = "1",
+            "enable_unique_key_merge_on_write" = "false",
+            "disable_auto_compaction" = "true"
+        );
+    """
+
+    // Create multiple overlapping rowsets (dedup-only: all versions have same 
values)
+    sql "INSERT INTO ${tbName} VALUES (1, 100, 'first'), (2, 300, 'third')"
+    sql "INSERT INTO ${tbName} VALUES (1, 100, 'first')"
+    sql "INSERT INTO ${tbName} VALUES (2, 300, 'third'), (3, 500, 'fifth')"
+
+    // Test with pushdown enabled
+    sql "SET enable_mor_value_predicate_pushdown_tables = '*'"
+
+    // Should return all rows matching predicate after dedup
+    qt_select_dedup_all """
+        SELECT * FROM ${tbName} WHERE v1 >= 100 ORDER BY k1
+    """
+
+    // Equality match on a value that exists
+    qt_select_dedup_eq """
+        SELECT * FROM ${tbName} WHERE v1 = 300 ORDER BY k1
+    """
+
+    // Predicate that matches no rows
+    qt_select_dedup_none """
+        SELECT * FROM ${tbName} WHERE v1 = 999 ORDER BY k1
+    """
+
+    // Test 3: Dedup + delete scenario
+    // Value columns are identical across rowsets (dedup-only), but some rows 
are deleted.
+    // Verifies that __DORIS_DELETE_SIGN__ is NOT pushed per-segment so 
deletions are honored.
+    sql "DROP TABLE IF EXISTS ${tbName}"
+    sql """
+        CREATE TABLE IF NOT EXISTS ${tbName} (
+            k1 INT,
+            v1 INT,
+            v2 VARCHAR(100)
+        )
+        UNIQUE KEY(k1)
+        DISTRIBUTED BY HASH(k1) BUCKETS 1
+        PROPERTIES (
+            "replication_num" = "1",
+            "enable_unique_key_merge_on_write" = "false",
+            "disable_auto_compaction" = "true"
+        );
+    """
+
+    // Rowset 1: initial data
+    sql "INSERT INTO ${tbName} VALUES (1, 100, 'hello'), (2, 200, 'world'), 
(3, 300, 'test')"
+    // Rowset 2: dedup insert (same key, same values)
+    sql "INSERT INTO ${tbName} VALUES (1, 100, 'hello'), (2, 200, 'world')"
+    // Rowset 3: delete k1=2 via insert with __DORIS_DELETE_SIGN__=1
+    // Value columns are identical (dedup-only), only delete sign differs
+    sql "INSERT INTO ${tbName}(k1, v1, v2, __DORIS_DELETE_SIGN__) VALUES (2, 
200, 'world', 1)"
+
+    sql "SET enable_mor_value_predicate_pushdown_tables = '*'"
+
+    // Deleted row must not appear even though v1=200 matches the predicate
+    qt_select_delete_range """
+        SELECT * FROM ${tbName} WHERE v1 > 150 ORDER BY k1
+    """
+
+    // Equality on deleted row's value — should return empty
+    qt_select_delete_eq """
+        SELECT * FROM ${tbName} WHERE v1 = 200 ORDER BY k1
+    """
+
+    // Broader predicate — deleted row still excluded
+    qt_select_delete_all """
+        SELECT * FROM ${tbName} WHERE v1 >= 100 ORDER BY k1
+    """
+
+    // Test 4: Dedup + delete predicate scenario
+    // DELETE FROM creates a delete predicate stored in rowset metadata.
+    // Delete predicates go through DeleteHandler, separate from value 
predicates.
+    // Verify they work correctly alongside value predicate pushdown.
+    sql "DROP TABLE IF EXISTS ${tbName}"
+    sql """
+        CREATE TABLE IF NOT EXISTS ${tbName} (
+            k1 INT,
+            v1 INT,
+            v2 VARCHAR(100)
+        )
+        UNIQUE KEY(k1)
+        DISTRIBUTED BY HASH(k1) BUCKETS 1
+        PROPERTIES (
+            "replication_num" = "1",
+            "enable_unique_key_merge_on_write" = "false",
+            "disable_auto_compaction" = "true"
+        );
+    """
+
+    // Rowset 1: initial data
+    sql "INSERT INTO ${tbName} VALUES (1, 100, 'hello'), (2, 200, 'world'), 
(3, 300, 'test')"
+    // Rowset 2: dedup insert (same key, same values)
+    sql "INSERT INTO ${tbName} VALUES (1, 100, 'hello'), (2, 200, 'world')"
+    // Delete predicate rowset: DELETE FROM creates a predicate, not row 
markers
+    sql "DELETE FROM ${tbName} WHERE k1 = 2"
+
+    sql "SET enable_mor_value_predicate_pushdown_tables = '*'"
+
+    // Deleted row must not appear
+    qt_select_delpred_range """
+        SELECT * FROM ${tbName} WHERE v1 > 150 ORDER BY k1
+    """
+
+    // Equality on deleted row's value — should return empty
+    qt_select_delpred_eq """
+        SELECT * FROM ${tbName} WHERE v1 = 200 ORDER BY k1
+    """
+
+    // Broader predicate — deleted row still excluded
+    qt_select_delpred_all """
+        SELECT * FROM ${tbName} WHERE v1 >= 100 ORDER BY k1
+    """
+
+    // Test 5: Update scenario — proves pushdown is active at storage layer
+    // k1=1 is updated from v1=100 to v1=500 across two rowsets.
+    // Comparing results with pushdown disabled vs enabled for the SAME query
+    // proves per-segment filtering is happening:
+    //   disabled: merge picks latest (v1=500), VExpr filters → empty
+    //   enabled:  rowset 2 filtered per-segment (v1=500≠100), old version 
survives
+    //             merge sees only old version, VExpr passes → stale 
(1,100,'old')
+    sql "DROP TABLE IF EXISTS ${tbName}"
+    sql """
+        CREATE TABLE IF NOT EXISTS ${tbName} (
+            k1 INT,
+            v1 INT,
+            v2 VARCHAR(100)
+        )
+        UNIQUE KEY(k1)
+        DISTRIBUTED BY HASH(k1) BUCKETS 1
+        PROPERTIES (
+            "replication_num" = "1",
+            "enable_unique_key_merge_on_write" = "false",
+            "disable_auto_compaction" = "true"
+        );
+    """
+
+    // Rowset 1: initial data
+    sql "INSERT INTO ${tbName} VALUES (1, 100, 'old'), (2, 200, 'keep'), (3, 
300, 'keep')"
+    // Rowset 2: update k1=1 from v1=100 to v1=500
+    sql "INSERT INTO ${tbName} VALUES (1, 500, 'new')"
+
+    // --- Pushdown disabled: correct merge-then-filter behavior ---
+    sql "SET enable_mor_value_predicate_pushdown_tables = ''"
+
+    // v1=100 does not match latest version (v1=500) → empty
+    qt_select_update_disabled_old """
+        SELECT * FROM ${tbName} WHERE v1 = 100 ORDER BY k1
+    """
+
+    // v1=500 matches latest → returns updated row
+    qt_select_update_disabled_new """
+        SELECT * FROM ${tbName} WHERE v1 = 500 ORDER BY k1
+    """
+
+    // --- Pushdown enabled: per-segment filtering observable ---
+    sql "SET enable_mor_value_predicate_pushdown_tables = '*'"
+
+    // v1=100: rowset 2 (v1=500) filtered per-segment, old version survives.
+    // Returns stale data — this proves pushdown is filtering at storage layer.
+    qt_select_update_enabled_old """
+        SELECT * FROM ${tbName} WHERE v1 = 100 ORDER BY k1
+    """
+
+    // v1=500: rowset 1 (v1=100) filtered per-segment, new version passes → 
correct
+    qt_select_update_enabled_new """
+        SELECT * FROM ${tbName} WHERE v1 = 500 ORDER BY k1
+    """
+
+    // v1 > 200: old v1=100 filtered, new v1=500 passes, k1=3 v1=300 passes → 
correct
+    qt_select_update_enabled_range """
+        SELECT * FROM ${tbName} WHERE v1 > 200 ORDER BY k1
+    """
+
+    // Test 6: Multiple tables in the list
+    def tbName2 = "test_mor_value_pred_pushdown_2"
+    sql "DROP TABLE IF EXISTS ${tbName2}"
+    sql """
+        CREATE TABLE IF NOT EXISTS ${tbName2} (
+            k1 INT,
+            v1 INT
+        )
+        UNIQUE KEY(k1)
+        DISTRIBUTED BY HASH(k1) BUCKETS 1
+        PROPERTIES (
+            "replication_num" = "1",
+            "enable_unique_key_merge_on_write" = "false"
+        );
+    """
+
+    sql "INSERT INTO ${tbName2} VALUES (1, 100), (2, 200)"
+
+    sql "SET enable_mor_value_predicate_pushdown_tables = 
'${tbName},${tbName2}'"
+
+    qt_select_multiple_tables """
+        SELECT * FROM ${tbName2} WHERE v1 > 100 ORDER BY k1
+    """
+
+    // Test 7: Non-MOR table (MOW) - value predicates should always be pushed 
down
+    // The session variable should have no effect on MOW tables
+    def tbNameMow = "test_mow_value_pred"
+    sql "DROP TABLE IF EXISTS ${tbNameMow}"
+    sql """
+        CREATE TABLE IF NOT EXISTS ${tbNameMow} (
+            k1 INT,
+            v1 INT
+        )
+        UNIQUE KEY(k1)
+        DISTRIBUTED BY HASH(k1) BUCKETS 1
+        PROPERTIES (
+            "replication_num" = "1",
+            "enable_unique_key_merge_on_write" = "true"
+        );
+    """
+
+    sql "INSERT INTO ${tbNameMow} VALUES (1, 100), (2, 200)"
+
+    // MOW tables always push down value predicates regardless of setting
+    sql "SET enable_mor_value_predicate_pushdown_tables = ''"
+
+    qt_select_mow_table """
+        SELECT * FROM ${tbNameMow} WHERE v1 > 100 ORDER BY k1
+    """
+
+    // Test 8: DUP_KEYS table - value predicates should always be pushed down
+    // The session variable should have no effect on DUP_KEYS tables
+    def tbNameDup = "test_dup_value_pred"
+    sql "DROP TABLE IF EXISTS ${tbNameDup}"
+    sql """
+        CREATE TABLE IF NOT EXISTS ${tbNameDup} (
+            k1 INT,
+            v1 INT
+        )
+        DUPLICATE KEY(k1)
+        DISTRIBUTED BY HASH(k1) BUCKETS 1
+        PROPERTIES (
+            "replication_num" = "1"
+        );
+    """
+
+    sql "INSERT INTO ${tbNameDup} VALUES (1, 100), (2, 200)"
+
+    // DUP_KEYS tables always push down value predicates regardless of setting
+    qt_select_dup_table """
+        SELECT * FROM ${tbNameDup} WHERE v1 > 100 ORDER BY k1
+    """
+
+    // Cleanup
+    sql "SET enable_mor_value_predicate_pushdown_tables = ''"
+    sql "DROP TABLE IF EXISTS ${tbName}"
+    sql "DROP TABLE IF EXISTS ${tbName2}"
+    sql "DROP TABLE IF EXISTS ${tbNameMow}"
+    sql "DROP TABLE IF EXISTS ${tbNameDup}"
+}
diff --git 
a/regression-test/suites/unique_with_mor_p0/test_read_mor_as_dup.groovy 
b/regression-test/suites/unique_with_mor_p0/test_read_mor_as_dup.groovy
new file mode 100644
index 00000000000..aada4292ff9
--- /dev/null
+++ b/regression-test/suites/unique_with_mor_p0/test_read_mor_as_dup.groovy
@@ -0,0 +1,119 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_read_mor_as_dup") {
+    def tableName = "test_read_mor_as_dup_tbl"
+    def tableName2 = "test_read_mor_as_dup_tbl2"
+
+    sql "DROP TABLE IF EXISTS ${tableName}"
+    sql "DROP TABLE IF EXISTS ${tableName2}"
+
+    // Create a MOR (Merge-On-Read) UNIQUE table
+    sql """
+        CREATE TABLE ${tableName} (
+            `k` int NOT NULL,
+            `v1` int NULL,
+            `v2` varchar(100) NULL
+        ) ENGINE=OLAP
+        UNIQUE KEY(`k`)
+        DISTRIBUTED BY HASH(`k`) BUCKETS 1
+        PROPERTIES (
+            "enable_unique_key_merge_on_write" = "false",
+            "disable_auto_compaction" = "true",
+            "replication_num" = "1"
+        );
+    """
+
+    // Create a second MOR table for per-table control test
+    sql """
+        CREATE TABLE ${tableName2} (
+            `k` int NOT NULL,
+            `v1` int NULL
+        ) ENGINE=OLAP
+        UNIQUE KEY(`k`)
+        DISTRIBUTED BY HASH(`k`) BUCKETS 1
+        PROPERTIES (
+            "enable_unique_key_merge_on_write" = "false",
+            "disable_auto_compaction" = "true",
+            "replication_num" = "1"
+        );
+    """
+
+    // Insert multiple versions of the same key
+    sql "INSERT INTO ${tableName} VALUES (1, 10, 'first');"
+    sql "INSERT INTO ${tableName} VALUES (1, 20, 'second');"
+    sql "INSERT INTO ${tableName} VALUES (1, 30, 'third');"
+    sql "INSERT INTO ${tableName} VALUES (2, 100, 'only_version');"
+
+    // Insert a row and then delete it
+    sql "INSERT INTO ${tableName} VALUES (3, 50, 'to_be_deleted');"
+    sql "DELETE FROM ${tableName} WHERE k = 3;"
+
+    // Insert into second table
+    sql "INSERT INTO ${tableName2} VALUES (1, 10);"
+    sql "INSERT INTO ${tableName2} VALUES (1, 20);"
+
+    // === Test 1: Normal query returns merged result ===
+    sql "SET read_mor_as_dup_tables = '';"
+    def normalResult = sql "SELECT * FROM ${tableName} ORDER BY k;"
+    // Should see only latest version per key: (1,30,'third'), 
(2,100,'only_version')
+    // Key 3 was deleted, should not appear
+    assertTrue(normalResult.size() == 2, "Normal query should return 2 rows 
(merged), got ${normalResult.size()}")
+
+    // === Test 2: Wildcard — read all MOR tables as DUP ===
+    sql "SET read_mor_as_dup_tables = '*';"
+    def dupResult = sql "SELECT * FROM ${tableName} ORDER BY k, v1;"
+    // Should see all row versions for key 1 (3 versions) + key 2 (1 version)
+    // Key 3: delete predicates are still applied, so deleted row should still 
be filtered
+    // But the delete sign filter is skipped, so we may see it depending on 
how delete was done.
+    // For MOR tables, DELETE adds a delete predicate in rowsets, which IS 
still honored.
+    assertTrue(dupResult.size() >= 4, "DUP mode should return at least 4 rows 
(all versions), got ${dupResult.size()}")
+
+    // Verify key 1 has multiple versions
+    def key1Rows = dupResult.findAll { it[0] == 1 }
+    assertTrue(key1Rows.size() == 3, "Key 1 should have 3 versions in DUP 
mode, got ${key1Rows.size()}")
+
+    // === Test 3: Per-table control with db.table format ===
+    def dbName = sql "SELECT DATABASE();"
+    def currentDb = dbName[0][0]
+
+    sql "SET read_mor_as_dup_tables = '${currentDb}.${tableName}';"
+
+    // tableName should be in DUP mode
+    def perTableResult = sql "SELECT * FROM ${tableName} ORDER BY k, v1;"
+    assertTrue(perTableResult.size() >= 4, "Per-table DUP mode should return 
all versions, got ${perTableResult.size()}")
+
+    // tableName2 should still be in normal (merged) mode
+    def table2Result = sql "SELECT * FROM ${tableName2} ORDER BY k;"
+    assertTrue(table2Result.size() == 1, "Table2 should still be merged (1 
row), got ${table2Result.size()}")
+
+    // === Test 4: Per-table control with just table name ===
+    sql "SET read_mor_as_dup_tables = '${tableName2}';"
+
+    // tableName should now be in normal mode
+    def revertedResult = sql "SELECT * FROM ${tableName} ORDER BY k;"
+    assertTrue(revertedResult.size() == 2, "Table1 should be merged again (2 
rows), got ${revertedResult.size()}")
+
+    // tableName2 should be in DUP mode
+    def table2DupResult = sql "SELECT * FROM ${tableName2} ORDER BY k, v1;"
+    assertTrue(table2DupResult.size() == 2, "Table2 in DUP mode should return 
2 rows, got ${table2DupResult.size()}")
+
+    // === Cleanup ===
+    sql "SET read_mor_as_dup_tables = '';"
+    sql "DROP TABLE IF EXISTS ${tableName}"
+    sql "DROP TABLE IF EXISTS ${tableName2}"
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to