This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 01490139ef6 [improve](txn insert) txn insert support delete command 
(#33100)
01490139ef6 is described below

commit 01490139ef6490778129fe1892f8c3cfbc159074
Author: meiyi <myime...@gmail.com>
AuthorDate: Thu Apr 4 21:04:12 2024 +0800

    [improve](txn insert) txn insert support delete command (#33100)
---
 .../main/java/org/apache/doris/load/DeleteJob.java |  19 ++--
 .../java/org/apache/doris/load/TxnDeleteJob.java   |  67 ++++++++++++++
 .../plans/commands/DeleteFromUsingCommand.java     |   1 +
 .../trees/plans/commands/UpdateCommand.java        |   1 +
 .../java/org/apache/doris/qe/StmtExecutor.java     |   8 +-
 regression-test/data/insert_p0/txn_insert.out      |  19 ++++
 regression-test/suites/insert_p0/txn_insert.groovy | 100 +++++++++++++++++++++
 7 files changed, 207 insertions(+), 8 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/DeleteJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/DeleteJob.java
index 0e515ab9e52..e62e5dd9f71 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/DeleteJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/DeleteJob.java
@@ -97,8 +97,8 @@ public class DeleteJob extends AbstractTxnStateChangeCallback 
implements DeleteJ
 
     // jobId(listenerId). use in beginTransaction to callback function
     private final long id;
-    private long transactionId;
-    private final String label;
+    protected long transactionId;
+    protected String label;
     private final Set<Long> totalTablets;
     private final Set<Long> quorumTablets;
     private final Set<Long> finishedTablets;
@@ -110,7 +110,7 @@ public class DeleteJob extends 
AbstractTxnStateChangeCallback implements DeleteJ
 
     private Database targetDb;
 
-    private OlapTable targetTbl;
+    protected OlapTable targetTbl;
 
     private List<Partition> partitions;
 
@@ -413,8 +413,7 @@ public class DeleteJob extends 
AbstractTxnStateChangeCallback implements DeleteJ
         }
     }
 
-    @Override
-    public String commit() throws Exception {
+    protected List<TabletCommitInfo> generateTabletCommitInfos() {
         TabletInvertedIndex currentInvertedIndex = 
Env.getCurrentInvertedIndex();
         List<TabletCommitInfo> tabletCommitInfos = Lists.newArrayList();
         tabletDeleteInfoMap.forEach((tabletId, deleteInfo) -> 
deleteInfo.getFinishedReplicas()
@@ -425,6 +424,12 @@ public class DeleteJob extends 
AbstractTxnStateChangeCallback implements DeleteJ
                     }
                     tabletCommitInfos.add(new TabletCommitInfo(tabletId, 
replica.getBackendId()));
                 }));
