This is an automated email from the ASF dual-hosted git repository. kxiao pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push: new c8a5517ace [cherry-pick](branch-2.0) add partial update support for delete stmt in Nereids (#22329) c8a5517ace is described below commit c8a5517acee55abf3815830465ea6b33570f72b2 Author: bobhan1 <bh2444151...@outlook.com> AuthorDate: Fri Jul 28 17:36:09 2023 +0800 [cherry-pick](branch-2.0) add partial update support for delete stmt in Nereids (#22329) --- .../nereids/analyzer/UnboundOlapTableSink.java | 30 +++++++--- .../glue/translator/PhysicalPlanTranslator.java | 8 +++ .../rules/analysis/BindInsertTargetTable.java | 1 + ...ogicalOlapTableSinkToPhysicalOlapTableSink.java | 1 + .../trees/plans/commands/DeleteCommand.java | 8 ++- .../trees/plans/logical/LogicalOlapTableSink.java | 28 +++++++--- .../plans/physical/PhysicalOlapTableSink.java | 36 ++++++++---- .../delete/delete_mow_partial_update.out | 26 +++++++++ .../delete/delete_mow_partial_update.groovy | 64 ++++++++++++++++++++++ 9 files changed, 174 insertions(+), 28 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundOlapTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundOlapTableSink.java index 344cfb9b69..4336844951 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundOlapTableSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundOlapTableSink.java @@ -42,20 +42,30 @@ public class UnboundOlapTableSink<CHILD_TYPE extends Plan> extends LogicalUnary< private final List<String> colNames; private final List<String> hints; private final List<String> partitions; + private final boolean isPartialUpdate; public UnboundOlapTableSink(List<String> nameParts, List<String> colNames, List<String> hints, List<String> partitions, CHILD_TYPE child) { - this(nameParts, colNames, hints, partitions, Optional.empty(), Optional.empty(), child); + this(nameParts, colNames, hints, partitions, false, Optional.empty(), Optional.empty(), child); } public UnboundOlapTableSink(List<String> nameParts, List<String> colNames, List<String> hints, - List<String> partitions, Optional<GroupExpression> groupExpression, + List<String> partitions, boolean isPartialUpdate, CHILD_TYPE child) { + this(nameParts, colNames, hints, partitions, isPartialUpdate, Optional.empty(), Optional.empty(), child); + } + + /** + * constructor + */ + public UnboundOlapTableSink(List<String> nameParts, List<String> colNames, List<String> hints, + List<String> partitions, boolean isPartialUpdate, Optional<GroupExpression> groupExpression, Optional<LogicalProperties> logicalProperties, CHILD_TYPE child) { super(PlanType.LOGICAL_UNBOUND_OLAP_TABLE_SINK, groupExpression, logicalProperties, child); this.nameParts = Utils.copyRequiredList(nameParts); this.colNames = Utils.copyRequiredList(colNames); this.hints = Utils.copyRequiredList(hints); this.partitions = Utils.copyRequiredList(partitions); + this.isPartialUpdate = isPartialUpdate; } public List<String> getColNames() { @@ -70,11 +80,15 @@ public class UnboundOlapTableSink<CHILD_TYPE extends Plan> extends LogicalUnary< return partitions; } + public boolean isPartialUpdate() { + return isPartialUpdate; + } + @Override public Plan withChildren(List<Plan> children) { Preconditions.checkArgument(children.size() == 1, "UnboundOlapTableSink only accepts one child"); - return new UnboundOlapTableSink<>(nameParts, colNames, hints, partitions, groupExpression, - Optional.of(getLogicalProperties()), children.get(0)); + return new UnboundOlapTableSink<>(nameParts, colNames, hints, partitions, isPartialUpdate, + groupExpression, Optional.of(getLogicalProperties()), children.get(0)); } @Override @@ -109,15 +123,15 @@ public class UnboundOlapTableSink<CHILD_TYPE extends Plan> extends LogicalUnary< @Override public Plan withGroupExpression(Optional<GroupExpression> groupExpression) { - return new UnboundOlapTableSink<>(nameParts, colNames, hints, partitions, groupExpression, - Optional.of(getLogicalProperties()), child()); + return new UnboundOlapTableSink<>(nameParts, colNames, hints, partitions, + isPartialUpdate, groupExpression, Optional.of(getLogicalProperties()), child()); } @Override public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression> groupExpression, Optional<LogicalProperties> logicalProperties, List<Plan> children) { - return new UnboundOlapTableSink<>(nameParts, colNames, hints, partitions, groupExpression, - logicalProperties, children.get(0)); + return new UnboundOlapTableSink<>(nameParts, colNames, hints, partitions, + isPartialUpdate, groupExpression, logicalProperties, children.get(0)); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java index 2128ba7b78..8f9b336585 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java @@ -331,6 +331,14 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla olapTableSink.isSingleReplicaLoad() ); + if (olapTableSink.isPartialUpdate()) { + HashSet<String> partialUpdateCols = new HashSet<String>(); + for (Column col : olapTableSink.getCols()) { + partialUpdateCols.add(col.getName()); + } + sink.setPartialUpdateInputColumns(true, partialUpdateCols); + } + rootFragment.setSink(sink); rootFragment.setOutputPartition(DataPartition.UNPARTITIONED); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindInsertTargetTable.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindInsertTargetTable.java index fc1ae2e21d..750413b919 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindInsertTargetTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindInsertTargetTable.java @@ -75,6 +75,7 @@ public class BindInsertTargetTable extends OneAnalysisRuleFactory { table, bindTargetColumns(table, sink.getColNames()), bindPartitionIds(table, sink.getPartitions()), + sink.isPartialUpdate(), sink.child()); // we need to insert all the columns of the target table although some columns are not mentions. diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalOlapTableSinkToPhysicalOlapTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalOlapTableSinkToPhysicalOlapTableSink.java index 75927ed6f2..8927b8bd13 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalOlapTableSinkToPhysicalOlapTableSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalOlapTableSinkToPhysicalOlapTableSink.java @@ -39,6 +39,7 @@ public class LogicalOlapTableSinkToPhysicalOlapTableSink extends OneImplementati sink.getPartitionIds(), sink.getCols(), ctx.connectContext.getSessionVariable().isEnableSingleReplicaInsert(), + sink.isPartialUpdate(), Optional.empty(), sink.getLogicalProperties(), sink.child()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/DeleteCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/DeleteCommand.java index d7216d257b..ae0900ced5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/DeleteCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/DeleteCommand.java @@ -111,9 +111,15 @@ public class DeleteCommand extends Command implements ForwardWithSync, Explainab logicalQuery = new LogicalProject<>(selectLists, logicalQuery); + boolean isPartialUpdate = false; + if (((OlapTable) targetTable).getEnableUniqueKeyMergeOnWrite() + && cols.size() < targetTable.getColumns().size()) { + isPartialUpdate = true; + } + // make UnboundTableSink return new UnboundOlapTableSink<>(nameParts, cols, ImmutableList.of(), - partitions, logicalQuery); + partitions, isPartialUpdate, logicalQuery); } public LogicalPlan getLogicalQuery() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalOlapTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalOlapTableSink.java index e1469397e0..b1c514f254 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalOlapTableSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalOlapTableSink.java @@ -45,28 +45,33 @@ public class LogicalOlapTableSink<CHILD_TYPE extends Plan> extends LogicalUnary< private OlapTable targetTable; private List<Column> cols; private List<Long> partitionIds; + private boolean isPartialUpdate; public LogicalOlapTableSink(Database database, OlapTable targetTable, List<Column> cols, List<Long> partitionIds, - CHILD_TYPE child) { - this(database, targetTable, cols, partitionIds, Optional.empty(), Optional.empty(), child); + boolean isPartialUpdate, CHILD_TYPE child) { + this(database, targetTable, cols, partitionIds, isPartialUpdate, Optional.empty(), Optional.empty(), child); } + /** + * constructor + */ public LogicalOlapTableSink(Database database, OlapTable targetTable, List<Column> cols, - List<Long> partitionIds, Optional<GroupExpression> groupExpression, + List<Long> partitionIds, boolean isPartialUpdate, Optional<GroupExpression> groupExpression, Optional<LogicalProperties> logicalProperties, CHILD_TYPE child) { super(PlanType.LOGICAL_OLAP_TABLE_SINK, groupExpression, logicalProperties, child); this.database = Objects.requireNonNull(database, "database != null in LogicalOlapTableSink"); this.targetTable = Objects.requireNonNull(targetTable, "targetTable != null in LogicalOlapTableSink"); this.cols = Utils.copyRequiredList(cols); + this.isPartialUpdate = isPartialUpdate; this.partitionIds = Utils.copyRequiredList(partitionIds); } @Override public Plan withChildren(List<Plan> children) { Preconditions.checkArgument(children.size() == 1, "LogicalOlapTableSink only accepts one child"); - return new LogicalOlapTableSink<>(database, targetTable, cols, partitionIds, groupExpression, - Optional.of(getLogicalProperties()), children.get(0)); + return new LogicalOlapTableSink<>(database, targetTable, cols, partitionIds, isPartialUpdate, + groupExpression, Optional.of(getLogicalProperties()), children.get(0)); } public Database getDatabase() { @@ -85,6 +90,10 @@ public class LogicalOlapTableSink<CHILD_TYPE extends Plan> extends LogicalUnary< return partitionIds; } + public boolean isPartialUpdate() { + return isPartialUpdate; + } + @Override public boolean equals(Object o) { if (this == o) { @@ -94,7 +103,8 @@ public class LogicalOlapTableSink<CHILD_TYPE extends Plan> extends LogicalUnary< return false; } LogicalOlapTableSink<?> sink = (LogicalOlapTableSink<?>) o; - return Objects.equals(database, sink.database) + return isPartialUpdate == sink.isPartialUpdate() + && Objects.equals(database, sink.database) && Objects.equals(targetTable, sink.targetTable) && Objects.equals(partitionIds, sink.partitionIds) && Objects.equals(cols, sink.cols); @@ -102,7 +112,7 @@ public class LogicalOlapTableSink<CHILD_TYPE extends Plan> extends LogicalUnary< @Override public int hashCode() { - return Objects.hash(database, targetTable, partitionIds, cols); + return Objects.hash(database, targetTable, partitionIds, cols, isPartialUpdate); } @Override @@ -117,14 +127,14 @@ public class LogicalOlapTableSink<CHILD_TYPE extends Plan> extends LogicalUnary< @Override public Plan withGroupExpression(Optional<GroupExpression> groupExpression) { - return new LogicalOlapTableSink<>(database, targetTable, cols, partitionIds, groupExpression, + return new LogicalOlapTableSink<>(database, targetTable, cols, partitionIds, isPartialUpdate, groupExpression, Optional.of(getLogicalProperties()), child()); } @Override public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression> groupExpression, Optional<LogicalProperties> logicalProperties, List<Plan> children) { - return new LogicalOlapTableSink<>(database, targetTable, cols, partitionIds, groupExpression, + return new LogicalOlapTableSink<>(database, targetTable, cols, partitionIds, isPartialUpdate, groupExpression, logicalProperties, children.get(0)); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalOlapTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalOlapTableSink.java index 915c1eec70..c5dfb2d154 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalOlapTableSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalOlapTableSink.java @@ -53,32 +53,42 @@ public class PhysicalOlapTableSink<CHILD_TYPE extends Plan> extends PhysicalUnar private final List<Column> cols; private final List<Long> partitionIds; private final boolean singleReplicaLoad; + private final boolean isPartialUpdate; + + public PhysicalOlapTableSink(Database database, OlapTable targetTable, List<Long> partitionIds, + List<Column> cols, boolean singleReplicaLoad, boolean isPartialUpdate, LogicalProperties logicalProperties, + CHILD_TYPE child) { + this(database, targetTable, partitionIds, cols, singleReplicaLoad, isPartialUpdate, + Optional.empty(), logicalProperties, child); + } public PhysicalOlapTableSink(Database database, OlapTable targetTable, List<Long> partitionIds, List<Column> cols, boolean singleReplicaLoad, LogicalProperties logicalProperties, CHILD_TYPE child) { - this(database, targetTable, partitionIds, cols, singleReplicaLoad, Optional.empty(), logicalProperties, child); + this(database, targetTable, partitionIds, cols, singleReplicaLoad, false, + Optional.empty(), logicalProperties, child); } /** * Constructor */ public PhysicalOlapTableSink(Database database, OlapTable targetTable, List<Long> partitionIds, - List<Column> cols, boolean singleReplicaLoad, Optional<GroupExpression> groupExpression, - LogicalProperties logicalProperties, CHILD_TYPE child) { + List<Column> cols, boolean singleReplicaLoad, boolean isPartialUpdate, + Optional<GroupExpression> groupExpression, LogicalProperties logicalProperties, CHILD_TYPE child) { super(PlanType.PHYSICAL_OLAP_TABLE_SINK, groupExpression, logicalProperties, child); this.database = Objects.requireNonNull(database, "database != null in PhysicalOlapTableSink"); this.targetTable = Objects.requireNonNull(targetTable, "targetTable != null in PhysicalOlapTableSink"); this.cols = Utils.copyRequiredList(cols); this.partitionIds = Utils.copyRequiredList(partitionIds); this.singleReplicaLoad = singleReplicaLoad; + this.isPartialUpdate = isPartialUpdate; } /** * Constructor */ - public PhysicalOlapTableSink(Database database, OlapTable targetTable, List<Long> partitionIds, - List<Column> cols, boolean singleReplicaLoad, Optional<GroupExpression> groupExpression, + public PhysicalOlapTableSink(Database database, OlapTable targetTable, List<Long> partitionIds, List<Column> cols, + boolean singleReplicaLoad, boolean isPartialUpdate, Optional<GroupExpression> groupExpression, LogicalProperties logicalProperties, PhysicalProperties physicalProperties, Statistics statistics, CHILD_TYPE child) { super(PlanType.PHYSICAL_OLAP_TABLE_SINK, groupExpression, logicalProperties, physicalProperties, @@ -88,6 +98,7 @@ public class PhysicalOlapTableSink<CHILD_TYPE extends Plan> extends PhysicalUnar this.cols = Utils.copyRequiredList(cols); this.partitionIds = Utils.copyRequiredList(partitionIds); this.singleReplicaLoad = singleReplicaLoad; + this.isPartialUpdate = isPartialUpdate; } public Database getDatabase() { @@ -110,11 +121,15 @@ public class PhysicalOlapTableSink<CHILD_TYPE extends Plan> extends PhysicalUnar return singleReplicaLoad; } + public boolean isPartialUpdate() { + return isPartialUpdate; + } + @Override public Plan withChildren(List<Plan> children) { Preconditions.checkArgument(children.size() == 1, "PhysicalOlapTableSink only accepts one child"); return new PhysicalOlapTableSink<>(database, targetTable, partitionIds, cols, - singleReplicaLoad, groupExpression, getLogicalProperties(), physicalProperties, + singleReplicaLoad, isPartialUpdate, groupExpression, getLogicalProperties(), physicalProperties, statistics, children.get(0)); } @@ -128,6 +143,7 @@ public class PhysicalOlapTableSink<CHILD_TYPE extends Plan> extends PhysicalUnar } PhysicalOlapTableSink<?> that = (PhysicalOlapTableSink<?>) o; return singleReplicaLoad == that.singleReplicaLoad + && isPartialUpdate == that.isPartialUpdate && Objects.equals(database, that.database) && Objects.equals(targetTable, that.targetTable) && Objects.equals(cols, that.cols) @@ -136,7 +152,7 @@ public class PhysicalOlapTableSink<CHILD_TYPE extends Plan> extends PhysicalUnar @Override public int hashCode() { - return Objects.hash(database, targetTable, cols, partitionIds, singleReplicaLoad); + return Objects.hash(database, targetTable, cols, partitionIds, singleReplicaLoad, isPartialUpdate); } @Override @@ -170,20 +186,20 @@ public class PhysicalOlapTableSink<CHILD_TYPE extends Plan> extends PhysicalUnar @Override public Plan withGroupExpression(Optional<GroupExpression> groupExpression) { return new PhysicalOlapTableSink<>(database, targetTable, partitionIds, cols, singleReplicaLoad, - groupExpression, getLogicalProperties(), child()); + isPartialUpdate, groupExpression, getLogicalProperties(), child()); } @Override public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression> groupExpression, Optional<LogicalProperties> logicalProperties, List<Plan> children) { return new PhysicalOlapTableSink<>(database, targetTable, partitionIds, cols, singleReplicaLoad, - groupExpression, logicalProperties.get(), children.get(0)); + isPartialUpdate, groupExpression, logicalProperties.get(), children.get(0)); } @Override public PhysicalPlan withPhysicalPropertiesAndStats(PhysicalProperties physicalProperties, Statistics statistics) { return new PhysicalOlapTableSink<>(database, targetTable, partitionIds, cols, singleReplicaLoad, - groupExpression, getLogicalProperties(), physicalProperties, statistics, child()); + isPartialUpdate, groupExpression, getLogicalProperties(), physicalProperties, statistics, child()); } /** diff --git a/regression-test/data/nereids_p0/delete/delete_mow_partial_update.out b/regression-test/data/nereids_p0/delete/delete_mow_partial_update.out new file mode 100644 index 0000000000..0b7c6bf68e --- /dev/null +++ b/regression-test/data/nereids_p0/delete/delete_mow_partial_update.out @@ -0,0 +1,26 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +1 1 +2 2 +3 3 +4 4 +5 5 + +-- !sql -- +4 4 +5 5 + +-- !sql_skip_delete_predicate -- +4 4 +5 5 + +-- !sql -- +1 1 0 +1 1 1 +2 2 0 +2 2 1 +3 3 0 +3 3 1 +4 4 0 +5 5 0 + diff --git a/regression-test/suites/nereids_p0/delete/delete_mow_partial_update.groovy b/regression-test/suites/nereids_p0/delete/delete_mow_partial_update.groovy new file mode 100644 index 0000000000..b70bfc2986 --- /dev/null +++ b/regression-test/suites/nereids_p0/delete/delete_mow_partial_update.groovy @@ -0,0 +1,64 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite('nereids_delete_mow_partial_update') { + sql 'set enable_nereids_planner=true' + sql 'set enable_fallback_to_original_planner=false' + sql "set experimental_enable_nereids_planner=true;" + sql 'set enable_nereids_dml=true' + + def tableName1 = "nereids_delete_mow_partial_update1" + sql "DROP TABLE IF EXISTS ${tableName1};" + + sql """ CREATE TABLE IF NOT EXISTS ${tableName1} ( + `uid` BIGINT NULL, + `v1` BIGINT NULL + )UNIQUE KEY(uid) + DISTRIBUTED BY HASH(uid) BUCKETS 3 + PROPERTIES ( + "enable_unique_key_merge_on_write" = "true", + "disable_auto_compaction" = "true", + "replication_num" = "1" + );""" + def tableName2 = "nereids_delete_mow_partial_update2" + sql "DROP TABLE IF EXISTS ${tableName2};" + + sql """ CREATE TABLE IF NOT EXISTS ${tableName2} ( + `uid` BIGINT NULL + ) UNIQUE KEY(uid) + DISTRIBUTED BY HASH(uid) BUCKETS 3 + PROPERTIES ( + "enable_unique_key_merge_on_write" = "true", + "disable_auto_compaction" = "true", + "replication_num" = "1" + );""" + + sql "insert into ${tableName1} values(1,1),(2,2),(3,3),(4,4),(5,5);" + qt_sql "select * from ${tableName1} order by uid;" + sql "insert into ${tableName2} values(1),(2),(3);" + sql "delete from ${tableName1} A using ${tableName2} B where A.uid=B.uid;" + qt_sql "select * from ${tableName1} order by uid;" + // when using parital update insert stmt for delete stmt, it will use delete bitmap or delete sign rather than + // delete predicate to "delete" the rows + sql "set skip_delete_predicate=true;" + qt_sql_skip_delete_predicate "select * from ${tableName1} order by uid;" + + sql "set skip_delete_sign=true;" + sql "set skip_storage_engine_merge=true;" + sql "set skip_delete_bitmap=true;" + qt_sql "select uid,v1,__DORIS_DELETE_SIGN__ from ${tableName1} order by uid;" +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org