This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 77d544bec45 [feature](scan) Add value predicate pushdown control for
MOR tables (#60513)
77d544bec45 is described below
commit 77d544bec45dfbdf8b6fd279b84799cd00135050
Author: Yongqiang YANG <[email protected]>
AuthorDate: Mon Mar 16 00:58:57 2026 -0700
[feature](scan) Add value predicate pushdown control for MOR tables (#60513)
## Summary
- Add session variable `enable_mor_value_predicate_pushdown_tables` to
control value column predicate pushdown for MOR (Merge-On-Read) tables
- FE: session variable with table-list matching (`db.table`, `table`, or
`*`), thrift flag on TOlapScanNode, `isMorTable()` helper on OlapTable
- BE: propagate flag through scan operator → tablet reader → rowset
reader; extend `_should_push_down_value_predicates()` for all rowsets;
skip `__DORIS_DELETE_SIGN__` to preserve delete correctness; keep VExpr
in conjuncts as post-merge safety net
## Motivation
MOR tables with inverted indexes on value columns cannot utilize those
indexes for filtering because value predicates are not pushed to the
storage layer. This feature enables per-segment value predicate pushdown
for dedup-only/insert-only MOR workloads where the same key always
carries identical values across rowsets, allowing inverted indexes and
zone maps to filter data early.
## Design
Two-layer predicate approach:
1. **ColumnPredicate** (storage layer): pushed per-segment for early
filtering via inverted index, zone map, bloom filter
2. **VExpr** (compute layer): retained in `_conjuncts` as post-merge
safety net
Delete sign handling: `__DORIS_DELETE_SIGN__` predicate is excluded from
per-segment pushdown to prevent delete markers from being filtered
before merge, which would cause deleted rows to reappear.
## Test plan
- [x] Test 1: Basic session variable matching (table name, db.table,
wildcard, not-in-list)
- [x] Test 2: Dedup-only pattern with multiple overlapping rowsets
- [x] Test 3: Dedup + row-level delete via `INSERT INTO (...,
__DORIS_DELETE_SIGN__) VALUES (..., 1)`
- [x] Test 4: Dedup + delete predicate via `DELETE FROM ... WHERE`
- [x] Test 5: Multiple tables in session variable list
- [x] Test 6: MOW table (unaffected by setting)
- [x] Test 7: DUP_KEYS table (unaffected by setting)
---------
Co-authored-by: Claude Opus 4.6 <[email protected]>
---
be/src/exec/operator/olap_scan_operator.cpp | 8 +-
be/src/exec/operator/olap_scan_operator.h | 1 +
be/src/exec/operator/scan_operator.cpp | 2 +-
be/src/exec/scan/olap_scanner.cpp | 12 +-
be/src/storage/rowset/beta_rowset_reader.cpp | 3 +-
be/src/storage/rowset/rowset_reader_context.h | 3 +
be/src/storage/tablet/tablet_reader.cpp | 10 +
be/src/storage/tablet/tablet_reader.h | 3 +
.../java/org/apache/doris/catalog/OlapTable.java | 8 +
.../doris/nereids/rules/analysis/BindRelation.java | 5 +-
.../nereids/rules/rewrite/SetPreAggStatus.java | 9 +-
.../trees/plans/logical/LogicalOlapScan.java | 7 +
.../org/apache/doris/planner/OlapScanNode.java | 13 +
.../java/org/apache/doris/qe/SessionVariable.java | 69 ++++
.../nereids/rules/analysis/ReadMorAsDupTest.java | 280 ++++++++++++++++
.../org/apache/doris/qe/SessionVariablesTest.java | 55 ++++
gensrc/thrift/PlanNodes.thrift | 4 +
.../unique/test_mor_value_predicate_pushdown.out | 76 +++++
.../test_mor_value_predicate_pushdown.groovy | 366 +++++++++++++++++++++
.../unique_with_mor_p0/test_read_mor_as_dup.groovy | 119 +++++++
20 files changed, 1047 insertions(+), 6 deletions(-)
diff --git a/be/src/exec/operator/olap_scan_operator.cpp
b/be/src/exec/operator/olap_scan_operator.cpp
index 60a53e9c67f..53c17626d95 100644
--- a/be/src/exec/operator/olap_scan_operator.cpp
+++ b/be/src/exec/operator/olap_scan_operator.cpp
@@ -454,7 +454,13 @@ bool OlapScanLocalState::_storage_no_merge() {
return (p._olap_scan_node.keyType == TKeysType::DUP_KEYS ||
(p._olap_scan_node.keyType == TKeysType::UNIQUE_KEYS &&
p._olap_scan_node.__isset.enable_unique_key_merge_on_write &&
- p._olap_scan_node.enable_unique_key_merge_on_write));
+ p._olap_scan_node.enable_unique_key_merge_on_write)) ||
+ _read_mor_as_dup();
+}
+
+bool OlapScanLocalState::_read_mor_as_dup() {
+ auto& p = _parent->cast<OlapScanOperatorX>();
+ return p._olap_scan_node.__isset.read_mor_as_dup &&
p._olap_scan_node.read_mor_as_dup;
}
Status OlapScanLocalState::_init_scanners(std::list<ScannerSPtr>* scanners) {
diff --git a/be/src/exec/operator/olap_scan_operator.h
b/be/src/exec/operator/olap_scan_operator.h
index 27414c6b25b..ae010479840 100644
--- a/be/src/exec/operator/olap_scan_operator.h
+++ b/be/src/exec/operator/olap_scan_operator.h
@@ -103,6 +103,7 @@ private:
bool _storage_no_merge() override;
+ bool _read_mor_as_dup();
bool _push_down_topn(const RuntimePredicate& predicate) override {
if (!predicate.target_is_slot(_parent->node_id())) {
return false;
diff --git a/be/src/exec/operator/scan_operator.cpp
b/be/src/exec/operator/scan_operator.cpp
index ae41f3303b0..b2ac312ffda 100644
--- a/be/src/exec/operator/scan_operator.cpp
+++ b/be/src/exec/operator/scan_operator.cpp
@@ -443,7 +443,7 @@ Status
ScanLocalState<Derived>::_normalize_predicate(VExprContext* context, cons
return Status::OK();
}
- if (pdt == PushDownType::ACCEPTABLE && (_is_key_column(slot->col_name())))
{
+ if (pdt == PushDownType::ACCEPTABLE && _is_key_column(slot->col_name())) {
output_expr = nullptr;
return Status::OK();
} else {
diff --git a/be/src/exec/scan/olap_scanner.cpp
b/be/src/exec/scan/olap_scanner.cpp
index 4fb031912cb..aa6af4b7159 100644
--- a/be/src/exec/scan/olap_scanner.cpp
+++ b/be/src/exec/scan/olap_scanner.cpp
@@ -333,7 +333,10 @@ Status OlapScanner::_init_tablet_reader_params(
// if the table with rowset [0-x] or [0-1] [2-y], and [0-1] is empty
const bool single_version = _tablet_reader_params.has_single_version();
- if (_state->skip_storage_engine_merge()) {
+ auto* olap_local_state = static_cast<OlapScanLocalState*>(_local_state);
+ bool read_mor_as_dup =
olap_local_state->olap_scan_node().__isset.read_mor_as_dup &&
+ olap_local_state->olap_scan_node().read_mor_as_dup;
+ if (_state->skip_storage_engine_merge() || read_mor_as_dup) {
_tablet_reader_params.direct_mode = true;
_tablet_reader_params.aggregation = true;
} else {
@@ -475,6 +478,13 @@ Status OlapScanner::_init_tablet_reader_params(
if (!_state->skip_storage_engine_merge()) {
auto* olap_scan_local_state = (OlapScanLocalState*)_local_state;
TOlapScanNode& olap_scan_node =
olap_scan_local_state->olap_scan_node();
+
+ // Set MOR value predicate pushdown flag
+ if (olap_scan_node.__isset.enable_mor_value_predicate_pushdown &&
+ olap_scan_node.enable_mor_value_predicate_pushdown) {
+ _tablet_reader_params.enable_mor_value_predicate_pushdown = true;
+ }
+
// order by table keys optimization for topn
// will only read head/tail of data file since it's already sorted by
keys
if (olap_scan_node.__isset.sort_info &&
!olap_scan_node.sort_info.is_asc_order.empty()) {
diff --git a/be/src/storage/rowset/beta_rowset_reader.cpp
b/be/src/storage/rowset/beta_rowset_reader.cpp
index f1126391544..2869659ed12 100644
--- a/be/src/storage/rowset/beta_rowset_reader.cpp
+++ b/be/src/storage/rowset/beta_rowset_reader.cpp
@@ -360,7 +360,8 @@ bool BetaRowsetReader::_should_push_down_value_predicates()
const {
(((_rowset->start_version() == 0 || _rowset->start_version() == 2)
&&
!_rowset->_rowset_meta->is_segments_overlapping() &&
_read_context->sequence_id_idx == -1) ||
- _read_context->enable_unique_key_merge_on_write);
+ _read_context->enable_unique_key_merge_on_write ||
+ _read_context->enable_mor_value_predicate_pushdown);
}
#include "common/compile_check_end.h"
} // namespace doris
diff --git a/be/src/storage/rowset/rowset_reader_context.h
b/be/src/storage/rowset/rowset_reader_context.h
index 5855b113a93..e44733367c8 100644
--- a/be/src/storage/rowset/rowset_reader_context.h
+++ b/be/src/storage/rowset/rowset_reader_context.h
@@ -100,6 +100,9 @@ struct RowsetReaderContext {
std::shared_ptr<segment_v2::AnnTopNRuntime> ann_topn_runtime;
uint64_t condition_cache_digest = 0;
+
+ // When true, push down value predicates for MOR tables
+ bool enable_mor_value_predicate_pushdown = false;
};
} // namespace doris
diff --git a/be/src/storage/tablet/tablet_reader.cpp
b/be/src/storage/tablet/tablet_reader.cpp
index 35568743212..b241d621d05 100644
--- a/be/src/storage/tablet/tablet_reader.cpp
+++ b/be/src/storage/tablet/tablet_reader.cpp
@@ -194,6 +194,8 @@ Status TabletReader::_capture_rs_readers(const
ReaderParams& read_params) {
_reader_context.merged_rows = &_merged_rows;
_reader_context.delete_bitmap = read_params.delete_bitmap;
_reader_context.enable_unique_key_merge_on_write =
tablet()->enable_unique_key_merge_on_write();
+ _reader_context.enable_mor_value_predicate_pushdown =
+ read_params.enable_mor_value_predicate_pushdown;
_reader_context.record_rowids = read_params.record_rowids;
_reader_context.rowid_conversion = read_params.rowid_conversion;
_reader_context.is_key_column_group = read_params.is_key_column_group;
@@ -500,9 +502,17 @@ Status TabletReader::_init_conditions_param(const
ReaderParams& read_params) {
}
}
+ int32_t delete_sign_idx = _tablet_schema->delete_sign_idx();
for (auto predicate : predicates) {
auto column = _tablet_schema->column(predicate->column_id());
if (column.aggregation() !=
FieldAggregationMethod::OLAP_FIELD_AGGREGATION_NONE) {
+ // When MOR value predicate pushdown is enabled, drop
__DORIS_DELETE_SIGN__
+ // from storage-layer predicates entirely. Delete sign must only
be evaluated
+ // post-merge via VExpr to prevent deleted rows from reappearing.
+ if (read_params.enable_mor_value_predicate_pushdown &&
delete_sign_idx >= 0 &&
+ predicate->column_id() ==
static_cast<uint32_t>(delete_sign_idx)) {
+ continue;
+ }
_value_col_predicates.push_back(predicate);
} else {
_col_predicates.push_back(predicate);
diff --git a/be/src/storage/tablet/tablet_reader.h
b/be/src/storage/tablet/tablet_reader.h
index 8616d8eeac0..e4842086cc8 100644
--- a/be/src/storage/tablet/tablet_reader.h
+++ b/be/src/storage/tablet/tablet_reader.h
@@ -187,6 +187,9 @@ public:
bool is_segcompaction = false;
+ // Enable value predicate pushdown for MOR tables
+ bool enable_mor_value_predicate_pushdown = false;
+
std::vector<RowwiseIteratorUPtr>* segment_iters_ptr = nullptr;
void check_validation() const;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
index 5199ddbe4fa..b0200c707fc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
@@ -3073,6 +3073,14 @@ public class OlapTable extends Table implements
MTMVRelatedTableIf, GsonPostProc
return getKeysType() == KeysType.UNIQUE_KEYS &&
getEnableUniqueKeyMergeOnWrite();
}
+ /**
+ * Check if this is a MOR (Merge-On-Read) table.
+ * MOR = UNIQUE_KEYS without merge-on-write enabled.
+ */
+ public boolean isMorTable() {
+ return getKeysType() == KeysType.UNIQUE_KEYS &&
!getEnableUniqueKeyMergeOnWrite();
+ }
+
public boolean isUniqKeyMergeOnWriteWithClusterKeys() {
return isUniqKeyMergeOnWrite() &&
getBaseSchema().stream().anyMatch(Column::isClusterKey);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java
index 2d11e1c75db..81a54f8c67e 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java
@@ -371,7 +371,10 @@ public class BindRelation extends OneAnalysisRuleFactory {
public static LogicalPlan checkAndAddDeleteSignFilter(LogicalOlapScan
scan, ConnectContext connectContext,
OlapTable olapTable) {
if (!Util.showHiddenColumns() && scan.getTable().hasDeleteSign()
- && !connectContext.getSessionVariable().skipDeleteSign()) {
+ && !connectContext.getSessionVariable().skipDeleteSign()
+ && !(olapTable.isMorTable()
+ &&
connectContext.getSessionVariable().isReadMorAsDupEnabled(
+ olapTable.getQualifiedDbName(), olapTable.getName())))
{
// table qualifier is catalog.db.table, we make db.table.column
Slot deleteSlot = null;
for (Slot slot : scan.getOutput()) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/SetPreAggStatus.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/SetPreAggStatus.java
index 2f7743cb86f..7ae49fc6ec8 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/SetPreAggStatus.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/SetPreAggStatus.java
@@ -53,6 +53,7 @@ import
org.apache.doris.nereids.trees.plans.logical.LogicalRepeat;
import org.apache.doris.nereids.trees.plans.visitor.CustomRewriter;
import org.apache.doris.nereids.trees.plans.visitor.DefaultPlanRewriter;
import org.apache.doris.nereids.util.ExpressionUtils;
+import org.apache.doris.qe.ConnectContext;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
@@ -151,7 +152,13 @@ public class SetPreAggStatus extends
DefaultPlanRewriter<Stack<SetPreAggStatus.P
long selectIndexId = logicalOlapScan.getSelectedIndexId();
MaterializedIndexMeta meta =
logicalOlapScan.getTable().getIndexMetaByIndexId(selectIndexId);
if (meta.getKeysType() == KeysType.DUP_KEYS || (meta.getKeysType()
== KeysType.UNIQUE_KEYS
- &&
logicalOlapScan.getTable().getEnableUniqueKeyMergeOnWrite())) {
+ &&
logicalOlapScan.getTable().getEnableUniqueKeyMergeOnWrite())
+ || (meta.getKeysType() == KeysType.UNIQUE_KEYS
+ && logicalOlapScan.getTable().isMorTable()
+ && ConnectContext.get() != null
+ &&
ConnectContext.get().getSessionVariable().isReadMorAsDupEnabled(
+ logicalOlapScan.getTable().getQualifiedDbName(),
+ logicalOlapScan.getTable().getName()))) {
return logicalOlapScan.withPreAggStatus(PreAggStatus.on());
} else {
if (context.empty()) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalOlapScan.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalOlapScan.java
index 7f17e19e1f5..9a6f8262414 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalOlapScan.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalOlapScan.java
@@ -795,6 +795,13 @@ public class LogicalOlapScan extends
LogicalCatalogRelation implements OlapScan,
&& getTable().getKeysType() == KeysType.UNIQUE_KEYS) {
return;
}
+ // When readMorAsDup is enabled, MOR tables are read as DUP, so
uniqueness cannot be guaranteed.
+ if (getTable().getKeysType() == KeysType.UNIQUE_KEYS
+ && getTable().isMorTable()
+ &&
ConnectContext.get().getSessionVariable().isReadMorAsDupEnabled(
+ getTable().getQualifiedDbName(),
getTable().getName())) {
+ return;
+ }
ImmutableSet.Builder<Slot> uniqSlots =
ImmutableSet.builderWithExpectedSize(outputSet.size());
for (Slot slot : outputSet) {
if (!(slot instanceof SlotReference)) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
index 0da78206cba..d651bf79010 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
@@ -1198,6 +1198,19 @@ public class OlapScanNode extends ScanNode {
msg.olap_scan_node.setTableName(tableName);
msg.olap_scan_node.setEnableUniqueKeyMergeOnWrite(olapTable.getEnableUniqueKeyMergeOnWrite());
+ // Set MOR value predicate pushdown flag based on session variable
+ if (olapTable.isMorTable() && ConnectContext.get() != null) {
+ String dbName = olapTable.getQualifiedDbName();
+ String tblName = olapTable.getName();
+ boolean enabled = ConnectContext.get().getSessionVariable()
+ .isMorValuePredicatePushdownEnabled(dbName, tblName);
+ msg.olap_scan_node.setEnableMorValuePredicatePushdown(enabled);
+ if (ConnectContext.get().getSessionVariable()
+ .isReadMorAsDupEnabled(dbName, tblName)) {
+ msg.olap_scan_node.setReadMorAsDup(true);
+ }
+ }
+
msg.setPushDownAggTypeOpt(pushDownAggNoGroupingOp);
msg.olap_scan_node.setPushDownAggTypeOpt(pushDownAggNoGroupingOp);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index d54a128dcc9..6888134673c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -734,6 +734,11 @@ public class SessionVariable implements Serializable,
Writable {
public static final String ENABLE_PUSHDOWN_STRING_MINMAX =
"enable_pushdown_string_minmax";
+ public static final String ENABLE_MOR_VALUE_PREDICATE_PUSHDOWN_TABLES
+ = "enable_mor_value_predicate_pushdown_tables";
+
+ public static final String READ_MOR_AS_DUP_TABLES =
"read_mor_as_dup_tables";
+
// When set use fix replica = true, the fixed replica maybe bad, try to
use the health one if
// this session variable is set to true.
public static final String FALLBACK_OTHER_REPLICA_WHEN_FIXED_CORRUPT =
"fallback_other_replica_when_fixed_corrupt";
@@ -2226,6 +2231,21 @@ public class SessionVariable implements Serializable,
Writable {
"是否启用 string 类型 min max 下推。", "Set whether to enable push down string
type minmax."})
public boolean enablePushDownStringMinMax = false;
+ // Comma-separated list of MOR tables to enable value predicate pushdown.
+ @VariableMgr.VarAttr(name = ENABLE_MOR_VALUE_PREDICATE_PUSHDOWN_TABLES,
needForward = true, description = {
+ "指定启用MOR表value列谓词下推的表列表,格式:db1.tbl1,db2.tbl2 或 * 表示所有MOR表。",
+ "Comma-separated list of MOR tables to enable value predicate
pushdown. "
+ + "Format: db1.tbl1,db2.tbl2 or * for all MOR tables."})
+ public String enableMorValuePredicatePushdownTables = "";
+
+ // Comma-separated list of MOR tables to read as DUP (skip merge, skip
delete sign filter).
+ @VariableMgr.VarAttr(name = READ_MOR_AS_DUP_TABLES, needForward = true,
+ affectQueryResultInPlan = true, description = {
+ "指定以DUP模式读取MOR表的表列表(跳过合并和删除标记过滤),格式:db1.tbl1,db2.tbl2 或 *
表示所有MOR表。",
+ "Comma-separated list of MOR tables to read as DUP (skip
merge, skip delete sign filter). "
+ + "Format: db1.tbl1,db2.tbl2 or * for all MOR
tables."})
+ public String readMorAsDupTables = "";
+
// Whether drop table when create table as select insert data appear error.
@VariableMgr.VarAttr(name = DROP_TABLE_IF_CTAS_FAILED, needForward = true)
public boolean dropTableIfCtasFailed = true;
@@ -4842,6 +4862,55 @@ public class SessionVariable implements Serializable,
Writable {
return enablePushDownStringMinMax;
}
+ public String getEnableMorValuePredicatePushdownTables() {
+ return enableMorValuePredicatePushdownTables;
+ }
+
+ public boolean isMorValuePredicatePushdownEnabled(String dbName, String
tableName) {
+ return isTableInList(enableMorValuePredicatePushdownTables, dbName,
tableName);
+ }
+
+ public boolean isReadMorAsDupEnabled(String dbName, String tableName) {
+ return isTableInList(readMorAsDupTables, dbName, tableName);
+ }
+
+ /**
+ * Check if a table matches any entry in a comma-separated table list.
+ * Parses entries the same way as TableNameInfo: split by "." to extract
+ * component parts (table, db.table, or ctl.db.table).
+ * When entry specifies db, both db and table must match.
+ * When entry is just a table name, it matches any database.
+ */
+ private static boolean isTableInList(String tableList, String dbName,
String tableName) {
+ if (tableList == null || tableList.isEmpty()) {
+ return false;
+ }
+ String trimmed = tableList.trim();
+ if ("*".equals(trimmed)) {
+ return true;
+ }
+ for (String entry : trimmed.split(",")) {
+ String trimmedEntry = entry.trim();
+ if (trimmedEntry.isEmpty()) {
+ continue;
+ }
+ String[] parts = trimmedEntry.split("\\.");
+ String entryTbl = parts[parts.length - 1];
+ String entryDb = parts.length >= 2 ? parts[parts.length - 2] :
null;
+ if (!entryTbl.equalsIgnoreCase(tableName)) {
+ continue;
+ }
+ if (entryDb != null) {
+ if (dbName != null && entryDb.equalsIgnoreCase(dbName)) {
+ return true;
+ }
+ } else {
+ return true;
+ }
+ }
+ return false;
+ }
+
/** canUseNereidsDistributePlanner */
public static boolean canUseNereidsDistributePlanner() {
ConnectContext connectContext = ConnectContext.get();
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/analysis/ReadMorAsDupTest.java
b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/analysis/ReadMorAsDupTest.java
new file mode 100644
index 00000000000..1c3f687c8ab
--- /dev/null
+++
b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/analysis/ReadMorAsDupTest.java
@@ -0,0 +1,280 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.rules.analysis;
+
+import org.apache.doris.catalog.Column;
+import org.apache.doris.nereids.pattern.GeneratedPlanPatterns;
+import org.apache.doris.nereids.rules.RulePromise;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
+import org.apache.doris.nereids.util.PlanChecker;
+import org.apache.doris.utframe.TestWithFeService;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Set;
+
+class ReadMorAsDupTest extends TestWithFeService implements
GeneratedPlanPatterns {
+ private static final String DB = "test_read_mor_as_dup_db";
+
+ @Override
+ protected void runBeforeAll() throws Exception {
+ createDatabase(DB);
+ connectContext.setDatabase(DEFAULT_CLUSTER_PREFIX + DB);
+
+ // MOR UNIQUE table (enable_unique_key_merge_on_write = false)
+ createTable("CREATE TABLE " + DB + ".mor_tbl (\n"
+ + " k INT NOT NULL,\n"
+ + " v1 INT,\n"
+ + " v2 VARCHAR(100)\n"
+ + ") ENGINE=OLAP\n"
+ + "UNIQUE KEY(k)\n"
+ + "DISTRIBUTED BY HASH(k) BUCKETS 1\n"
+ + "PROPERTIES ('replication_num' = '1',
'enable_unique_key_merge_on_write' = 'false');");
+
+ // MOW UNIQUE table (should not be affected)
+ createTable("CREATE TABLE " + DB + ".mow_tbl (\n"
+ + " k INT NOT NULL,\n"
+ + " v1 INT\n"
+ + ") ENGINE=OLAP\n"
+ + "UNIQUE KEY(k)\n"
+ + "DISTRIBUTED BY HASH(k) BUCKETS 1\n"
+ + "PROPERTIES ('replication_num' = '1',
'enable_unique_key_merge_on_write' = 'true');");
+
+ // DUP table (should not be affected)
+ createTable("CREATE TABLE " + DB + ".dup_tbl (\n"
+ + " k INT NOT NULL,\n"
+ + " v1 INT\n"
+ + ") ENGINE=OLAP\n"
+ + "DUPLICATE KEY(k)\n"
+ + "DISTRIBUTED BY HASH(k) BUCKETS 1\n"
+ + "PROPERTIES ('replication_num' = '1');");
+
+
connectContext.getSessionVariable().setDisableNereidsRules("PRUNE_EMPTY_PARTITION");
+ }
+
+ @Test
+ void testDeleteSignFilterPresentByDefault() {
+ connectContext.getSessionVariable().readMorAsDupTables = "";
+
+ PlanChecker.from(connectContext)
+ .analyze("select * from mor_tbl")
+ .rewrite()
+ .matches(
+ logicalProject(
+ logicalFilter(
+ logicalOlapScan()
+ ).when(filter ->
hasDeleteSignPredicate(filter.getConjuncts()))
+ )
+ );
+ }
+
+ @Test
+ void testDeleteSignFilterSkippedWithWildcard() {
+ connectContext.getSessionVariable().readMorAsDupTables = "*";
+
+ try {
+ PlanChecker.from(connectContext)
+ .analyze("select * from mor_tbl")
+ .rewrite()
+ .nonMatch(
+ logicalFilter(
+ logicalOlapScan()
+ ).when(filter ->
hasDeleteSignPredicate(filter.getConjuncts()))
+ );
+ } finally {
+ connectContext.getSessionVariable().readMorAsDupTables = "";
+ }
+ }
+
+ @Test
+ void testDeleteSignFilterSkippedWithTableName() {
+ connectContext.getSessionVariable().readMorAsDupTables = "mor_tbl";
+
+ try {
+ PlanChecker.from(connectContext)
+ .analyze("select * from mor_tbl")
+ .rewrite()
+ .nonMatch(
+ logicalFilter(
+ logicalOlapScan()
+ ).when(filter ->
hasDeleteSignPredicate(filter.getConjuncts()))
+ );
+ } finally {
+ connectContext.getSessionVariable().readMorAsDupTables = "";
+ }
+ }
+
+ @Test
+ void testDeleteSignFilterSkippedWithDbTableName() {
+ connectContext.getSessionVariable().readMorAsDupTables =
+ DEFAULT_CLUSTER_PREFIX + DB + ".mor_tbl";
+
+ try {
+ PlanChecker.from(connectContext)
+ .analyze("select * from mor_tbl")
+ .rewrite()
+ .nonMatch(
+ logicalFilter(
+ logicalOlapScan()
+ ).when(filter ->
hasDeleteSignPredicate(filter.getConjuncts()))
+ );
+ } finally {
+ connectContext.getSessionVariable().readMorAsDupTables = "";
+ }
+ }
+
+ @Test
+ void testMowTableNotAffected() {
+ // MOW table should still have delete sign filter even with
read_mor_as_dup = *
+ connectContext.getSessionVariable().readMorAsDupTables = "*";
+
+ try {
+ // MOW tables also have delete sign filter; read_mor_as_dup should
NOT remove it
+ PlanChecker.from(connectContext)
+ .analyze("select * from mow_tbl")
+ .rewrite()
+ .matches(
+ logicalProject(
+ logicalFilter(
+ logicalOlapScan()
+ ).when(filter ->
hasDeleteSignPredicate(filter.getConjuncts()))
+ )
+ );
+ } finally {
+ connectContext.getSessionVariable().readMorAsDupTables = "";
+ }
+ }
+
+ @Test
+ void testPreAggOnForMorWithReadAsDup() {
+ connectContext.getSessionVariable().readMorAsDupTables = "*";
+
+ try {
+ Plan plan = PlanChecker.from(connectContext)
+ .analyze("select * from mor_tbl")
+ .rewrite()
+ .getPlan();
+
+ // Find the LogicalOlapScan and verify preAggStatus is ON
+ LogicalOlapScan scan = findOlapScan(plan);
+ Assertions.assertNotNull(scan, "Should find LogicalOlapScan in
plan");
+ Assertions.assertTrue(scan.getPreAggStatus().isOn(),
+ "PreAggStatus should be ON when read_mor_as_dup is
enabled");
+ } finally {
+ connectContext.getSessionVariable().readMorAsDupTables = "";
+ }
+ }
+
+ @Test
+ void testPerTableControlOnlyAffectsSpecifiedTable() {
+ // Only enable for mor_tbl, not for other tables
+ connectContext.getSessionVariable().readMorAsDupTables = "mor_tbl";
+
+ try {
+ // mor_tbl should NOT have delete sign filter
+ PlanChecker.from(connectContext)
+ .analyze("select * from mor_tbl")
+ .rewrite()
+ .nonMatch(
+ logicalFilter(
+ logicalOlapScan()
+ ).when(filter ->
hasDeleteSignPredicate(filter.getConjuncts()))
+ );
+
+ // dup_tbl should be unaffected (no delete sign filter for DUP)
+ PlanChecker.from(connectContext)
+ .analyze("select * from dup_tbl")
+ .rewrite()
+ .matches(logicalOlapScan());
+ } finally {
+ connectContext.getSessionVariable().readMorAsDupTables = "";
+ }
+ }
+
+ @Test
+ void testSessionVariableHelperMethod() {
+ // Test the isReadMorAsDupEnabled helper directly
+ connectContext.getSessionVariable().readMorAsDupTables = "";
+ Assertions.assertFalse(
+
connectContext.getSessionVariable().isReadMorAsDupEnabled("db", "tbl"));
+
+ connectContext.getSessionVariable().readMorAsDupTables = "*";
+ Assertions.assertTrue(
+
connectContext.getSessionVariable().isReadMorAsDupEnabled("db", "tbl"));
+ Assertions.assertTrue(
+
connectContext.getSessionVariable().isReadMorAsDupEnabled("any_db", "any_tbl"));
+
+ connectContext.getSessionVariable().readMorAsDupTables = "mydb.mytbl";
+ Assertions.assertTrue(
+
connectContext.getSessionVariable().isReadMorAsDupEnabled("mydb", "mytbl"));
+ Assertions.assertFalse(
+
connectContext.getSessionVariable().isReadMorAsDupEnabled("otherdb",
"othertbl"));
+ // "mydb.mytbl" entry requires both db and table components to match
+ Assertions.assertFalse(
+
connectContext.getSessionVariable().isReadMorAsDupEnabled("anything", "mytbl"));
+
+ // Table name only (no db prefix) matches any db
+ connectContext.getSessionVariable().readMorAsDupTables = "mytbl";
+ Assertions.assertTrue(
+
connectContext.getSessionVariable().isReadMorAsDupEnabled("anydb", "mytbl"));
+ Assertions.assertFalse(
+
connectContext.getSessionVariable().isReadMorAsDupEnabled("anydb", "othertbl"));
+
+ connectContext.getSessionVariable().readMorAsDupTables =
"db1.tbl1,db2.tbl2";
+ Assertions.assertTrue(
+
connectContext.getSessionVariable().isReadMorAsDupEnabled("db1", "tbl1"));
+ Assertions.assertTrue(
+
connectContext.getSessionVariable().isReadMorAsDupEnabled("db2", "tbl2"));
+ Assertions.assertFalse(
+
connectContext.getSessionVariable().isReadMorAsDupEnabled("db1", "tbl2"));
+
+ // Case insensitive
+ connectContext.getSessionVariable().readMorAsDupTables = "MyDB.MyTbl";
+ Assertions.assertTrue(
+
connectContext.getSessionVariable().isReadMorAsDupEnabled("mydb", "mytbl"));
+
+ // Cleanup
+ connectContext.getSessionVariable().readMorAsDupTables = "";
+ }
+
+ private boolean hasDeleteSignPredicate(Set<Expression> conjuncts) {
+ return conjuncts.stream()
+ .anyMatch(expr -> expr.toSql().contains(Column.DELETE_SIGN));
+ }
+
+ private LogicalOlapScan findOlapScan(Plan plan) {
+ if (plan instanceof LogicalOlapScan) {
+ return (LogicalOlapScan) plan;
+ }
+ for (Plan child : plan.children()) {
+ LogicalOlapScan result = findOlapScan(child);
+ if (result != null) {
+ return result;
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public RulePromise defaultPromise() {
+ return RulePromise.REWRITE;
+ }
+}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/qe/SessionVariablesTest.java
b/fe/fe-core/src/test/java/org/apache/doris/qe/SessionVariablesTest.java
index 8721879f5af..4b3c307e818 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/qe/SessionVariablesTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/qe/SessionVariablesTest.java
@@ -81,4 +81,59 @@ public class SessionVariablesTest extends TestWithFeService {
new NereidsParser().parseSQL(sql);
Assertions.assertEquals(false,
connectContext.getSessionVariable().enableNereidsDmlWithPipeline);
}
+
+ @Test
+ public void testMorValuePredicatePushdownEnabled() {
+ SessionVariable sv = new SessionVariable();
+
+ // default empty string — disabled for all tables
+ Assertions.assertFalse(sv.isMorValuePredicatePushdownEnabled("db1",
"tbl1"));
+
+ // wildcard enables all tables
+ sv.enableMorValuePredicatePushdownTables = "*";
+ Assertions.assertTrue(sv.isMorValuePredicatePushdownEnabled("db1",
"tbl1"));
+ Assertions.assertTrue(sv.isMorValuePredicatePushdownEnabled(null,
"tbl1"));
+
+ // single table name without db — matches any database
+ sv.enableMorValuePredicatePushdownTables = "tbl1";
+ Assertions.assertTrue(sv.isMorValuePredicatePushdownEnabled("db1",
"tbl1"));
+ Assertions.assertTrue(sv.isMorValuePredicatePushdownEnabled("db2",
"tbl1"));
+ Assertions.assertFalse(sv.isMorValuePredicatePushdownEnabled("db1",
"tbl2"));
+
+ // table name with db prefix — must match both
+ sv.enableMorValuePredicatePushdownTables = "db1.tbl1";
+ Assertions.assertTrue(sv.isMorValuePredicatePushdownEnabled("db1",
"tbl1"));
+ Assertions.assertFalse(sv.isMorValuePredicatePushdownEnabled("db2",
"tbl1"));
+
+ // multiple tables comma-separated
+ sv.enableMorValuePredicatePushdownTables = "db1.tbl1,tbl2";
+ Assertions.assertTrue(sv.isMorValuePredicatePushdownEnabled("db1",
"tbl1"));
+ Assertions.assertFalse(sv.isMorValuePredicatePushdownEnabled("db2",
"tbl1"));
+ Assertions.assertTrue(sv.isMorValuePredicatePushdownEnabled("db2",
"tbl2"));
+
+ // case-insensitive matching
+ sv.enableMorValuePredicatePushdownTables = "DB1.TBL1";
+ Assertions.assertTrue(sv.isMorValuePredicatePushdownEnabled("db1",
"tbl1"));
+
+ // whitespace handling
+ sv.enableMorValuePredicatePushdownTables = " tbl1 , db2.tbl2 ";
+ Assertions.assertTrue(sv.isMorValuePredicatePushdownEnabled("db1",
"tbl1"));
+ Assertions.assertTrue(sv.isMorValuePredicatePushdownEnabled("db2",
"tbl2"));
+ Assertions.assertFalse(sv.isMorValuePredicatePushdownEnabled("db1",
"tbl2"));
+
+ // null dbName — matches table-only entries, not db-qualified entries
+ sv.enableMorValuePredicatePushdownTables = "tbl1,db2.tbl2";
+ Assertions.assertTrue(sv.isMorValuePredicatePushdownEnabled(null,
"tbl1"));
+ Assertions.assertFalse(sv.isMorValuePredicatePushdownEnabled(null,
"tbl2"));
+
+ // consecutive commas / empty entries
+ sv.enableMorValuePredicatePushdownTables = "tbl1,,tbl2";
+ Assertions.assertTrue(sv.isMorValuePredicatePushdownEnabled("db1",
"tbl1"));
+ Assertions.assertTrue(sv.isMorValuePredicatePushdownEnabled("db1",
"tbl2"));
+
+ // ctl.db.table format — matches on db and table components
+ sv.enableMorValuePredicatePushdownTables = "ctl1.db1.tbl1";
+ Assertions.assertTrue(sv.isMorValuePredicatePushdownEnabled("db1",
"tbl1"));
+ Assertions.assertFalse(sv.isMorValuePredicatePushdownEnabled("db2",
"tbl1"));
+ }
}
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index afae4dbc255..64ad52267b3 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -897,6 +897,10 @@ struct TOlapScanNode {
21: optional TSortInfo ann_sort_info
22: optional i64 ann_sort_limit
23: optional TScoreRangeInfo score_range_info
+ // Enable value predicate pushdown for MOR tables
+ 24: optional bool enable_mor_value_predicate_pushdown
+ // Read MOR table as DUP table: skip merge, skip delete sign
+ 25: optional bool read_mor_as_dup
}
struct TEqJoinCondition {
diff --git
a/regression-test/data/data_model_p0/unique/test_mor_value_predicate_pushdown.out
b/regression-test/data/data_model_p0/unique/test_mor_value_predicate_pushdown.out
new file mode 100644
index 00000000000..3ab8254a635
--- /dev/null
+++
b/regression-test/data/data_model_p0/unique/test_mor_value_predicate_pushdown.out
@@ -0,0 +1,76 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !select_disabled --
+2 200 world
+3 300 test
+
+-- !select_enabled_tablename --
+2 200 world
+3 300 test
+
+-- !select_enabled_fullname --
+2 200 world
+3 300 test
+
+-- !select_enabled_wildcard --
+2 200 world
+3 300 test
+
+-- !select_eq_predicate --
+2 200 world
+
+-- !select_not_in_list --
+2 200 world
+3 300 test
+
+-- !select_dedup_all --
+1 100 first
+2 300 third
+3 500 fifth
+
+-- !select_dedup_eq --
+2 300 third
+
+-- !select_dedup_none --
+
+-- !select_delete_range --
+3 300 test
+
+-- !select_delete_eq --
+
+-- !select_delete_all --
+1 100 hello
+3 300 test
+
+-- !select_delpred_range --
+3 300 test
+
+-- !select_delpred_eq --
+
+-- !select_delpred_all --
+1 100 hello
+3 300 test
+
+-- !select_update_disabled_old --
+
+-- !select_update_disabled_new --
+1 500 new
+
+-- !select_update_enabled_old --
+1 100 old
+
+-- !select_update_enabled_new --
+1 500 new
+
+-- !select_update_enabled_range --
+1 500 new
+3 300 keep
+
+-- !select_multiple_tables --
+2 200
+
+-- !select_mow_table --
+2 200
+
+-- !select_dup_table --
+2 200
+
diff --git
a/regression-test/suites/data_model_p0/unique/test_mor_value_predicate_pushdown.groovy
b/regression-test/suites/data_model_p0/unique/test_mor_value_predicate_pushdown.groovy
new file mode 100644
index 00000000000..b82aba0f712
--- /dev/null
+++
b/regression-test/suites/data_model_p0/unique/test_mor_value_predicate_pushdown.groovy
@@ -0,0 +1,366 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_mor_value_predicate_pushdown") {
+ def tbName = "test_mor_value_pred_pushdown"
+ def dbName = context.config.getDbNameByFile(context.file)
+
+ // Test 1: Basic MOR table with value predicate pushdown (dedup-only
scenario)
+ // This feature is designed for insert-only/dedup-only workloads where
+ // the same key always has identical values across rowsets.
+ sql "DROP TABLE IF EXISTS ${tbName}"
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tbName} (
+ k1 INT,
+ v1 INT,
+ v2 VARCHAR(100)
+ )
+ UNIQUE KEY(k1)
+ DISTRIBUTED BY HASH(k1) BUCKETS 3
+ PROPERTIES (
+ "replication_num" = "1",
+ "enable_unique_key_merge_on_write" = "false",
+ "disable_auto_compaction" = "true"
+ );
+ """
+
+ // Insert test data across separate rowsets (dedup-only: same key has same
values)
+ sql "INSERT INTO ${tbName} VALUES (1, 100, 'hello'), (2, 200, 'world'),
(3, 300, 'test')"
+ // Re-insert duplicates to create overlapping rowsets for dedup
+ sql "INSERT INTO ${tbName} VALUES (1, 100, 'hello'), (2, 200, 'world')"
+
+ // Test: pushdown disabled (default)
+ sql "SET enable_mor_value_predicate_pushdown_tables = ''"
+
+ // Verify session variable is set correctly
+ def result = sql "SHOW VARIABLES LIKE
'enable_mor_value_predicate_pushdown_tables'"
+ assertTrue(result.size() > 0)
+ assertTrue(result[0][1] == "")
+
+ // Query with value predicate - should return correct results
+ qt_select_disabled """
+ SELECT * FROM ${tbName} WHERE v1 > 150 ORDER BY k1
+ """
+
+ // Test: enable for specific table (just table name)
+ sql "SET enable_mor_value_predicate_pushdown_tables = '${tbName}'"
+
+ result = sql "SHOW VARIABLES LIKE
'enable_mor_value_predicate_pushdown_tables'"
+ assertTrue(result[0][1] == tbName)
+
+ qt_select_enabled_tablename """
+ SELECT * FROM ${tbName} WHERE v1 > 150 ORDER BY k1
+ """
+
+ // Test: enable for specific table with db prefix
+ sql "SET enable_mor_value_predicate_pushdown_tables =
'${dbName}.${tbName}'"
+
+ qt_select_enabled_fullname """
+ SELECT * FROM ${tbName} WHERE v1 > 150 ORDER BY k1
+ """
+
+ // Test: enable for all MOR tables with '*'
+ sql "SET enable_mor_value_predicate_pushdown_tables = '*'"
+
+ result = sql "SHOW VARIABLES LIKE
'enable_mor_value_predicate_pushdown_tables'"
+ assertTrue(result[0][1] == "*")
+
+ qt_select_enabled_wildcard """
+ SELECT * FROM ${tbName} WHERE v1 > 150 ORDER BY k1
+ """
+
+ // Test: equality predicate on value column with pushdown
+ qt_select_eq_predicate """
+ SELECT * FROM ${tbName} WHERE v1 = 200 ORDER BY k1
+ """
+
+ // Test: table not in list - pushdown should be disabled
+ sql "SET enable_mor_value_predicate_pushdown_tables = 'other_table'"
+
+ qt_select_not_in_list """
+ SELECT * FROM ${tbName} WHERE v1 > 150 ORDER BY k1
+ """
+
+ // Test 2: Multiple rowsets with dedup-only pattern
+ // Verify correctness when same keys appear across many rowsets with
identical values
+ sql "DROP TABLE IF EXISTS ${tbName}"
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tbName} (
+ k1 INT,
+ v1 INT,
+ v2 VARCHAR(100)
+ )
+ UNIQUE KEY(k1)
+ DISTRIBUTED BY HASH(k1) BUCKETS 1
+ PROPERTIES (
+ "replication_num" = "1",
+ "enable_unique_key_merge_on_write" = "false",
+ "disable_auto_compaction" = "true"
+ );
+ """
+
+ // Create multiple overlapping rowsets (dedup-only: all versions have same
values)
+ sql "INSERT INTO ${tbName} VALUES (1, 100, 'first'), (2, 300, 'third')"
+ sql "INSERT INTO ${tbName} VALUES (1, 100, 'first')"
+ sql "INSERT INTO ${tbName} VALUES (2, 300, 'third'), (3, 500, 'fifth')"
+
+ // Test with pushdown enabled
+ sql "SET enable_mor_value_predicate_pushdown_tables = '*'"
+
+ // Should return all rows matching predicate after dedup
+ qt_select_dedup_all """
+ SELECT * FROM ${tbName} WHERE v1 >= 100 ORDER BY k1
+ """
+
+ // Equality match on a value that exists
+ qt_select_dedup_eq """
+ SELECT * FROM ${tbName} WHERE v1 = 300 ORDER BY k1
+ """
+
+ // Predicate that matches no rows
+ qt_select_dedup_none """
+ SELECT * FROM ${tbName} WHERE v1 = 999 ORDER BY k1
+ """
+
+ // Test 3: Dedup + delete scenario
+ // Value columns are identical across rowsets (dedup-only), but some rows
are deleted.
+ // Verifies that __DORIS_DELETE_SIGN__ is NOT pushed per-segment so
deletions are honored.
+ sql "DROP TABLE IF EXISTS ${tbName}"
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tbName} (
+ k1 INT,
+ v1 INT,
+ v2 VARCHAR(100)
+ )
+ UNIQUE KEY(k1)
+ DISTRIBUTED BY HASH(k1) BUCKETS 1
+ PROPERTIES (
+ "replication_num" = "1",
+ "enable_unique_key_merge_on_write" = "false",
+ "disable_auto_compaction" = "true"
+ );
+ """
+
+ // Rowset 1: initial data
+ sql "INSERT INTO ${tbName} VALUES (1, 100, 'hello'), (2, 200, 'world'),
(3, 300, 'test')"
+ // Rowset 2: dedup insert (same key, same values)
+ sql "INSERT INTO ${tbName} VALUES (1, 100, 'hello'), (2, 200, 'world')"
+ // Rowset 3: delete k1=2 via insert with __DORIS_DELETE_SIGN__=1
+ // Value columns are identical (dedup-only), only delete sign differs
+ sql "INSERT INTO ${tbName}(k1, v1, v2, __DORIS_DELETE_SIGN__) VALUES (2,
200, 'world', 1)"
+
+ sql "SET enable_mor_value_predicate_pushdown_tables = '*'"
+
+ // Deleted row must not appear even though v1=200 matches the predicate
+ qt_select_delete_range """
+ SELECT * FROM ${tbName} WHERE v1 > 150 ORDER BY k1
+ """
+
+ // Equality on deleted row's value — should return empty
+ qt_select_delete_eq """
+ SELECT * FROM ${tbName} WHERE v1 = 200 ORDER BY k1
+ """
+
+ // Broader predicate — deleted row still excluded
+ qt_select_delete_all """
+ SELECT * FROM ${tbName} WHERE v1 >= 100 ORDER BY k1
+ """
+
+ // Test 4: Dedup + delete predicate scenario
+ // DELETE FROM creates a delete predicate stored in rowset metadata.
+ // Delete predicates go through DeleteHandler, separate from value
predicates.
+ // Verify they work correctly alongside value predicate pushdown.
+ sql "DROP TABLE IF EXISTS ${tbName}"
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tbName} (
+ k1 INT,
+ v1 INT,
+ v2 VARCHAR(100)
+ )
+ UNIQUE KEY(k1)
+ DISTRIBUTED BY HASH(k1) BUCKETS 1
+ PROPERTIES (
+ "replication_num" = "1",
+ "enable_unique_key_merge_on_write" = "false",
+ "disable_auto_compaction" = "true"
+ );
+ """
+
+ // Rowset 1: initial data
+ sql "INSERT INTO ${tbName} VALUES (1, 100, 'hello'), (2, 200, 'world'),
(3, 300, 'test')"
+ // Rowset 2: dedup insert (same key, same values)
+ sql "INSERT INTO ${tbName} VALUES (1, 100, 'hello'), (2, 200, 'world')"
+ // Delete predicate rowset: DELETE FROM creates a predicate, not row
markers
+ sql "DELETE FROM ${tbName} WHERE k1 = 2"
+
+ sql "SET enable_mor_value_predicate_pushdown_tables = '*'"
+
+ // Deleted row must not appear
+ qt_select_delpred_range """
+ SELECT * FROM ${tbName} WHERE v1 > 150 ORDER BY k1
+ """
+
+ // Equality on deleted row's value — should return empty
+ qt_select_delpred_eq """
+ SELECT * FROM ${tbName} WHERE v1 = 200 ORDER BY k1
+ """
+
+ // Broader predicate — deleted row still excluded
+ qt_select_delpred_all """
+ SELECT * FROM ${tbName} WHERE v1 >= 100 ORDER BY k1
+ """
+
+ // Test 5: Update scenario — proves pushdown is active at storage layer
+ // k1=1 is updated from v1=100 to v1=500 across two rowsets.
+ // Comparing results with pushdown disabled vs enabled for the SAME query
+ // proves per-segment filtering is happening:
+ // disabled: merge picks latest (v1=500), VExpr filters → empty
+ // enabled: rowset 2 filtered per-segment (v1=500≠100), old version
survives
+ // merge sees only old version, VExpr passes → stale
(1,100,'old')
+ sql "DROP TABLE IF EXISTS ${tbName}"
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tbName} (
+ k1 INT,
+ v1 INT,
+ v2 VARCHAR(100)
+ )
+ UNIQUE KEY(k1)
+ DISTRIBUTED BY HASH(k1) BUCKETS 1
+ PROPERTIES (
+ "replication_num" = "1",
+ "enable_unique_key_merge_on_write" = "false",
+ "disable_auto_compaction" = "true"
+ );
+ """
+
+ // Rowset 1: initial data
+ sql "INSERT INTO ${tbName} VALUES (1, 100, 'old'), (2, 200, 'keep'), (3,
300, 'keep')"
+ // Rowset 2: update k1=1 from v1=100 to v1=500
+ sql "INSERT INTO ${tbName} VALUES (1, 500, 'new')"
+
+ // --- Pushdown disabled: correct merge-then-filter behavior ---
+ sql "SET enable_mor_value_predicate_pushdown_tables = ''"
+
+ // v1=100 does not match latest version (v1=500) → empty
+ qt_select_update_disabled_old """
+ SELECT * FROM ${tbName} WHERE v1 = 100 ORDER BY k1
+ """
+
+ // v1=500 matches latest → returns updated row
+ qt_select_update_disabled_new """
+ SELECT * FROM ${tbName} WHERE v1 = 500 ORDER BY k1
+ """
+
+ // --- Pushdown enabled: per-segment filtering observable ---
+ sql "SET enable_mor_value_predicate_pushdown_tables = '*'"
+
+ // v1=100: rowset 2 (v1=500) filtered per-segment, old version survives.
+ // Returns stale data — this proves pushdown is filtering at storage layer.
+ qt_select_update_enabled_old """
+ SELECT * FROM ${tbName} WHERE v1 = 100 ORDER BY k1
+ """
+
+ // v1=500: rowset 1 (v1=100) filtered per-segment, new version passes →
correct
+ qt_select_update_enabled_new """
+ SELECT * FROM ${tbName} WHERE v1 = 500 ORDER BY k1
+ """
+
+ // v1 > 200: old v1=100 filtered, new v1=500 passes, k1=3 v1=300 passes →
correct
+ qt_select_update_enabled_range """
+ SELECT * FROM ${tbName} WHERE v1 > 200 ORDER BY k1
+ """
+
+ // Test 6: Multiple tables in the list
+ def tbName2 = "test_mor_value_pred_pushdown_2"
+ sql "DROP TABLE IF EXISTS ${tbName2}"
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tbName2} (
+ k1 INT,
+ v1 INT
+ )
+ UNIQUE KEY(k1)
+ DISTRIBUTED BY HASH(k1) BUCKETS 1
+ PROPERTIES (
+ "replication_num" = "1",
+ "enable_unique_key_merge_on_write" = "false"
+ );
+ """
+
+ sql "INSERT INTO ${tbName2} VALUES (1, 100), (2, 200)"
+
+ sql "SET enable_mor_value_predicate_pushdown_tables =
'${tbName},${tbName2}'"
+
+ qt_select_multiple_tables """
+ SELECT * FROM ${tbName2} WHERE v1 > 100 ORDER BY k1
+ """
+
+ // Test 7: Non-MOR table (MOW) - value predicates should always be pushed
down
+ // The session variable should have no effect on MOW tables
+ def tbNameMow = "test_mow_value_pred"
+ sql "DROP TABLE IF EXISTS ${tbNameMow}"
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tbNameMow} (
+ k1 INT,
+ v1 INT
+ )
+ UNIQUE KEY(k1)
+ DISTRIBUTED BY HASH(k1) BUCKETS 1
+ PROPERTIES (
+ "replication_num" = "1",
+ "enable_unique_key_merge_on_write" = "true"
+ );
+ """
+
+ sql "INSERT INTO ${tbNameMow} VALUES (1, 100), (2, 200)"
+
+ // MOW tables always push down value predicates regardless of setting
+ sql "SET enable_mor_value_predicate_pushdown_tables = ''"
+
+ qt_select_mow_table """
+ SELECT * FROM ${tbNameMow} WHERE v1 > 100 ORDER BY k1
+ """
+
+ // Test 8: DUP_KEYS table - value predicates should always be pushed down
+ // The session variable should have no effect on DUP_KEYS tables
+ def tbNameDup = "test_dup_value_pred"
+ sql "DROP TABLE IF EXISTS ${tbNameDup}"
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tbNameDup} (
+ k1 INT,
+ v1 INT
+ )
+ DUPLICATE KEY(k1)
+ DISTRIBUTED BY HASH(k1) BUCKETS 1
+ PROPERTIES (
+ "replication_num" = "1"
+ );
+ """
+
+ sql "INSERT INTO ${tbNameDup} VALUES (1, 100), (2, 200)"
+
+ // DUP_KEYS tables always push down value predicates regardless of setting
+ qt_select_dup_table """
+ SELECT * FROM ${tbNameDup} WHERE v1 > 100 ORDER BY k1
+ """
+
+ // Cleanup
+ sql "SET enable_mor_value_predicate_pushdown_tables = ''"
+ sql "DROP TABLE IF EXISTS ${tbName}"
+ sql "DROP TABLE IF EXISTS ${tbName2}"
+ sql "DROP TABLE IF EXISTS ${tbNameMow}"
+ sql "DROP TABLE IF EXISTS ${tbNameDup}"
+}
diff --git
a/regression-test/suites/unique_with_mor_p0/test_read_mor_as_dup.groovy
b/regression-test/suites/unique_with_mor_p0/test_read_mor_as_dup.groovy
new file mode 100644
index 00000000000..aada4292ff9
--- /dev/null
+++ b/regression-test/suites/unique_with_mor_p0/test_read_mor_as_dup.groovy
@@ -0,0 +1,119 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_read_mor_as_dup") {
+ def tableName = "test_read_mor_as_dup_tbl"
+ def tableName2 = "test_read_mor_as_dup_tbl2"
+
+ sql "DROP TABLE IF EXISTS ${tableName}"
+ sql "DROP TABLE IF EXISTS ${tableName2}"
+
+ // Create a MOR (Merge-On-Read) UNIQUE table
+ sql """
+ CREATE TABLE ${tableName} (
+ `k` int NOT NULL,
+ `v1` int NULL,
+ `v2` varchar(100) NULL
+ ) ENGINE=OLAP
+ UNIQUE KEY(`k`)
+ DISTRIBUTED BY HASH(`k`) BUCKETS 1
+ PROPERTIES (
+ "enable_unique_key_merge_on_write" = "false",
+ "disable_auto_compaction" = "true",
+ "replication_num" = "1"
+ );
+ """
+
+ // Create a second MOR table for per-table control test
+ sql """
+ CREATE TABLE ${tableName2} (
+ `k` int NOT NULL,
+ `v1` int NULL
+ ) ENGINE=OLAP
+ UNIQUE KEY(`k`)
+ DISTRIBUTED BY HASH(`k`) BUCKETS 1
+ PROPERTIES (
+ "enable_unique_key_merge_on_write" = "false",
+ "disable_auto_compaction" = "true",
+ "replication_num" = "1"
+ );
+ """
+
+ // Insert multiple versions of the same key
+ sql "INSERT INTO ${tableName} VALUES (1, 10, 'first');"
+ sql "INSERT INTO ${tableName} VALUES (1, 20, 'second');"
+ sql "INSERT INTO ${tableName} VALUES (1, 30, 'third');"
+ sql "INSERT INTO ${tableName} VALUES (2, 100, 'only_version');"
+
+ // Insert a row and then delete it
+ sql "INSERT INTO ${tableName} VALUES (3, 50, 'to_be_deleted');"
+ sql "DELETE FROM ${tableName} WHERE k = 3;"
+
+ // Insert into second table
+ sql "INSERT INTO ${tableName2} VALUES (1, 10);"
+ sql "INSERT INTO ${tableName2} VALUES (1, 20);"
+
+ // === Test 1: Normal query returns merged result ===
+ sql "SET read_mor_as_dup_tables = '';"
+ def normalResult = sql "SELECT * FROM ${tableName} ORDER BY k;"
+ // Should see only latest version per key: (1,30,'third'),
(2,100,'only_version')
+ // Key 3 was deleted, should not appear
+ assertTrue(normalResult.size() == 2, "Normal query should return 2 rows
(merged), got ${normalResult.size()}")
+
+ // === Test 2: Wildcard — read all MOR tables as DUP ===
+ sql "SET read_mor_as_dup_tables = '*';"
+ def dupResult = sql "SELECT * FROM ${tableName} ORDER BY k, v1;"
+ // Should see all row versions for key 1 (3 versions) + key 2 (1 version)
+ // Key 3: delete predicates are still applied, so deleted row should still
be filtered
+ // But the delete sign filter is skipped, so we may see it depending on
how delete was done.
+ // For MOR tables, DELETE adds a delete predicate in rowsets, which IS
still honored.
+ assertTrue(dupResult.size() >= 4, "DUP mode should return at least 4 rows
(all versions), got ${dupResult.size()}")
+
+ // Verify key 1 has multiple versions
+ def key1Rows = dupResult.findAll { it[0] == 1 }
+ assertTrue(key1Rows.size() == 3, "Key 1 should have 3 versions in DUP
mode, got ${key1Rows.size()}")
+
+ // === Test 3: Per-table control with db.table format ===
+ def dbName = sql "SELECT DATABASE();"
+ def currentDb = dbName[0][0]
+
+ sql "SET read_mor_as_dup_tables = '${currentDb}.${tableName}';"
+
+ // tableName should be in DUP mode
+ def perTableResult = sql "SELECT * FROM ${tableName} ORDER BY k, v1;"
+ assertTrue(perTableResult.size() >= 4, "Per-table DUP mode should return
all versions, got ${perTableResult.size()}")
+
+ // tableName2 should still be in normal (merged) mode
+ def table2Result = sql "SELECT * FROM ${tableName2} ORDER BY k;"
+ assertTrue(table2Result.size() == 1, "Table2 should still be merged (1
row), got ${table2Result.size()}")
+
+ // === Test 4: Per-table control with just table name ===
+ sql "SET read_mor_as_dup_tables = '${tableName2}';"
+
+ // tableName should now be in normal mode
+ def revertedResult = sql "SELECT * FROM ${tableName} ORDER BY k;"
+ assertTrue(revertedResult.size() == 2, "Table1 should be merged again (2
rows), got ${revertedResult.size()}")
+
+ // tableName2 should be in DUP mode
+ def table2DupResult = sql "SELECT * FROM ${tableName2} ORDER BY k, v1;"
+ assertTrue(table2DupResult.size() == 2, "Table2 in DUP mode should return
2 rows, got ${table2DupResult.size()}")
+
+ // === Cleanup ===
+ sql "SET read_mor_as_dup_tables = '';"
+ sql "DROP TABLE IF EXISTS ${tableName}"
+ sql "DROP TABLE IF EXISTS ${tableName2}"
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]