This is an automated email from the ASF dual-hosted git repository.

lingmiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 3a5de97  [Feature](Partition pruning) Implement V2 version of 
partition prune. (#7434)
3a5de97 is described below

commit 3a5de976a36854901b288b6f483cffa17f3a8040
Author: Shuo Wang <wangshuo...@gmail.com>
AuthorDate: Tue Dec 28 22:32:34 2021 +0800

    [Feature](Partition pruning) Implement V2 version of partition prune. 
(#7434)
    
    Implement a V2 version of partition prune algorithm. We use session 
variable partition_prune_algorithm_version as the control flag, with a default 
value of 2.
    
    1. Support disjunctive predicates when prune partitions for both list and 
range partitions.
    2. Optimize partition prune for multiple-column list partitions.
    
    Closed #7433
---
 .../org/apache/doris/analysis/BinaryPredicate.java |   2 +-
 .../org/apache/doris/analysis/PredicateUtils.java  |  55 +++++
 .../java/org/apache/doris/analysis/SetVar.java     |   9 +
 .../org/apache/doris/catalog/PartitionKey.java     |  12 +-
 .../java/org/apache/doris/planner/ColumnBound.java |  66 +++++
 .../java/org/apache/doris/planner/ColumnRange.java | 106 ++++++++
 .../doris/planner/ListPartitionPrunerV2.java       | 195 +++++++++++++++
 .../org/apache/doris/planner/OlapScanNode.java     |  21 +-
 .../org/apache/doris/planner/PartitionPruner.java  |   2 +-
 .../doris/planner/PartitionPrunerV2Base.java       | 200 +++++++++++++++
 .../doris/planner/RangePartitionPrunerV2.java      | 270 +++++++++++++++++++++
 .../java/org/apache/doris/planner/ScanNode.java    | 198 ++++++++++++++-
 .../java/org/apache/doris/qe/SessionVariable.java  |  12 +-
 .../doris/analysis/ListPartitionPrunerTest.java    | 160 +++++-------
 .../doris/analysis/PartitionPruneTestBase.java     |  71 ++++++
 .../doris/analysis/RangePartitionPruneTest.java    | 213 ++++++++++++++++
 16 files changed, 1481 insertions(+), 111 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/BinaryPredicate.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/BinaryPredicate.java
index 94e55b0..517694d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/BinaryPredicate.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/BinaryPredicate.java
@@ -427,7 +427,7 @@ public class BinaryPredicate extends Predicate implements 
Writable {
         }
 
         if (slotRef != null && slotRef.getSlotId() == id) {
-            slotIsleft = false; 
+            slotIsleft = false;
             return getChild(0);
         }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/PredicateUtils.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/PredicateUtils.java
new file mode 100644
index 0000000..e8bb325
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/PredicateUtils.java
@@ -0,0 +1,55 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.analysis;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import com.google.common.collect.Lists;
+
+public class PredicateUtils {
+    /**
+     * Split predicates in disjunctive form recursively, i.e., split the input 
expression
+     * if the root node of the expression tree is `or` predicate.
+     *
+     * Some examples:
+     * a or b -> a, b
+     * a or b or c -> a, b, c
+     * (a and b) or (c or d) -> (a and b), (c and d)
+     * (a or b) and c -> (a or b) and c
+     * a -> a
+     */
+    public static List<Expr> splitDisjunctivePredicates(Expr expr) {
+        ArrayList<Expr> result = Lists.newArrayList();
+        if (expr == null) {
+            return result;
+        }
+
+        splitDisjunctivePredicates(expr, result);
+        return result;
+    }
+
+    private static void splitDisjunctivePredicates(Expr expr, List<Expr> 
result) {
+        if (expr instanceof CompoundPredicate && ((CompoundPredicate) 
expr).getOp() == CompoundPredicate.Operator.OR) {
+            splitDisjunctivePredicates(expr.getChild(0), result);
+            splitDisjunctivePredicates(expr.getChild(1), result);
+        } else {
+            result.add(expr);
+        }
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/SetVar.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/SetVar.java
index 784489e..2ff1fe7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/SetVar.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/SetVar.java
@@ -149,6 +149,15 @@ public class SetVar {
         if (getVariable().equalsIgnoreCase("is_report_success")) {
             variable = SessionVariable.ENABLE_PROFILE;
         }
+
+        if 
(getVariable().equalsIgnoreCase(SessionVariable.PARTITION_PRUNE_ALGORITHM_VERSION))
 {
+            String value = getValue().getStringValue();
+            if (!"1".equals(value) && !"2".equals(value)) {
+                throw new AnalysisException("Value of " +
+                    SessionVariable.PARTITION_PRUNE_ALGORITHM_VERSION + " 
should be " +
+                    "either 1 or 2, but meet " + value);
+            }
+        }
     }
 
     public String toSql() {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionKey.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionKey.java
index 93b2ba1..e451c6b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionKey.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionKey.java
@@ -238,6 +238,14 @@ public class PartitionKey implements 
Comparable<PartitionKey>, Writable {
         builder.append("]; ");
 
         builder.append("keys: [");
+        builder.append(toString(keys));
+        builder.append("]; ");
+
+        return builder.toString();
+    }
+
+    public static String toString(List<LiteralExpr> keys) {
+        StringBuilder builder = new StringBuilder();
         int i = 0;
         for (LiteralExpr expr : keys) {
             Object value = null;
@@ -253,12 +261,10 @@ public class PartitionKey implements 
Comparable<PartitionKey>, Writable {
             if (keys.size() - 1 == i) {
                 builder.append(value);
             } else {
-                builder.append(value + ", ");
+                builder.append(value).append(", ");
             }
             ++i;
         }
-        builder.append("]; ");
-
         return builder.toString();
     }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ColumnBound.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/ColumnBound.java
new file mode 100644
index 0000000..805d4dd
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ColumnBound.java
@@ -0,0 +1,66 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.planner;
+
+import org.apache.doris.analysis.LiteralExpr;
+import org.apache.doris.catalog.PartitionKey;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.base.Objects;
+import com.google.common.collect.Lists;
+
+public class ColumnBound implements Comparable<ColumnBound> {
+    private final LiteralExpr value;
+
+    private ColumnBound(LiteralExpr value) {
+        this.value = value;
+    }
+
+    @Override
+    public int compareTo(ColumnBound o) {
+        return PartitionKey.compareLiteralExpr(this.value, o.value);
+    }
+
+    public static ColumnBound of(LiteralExpr expr) {
+        return new ColumnBound(expr);
+    }
+
+    public LiteralExpr getValue() {
+        return value;
+    }
+
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper(this)
+            .add("value", PartitionKey.toString(Lists.newArrayList(value)))
+            .toString();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        ColumnBound that = (ColumnBound) o;
+        return Objects.equal(value, that.value);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hashCode(value);
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ColumnRange.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/ColumnRange.java
new file mode 100644
index 0000000..a74aa67
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ColumnRange.java
@@ -0,0 +1,106 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.planner;
+
+import java.util.List;
+import java.util.Optional;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.collect.Range;
+import com.google.common.collect.RangeSet;
+import com.google.common.collect.TreeRangeSet;
+
+/**
+ * There are two kinds of predicates for a column: `is null` predicate and 
other predicates that
+ * the value of a column is not null, e.g., col=1, col>2, col in (1,2,3), etc.
+ *
+ * This can represent both conjunctive and disjunctive predicates for a column.
+ *
+ * The meaning of the predicates is: `conjunctiveIsNull` AND (`rangeSet` OR 
`disjunctiveIsNull`)
+ *
+ * Notes about internal state:
+ * 1. If `conjunctiveIsNull` and  `disjunctiveIsNull` are both false and 
`rangeSet` is null,
+ * it means that there is no filter for the column. See {@link 
ColumnRange#hasFilter()}.
+ * 2. If `rangeSet` is empty, it means that the `not null` predicates are 
folded to false literal,
+ * i.e., col=1 and col=2.
+ */
+public class ColumnRange {
+    private boolean hasConjunctiveIsNull;
+    private boolean hasDisjunctiveIsNull;
+    private RangeSet<ColumnBound> rangeSet;
+
+    private ColumnRange() {
+    }
+
+    public void intersect(List<Range<ColumnBound>> disjunctiveRanges) {
+        if (disjunctiveRanges != null && !disjunctiveRanges.isEmpty()) {
+            if (rangeSet == null) {
+                rangeSet = TreeRangeSet.create();
+                disjunctiveRanges.forEach(rangeSet::add);
+            } else {
+                RangeSet<ColumnBound> merged = TreeRangeSet.create();
+                disjunctiveRanges.forEach(range -> 
merged.addAll(rangeSet.subRangeSet(range)));
+                rangeSet = merged;
+            }
+        }
+    }
+
+    public Optional<RangeSet<ColumnBound>> getRangeSet() {
+        if (rangeSet == null) {
+            return Optional.empty();
+        } else {
+            return Optional.of(rangeSet);
+        }
+    }
+
+    public static ColumnRange create() {
+        return new ColumnRange();
+    }
+
+    public boolean hasConjunctiveIsNull() {
+        return hasConjunctiveIsNull;
+    }
+
+    public ColumnRange setHasConjunctiveIsNull(boolean hasConjunctiveIsNull) {
+        this.hasConjunctiveIsNull = hasConjunctiveIsNull;
+        return this;
+    }
+
+    public boolean hasDisjunctiveIsNull() {
+        return hasDisjunctiveIsNull;
+    }
+
+    public ColumnRange setHasDisjunctiveIsNull(boolean hasDisjunctiveIsNull) {
+        this.hasDisjunctiveIsNull = hasDisjunctiveIsNull;
+        return this;
+    }
+
+    public boolean hasFilter() {
+        return hasConjunctiveIsNull || hasDisjunctiveIsNull || rangeSet != 
null;
+    }
+
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper(this)
+            .add("hasConjunctiveIsNull", hasConjunctiveIsNull)
+            .add("hasDisjunctiveIsNull", hasDisjunctiveIsNull)
+            .add("rangeSet", rangeSet)
+            .toString();
+    }
+}
+
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/ListPartitionPrunerV2.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/ListPartitionPrunerV2.java
new file mode 100644
index 0000000..154f9cf
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/ListPartitionPrunerV2.java
@@ -0,0 +1,195 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.planner;
+
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.PartitionItem;
+import org.apache.doris.catalog.PartitionKey;
+import org.apache.doris.common.AnalysisException;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.base.Objects;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Range;
+import com.google.common.collect.RangeMap;
+import com.google.common.collect.RangeSet;
+import com.google.common.collect.TreeRangeMap;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+public class ListPartitionPrunerV2 extends PartitionPrunerV2Base {
+    private final Map<UniqueId, Range<PartitionKey>> uidToPartitionRange;
+
+    public ListPartitionPrunerV2(Map<Long, PartitionItem> idToPartitionItem,
+                                 List<Column> partitionColumns,
+                                 Map<String, ColumnRange> columnNameToRange) {
+        super(idToPartitionItem, partitionColumns, columnNameToRange);
+        this.uidToPartitionRange = Maps.newHashMap();
+        if (partitionColumns.size() > 1) {
+            // `uidToPartitionRange` is only used for multiple columns 
partition.
+            idToPartitionItem.forEach((id, item) -> {
+                List<PartitionKey> keys = item.getItems();
+                List<Range<PartitionKey>> ranges = keys.stream()
+                    .map(key -> Range.closed(key, key))
+                    .collect(Collectors.toList());
+                for (int i = 0; i < ranges.size(); i++) {
+                    uidToPartitionRange.put(new ListPartitionUniqueId(id, i), 
ranges.get(i));
+                }
+            });
+        }
+    }
+
+    @Override
+    RangeMap<ColumnBound, UniqueId> getCandidateRangeMap() {
+        RangeMap<ColumnBound, UniqueId> candidate = TreeRangeMap.create();
+        idToPartitionItem.forEach((id, item) -> {
+            List<PartitionKey> keys = item.getItems();
+            List<Range<PartitionKey>> ranges = keys.stream()
+                .map(key -> Range.closed(key, key))
+                .collect(Collectors.toList());
+            for (int i = 0; i < ranges.size(); i++) {
+                candidate.put(mapPartitionKeyRange(ranges.get(i), 0),
+                    new ListPartitionUniqueId(id, i));
+            }
+        });
+        return candidate;
+    }
+
+    /**
+     * List partitions don't have null value.
+     */
+    @Override
+    FinalFilters getFinalFilters(ColumnRange columnRange,
+                                 Column column) throws AnalysisException {
+        if (!columnRange.hasFilter()) {
+            return FinalFilters.noFilters();
+        }
+
+        Optional<RangeSet<ColumnBound>> rangeSetOpt = 
columnRange.getRangeSet();
+        if (columnRange.hasConjunctiveIsNull() || !rangeSetOpt.isPresent()) {
+            return FinalFilters.constantFalseFilters();
+        } else {
+            RangeSet<ColumnBound> rangeSet = rangeSetOpt.get();
+            if (rangeSet.isEmpty()) {
+                return FinalFilters.constantFalseFilters();
+            } else {
+                return FinalFilters.create(rangeSet.asRanges());
+            }
+        }
+    }
+
+    @Override
+    Collection<Long> pruneMultipleColumnPartition(
+        Map<Column, FinalFilters> columnToFilters) throws AnalysisException {
+        Map<Range<PartitionKey>, UniqueId> rangeToId = Maps.newHashMap();
+        uidToPartitionRange.forEach((uid, range) -> rangeToId.put(range, uid));
+        return doPruneMultiple(columnToFilters, rangeToId, 0);
+    }
+
+    private Collection<Long> doPruneMultiple(Map<Column, FinalFilters> 
columnToFilters,
+                                             Map<Range<PartitionKey>, 
UniqueId> partitionRangeToUid,
+                                             int columnIdx) {
+        // No more partition column.
+        if (columnIdx == partitionColumns.size()) {
+            return partitionRangeToUid.values().stream()
+                .map(UniqueId::getPartitionId)
+                .collect(Collectors.toSet());
+        }
+
+        FinalFilters finalFilters = 
columnToFilters.get(partitionColumns.get(columnIdx));
+        switch (finalFilters.type) {
+            case CONSTANT_FALSE_FILTERS:
+                return Collections.emptyList();
+            case HAVE_FILTERS:
+                // Grouping partition ranges by the range of column value 
indexed by `columnIdx`,
+                // so that to compare with the filters.
+                Map<Range<ColumnBound>, List<UniqueId>> grouped =
+                    partitionRangeToUid
+                        .entrySet()
+                        .stream()
+                        .collect(Collectors.groupingBy(entry -> 
mapPartitionKeyRange(entry.getKey(), columnIdx),
+                            Collectors.mapping(Map.Entry::getValue, 
Collectors.toList())));
+
+                // Convert the grouped map to a RangeMap.
+                TreeRangeMap<ColumnBound, List<UniqueId>> candidateRangeMap = 
TreeRangeMap.create();
+                grouped.forEach((k, v) -> candidateRangeMap.put(k, v));
+
+                return finalFilters.filters.stream()
+                    .map(filter -> {
+                        RangeMap<ColumnBound, List<UniqueId>> filtered =
+                            candidateRangeMap.subRangeMap(filter);
+                        // Find PartitionKey ranges according to filtered 
UniqueIds.
+                        Map<Range<PartitionKey>, UniqueId> 
filteredPartitionRange =
+                            filtered.asMapOfRanges().values()
+                                .stream()
+                                .flatMap(List::stream)
+                                .collect(Collectors.toMap(
+                                    uidToPartitionRange::get, 
Function.identity()));
+                        return doPruneMultiple(columnToFilters, 
filteredPartitionRange,
+                            columnIdx + 1);
+                    })
+                    .flatMap(Collection::stream)
+                    .collect(Collectors.toSet());
+            case NO_FILTERS:
+            default:
+                return doPruneMultiple(columnToFilters, partitionRangeToUid, 
columnIdx + 1);
+        }
+    }
+
+    private static class ListPartitionUniqueId implements UniqueId {
+        private final long partitionId;
+        private final int partitionKeyIndex;
+
+        public ListPartitionUniqueId(long partitionId, int partitionKeyIndex) {
+            this.partitionId = partitionId;
+            this.partitionKeyIndex = partitionKeyIndex;
+        }
+
+        @Override
+        public long getPartitionId() {
+            return partitionId;
+        }
+
+        @Override
+        public String toString() {
+            return MoreObjects.toStringHelper(this)
+                .add("partitionId", partitionId)
+                .add("partitionKeyIndex", partitionKeyIndex)
+                .toString();
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+            ListPartitionUniqueId that = (ListPartitionUniqueId) o;
+            return partitionId == that.partitionId && partitionKeyIndex == 
that.partitionKeyIndex;
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hashCode(partitionId, partitionKeyIndex);
+        }
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
index 7faf02b..b429c72 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
@@ -424,12 +424,23 @@ public class OlapScanNode extends ScanNode {
         } else {
             keyItemMap = partitionInfo.getIdToItem(false);
         }
+
         if (partitionInfo.getType() == PartitionType.RANGE) {
-            partitionPruner = new RangePartitionPruner(keyItemMap,
-                    partitionInfo.getPartitionColumns(), columnFilters);
+            if 
(analyzer.getContext().getSessionVariable().getPartitionPruneAlgorithmVersion() 
== 2) {
+                partitionPruner = new RangePartitionPrunerV2(keyItemMap,
+                        partitionInfo.getPartitionColumns(), 
columnNameToRange);
+            } else {
+                partitionPruner = new RangePartitionPruner(keyItemMap,
+                        partitionInfo.getPartitionColumns(), columnFilters);
+            }
         } else if (partitionInfo.getType() == PartitionType.LIST) {
-            partitionPruner = new ListPartitionPruner(keyItemMap,
+            if 
(analyzer.getContext().getSessionVariable().getPartitionPruneAlgorithmVersion() 
== 2) {
+                partitionPruner = new ListPartitionPrunerV2(keyItemMap, 
partitionInfo.getPartitionColumns(),
+                    columnNameToRange);
+            } else {
+                partitionPruner = new ListPartitionPruner(keyItemMap,
                     partitionInfo.getPartitionColumns(), columnFilters);
+            }
         }
         return partitionPruner.prune();
     }
@@ -576,8 +587,8 @@ public class OlapScanNode extends ScanNode {
             }
         } else {
             selectedPartitionIds = selectedPartitionIds.stream()
-                    .filter(id -> olapTable.getPartition(id).hasData())
-                    .collect(Collectors.toList());
+                  .filter(id -> olapTable.getPartition(id).hasData())
+                  .collect(Collectors.toList());
         }
         selectedPartitionNum = selectedPartitionIds.size();
         LOG.debug("partition prune cost: {} ms, partitions: {}",
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/PartitionPruner.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/PartitionPruner.java
index b603ca0..2bcd149 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/PartitionPruner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PartitionPruner.java
@@ -23,5 +23,5 @@ import java.util.Collection;
 
 public interface PartitionPruner {
     // return partition after pruning
-    public Collection<Long> prune() throws AnalysisException;
+    Collection<Long> prune() throws AnalysisException;
 };
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/PartitionPrunerV2Base.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/PartitionPrunerV2Base.java
new file mode 100644
index 0000000..017d556
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/PartitionPrunerV2Base.java
@@ -0,0 +1,200 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.planner;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import com.google.common.collect.BoundType;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Range;
+import com.google.common.collect.RangeMap;
+
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.PartitionItem;
+import org.apache.doris.catalog.PartitionKey;
+import org.apache.doris.common.AnalysisException;
+
+public abstract class PartitionPrunerV2Base implements PartitionPruner {
+    protected final Map<Long, PartitionItem> idToPartitionItem;
+    protected final List<Column> partitionColumns;
+    protected final Map<String, ColumnRange> columnNameToRange;
+
+    public PartitionPrunerV2Base(Map<Long, PartitionItem> idToPartitionItem,
+                                 List<Column> partitionColumns,
+                                 Map<String, ColumnRange> columnNameToRange) {
+        this.idToPartitionItem = idToPartitionItem;
+        this.partitionColumns = partitionColumns;
+        this.columnNameToRange = columnNameToRange;
+    }
+
+    @Override
+    public Collection<Long> prune() throws AnalysisException {
+        Map<Column, FinalFilters> columnToFilters = Maps.newHashMap();
+        for (Column column : partitionColumns) {
+            ColumnRange columnRange = columnNameToRange.get(column.getName());
+            if (columnRange == null) {
+                columnToFilters.put(column, FinalFilters.noFilters());
+            } else {
+                columnToFilters.put(column, getFinalFilters(columnRange, 
column));
+            }
+        }
+
+        if (partitionColumns.size() == 1) {
+            return pruneSingleColumnPartition(columnToFilters);
+        } else if (partitionColumns.size() > 1) {
+            return pruneMultipleColumnPartition(columnToFilters);
+        } else {
+            return Lists.newArrayList();
+        }
+    }
+
+    abstract RangeMap<ColumnBound, UniqueId> getCandidateRangeMap();
+
+    /**
+     * Handle conjunctive and disjunctive `is null` predicates.
+     */
+    abstract FinalFilters getFinalFilters(ColumnRange columnRange,
+                                          Column column) throws 
AnalysisException;
+
+    /**
+     * It's a little complex to unify the logic of pruning multiple columns 
partition for both
+     * list and range partitions.
+     *
+     * The key point is that the list partitions value are the explicit values 
of partition columns,
+     * however, the range bound for a partition column in multiple columns 
partition is depended on
+     * both other partition columns' range values and the range value itself.
+     *
+     * Let's say we have two partition columns k1, k2:
+     * For partition [(1, 5), (1, 10)), the range for k2 is [5, 10).
+     * For partition [(1, 5), (2, 10)), the range for k2 is (-∞, +∞).
+     * For partition [(1, 10), (2, 5)), the range for k2 is (-∞, 5) union [10, 
+∞).
+     *
+     * We could try to compute the range bound of every column in multiple 
columns partition and
+     * unify the logic like pruning multiple list columns partition for 
multiple range ones.
+     */
+    abstract Collection<Long> pruneMultipleColumnPartition(
+        Map<Column, FinalFilters> columnToFilters) throws AnalysisException;
+
+    /**
+     * Now we could unify the logic of pruning single column partition for 
both list and range
+     * partitions.
+     */
+    private Collection<Long> pruneSingleColumnPartition(Map<Column, 
FinalFilters> columnToFilters) {
+        FinalFilters finalFilters = 
columnToFilters.get(partitionColumns.get(0));
+        switch (finalFilters.type) {
+            case CONSTANT_FALSE_FILTERS:
+                return Collections.emptyList();
+            case HAVE_FILTERS:
+                RangeMap<ColumnBound, UniqueId> candidate = 
getCandidateRangeMap();
+                return finalFilters.filters.stream()
+                    .map(filter -> {
+                        RangeMap<ColumnBound, UniqueId> filtered = 
candidate.subRangeMap(filter);
+                        return filtered.asMapOfRanges().values().stream()
+                            .map(UniqueId::getPartitionId)
+                            .collect(Collectors.toSet());
+                    })
+                    .flatMap(Set::stream)
+                    .collect(Collectors.toSet());
+            case NO_FILTERS:
+            default:
+                return idToPartitionItem.keySet();
+        }
+    }
+
+    protected Range<ColumnBound> mapPartitionKeyRange(Range<PartitionKey> 
fromRange,
+                                                      int columnIdx) {
+        return mapRange(fromRange,
+            partitionKey -> 
ColumnBound.of(partitionKey.getKeys().get(columnIdx)));
+    }
+
+    protected <TO extends Comparable, FROM extends Comparable>
+    Range<TO> mapRange(Range<FROM> range, Function<FROM, TO> mapper) {
+        TO lower = range.hasLowerBound() ? mapper.apply(range.lowerEndpoint()) 
: null;
+        TO upper = range.hasUpperBound() ? mapper.apply(range.upperEndpoint()) 
: null;
+        if (range.hasUpperBound()) {
+            // has upper bound
+            if (range.hasLowerBound()) {
+                return Range.range(lower, range.lowerBoundType(), upper, 
range.upperBoundType());
+            } else {
+                if (range.upperBoundType() == BoundType.OPEN) {
+                    return Range.lessThan(upper);
+                } else {
+                    return Range.atMost(upper);
+                }
+            }
+        } else if (range.hasLowerBound()) {
+            // has no upper bound, but has lower bound
+            if (range.lowerBoundType() == BoundType.OPEN) {
+                return Range.greaterThan(lower);
+            } else {
+                return Range.atLeast(lower);
+            }
+        } else {
+            // has neither upper nor lower bound
+            return Range.all();
+        }
+    }
+
+    protected interface UniqueId {
+        long getPartitionId();
+    }
+
+    protected static class FinalFilters {
+        enum Type {
+            // Have no filters, should just return all the partitions.
+            NO_FILTERS,
+            // Have filters.
+            HAVE_FILTERS,
+            // Filter predicates are folded to constant false, pruned 
partitions should be
+            // an empty collection.
+            CONSTANT_FALSE_FILTERS,
+        }
+
+        final Type type;
+        final Set<Range<ColumnBound>> filters;
+
+        private FinalFilters(Type type, Set<Range<ColumnBound>> filters) {
+            this.type = type;
+            this.filters = filters;
+        }
+
+        private static final FinalFilters NO_FILTERS = new 
FinalFilters(Type.NO_FILTERS, null);
+
+        private static final FinalFilters CONSTANT_FALSE_FILTERS =
+            new FinalFilters(Type.CONSTANT_FALSE_FILTERS, null);
+
+        public static FinalFilters noFilters() {
+            return NO_FILTERS;
+        }
+
+        public static FinalFilters constantFalseFilters() {
+            return CONSTANT_FALSE_FILTERS;
+        }
+
+        public static FinalFilters create(Set<Range<ColumnBound>> filters) {
+            return new FinalFilters(Type.HAVE_FILTERS, filters);
+        }
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/RangePartitionPrunerV2.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/RangePartitionPrunerV2.java
new file mode 100644
index 0000000..e122877
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/RangePartitionPrunerV2.java
@@ -0,0 +1,270 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.planner;
+
+import org.apache.doris.analysis.LiteralExpr;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.PartitionItem;
+import org.apache.doris.catalog.PartitionKey;
+import org.apache.doris.catalog.Type;
+import org.apache.doris.common.AnalysisException;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.base.Objects;
+import com.google.common.collect.BoundType;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Range;
+import com.google.common.collect.RangeMap;
+import com.google.common.collect.RangeSet;
+import com.google.common.collect.Sets;
+import com.google.common.collect.TreeRangeMap;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+public class RangePartitionPrunerV2 extends PartitionPrunerV2Base {
+    public RangePartitionPrunerV2(Map<Long, PartitionItem> idToPartitionItem,
+                                  List<Column> partitionColumns,
+                                  Map<String, ColumnRange> columnNameToRange) {
+        super(idToPartitionItem, partitionColumns, columnNameToRange);
+    }
+
+    @Override
+    RangeMap<ColumnBound, UniqueId> getCandidateRangeMap() {
+        RangeMap<ColumnBound, UniqueId> candidate = TreeRangeMap.create();
+        idToPartitionItem.forEach((id, item) -> {
+            Range<PartitionKey> range = item.getItems();
+            candidate.put(mapPartitionKeyRange(range, 0), new 
RangePartitionUniqueId(id));
+        });
+        return candidate;
+    }
+
+    /**
+     * This is just like the logic in v1 version, but we support disjunctive 
predicates here.
+     */
+    @Override
+    Collection<Long> pruneMultipleColumnPartition(
+        Map<Column, FinalFilters> columnToFilters) throws AnalysisException {
+        PartitionKey minKey = new PartitionKey();
+        PartitionKey maxKey = new PartitionKey();
+        RangeMap<PartitionKey, Long> rangeMap = TreeRangeMap.create();
+        idToPartitionItem.forEach((id, item) -> rangeMap.put(item.getItems(), 
id));
+        return doPruneMulti(columnToFilters, rangeMap, 0, minKey, maxKey);
+    }
+
+    @Override
+    FinalFilters getFinalFilters(ColumnRange columnRange,
+                                 Column column) throws AnalysisException {
+        if (!columnRange.hasFilter()) {
+            return FinalFilters.noFilters();
+        }
+
+        Optional<RangeSet<ColumnBound>> rangeSetOpt = 
columnRange.getRangeSet();
+        if (columnRange.hasConjunctiveIsNull()) {
+            if (!rangeSetOpt.isPresent()) {
+                // Only has conjunctive `is null` predicate.
+                return 
FinalFilters.create(Sets.newHashSet(getMinInfinityRange(column)));
+            } else {
+                // Has both conjunctive `is null` predicate and other 
predicates.
+                return FinalFilters.constantFalseFilters();
+            }
+        } else {
+            if (columnRange.hasDisjunctiveIsNull()) {
+                if (rangeSetOpt.isPresent() && !rangeSetOpt.get().isEmpty()) {
+                    RangeSet<ColumnBound> rangeSet = rangeSetOpt.get();
+                    rangeSet.add(getMinInfinityRange(column));
+                    return FinalFilters.create(rangeSet.asRanges());
+                } else {
+                    return 
FinalFilters.create(Sets.newHashSet(getMinInfinityRange(column)));
+                }
+            } else {
+                if (rangeSetOpt.isPresent()) {
+                    RangeSet<ColumnBound> rangeSet = rangeSetOpt.get();
+                    if (rangeSet.isEmpty()) {
+                        return FinalFilters.constantFalseFilters();
+                    } else {
+                        return FinalFilters.create(rangeSet.asRanges());
+                    }
+                } else {
+                    return FinalFilters.noFilters();
+                }
+            }
+        }
+    }
+
+    private Range<ColumnBound> getMinInfinityRange(Column column) throws 
AnalysisException {
+        ColumnBound value = ColumnBound.of(
+            
LiteralExpr.createInfinity(Type.fromPrimitiveType(column.getDataType()), 
false));
+        return Range.closed(value, value);
+    }
+
+    private Collection<Long> doPruneMulti(Map<Column, FinalFilters> 
columnToFilters,
+                                          RangeMap<PartitionKey, Long> 
rangeMap,
+                                          int columnIdx,
+                                          PartitionKey minKey,
+                                          PartitionKey maxKey) throws 
AnalysisException {
+
+        // the last column in partition Key
+        if (columnIdx == partitionColumns.size()) {
+            try {
+                return 
Lists.newArrayList(rangeMap.subRangeMap(Range.closed(minKey, maxKey))
+                    .asMapOfRanges().values());
+            } catch (IllegalArgumentException e) {
+                return Lists.newArrayList();
+            }
+        }
+
+        Column column = partitionColumns.get(columnIdx);
+        FinalFilters finalFilters = columnToFilters.get(column);
+        switch (finalFilters.type) {
+            case HAVE_FILTERS:
+                Set<Range<ColumnBound>> filters = finalFilters.filters;
+                Set<Long> result = Sets.newHashSet();
+                for (Range<ColumnBound> filter : filters) {
+                    if (filter.hasLowerBound() && filter.lowerBoundType() == 
BoundType.CLOSED &&
+                        filter.hasUpperBound() && filter.upperBoundType() == 
BoundType.CLOSED &&
+                        filter.lowerEndpoint() == filter.upperEndpoint()) {
+                        // Equal to predicate, e.g., col=1, the filter range 
is [1, 1].
+                        minKey.pushColumn(filter.lowerEndpoint().getValue(), 
column.getDataType());
+                        maxKey.pushColumn(filter.upperEndpoint().getValue(), 
column.getDataType());
+                        result.addAll(
+                            doPruneMulti(columnToFilters, rangeMap, columnIdx 
+ 1, minKey, maxKey));
+                        minKey.popColumn();
+                        maxKey.popColumn();
+                    } else {
+                        // Range that is not an equal to predicate.
+                        int lastColumnId = partitionColumns.size() - 1;
+                        int pushMinCount = 0;
+                        int pushMaxCount = 0;
+                        // lower bound
+                        if (filter.hasLowerBound()) {
+                            
minKey.pushColumn(filter.lowerEndpoint().getValue(), column.getDataType());
+                            pushMinCount++;
+                            if (filter.lowerBoundType() == BoundType.CLOSED && 
columnIdx != lastColumnId) {
+                                pushInfinity(minKey, columnIdx + 1, false);
+                                pushMinCount++;
+                            }
+                        } else {
+                            pushInfinity(minKey, columnIdx, false);
+                            pushMinCount++;
+                        }
+
+                        // upper bound
+                        if (filter.hasUpperBound()) {
+                            
maxKey.pushColumn(filter.upperEndpoint().getValue(), column.getDataType());
+                            pushMaxCount++;
+                            if (filter.upperBoundType() == BoundType.CLOSED && 
columnIdx != lastColumnId) {
+                                pushInfinity(maxKey, columnIdx + 1, true);
+                                pushMaxCount++;
+                            }
+                        } else {
+                            pushInfinity(maxKey, columnIdx, true);
+                            pushMaxCount++;
+                        }
+
+                        try {
+                            BoundType lowerType = filter.hasLowerBound() &&
+                                filter.lowerBoundType() == BoundType.CLOSED ?
+                                BoundType.CLOSED : BoundType.OPEN;
+                            BoundType upperType = filter.hasUpperBound() &&
+                                filter.upperBoundType() == BoundType.CLOSED ?
+                                BoundType.CLOSED : BoundType.OPEN;
+                            result.addAll(rangeMap.subRangeMap(
+                                Range.range(minKey, lowerType, maxKey, 
upperType))
+                                .asMapOfRanges().values());
+                        } catch (IllegalArgumentException e) {
+                        }
+
+                        for (; pushMinCount > 0; pushMinCount--) {
+                            minKey.popColumn();
+                        }
+                        for (; pushMaxCount > 0; pushMaxCount--) {
+                            maxKey.popColumn();
+                        }
+                    }
+                }
+                return result;
+            case CONSTANT_FALSE_FILTERS:
+                return Collections.emptyList();
+            case NO_FILTERS:
+            default:
+                return noFiltersResult(minKey, maxKey, columnIdx, rangeMap);
+        }
+    }
+
+    private void pushInfinity(PartitionKey key, int columnIdx,
+                              boolean isMax) throws AnalysisException {
+        Column column = partitionColumns.get(columnIdx);
+        
key.pushColumn(LiteralExpr.createInfinity(Type.fromPrimitiveType(column.getDataType()),
 isMax),
+            column.getDataType());
+    }
+
+    private Collection<Long> noFiltersResult(PartitionKey minKey, PartitionKey 
maxKey,
+                                             int columnIdx,
+                                             RangeMap<PartitionKey, Long> 
rangeMap) throws AnalysisException {
+        pushInfinity(minKey, columnIdx, false);
+        pushInfinity(maxKey, columnIdx, true);
+        Collection<Long> result;
+        try {
+            result = Lists.newArrayList(
+                rangeMap.subRangeMap(Range.closed(minKey, 
maxKey)).asMapOfRanges().values());
+        } catch (IllegalArgumentException e) {
+            result = Lists.newArrayList();
+        }
+        minKey.popColumn();
+        maxKey.popColumn();
+        return result;
+    }
+
+    private static class RangePartitionUniqueId implements UniqueId {
+        private final long partitionId;
+
+        public RangePartitionUniqueId(long partitionId) {
+            this.partitionId = partitionId;
+        }
+
+        @Override
+        public long getPartitionId() {
+            return partitionId;
+        }
+
+        @Override
+        public String toString() {
+            return MoreObjects.toStringHelper(this)
+                .add("partitionId", partitionId)
+                .toString();
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+            RangePartitionUniqueId that = (RangePartitionUniqueId) o;
+            return partitionId == that.partitionId;
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hashCode(partitionId);
+        }
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
index a8bce22..bf7d907 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
@@ -17,13 +17,18 @@
 
 package org.apache.doris.planner;
 
+import com.google.common.collect.Lists;
+import com.google.common.collect.Range;
+
 import org.apache.doris.analysis.Analyzer;
 import org.apache.doris.analysis.BinaryPredicate;
+import org.apache.doris.analysis.CompoundPredicate;
 import org.apache.doris.analysis.Expr;
 import org.apache.doris.analysis.InPredicate;
 import org.apache.doris.analysis.IsNullPredicate;
 import org.apache.doris.analysis.LiteralExpr;
 import org.apache.doris.analysis.NullLiteral;
+import org.apache.doris.analysis.PredicateUtils;
 import org.apache.doris.analysis.SlotDescriptor;
 import org.apache.doris.analysis.SlotRef;
 import org.apache.doris.analysis.TupleDescriptor;
@@ -38,9 +43,11 @@ import com.google.common.collect.Maps;
 
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
+import org.glassfish.jersey.internal.guava.Sets;
 
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 /**
  * Representation of the common elements of all scan nodes.
@@ -48,8 +55,12 @@ import java.util.Map;
 abstract public class ScanNode extends PlanNode {
     private final static Logger LOG = LogManager.getLogger(ScanNode.class);
     protected final TupleDescriptor desc;
+    // Use this if partition_prune_algorithm_version is 1.
     protected Map<String, PartitionColumnFilter> columnFilters = 
Maps.newHashMap();
+    // Use this if partition_prune_algorithm_version is 2.
+    protected Map<String, ColumnRange> columnNameToRange = Maps.newHashMap();
     protected String sortColumn = null;
+    protected Analyzer analyzer;
 
     public ScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName) {
         super(id, desc.getId().asList(), planNodeName);
@@ -59,6 +70,7 @@ abstract public class ScanNode extends PlanNode {
     @Override
     public void init(Analyzer analyzer) throws UserException {
         super.init(analyzer);
+        this.analyzer = analyzer;
         // materialize conjuncts in where
         analyzer.materializeSlots(conjuncts);
     }
@@ -75,7 +87,9 @@ abstract public class ScanNode extends PlanNode {
         return result;
     }
 
-    public TupleDescriptor getTupleDesc() { return desc; }
+    public TupleDescriptor getTupleDesc() {
+        return desc;
+    }
 
     public void setSortColumn(String column) {
         sortColumn = column;
@@ -111,10 +125,147 @@ abstract public class ScanNode extends PlanNode {
             if (null == slotDesc) {
                 continue;
             }
+            // Set `columnFilters` all the time because `DistributionPruner` 
also use this.
+            // Maybe we could use `columnNameToRange` for `DistributionPruner` 
and
+            // only create `columnFilters` when 
`partition_prune_algorithm_version` is 1.
             PartitionColumnFilter keyFilter = createPartitionFilter(slotDesc, 
conjuncts);
             if (null != keyFilter) {
                 columnFilters.put(column.getName(), keyFilter);
             }
+
+            if 
(analyzer.getContext().getSessionVariable().getPartitionPruneAlgorithmVersion() 
== 2) {
+                ColumnRange columnRange = createColumnRange(slotDesc, 
conjuncts);
+                if (columnRange != null) {
+                    columnNameToRange.put(column.getName(), columnRange);
+                }
+            }
+
+        }
+    }
+
+    private ColumnRange createColumnRange(SlotDescriptor desc,
+                                          List<Expr> conjuncts) {
+        ColumnRange result = ColumnRange.create();
+        for (Expr expr : conjuncts) {
+            if (!expr.isBound(desc.getId())) {
+                continue;
+            }
+
+            if (expr instanceof CompoundPredicate &&
+                ((CompoundPredicate) expr).getOp() == 
CompoundPredicate.Operator.OR) {
+                // Try to get column filter from disjunctive predicates.
+                List<Expr> disjunctivePredicates = 
PredicateUtils.splitDisjunctivePredicates(expr);
+                if (disjunctivePredicates.isEmpty()) {
+                    continue;
+                }
+
+                List<Range<ColumnBound>> disjunctiveRanges = 
Lists.newArrayList();
+                Set<Boolean> hasIsNull = Sets.newHashSet();
+                boolean allMatch = disjunctivePredicates.stream().allMatch(e 
-> {
+                    ColumnRanges ranges = expressionToRanges(e, desc);
+                    switch (ranges.type) {
+                        case IS_NULL:
+                            hasIsNull.add(true);
+                            return true;
+                        case CONVERT_SUCCESS:
+                            disjunctiveRanges.addAll(ranges.ranges);
+                            return true;
+                        case CONVERT_FAILURE:
+                        default:
+                            return false;
+
+                    }
+                });
+                if (allMatch && !(disjunctiveRanges.isEmpty() && 
hasIsNull.isEmpty())) {
+                    result.intersect(disjunctiveRanges);
+                    result.setHasDisjunctiveIsNull(!hasIsNull.isEmpty());
+                }
+            } else {
+                // Try to get column filter from conjunctive predicates.
+                ColumnRanges ranges = expressionToRanges(expr, desc);
+                switch (ranges.type) {
+                    case IS_NULL:
+                        result.setHasConjunctiveIsNull(true);
+                        break;
+                    case CONVERT_SUCCESS:
+                        result.intersect(ranges.ranges);
+                    case CONVERT_FAILURE:
+                    default:
+                        break;
+                }
+            }
+        }
+        return result;
+    }
+
+    private ColumnRanges expressionToRanges(Expr expr,
+                                            SlotDescriptor desc) {
+        if (expr instanceof IsNullPredicate) {
+            IsNullPredicate isNullPredicate = (IsNullPredicate) expr;
+            if (isNullPredicate.isSlotRefChildren() && 
!isNullPredicate.isNotNull()) {
+                return ColumnRanges.createIsNull();
+            }
+        }
+
+        List<Range<ColumnBound>> result = Lists.newArrayList();
+        if (expr instanceof BinaryPredicate) {
+            BinaryPredicate binPred = (BinaryPredicate) expr;
+            Expr slotBinding = binPred.getSlotBinding(desc.getId());
+
+            if (slotBinding == null || !slotBinding.isConstant() ||
+                !(slotBinding instanceof LiteralExpr)) {
+                return ColumnRanges.createFailure();
+            }
+
+            LiteralExpr value = (LiteralExpr) slotBinding;
+            switch (binPred.getOp()) {
+                case EQ:
+                    ColumnBound bound = ColumnBound.of(value);
+                    result.add(Range.closed(bound, bound));
+                    break;
+                case LE:
+                    result.add(Range.atMost(ColumnBound.of(value)));
+                    break;
+                case LT:
+                    result.add(Range.lessThan(ColumnBound.of(value)));
+                    break;
+                case GE:
+                    result.add(Range.atLeast(ColumnBound.of(value)));
+                    break;
+                case GT:
+                    result.add(Range.greaterThan(ColumnBound.of(value)));
+                    break;
+                case NE:
+                    ColumnBound b = ColumnBound.of(value);
+                    result.add(Range.greaterThan(b));
+                    result.add(Range.lessThan(b));
+                    break;
+                default:
+                    break;
+            }
+        } else if (expr instanceof InPredicate) {
+            InPredicate inPredicate = (InPredicate) expr;
+            if (!inPredicate.isLiteralChildren() || inPredicate.isNotIn()) {
+                return ColumnRanges.createFailure();
+            }
+
+            if (!(inPredicate.getChild(0).unwrapExpr(false) instanceof 
SlotRef)) {
+                // If child(0) of the in predicate is not a SlotRef,
+                // then other children of in predicate should not be used as a 
condition for partition prune.
+                return ColumnRanges.createFailure();
+            }
+
+            for (int i = 1; i < inPredicate.getChildren().size(); ++i) {
+                ColumnBound bound =
+                    ColumnBound.of((LiteralExpr) inPredicate.getChild(i));
+                result.add(Range.closed(bound, bound));
+            }
+        }
+
+        if (result.isEmpty()) {
+            return ColumnRanges.createFailure();
+        } else {
+            return ColumnRanges.create(result);
         }
     }
 
@@ -124,14 +275,15 @@ abstract public class ScanNode extends PlanNode {
             if (!expr.isBound(desc.getId())) {
                 continue;
             }
+
             if (expr instanceof BinaryPredicate) {
                 BinaryPredicate binPredicate = (BinaryPredicate) expr;
-                Expr slotBinding = binPredicate.getSlotBinding(desc.getId());
-                if (slotBinding == null || !slotBinding.isConstant()) {
+                if (binPredicate.getOp() == BinaryPredicate.Operator.NE) {
                     continue;
                 }
-                if (binPredicate.getOp() == BinaryPredicate.Operator.NE
-                        || !(slotBinding instanceof LiteralExpr)) {
+
+                Expr slotBinding = binPredicate.getSlotBinding(desc.getId());
+                if (slotBinding == null || !slotBinding.isConstant() || 
!(slotBinding instanceof LiteralExpr)) {
                     continue;
                 }
 
@@ -193,11 +345,47 @@ abstract public class ScanNode extends PlanNode {
                 partitionColumnFilter.setUpperBound(nullLiteral, true);
                 break;
             }
+
         }
         LOG.debug("partitionColumnFilter: {}", partitionColumnFilter);
         return partitionColumnFilter;
     }
 
+    private static class ColumnRanges {
+        enum Type {
+            // Expression is `is null` predicate.
+            IS_NULL,
+            // Succeed to convert expression to ranges.
+            CONVERT_SUCCESS,
+            // Failed to convert expression to ranges.
+            CONVERT_FAILURE
+        }
+
+        final Type type;
+        final List<Range<ColumnBound>> ranges;
+
+        private ColumnRanges(Type type, List<Range<ColumnBound>> ranges) {
+            this.type = type;
+            this.ranges = ranges;
+        }
+
+        private static final ColumnRanges IS_NULL = new 
ColumnRanges(Type.IS_NULL, null);
+
+        private static final ColumnRanges CONVERT_FAILURE = new 
ColumnRanges(Type.CONVERT_FAILURE, null);
+
+        public static ColumnRanges createIsNull() {
+            return IS_NULL;
+        }
+
+        public static ColumnRanges createFailure() {
+            return CONVERT_FAILURE;
+        }
+
+        public static ColumnRanges create(List<Range<ColumnBound>> ranges) {
+            return new ColumnRanges(Type.CONVERT_SUCCESS, ranges);
+        }
+    }
+
     @Override
     public String toString() {
         return MoreObjects.toStringHelper(this).add("tid", 
desc.getId().asInt()).add("tblName",
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 7ccaffa..63d2d82 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -154,7 +154,9 @@ public class SessionVariable implements Serializable, 
Writable {
     public static final long DEFAULT_INSERT_VISIBLE_TIMEOUT_MS = 10_000;
 
     public static final String EXTRACT_WIDE_RANGE_EXPR = 
"extract_wide_range_expr";
-    
+
+    public static final String PARTITION_PRUNE_ALGORITHM_VERSION = 
"partition_prune_algorithm_version";
+
     public static final long MIN_INSERT_VISIBLE_TIMEOUT_MS = 1000; // If user 
set a very small value, use this value instead.
 
     public static final String ENABLE_VECTORIZED_ENGINE = 
"enable_vectorized_engine";
@@ -365,6 +367,10 @@ public class SessionVariable implements Serializable, 
Writable {
 
     @VariableMgr.VarAttr(name = EXTRACT_WIDE_RANGE_EXPR, needForward = true)
     public boolean extractWideRangeExpr = true;
+
+    @VariableMgr.VarAttr(name = PARTITION_PRUNE_ALGORITHM_VERSION, needForward 
= true)
+    public int partitionPruneAlgorithmVersion = 2;
+
     @VariableMgr.VarAttr(name = RUNTIME_FILTER_MODE)
     private String runtimeFilterMode = "GLOBAL";
     @VariableMgr.VarAttr(name = RUNTIME_BLOOM_FILTER_SIZE)
@@ -815,6 +821,10 @@ public class SessionVariable implements Serializable, 
Writable {
         return extractWideRangeExpr;
     }
 
+    public int getPartitionPruneAlgorithmVersion() {
+        return partitionPruneAlgorithmVersion;
+    }
+
     public int getCpuResourceLimit() {
         return cpuResourceLimit;
     }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/analysis/ListPartitionPrunerTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/analysis/ListPartitionPrunerTest.java
index b93cf41..4cd409a 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/analysis/ListPartitionPrunerTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/analysis/ListPartitionPrunerTest.java
@@ -17,38 +17,32 @@
 
 package org.apache.doris.analysis;
 
-import org.apache.doris.common.Config;
+import org.apache.doris.catalog.Catalog;
 import org.apache.doris.common.FeConstants;
-import org.apache.doris.utframe.DorisAssert;
 import org.apache.doris.utframe.UtFrameUtils;
 
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
-import org.junit.Rule;
 import org.junit.Test;
-import org.junit.rules.ExpectedException;
 
 import java.util.UUID;
 
-public class ListPartitionPrunerTest {
-    private static String runningDir = "fe/mocked/DemoTest/" + 
UUID.randomUUID().toString() + "/";
-    private static DorisAssert dorisAssert;
-
-    @Rule
-    public ExpectedException expectedEx = ExpectedException.none();
-
-    @AfterClass
-    public static void tearDown() throws Exception {
-        UtFrameUtils.cleanDorisFeDir(runningDir);
-    }
+public class ListPartitionPrunerTest extends PartitionPruneTestBase {
 
     @BeforeClass
-    public static void setUp() throws Exception {
-        Config.enable_batch_delete_by_default = true;
+    public static void beforeClass() throws Exception {
         FeConstants.runningUnitTest = true;
+        runningDir = "fe/mocked/ListPartitionPrunerTest/" + 
UUID.randomUUID().toString() + "/";
         UtFrameUtils.createDorisCluster(runningDir);
 
-        String createSinglePartColWithSinglePartKey = "create table test.t1\n"
+        connectContext = UtFrameUtils.createDefaultCtx();
+
+        String createDbStmtStr = "create database test;";
+        CreateDbStmt createDbStmt = (CreateDbStmt) 
UtFrameUtils.parseAndAnalyzeStmt(createDbStmtStr, connectContext);
+        Catalog.getCurrentCatalog().createDb(createDbStmt);
+
+        String createSinglePartColWithSinglePartKey =
+            "create table test.t1\n"
                 + "(k1 int not null, k2 varchar(128), k3 int, v1 int, v2 
int)\n"
                 + "partition by list(k1)\n"
                 + "(\n"
@@ -57,7 +51,8 @@ public class ListPartitionPrunerTest {
                 + ")\n"
                 + "distributed by hash(k2) buckets 1\n"
                 + "properties('replication_num' = '1');";
-        String createSinglePartColWithMultiPartKey = "create table test.t2\n"
+        String createSinglePartColWithMultiPartKey =
+            "create table test.t2\n"
                 + "(k1 int not null, k2 varchar(128), k3 int, v1 int, v2 
int)\n"
                 + "partition by list(k1)\n"
                 + "(\n"
@@ -67,7 +62,8 @@ public class ListPartitionPrunerTest {
                 + ")\n"
                 + "distributed by hash(k2) buckets 1\n"
                 + "properties('replication_num' = '1');";
-        String createMultiPartColWithSinglePartKey = "create table test.t3\n"
+        String createMultiPartColWithSinglePartKey =
+            "create table test.t3\n"
                 + "(k1 int not null, k2 varchar(128) not null, k3 int, v1 int, 
v2 int)\n"
                 + "partition by list(k1, k2)\n"
                 + "(\n"
@@ -76,7 +72,8 @@ public class ListPartitionPrunerTest {
                 + ")\n"
                 + "distributed by hash(k2) buckets 1\n"
                 + "properties('replication_num' = '1');";
-        String createMultiPartColWithMultiPartKey = "create table test.t4\n"
+        String createMultiPartColWithMultiPartKey =
+            "create table test.t4\n"
                 + "(k1 int not null, k2 varchar(128) not null, k3 int, v1 int, 
v2 int)\n"
                 + "partition by list(k1, k2)\n"
                 + "(\n"
@@ -86,86 +83,59 @@ public class ListPartitionPrunerTest {
                 + ")\n"
                 + "distributed by hash(k2) buckets 1\n"
                 + "properties('replication_num' = '1');";
-        dorisAssert = new DorisAssert();
-        dorisAssert.withDatabase("test").useDatabase("test");
-        dorisAssert.withTable(createSinglePartColWithSinglePartKey)
-                .withTable(createSinglePartColWithMultiPartKey)
-                .withTable(createMultiPartColWithSinglePartKey)
-                .withTable(createMultiPartColWithMultiPartKey);
-    }
 
-    @Test
-    public void testSelectWithPartition() throws Exception {
-        String sql = "select * from t1 partition p1;";
-        dorisAssert.query(sql).explainContains("partitions=1/2");
-
-        sql = "select * from t2 partition (p2, p3);";
-        dorisAssert.query(sql).explainContains("partitions=2/3");
+        createTable(createSinglePartColWithSinglePartKey);
+        createTable(createSinglePartColWithMultiPartKey);
+        createTable(createMultiPartColWithSinglePartKey);
+        createTable(createMultiPartColWithMultiPartKey);
+    }
 
-        sql = "select * from t3 partition (p1, p2);";
-        dorisAssert.query(sql).explainContains("partitions=2/2");
+    @AfterClass
+    public static void tearDown() throws Exception {
+        UtFrameUtils.cleanDorisFeDir(runningDir);
+    }
 
-        sql = "select * from t4 partition p2;";
-        dorisAssert.query(sql).explainContains("partitions=1/3");
+    private void initTestCases() {
+        // Select by partition name
+        addCase("select * from test.t1 partition p1;", "partitions=1/2", 
"partitions=1/2");
+        addCase("select * from test.t2 partition (p2, p3);", "partitions=2/3", 
"partitions=2/3");
+        addCase("select * from test.t3 partition (p1, p2);", "partitions=2/2", 
"partitions=2/2");
+        addCase("select * from test.t4 partition p2;", "partitions=1/3", 
"partitions=1/3");
+
+        // Single partition column
+        addCase("select * from test.t2 where k1 < 7", "partitions=2/3", 
"partitions=2/3");
+        addCase("select * from test.t2 where k1 = 1;", "partitions=1/3", 
"partitions=1/3");
+        addCase("select * from test.t2 where k1 in (1, 2);", "partitions=2/3", 
"partitions=2/3");
+        addCase("select * from test.t2 where k1 >= 6;", "partitions=2/3", 
"partitions=2/3");
+        addCase("select * from test.t2 where k1 < 8 and k1 > 6;", 
"partitions=1/3", "partitions=1/3");
+        addCase("select * from test.t2 where k2 = \"beijing\";", 
"partitions=3/3", "partitions=3/3");
+        addCase("select * from test.t1 where k1 != 1", "partitions=2/2", 
"partitions=1/2");
+        addCase("select * from test.t4 where k2 != \"beijing\"", 
"partitions=3/3", "partitions=2/3");
+
+        // Multiple partition columns
+        addCase("select * from test.t4 where k1 = 2;", "partitions=2/3", 
"partitions=2/3");
+        addCase("select * from test.t4 where k2 = \"tianjin\";", 
"partitions=1/3", "partitions=1/3");
+        addCase("select * from test.t4 where k1 = 1 and k2 = \"shanghai\";", 
"partitions=2/3", "partitions=1/3");
+        addCase("select * from test.t4 where k1 in (1, 3) and k2 in 
(\"tianjin\", \"shanghai\");", "partitions=2/3", "partitions=1/3");
+        addCase("select * from test.t4 where k1 in (1, 3);", "partitions=2/3", 
"partitions=2/3");
+        addCase("select * from test.t4 where k2 in (\"tianjin\", 
\"shanghai\");", "partitions=2/3", "partitions=2/3");
+        addCase("select * from test.t4 where k1 < 3;", "partitions=3/3", 
"partitions=3/3");
+        addCase("select * from test.t4 where k1 > 2;", "partitions=1/3", 
"partitions=1/3");
+        addCase("select * from test.t4 where k2 <\"shanghai\";", 
"partitions=2/3", "partitions=2/3");
+        addCase("select * from test.t4 where k2 >=\"shanghai\";", 
"partitions=2/3", "partitions=2/3");
+        addCase("select * from test.t4 where k1 > 1 and k2 < \"shanghai\";", 
"partitions=2/3", "partitions=1/3");
+        addCase("select * from test.t4 where k1 >= 2 and k2 = \"shanghai\";", 
"partitions=2/3", "partitions=1/3");
+
+        // Disjunctive predicates
+        addCase("select * from test.t2 where k1=1 or k1=4", "partitions=3/3", 
"partitions=2/3");
+        addCase("select * from test.t4 where k1=1 or k1=3", "partitions=3/3", 
"partitions=2/3");
+        addCase("select * from test.t4 where k2=\"tianjin\" or 
k2=\"shanghai\"", "partitions=3/3", "partitions=2/3");
+        addCase("select * from test.t4 where k1 > 1 or k2 < \"shanghai\"", 
"partitions=3/3", "partitions=3/3");
     }
 
     @Test
     public void testPartitionPrune() throws Exception {
-        // single partition column
-        String sql = "select * from t2 where k1 < 7";
-        dorisAssert.query(sql).explainContains("partitions=2/3");
-
-        sql = "select * from t2 where k1 = 1;";
-        dorisAssert.query(sql).explainContains("partitions=1/3");
-
-        sql = "select * from t2 where k1 in (1, 2);";
-        dorisAssert.query(sql).explainContains("partitions=2/3");
-
-        sql = "select * from t2 where k1 >= 6;";
-        dorisAssert.query(sql).explainContains("partitions=2/3");
-
-        sql = "select * from t2 where k1 < 8 and k1 > 6;";
-        dorisAssert.query(sql).explainContains("partitions=1/3");
-
-        sql = "select * from t2 where k2 = \"beijing\";";
-        dorisAssert.query(sql).explainContains("partitions=3/3");
-
-        // multi partition columns
-        sql = "select * from t4 where k1 = 2;";
-        dorisAssert.query(sql).explainContains("partitions=2/3");
-
-        sql = "select * from t4 where k2 = \"tianjin\";";
-        dorisAssert.query(sql).explainContains("partitions=1/3");
-
-        sql = "select * from t4 where k1 = 1 and k2 = \"shanghai\";";
-        dorisAssert.query(sql).explainContains("partitions=2/3");
-
-        sql = "select * from t4 where k1 in (1, 3) and k2 in (\"tianjin\", 
\"shanghai\");";
-        dorisAssert.query(sql).explainContains("partitions=2/3");
-
-        sql = "select * from t4 where k1 in (1, 3);";
-        dorisAssert.query(sql).explainContains("partitions=2/3");
-
-        sql = "select * from t4 where k2 in (\"tianjin\", \"shanghai\");";
-        dorisAssert.query(sql).explainContains("partitions=2/3");
-
-        sql = "select * from t4 where k1 < 3;";
-        dorisAssert.query(sql).explainContains("partitions=3/3");
-
-        sql = "select * from t4 where k1 > 2;";
-        dorisAssert.query(sql).explainContains("partitions=1/3");
-
-        sql = "select * from t4 where k2 <\"shanghai\";";
-        dorisAssert.query(sql).explainContains("partitions=2/3");
-
-        sql = "select * from t4 where k2 >=\"shanghai\";";
-        dorisAssert.query(sql).explainContains("partitions=2/3");
-
-        sql = "select * from t4 where k1 > 1 and k2 < \"shanghai\";";
-        dorisAssert.query(sql).explainContains("partitions=2/3");
-
-        sql = "select * from t4 where k1 >= 2 and k2 = \"shanghai\";";
-        dorisAssert.query(sql).explainContains("partitions=2/3");
+        initTestCases();
+        doTest();
     }
-
 }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/analysis/PartitionPruneTestBase.java
 
b/fe/fe-core/src/test/java/org/apache/doris/analysis/PartitionPruneTestBase.java
new file mode 100644
index 0000000..2f3dc67
--- /dev/null
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/analysis/PartitionPruneTestBase.java
@@ -0,0 +1,71 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.analysis;
+
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.utframe.UtFrameUtils;
+
+import org.junit.Assert;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class PartitionPruneTestBase {
+    protected static String runningDir;
+    protected static ConnectContext connectContext;
+
+    protected List<TestCase> cases = new ArrayList<>();
+
+    protected void doTest() throws Exception {
+        for (RangePartitionPruneTest.TestCase testCase : cases) {
+            connectContext.getSessionVariable().partitionPruneAlgorithmVersion 
= 1;
+            assertExplainContains(1, testCase.sql, testCase.v1Result);
+            connectContext.getSessionVariable().partitionPruneAlgorithmVersion 
= 2;
+            assertExplainContains(2, testCase.sql, testCase.v2Result);
+        }
+    }
+
+    protected static void createTable(String sql) throws Exception {
+        CreateTableStmt createTableStmt = (CreateTableStmt) 
UtFrameUtils.parseAndAnalyzeStmt(sql, connectContext);
+        Catalog.getCurrentCatalog().createTable(createTableStmt);
+    }
+
+    private void assertExplainContains(int version, String sql, String 
subString) throws Exception {
+        Assert.assertTrue(String.format("version=%d, sql=%s, expectResult=%s",
+            version, sql, subString),
+            UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, "explain " + sql)
+                .contains(subString));
+    }
+
+    protected void addCase(String sql, String v1Result, String v2Result) {
+        cases.add(new TestCase(sql, v1Result, v2Result));
+    }
+
+    protected static class TestCase {
+        final String sql;
+        final String v1Result;
+        final String v2Result;
+
+        public TestCase(String sql, String v1Result, String v2Result) {
+            this.sql = sql;
+            this.v1Result = v1Result;
+            this.v2Result = v2Result;
+        }
+    }
+}
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/analysis/RangePartitionPruneTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/analysis/RangePartitionPruneTest.java
new file mode 100644
index 0000000..cfaf59a
--- /dev/null
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/analysis/RangePartitionPruneTest.java
@@ -0,0 +1,213 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.analysis;
+
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.common.FeConstants;
+import org.apache.doris.utframe.UtFrameUtils;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.UUID;
+
+public class RangePartitionPruneTest extends PartitionPruneTestBase {
+
+    @BeforeClass
+    public static void beforeClass() throws Exception {
+        FeConstants.runningUnitTest = true;
+        runningDir = "fe/mocked/RangePartitionPruneTest/" + 
UUID.randomUUID().toString() + "/";
+        UtFrameUtils.createDorisCluster(runningDir);
+
+        connectContext = UtFrameUtils.createDefaultCtx();
+
+        String createDbStmtStr = "create database test;";
+        CreateDbStmt createDbStmt = (CreateDbStmt) 
UtFrameUtils.parseAndAnalyzeStmt(createDbStmtStr, connectContext);
+        Catalog.getCurrentCatalog().createDb(createDbStmt);
+
+        String singleColumnPartitionTable =
+            "CREATE TABLE `test`.`t1` (\n" +
+                "  `dt` int(11) NULL COMMENT \"\",\n" +
+                "  `k1` int(11) NULL COMMENT \"\",\n" +
+                "  `k2` int(11) NULL COMMENT \"\",\n" +
+                "  `k3` int(11) NULL COMMENT \"\",\n" +
+                "  `k4` int(11) NULL COMMENT \"\"\n" +
+                ") " +
+                "DUPLICATE KEY(`dt`, `k1`, `k2`, `k3`, `k4`)\n" +
+                "PARTITION BY RANGE(`dt`)\n" +
+                "(PARTITION p20211121 VALUES LESS THAN (\"20211121\"),\n" +
+                "PARTITION p20211122 VALUES [(\"20211121\"), 
(\"20211122\")),\n" +
+                "PARTITION p20211123 VALUES [(\"20211122\"), 
(\"20211123\")),\n" +
+                "PARTITION p20211124 VALUES [(\"20211123\"), 
(\"20211124\")),\n" +
+                "PARTITION p20211125 VALUES [(\"20211124\"), 
(\"20211125\")),\n" +
+                "PARTITION p20211126 VALUES [(\"20211125\"), 
(\"20211126\")),\n" +
+                "PARTITION p20211127 VALUES [(\"20211126\"), 
(\"20211127\")),\n" +
+                "PARTITION p20211128 VALUES [(\"20211127\"), 
(\"20211128\")))\n" +
+                "DISTRIBUTED BY HASH(`k1`) BUCKETS 60\n" +
+                "PROPERTIES('replication_num' = '1');";
+
+        String notNullSingleColumnPartitionTable =
+            "CREATE TABLE `test`.`not_null` (\n" +
+                "  `dt` int(11) NULL COMMENT \"\",\n" +
+                "  `k1` int(11) NULL COMMENT \"\",\n" +
+                "  `k2` int(11) NULL COMMENT \"\",\n" +
+                "  `k3` int(11) NULL COMMENT \"\",\n" +
+                "  `k4` int(11) NULL COMMENT \"\"\n" +
+                ") " +
+                "DUPLICATE KEY(`dt`, `k1`, `k2`, `k3`, `k4`)\n" +
+                "PARTITION BY RANGE(`dt`)\n" +
+                "(PARTITION p20211122 VALUES [(\"20211121\"), 
(\"20211122\")),\n" +
+                "PARTITION p20211123 VALUES [(\"20211122\"), 
(\"20211123\")),\n" +
+                "PARTITION p20211124 VALUES [(\"20211123\"), 
(\"20211124\")),\n" +
+                "PARTITION p20211125 VALUES [(\"20211124\"), 
(\"20211125\")),\n" +
+                "PARTITION p20211126 VALUES [(\"20211125\"), 
(\"20211126\")),\n" +
+                "PARTITION p20211127 VALUES [(\"20211126\"), 
(\"20211127\")),\n" +
+                "PARTITION p20211128 VALUES [(\"20211127\"), 
(\"20211128\")))\n" +
+                "DISTRIBUTED BY HASH(`k1`) BUCKETS 60\n" +
+                "PROPERTIES('replication_num' = '1');";
+
+        String multipleColumnsPartitionTable =
+            "CREATE TABLE `test`.`t2` (\n" +
+                "  `k1` int(11) NULL COMMENT \"\",\n" +
+                "  `k2` int(11) NULL COMMENT \"\",\n" +
+                "  `k3` int(11) NULL COMMENT \"\",\n" +
+                "  `k4` int(11) NULL COMMENT \"\",\n" +
+                "  `k5` int(11) NULL COMMENT \"\"\n" +
+                ") \n" +
+                "PARTITION BY RANGE(`k1`, `k2`)\n" +
+                "(PARTITION p1 VALUES LESS THAN (\"3\", \"1\"),\n" +
+                "PARTITION p2 VALUES [(\"3\", \"1\"), (\"7\", \"10\")),\n" +
+                "PARTITION p3 VALUES [(\"7\", \"10\"), (\"8\", \"5\")),\n" +
+                "PARTITION p4 VALUES [(\"10\", \"10\"), (\"12\", \"5\")),\n" +
+                "PARTITION p5 VALUES [(\"15\", \"6\"), (\"20\", \"11\")),\n" +
+                "PARTITION p6 VALUES [(\"20\", \"11\"), (\"22\", \"3\")),\n" +
+                "PARTITION p7 VALUES [(\"23\", \"3\"), (\"23\", \"4\")),\n" +
+                "PARTITION p8 VALUES [(\"23\", \"4\"), (\"23\", \"20\")),\n" +
+                "PARTITION p9 VALUES [(\"24\", \"1\"), (\"25\", \"9\")))\n" +
+                "DISTRIBUTED BY HASH(`k1`) BUCKETS 10\n" +
+                "PROPERTIES ('replication_num' = '1');";
+
+        String notNullMultipleColumnsPartitionTable =
+            "CREATE TABLE `test`.`multi_not_null` (\n" +
+                "  `k1` int(11) NULL COMMENT \"\",\n" +
+                "  `k2` int(11) NULL COMMENT \"\",\n" +
+                "  `k3` int(11) NULL COMMENT \"\",\n" +
+                "  `k4` int(11) NULL COMMENT \"\",\n" +
+                "  `k5` int(11) NULL COMMENT \"\"\n" +
+                ") \n" +
+                "PARTITION BY RANGE(`k1`, `k2`)\n" +
+                "(PARTITION p1 VALUES [(\"3\", \"1\"), (\"3\", \"3\")),\n" +
+                "PARTITION p2 VALUES [(\"4\", \"2\"), (\"4\", \"6\")))\n" +
+                "DISTRIBUTED BY HASH(`k1`) BUCKETS 10\n" +
+                "PROPERTIES ('replication_num' = '1');";
+
+        createTable(singleColumnPartitionTable);
+        createTable(notNullSingleColumnPartitionTable);
+        createTable(multipleColumnsPartitionTable);
+        createTable(notNullMultipleColumnsPartitionTable);
+    }
+
+    @AfterClass
+    public static void tearDown() throws Exception {
+        UtFrameUtils.cleanDorisFeDir(runningDir);
+    }
+
+    private void initTestCases() {
+        // 1. Single partition column
+        // no filters
+        addCase("select * from test.t1", "partitions=8/8", "partitions=8/8");
+        // equal to
+        addCase("select * from test.t1 where dt=20211122", "partitions=1/8", 
"partitions=1/8");
+        // less than
+        addCase("select * from test.t1 where dt<20211122", "partitions=2/8", 
"partitions=2/8");
+        // less than or equal
+        addCase("select * from test.t1 where dt<=20211122", "partitions=3/8", 
"partitions=3/8");
+        // greater than
+        addCase("select * from test.t1 where dt>20211122", "partitions=6/8", 
"partitions=6/8");
+        // greater than or equal
+        addCase("select * from test.t1 where dt>=20211122", "partitions=6/8", 
"partitions=6/8");
+        // in
+        addCase("select * from test.t1 where dt in (20211124, 20211126, 
20211122)", "partitions=3/8", "partitions=3/8");
+        // is null
+        addCase("select * from test.t1 where dt is null", "partitions=1/8", 
"partitions=1/8");
+        addCase("select * from test.not_null where dt is null", 
"partitions=0/7", "partitions=0/7");
+        // not equal to
+        addCase("select * from test.t1 where dt!=20211122", "partitions=8/8", 
"partitions=8/8");
+
+        // 2. Multiple partition columns
+        // no filters
+        addCase("select * from test.t2", "partitions=9/9", "partitions=9/9");
+        // equal to
+        addCase("select * from test.t2 where k1=7", "partitions=2/9", 
"partitions=2/9");
+        addCase("select * from test.t2 where k2=7", "partitions=9/9", 
"partitions=9/9");
+        // less than
+        addCase("select * from test.t2 where k1<7", "partitions=2/9", 
"partitions=2/9");
+        addCase("select * from test.t2 where k2<7", "partitions=9/9", 
"partitions=9/9");
+        // less than or equal
+        addCase("select * from test.t2 where k1<=7", "partitions=3/9", 
"partitions=3/9");
+        addCase("select * from test.t2 where k2>7", "partitions=9/9", 
"partitions=9/9");
+        // greater than or equal
+        addCase("select * from test.t2 where k1>=7", "partitions=8/9", 
"partitions=8/9");
+        addCase("select * from test.t2 where k2>=7", "partitions=9/9", 
"partitions=9/9");
+        // in
+        addCase("select * from test.t2 where k1 in (7,9,16)", 
"partitions=3/9", "partitions=3/9");
+        addCase("select * from test.t2 where k2 in (7,9,16)", 
"partitions=9/9", "partitions=9/9");
+        // is null
+        addCase("select * from test.t2 where k1 is null", "partitions=1/9", 
"partitions=1/9");
+        addCase("select * from test.t2 where k2 is null", "partitions=9/9", 
"partitions=9/9");
+        addCase("select * from test.multi_not_null where k1 is null", 
"partitions=0/2", "partitions=0/2");
+        addCase("select * from test.multi_not_null where k2 is null", 
"partitions=2/2", "partitions=2/2");
+        // not equal to
+        addCase("select * from test.t2 where k1!=23", "partitions=9/9", 
"partitions=9/9");
+        addCase("select * from test.t2 where k2!=23", "partitions=9/9", 
"partitions=9/9");
+
+        // 3. Conjunctive predicates
+        // equal to and other predicates
+        addCase("select * from test.t2 where k1=23 and k2=5", 
"partitions=1/9", "partitions=1/9");
+        addCase("select * from test.t2 where k1=23 and k2>5", 
"partitions=1/9", "partitions=1/9");
+        // in and other equal predicates
+        addCase("select * from test.t2 where k1 in (3, 10, 13) and k2>10", 
"partitions=2/9", "partitions=2/9");
+        // is null and other predicates
+        addCase("select * from test.t2 where k1 > 10 and k1 is null", 
"partitions=1/9", "partitions=0/9");
+        addCase("select * from test.t2 where k1 is null and k1 > 10", 
"partitions=1/9", "partitions=0/9");
+        addCase("select * from test.multi_not_null where k1 > 10 and k1 is 
null", "partitions=0/2", "partitions=0/2");
+        // others predicates combination
+        addCase("select * from test.t2 where k1 > 10 and k2 < 4", 
"partitions=6/9", "partitions=6/9");
+        addCase("select * from test.t2 where k1 >10 and k1 < 10 and (k1=11 or 
k1=12)", "partitions=0/9", "partitions=0/9");
+        addCase("select * from test.t2 where k1 > 20 and k1 < 7 and k1 = 10", 
"partitions=0/9", "partitions=0/9");
+
+        // 4. Disjunctive predicates
+        addCase("select * from test.t2 where k1=10 or k1=23", 
"partitions=9/9", "partitions=3/9");
+        addCase("select * from test.t2 where (k1=10 or k1=23) and (k2=4 or 
k2=5)", "partitions=9/9", "partitions=1/9");
+        addCase("select * from test.t2 where (k1=10 or k1=23) and (k2=4 or 
k2=11)", "partitions=9/9", "partitions=2/9");
+        addCase("select * from test.t2 where (k1=10 or k1=23) and (k2=3 or 
k2=4 or k2=11)", "partitions=9/9", "partitions=3/9");
+        addCase("select * from test.t1 where dt=20211123 or dt=20211124", 
"partitions=8/8", "partitions=2/8");
+        addCase("select * from test.t1 where ((dt=20211123 and k1=1) or 
(dt=20211125 and k1=3))", "partitions=8/8", "partitions=2/8");
+        // TODO: predicates are "PREDICATES: ((`dt` = 20211123 AND `k1` = 1) 
OR (`dt` = 20211125 AND `k1` = 3)), `k2` > ",
+        // maybe something goes wrong with ExtractCommonFactorsRule.
+        addCase("select * from test.t1 where ((dt=20211123 and k1=1) or 
(dt=20211125 and k1=3)) and k2>0", "partitions=8/8", "partitions=8/8");
+        addCase("select * from test.t2 where k1 > 10 or k2 < 1", 
"partitions=9/9", "partitions=9/9");
+    }
+
+    @Test
+    public void testPartitionPrune() throws Exception {
+        initTestCases();
+        doTest();
+    }
+}

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to