This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new ddb6eb5ad7d [feature](Nereids) add command for updating mv with
partitions (#28060)
ddb6eb5ad7d is described below
commit ddb6eb5ad7df74d3037d7514d5cf8b8f663b3e3b
Author: 谢健 <[email protected]>
AuthorDate: Wed Dec 6 17:45:09 2023 +0800
[feature](Nereids) add command for updating mv with partitions (#28060)
---
.../java/org/apache/doris/catalog/TableIf.java | 7 ++
.../plans/commands/UpdateMvByPartitionCommand.java | 138 +++++++++++++++++++++
2 files changed, 145 insertions(+)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java
index e21a4ee289f..eb472d8884f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java
@@ -22,6 +22,7 @@ import org.apache.doris.catalog.constraint.Constraint;
import org.apache.doris.catalog.constraint.ForeignKeyConstraint;
import org.apache.doris.catalog.constraint.PrimaryKeyConstraint;
import org.apache.doris.catalog.constraint.UniqueConstraint;
+import org.apache.doris.cluster.ClusterNamespace;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.nereids.exceptions.AnalysisException;
@@ -375,6 +376,12 @@ public interface TableIf {
return null;
}
+ default List<String> getFullQualifiers() {
+ return ImmutableList.of(getDatabase().getCatalog().getName(),
+
ClusterNamespace.getNameFromFullName(getDatabase().getFullName()),
+ getName());
+ }
+
default boolean isManagedTable() {
return getType() == TableType.OLAP || getType() ==
TableType.MATERIALIZED_VIEW;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateMvByPartitionCommand.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateMvByPartitionCommand.java
new file mode 100644
index 00000000000..b221e507b82
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateMvByPartitionCommand.java
@@ -0,0 +1,138 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.commands;
+
+import org.apache.doris.catalog.ListPartitionItem;
+import org.apache.doris.catalog.MTMV;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.PartitionItem;
+import org.apache.doris.catalog.PartitionKey;
+import org.apache.doris.catalog.Type;
+import org.apache.doris.nereids.analyzer.UnboundSlot;
+import org.apache.doris.nereids.analyzer.UnboundTableSink;
+import org.apache.doris.nereids.parser.NereidsParser;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.GreaterThanEqual;
+import org.apache.doris.nereids.trees.expressions.InPredicate;
+import org.apache.doris.nereids.trees.expressions.LessThan;
+import org.apache.doris.nereids.trees.expressions.Slot;
+import org.apache.doris.nereids.trees.expressions.literal.Literal;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
+import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
+import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
+import org.apache.doris.nereids.trees.plans.visitor.DefaultPlanRewriter;
+import org.apache.doris.nereids.util.ExpressionUtils;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Range;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * Update mv by partition
+ */
+public class UpdateMvByPartitionCommand extends InsertOverwriteTableCommand {
+ private UpdateMvByPartitionCommand(LogicalPlan logicalQuery) {
+ super(logicalQuery, Optional.empty());
+ }
+
+ /**
+ * Construct command
+ *
+ * @param mv materialize view
+ * @param partitions update partitions in mv and tables
+ * @param tableWithPartKey the partitions key for different table
+ * @return command
+ */
+ public static UpdateMvByPartitionCommand from(MTMV mv, Set<PartitionItem>
partitions,
+ Map<OlapTable, String> tableWithPartKey) {
+ NereidsParser parser = new NereidsParser();
+ Map<OlapTable, Set<Expression>> predicates =
+ constructTableWithPredicates(partitions, tableWithPartKey);
+ List<String> parts = constructPartsForMv(mv, partitions);
+ Plan plan = parser.parseSingle(mv.getQuerySql());
+ plan = plan.accept(new PredicateAdder(), predicates);
+ UnboundTableSink<? extends Plan> sink =
+ new UnboundTableSink<>(mv.getFullQualifiers(),
ImmutableList.of(), ImmutableList.of(),
+ parts, plan);
+ return new UpdateMvByPartitionCommand(sink);
+ }
+
+ private static List<String> constructPartsForMv(MTMV mv,
Set<PartitionItem> partitions) {
+ return mv.getPartitionNames().stream()
+ .filter(name -> {
+ PartitionItem mvPartItem =
mv.getPartitionInfo().getItem(mv.getPartition(name).getId());
+ return partitions.stream().anyMatch(p ->
p.getIntersect(mvPartItem) != null);
+ })
+ .collect(ImmutableList.toImmutableList());
+ }
+
+ private static Map<OlapTable, Set<Expression>>
constructTableWithPredicates(Set<PartitionItem> partitions,
+ Map<OlapTable, String> tableWithPartKey) {
+ ImmutableMap.Builder<OlapTable, Set<Expression>> builder = new
ImmutableMap.Builder<>();
+ tableWithPartKey.forEach((table, colName) ->
+ builder.put(table, constructPredicates(partitions, colName))
+ );
+ return builder.build();
+ }
+
+ private static Set<Expression> constructPredicates(Set<PartitionItem>
partitions, String colName) {
+ UnboundSlot slot = new UnboundSlot(colName);
+ return partitions.stream()
+ .map(item -> convertPartitionItemToPredicate(item, slot))
+ .collect(ImmutableSet.toImmutableSet());
+ }
+
+ private static Expression convertPartitionItemToPredicate(PartitionItem
item, Slot col) {
+ if (item instanceof ListPartitionItem) {
+ List<Expression> inValues = ((ListPartitionItem)
item).getItems().stream()
+ .map(key -> Literal.fromLegacyLiteral(key.getKeys().get(0),
+ Type.fromPrimitiveType(key.getTypes().get(0))))
+ .collect(ImmutableList.toImmutableList());
+ return new InPredicate(col, inValues);
+ } else {
+ Range<PartitionKey> range = item.getItems();
+ List<Expression> exprs = new ArrayList<>();
+ if (range.hasLowerBound()) {
+ PartitionKey key = range.lowerEndpoint();
+ exprs.add(new GreaterThanEqual(col,
Literal.fromLegacyLiteral(key.getKeys().get(0),
+ Type.fromPrimitiveType(key.getTypes().get(0)))));
+ }
+ if (range.hasUpperBound()) {
+ PartitionKey key = range.upperEndpoint();
+ exprs.add(new LessThan(col,
Literal.fromLegacyLiteral(key.getKeys().get(0),
+ Type.fromPrimitiveType(key.getTypes().get(0)))));
+ }
+ return ExpressionUtils.and(exprs);
+ }
+ }
+
+ static class PredicateAdder extends DefaultPlanRewriter<Map<OlapTable,
Set<Expression>>> {
+ @Override
+ public Plan visitLogicalOlapScan(LogicalOlapScan scan, Map<OlapTable,
Set<Expression>> predicates) {
+ return new LogicalFilter<>(predicates.get(scan.getTable()), scan);
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]