This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 01490139ef6 [improve](txn insert) txn insert support delete command (#33100) 01490139ef6 is described below commit 01490139ef6490778129fe1892f8c3cfbc159074 Author: meiyi <myime...@gmail.com> AuthorDate: Thu Apr 4 21:04:12 2024 +0800 [improve](txn insert) txn insert support delete command (#33100) --- .../main/java/org/apache/doris/load/DeleteJob.java | 19 ++-- .../java/org/apache/doris/load/TxnDeleteJob.java | 67 ++++++++++++++ .../plans/commands/DeleteFromUsingCommand.java | 1 + .../trees/plans/commands/UpdateCommand.java | 1 + .../java/org/apache/doris/qe/StmtExecutor.java | 8 +- regression-test/data/insert_p0/txn_insert.out | 19 ++++ regression-test/suites/insert_p0/txn_insert.groovy | 100 +++++++++++++++++++++ 7 files changed, 207 insertions(+), 8 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/DeleteJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/DeleteJob.java index 0e515ab9e52..e62e5dd9f71 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/DeleteJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/DeleteJob.java @@ -97,8 +97,8 @@ public class DeleteJob extends AbstractTxnStateChangeCallback implements DeleteJ // jobId(listenerId). use in beginTransaction to callback function private final long id; - private long transactionId; - private final String label; + protected long transactionId; + protected String label; private final Set<Long> totalTablets; private final Set<Long> quorumTablets; private final Set<Long> finishedTablets; @@ -110,7 +110,7 @@ public class DeleteJob extends AbstractTxnStateChangeCallback implements DeleteJ private Database targetDb; - private OlapTable targetTbl; + protected OlapTable targetTbl; private List<Partition> partitions; @@ -413,8 +413,7 @@ public class DeleteJob extends AbstractTxnStateChangeCallback implements DeleteJ } } - @Override - public String commit() throws Exception { + protected List<TabletCommitInfo> generateTabletCommitInfos() { TabletInvertedIndex currentInvertedIndex = Env.getCurrentInvertedIndex(); List<TabletCommitInfo> tabletCommitInfos = Lists.newArrayList(); tabletDeleteInfoMap.forEach((tabletId, deleteInfo) -> deleteInfo.getFinishedReplicas() @@ -425,6 +424,12 @@ public class DeleteJob extends AbstractTxnStateChangeCallback implements DeleteJ } tabletCommitInfos.add(new TabletCommitInfo(tabletId, replica.getBackendId())); })); + return tabletCommitInfos; + } + + @Override + public String commit() throws Exception { + List<TabletCommitInfo> tabletCommitInfos = generateTabletCommitInfos(); boolean visible = Env.getCurrentGlobalTransactionMgr() .commitAndPublishTransaction(targetDb, Lists.newArrayList(targetTbl), transactionId, tabletCommitInfos, getTimeoutMs()); @@ -534,7 +539,9 @@ public class DeleteJob extends AbstractTxnStateChangeCallback implements DeleteJ DeleteInfo deleteInfo = new DeleteInfo(params.getDb().getId(), params.getTable().getId(), params.getTable().getName(), getDeleteCondString(params.getDeleteConditions()), noPartitionSpecified, partitionIds, partitionNames); - DeleteJob deleteJob = new DeleteJob(jobId, -1, label, partitionReplicaNum, deleteInfo); + DeleteJob deleteJob = ConnectContext.get() != null && ConnectContext.get().isTxnModel() + ? new TxnDeleteJob(jobId, -1, label, partitionReplicaNum, deleteInfo) + : new DeleteJob(jobId, -1, label, partitionReplicaNum, deleteInfo); long replicaNum = partitions.stream().mapToLong(Partition::getAllReplicaCount).sum(); deleteJob.setPartitions(partitions); deleteJob.setDeleteConditions(params.getDeleteConditions()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/TxnDeleteJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/TxnDeleteJob.java new file mode 100644 index 00000000000..79a4bdffa01 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/load/TxnDeleteJob.java @@ -0,0 +1,67 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.load; + +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.thrift.TTabletCommitInfo; +import org.apache.doris.transaction.TabletCommitInfo; +import org.apache.doris.transaction.TransactionEntry; +import org.apache.doris.transaction.TransactionStatus; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public class TxnDeleteJob extends DeleteJob { + private static final Logger LOG = LogManager.getLogger(TxnDeleteJob.class); + + public TxnDeleteJob(long id, long transactionId, String label, Map<Long, Short> partitionReplicaNum, + DeleteInfo deleteInfo) { + super(id, transactionId, label, partitionReplicaNum, deleteInfo); + } + + @Override + public long beginTxn() throws Exception { + TransactionEntry txnEntry = ConnectContext.get().getTxnEntry(); + txnEntry.beginTransaction(targetTbl.getDatabase(), targetTbl); + this.transactionId = txnEntry.getTransactionId(); + this.label = txnEntry.getLabel(); + return this.transactionId; + } + + @Override + public String commit() throws Exception { + List<TabletCommitInfo> tabletCommitInfos = generateTabletCommitInfos(); + TransactionEntry txnEntry = ConnectContext.get().getTxnEntry(); + txnEntry.addCommitInfos(targetTbl, + tabletCommitInfos.stream().map(c -> new TTabletCommitInfo(c.getTabletId(), c.getBackendId())) + .collect(Collectors.toList())); + + StringBuilder sb = new StringBuilder(); + sb.append("{'label':'").append(label).append("', 'txnId':'").append(transactionId); + sb.append("', 'status':'").append(TransactionStatus.PREPARE.name()).append("'").append("}"); + return sb.toString(); + } + + @Override + public void cancel(String reason) { + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/DeleteFromUsingCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/DeleteFromUsingCommand.java index ff70c75558d..cbd0c8ff6e5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/DeleteFromUsingCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/DeleteFromUsingCommand.java @@ -77,6 +77,7 @@ public class DeleteFromUsingCommand extends Command implements ForwardWithSync, + " Please check the following session variables: " + String.join(", ", SessionVariable.DEBUG_VARIABLES)); } + // NOTE: delete from using command is executed as insert command, so txn insert can support it new InsertIntoTableCommand(completeQueryPlan(ctx, logicalQuery), Optional.empty(), Optional.empty()).run(ctx, executor); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateCommand.java index 76143c0e80f..4cf6b832e12 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateCommand.java @@ -94,6 +94,7 @@ public class UpdateCommand extends Command implements ForwardWithSync, Explainab @Override public void run(ConnectContext ctx, StmtExecutor executor) throws Exception { + // NOTE: update command is executed as insert command, so txn insert can support it new InsertIntoTableCommand(completeQueryPlan(ctx, logicalQuery), Optional.empty(), Optional.empty()).run(ctx, executor); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index 4b0dfd01469..930ebf943cc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -138,6 +138,8 @@ import org.apache.doris.nereids.parser.NereidsParser; import org.apache.doris.nereids.rules.exploration.mv.InitMaterializationContextHook; import org.apache.doris.nereids.trees.plans.commands.Command; import org.apache.doris.nereids.trees.plans.commands.CreateTableCommand; +import org.apache.doris.nereids.trees.plans.commands.DeleteFromCommand; +import org.apache.doris.nereids.trees.plans.commands.DeleteFromUsingCommand; import org.apache.doris.nereids.trees.plans.commands.Forward; import org.apache.doris.nereids.trees.plans.commands.NotAllowFallback; import org.apache.doris.nereids.trees.plans.commands.UpdateCommand; @@ -653,8 +655,10 @@ public class StmtExecutor { // when we in transaction mode, we only support insert into command and transaction command if (context.isTxnModel()) { if (!(logicalPlan instanceof BatchInsertIntoTableCommand || logicalPlan instanceof InsertIntoTableCommand - || logicalPlan instanceof UpdateCommand)) { - String errMsg = "This is in a transaction, only insert, update, commit, rollback is acceptable."; + || logicalPlan instanceof UpdateCommand || logicalPlan instanceof DeleteFromUsingCommand + || logicalPlan instanceof DeleteFromCommand)) { + String errMsg = "This is in a transaction, only insert, update, delete, " + + "commit, rollback is acceptable."; throw new NereidsException(errMsg, new AnalysisException(errMsg)); } } diff --git a/regression-test/data/insert_p0/txn_insert.out b/regression-test/data/insert_p0/txn_insert.out index d317b2d8452..299b136f344 100644 --- a/regression-test/data/insert_p0/txn_insert.out +++ b/regression-test/data/insert_p0/txn_insert.out @@ -304,3 +304,22 @@ -- !select26 -- 1 a 100 +-- !select27 -- +1 2000-01-01 1 1 1.0 +3 2000-01-03 3 3 3.0 + +-- !select28 -- +2 2000-01-20 20 20 20.0 +3 2000-01-30 30 30 30.0 +4 2000-01-04 4 4 4.0 +6 2000-01-10 10 10 10.0 + +-- !select29 -- +3 2000-01-03 3 3 3.0 + +-- !select30 -- +1 2000-01-01 1 1 1.0 +2 2000-01-02 2 2 2.0 +3 2000-01-03 3 3 3.0 +6 2000-01-10 10 10 10.0 + diff --git a/regression-test/suites/insert_p0/txn_insert.groovy b/regression-test/suites/insert_p0/txn_insert.groovy index a8f173e62bc..7d4f1e9f329 100644 --- a/regression-test/suites/insert_p0/txn_insert.groovy +++ b/regression-test/suites/insert_p0/txn_insert.groovy @@ -239,8 +239,108 @@ suite("txn_insert") { sql """ insert into ${ut_table}_2 select * from ${ut_table}_1; """ sql """ update ${ut_table}_1 set score = 101 where id = 1; """ sql """ commit; """ + sql "sync" order_qt_select25 """select * from ${ut_table}_1 """ order_qt_select26 """select * from ${ut_table}_2 """ } + + // 8. delete from using and delete from stmt + if (use_nereids_planner) { + for (def ta in ["txn_insert_dt1", "txn_insert_dt2", "txn_insert_dt3", "txn_insert_dt4", "txn_insert_dt5"]) { + sql """ drop table if exists ${ta} """ + } + + for (def ta in ["txn_insert_dt1", "txn_insert_dt4", "txn_insert_dt5"]) { + sql """ + create table ${ta} ( + id int, + dt date, + c1 bigint, + c2 string, + c3 double + ) unique key (id, dt) + partition by range(dt) ( + from ("2000-01-01") TO ("2000-01-31") INTERVAL 1 DAY + ) + distributed by hash(id) + properties( + 'replication_num'='1', + "enable_unique_key_merge_on_write" = "true" + ); + """ + sql """ + INSERT INTO ${ta} VALUES + (1, '2000-01-01', 1, '1', 1.0), + (2, '2000-01-02', 2, '2', 2.0), + (3, '2000-01-03', 3, '3', 3.0); + """ + } + + sql """ + create table txn_insert_dt2 ( + id int, + dt date, + c1 bigint, + c2 string, + c3 double + ) unique key (id) + distributed by hash(id) + properties( + 'replication_num'='1' + ); + """ + sql """ + create table txn_insert_dt3 ( + id int + ) distributed by hash(id) + properties( + 'replication_num'='1' + ); + """ + sql """ + INSERT INTO txn_insert_dt2 VALUES + (1, '2000-01-10', 10, '10', 10.0), + (2, '2000-01-20', 20, '20', 20.0), + (3, '2000-01-30', 30, '30', 30.0), + (4, '2000-01-04', 4, '4', 4.0), + (5, '2000-01-05', 5, '5', 5.0); + """ + sql """ + INSERT INTO txn_insert_dt3 VALUES(1),(2),(4),(5); + """ + sql """ begin """ + test { + sql ''' + delete from txn_insert_dt1 temporary partition (p_20000102) + using txn_insert_dt2 join txn_insert_dt3 on txn_insert_dt2.id = txn_insert_dt3.id + where txn_insert_dt1.id = txn_insert_dt2.id; + ''' + exception 'Partition: p_20000102 is not exists' + } + sql """ + delete from txn_insert_dt1 partition (p_20000102) + using txn_insert_dt2 join txn_insert_dt3 on txn_insert_dt2.id = txn_insert_dt3.id + where txn_insert_dt1.id = txn_insert_dt2.id; + """ + sql """ + delete from txn_insert_dt4 + using txn_insert_dt2 join txn_insert_dt3 on txn_insert_dt2.id = txn_insert_dt3.id + where txn_insert_dt4.id = txn_insert_dt2.id; + """ + sql """ + delete from txn_insert_dt2 where id = 1 or id = 5; + """ + sql """ + delete from txn_insert_dt5 partition(p_20000102) where id = 1 or id = 5; + """ + sql """ commit """ + sql """ insert into txn_insert_dt2 VALUES (6, '2000-01-10', 10, '10', 10.0) """ + sql """ insert into txn_insert_dt5 VALUES (6, '2000-01-10', 10, '10', 10.0) """ + sql "sync" + order_qt_select27 """select * from txn_insert_dt1 """ + order_qt_select28 """select * from txn_insert_dt2 """ + order_qt_select29 """select * from txn_insert_dt4 """ + order_qt_select30 """select * from txn_insert_dt5 """ + } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org