+        return tabletCommitInfos;
+    }
+
+    @Override
+    public String commit() throws Exception {
+        List<TabletCommitInfo> tabletCommitInfos = generateTabletCommitInfos();
         boolean visible = Env.getCurrentGlobalTransactionMgr()
                 .commitAndPublishTransaction(targetDb, 
Lists.newArrayList(targetTbl),
                         transactionId, tabletCommitInfos, getTimeoutMs());
@@ -534,7 +539,9 @@ public class DeleteJob extends 
AbstractTxnStateChangeCallback implements DeleteJ
             DeleteInfo deleteInfo = new DeleteInfo(params.getDb().getId(), 
params.getTable().getId(),
                     params.getTable().getName(), 
getDeleteCondString(params.getDeleteConditions()),
                     noPartitionSpecified, partitionIds, partitionNames);
-            DeleteJob deleteJob = new DeleteJob(jobId, -1, label, 
partitionReplicaNum, deleteInfo);
+            DeleteJob deleteJob = ConnectContext.get() != null && 
ConnectContext.get().isTxnModel()
+                    ? new TxnDeleteJob(jobId, -1, label, partitionReplicaNum, 
deleteInfo)
+                    : new DeleteJob(jobId, -1, label, partitionReplicaNum, 
deleteInfo);
             long replicaNum = 
partitions.stream().mapToLong(Partition::getAllReplicaCount).sum();
             deleteJob.setPartitions(partitions);
             deleteJob.setDeleteConditions(params.getDeleteConditions());
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/TxnDeleteJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/TxnDeleteJob.java
new file mode 100644
index 00000000000..79a4bdffa01
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/TxnDeleteJob.java
@@ -0,0 +1,67 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load;
+
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.thrift.TTabletCommitInfo;
+import org.apache.doris.transaction.TabletCommitInfo;
+import org.apache.doris.transaction.TransactionEntry;
+import org.apache.doris.transaction.TransactionStatus;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class TxnDeleteJob extends DeleteJob {
+    private static final Logger LOG = LogManager.getLogger(TxnDeleteJob.class);
+
+    public TxnDeleteJob(long id, long transactionId, String label, Map<Long, 
Short> partitionReplicaNum,
+            DeleteInfo deleteInfo) {
+        super(id, transactionId, label, partitionReplicaNum, deleteInfo);
+    }
+
+    @Override
+    public long beginTxn() throws Exception {
+        TransactionEntry txnEntry = ConnectContext.get().getTxnEntry();
+        txnEntry.beginTransaction(targetTbl.getDatabase(), targetTbl);
+        this.transactionId = txnEntry.getTransactionId();
+        this.label = txnEntry.getLabel();
+        return this.transactionId;
+    }
+
+    @Override
+    public String commit() throws Exception {
+        List<TabletCommitInfo> tabletCommitInfos = generateTabletCommitInfos();
+        TransactionEntry txnEntry = ConnectContext.get().getTxnEntry();
+        txnEntry.addCommitInfos(targetTbl,
+                tabletCommitInfos.stream().map(c -> new 
TTabletCommitInfo(c.getTabletId(), c.getBackendId()))
+                        .collect(Collectors.toList()));
+
+        StringBuilder sb = new StringBuilder();
+        sb.append("{'label':'").append(label).append("', 
'txnId':'").append(transactionId);
+        sb.append("', 
'status':'").append(TransactionStatus.PREPARE.name()).append("'").append("}");
+        return sb.toString();
+    }
+
+    @Override
+    public void cancel(String reason) {
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/DeleteFromUsingCommand.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/DeleteFromUsingCommand.java
index ff70c75558d..cbd0c8ff6e5 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/DeleteFromUsingCommand.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/DeleteFromUsingCommand.java
@@ -77,6 +77,7 @@ public class DeleteFromUsingCommand extends Command 
implements ForwardWithSync,
                     + " Please check the following session variables: "
                     + String.join(", ", SessionVariable.DEBUG_VARIABLES));
         }
+        // NOTE: delete from using command is executed as insert command, so 
txn insert can support it
         new InsertIntoTableCommand(completeQueryPlan(ctx, logicalQuery), 
Optional.empty(), Optional.empty()).run(ctx,
                 executor);
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateCommand.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateCommand.java
index 76143c0e80f..4cf6b832e12 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateCommand.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateCommand.java
@@ -94,6 +94,7 @@ public class UpdateCommand extends Command implements 
ForwardWithSync, Explainab
 
     @Override
     public void run(ConnectContext ctx, StmtExecutor executor) throws 
Exception {
+        // NOTE: update command is executed as insert command, so txn insert 
can support it
         new InsertIntoTableCommand(completeQueryPlan(ctx, logicalQuery), 
Optional.empty(), Optional.empty()).run(ctx,
                 executor);
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index 4b0dfd01469..930ebf943cc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -138,6 +138,8 @@ import org.apache.doris.nereids.parser.NereidsParser;
 import 
org.apache.doris.nereids.rules.exploration.mv.InitMaterializationContextHook;
 import org.apache.doris.nereids.trees.plans.commands.Command;
 import org.apache.doris.nereids.trees.plans.commands.CreateTableCommand;
+import org.apache.doris.nereids.trees.plans.commands.DeleteFromCommand;
+import org.apache.doris.nereids.trees.plans.commands.DeleteFromUsingCommand;
 import org.apache.doris.nereids.trees.plans.commands.Forward;
 import org.apache.doris.nereids.trees.plans.commands.NotAllowFallback;
 import org.apache.doris.nereids.trees.plans.commands.UpdateCommand;
@@ -653,8 +655,10 @@ public class StmtExecutor {
         // when we in transaction mode, we only support insert into command 
and transaction command
         if (context.isTxnModel()) {
             if (!(logicalPlan instanceof BatchInsertIntoTableCommand || 
logicalPlan instanceof InsertIntoTableCommand
-                    || logicalPlan instanceof UpdateCommand)) {
-                String errMsg = "This is in a transaction, only insert, 
update, commit, rollback is acceptable.";
+                    || logicalPlan instanceof UpdateCommand || logicalPlan 
instanceof DeleteFromUsingCommand
+                    || logicalPlan instanceof DeleteFromCommand)) {
+                String errMsg = "This is in a transaction, only insert, 
update, delete, "
+                        + "commit, rollback is acceptable.";
                 throw new NereidsException(errMsg, new 
AnalysisException(errMsg));
             }
         }
diff --git a/regression-test/data/insert_p0/txn_insert.out 
b/regression-test/data/insert_p0/txn_insert.out
index d317b2d8452..299b136f344 100644
--- a/regression-test/data/insert_p0/txn_insert.out
+++ b/regression-test/data/insert_p0/txn_insert.out
@@ -304,3 +304,22 @@
 -- !select26 --
 1      a       100
 
+-- !select27 --
+1      2000-01-01      1       1       1.0
+3      2000-01-03      3       3       3.0
+
+-- !select28 --
+2      2000-01-20      20      20      20.0
+3      2000-01-30      30      30      30.0
+4      2000-01-04      4       4       4.0
+6      2000-01-10      10      10      10.0
+
+-- !select29 --
+3      2000-01-03      3       3       3.0
+
+-- !select30 --
+1      2000-01-01      1       1       1.0
+2      2000-01-02      2       2       2.0
+3      2000-01-03      3       3       3.0
+6      2000-01-10      10      10      10.0
+
diff --git a/regression-test/suites/insert_p0/txn_insert.groovy 
b/regression-test/suites/insert_p0/txn_insert.groovy
index a8f173e62bc..7d4f1e9f329 100644
--- a/regression-test/suites/insert_p0/txn_insert.groovy
+++ b/regression-test/suites/insert_p0/txn_insert.groovy
@@ -239,8 +239,108 @@ suite("txn_insert") {
             sql """ insert into ${ut_table}_2 select * from ${ut_table}_1; """
             sql """ update ${ut_table}_1 set score = 101 where id = 1; """
             sql """ commit; """
+            sql "sync"
             order_qt_select25 """select * from ${ut_table}_1 """
             order_qt_select26 """select * from ${ut_table}_2 """
         }
+
+        // 8. delete from using and delete from stmt
+        if (use_nereids_planner) {
+            for (def ta in ["txn_insert_dt1", "txn_insert_dt2", 
"txn_insert_dt3", "txn_insert_dt4", "txn_insert_dt5"]) {
+                sql """ drop table if exists ${ta} """
+            }
+
+            for (def ta in ["txn_insert_dt1", "txn_insert_dt4", 
"txn_insert_dt5"]) {
+                sql """
+                    create table ${ta} (
+                        id int,
+                        dt date,
+                        c1 bigint,
+                        c2 string,
+                        c3 double
+                    ) unique key (id, dt)
+                    partition by range(dt) (
+                        from ("2000-01-01") TO ("2000-01-31") INTERVAL 1 DAY
+                    )
+                    distributed by hash(id)
+                    properties(
+                        'replication_num'='1',
+                        "enable_unique_key_merge_on_write" = "true"
+                    );
+                """
+                sql """
+                    INSERT INTO ${ta} VALUES
+                        (1, '2000-01-01', 1, '1', 1.0),
+                        (2, '2000-01-02', 2, '2', 2.0),
+                        (3, '2000-01-03', 3, '3', 3.0);
+                """
+            }
+
+            sql """
+                create table txn_insert_dt2 (
+                    id int,
+                    dt date,
+                    c1 bigint,
+                    c2 string,
+                    c3 double
+                ) unique key (id)
+                distributed by hash(id)
+                properties(
+                    'replication_num'='1'
+                );
+            """
+            sql """
+                create table txn_insert_dt3 (
+                    id int
+                ) distributed by hash(id)
+                properties(
+                    'replication_num'='1'
+                );
+            """
+            sql """
+                INSERT INTO txn_insert_dt2 VALUES
+                    (1, '2000-01-10', 10, '10', 10.0),
+                    (2, '2000-01-20', 20, '20', 20.0),
+                    (3, '2000-01-30', 30, '30', 30.0),
+                    (4, '2000-01-04', 4, '4', 4.0),
+                    (5, '2000-01-05', 5, '5', 5.0);
+            """
+            sql """
+                INSERT INTO txn_insert_dt3 VALUES(1),(2),(4),(5);
+            """
+            sql """ begin """
+            test {
+                sql '''
+                    delete from txn_insert_dt1 temporary partition (p_20000102)
+                    using txn_insert_dt2 join txn_insert_dt3 on 
txn_insert_dt2.id = txn_insert_dt3.id
+                    where txn_insert_dt1.id = txn_insert_dt2.id;
+                '''
+                exception 'Partition: p_20000102 is not exists'
+            }
+            sql """
+                delete from txn_insert_dt1 partition (p_20000102)
+                using txn_insert_dt2 join txn_insert_dt3 on txn_insert_dt2.id 
= txn_insert_dt3.id
+                where txn_insert_dt1.id = txn_insert_dt2.id;
+            """
+            sql """
+                delete from txn_insert_dt4
+                using txn_insert_dt2 join txn_insert_dt3 on txn_insert_dt2.id 
= txn_insert_dt3.id
+                where txn_insert_dt4.id = txn_insert_dt2.id;
+            """
+            sql """
+                delete from txn_insert_dt2 where id = 1 or id = 5;
+            """
+            sql """
+                delete from txn_insert_dt5 partition(p_20000102) where id = 1 
or id = 5;
+            """
+            sql """ commit """
+            sql """ insert into txn_insert_dt2 VALUES (6, '2000-01-10', 10, 
'10', 10.0) """
+            sql """ insert into txn_insert_dt5 VALUES (6, '2000-01-10', 10, 
'10', 10.0) """
+            sql "sync"
+            order_qt_select27 """select * from txn_insert_dt1 """
+            order_qt_select28 """select * from txn_insert_dt2 """
+            order_qt_select29 """select * from txn_insert_dt4 """
+            order_qt_select30 """select * from txn_insert_dt5 """
+        }
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to