This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new f8acd81a68f [fix](transaction) commit txn check txn status (#40064) 
(#41229)
f8acd81a68f is described below

commit f8acd81a68f4513362df2cfa813b10fa3380391c
Author: meiyi <[email protected]>
AuthorDate: Wed Sep 25 10:57:51 2024 +0800

    [fix](transaction) commit txn check txn status (#40064) (#41229)
    
    pick https://github.com/apache/doris/pull/40064
---
 .../apache/doris/service/FrontendServiceImpl.java  | 16 ++++-----------
 .../doris/transaction/DatabaseTransactionMgr.java  | 18 +++++++----------
 .../doris/transaction/GlobalTransactionMgr.java    |  5 ++++-
 .../transaction/DatabaseTransactionMgrTest.java    |  2 +-
 .../transaction/GlobalTransactionMgrTest.java      | 23 ++++++++++++++--------
 5 files changed, 31 insertions(+), 33 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java 
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index ff25e8c1934..884065baf30 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -1730,13 +1730,8 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
             throw new UserException("transaction [" + request.getTxnId() + "] 
not found");
         }
         List<Long> tableIdList = transactionState.getTableIdList();
-        List<Table> tableList = new ArrayList<>();
-        List<String> tables = new ArrayList<>();
         // if table was dropped, transaction must be aborted
-        tableList = db.getTablesOnIdOrderOrThrowException(tableIdList);
-        for (Table table : tableList) {
-            tables.add(table.getName());
-        }
+        List<Table>  tableList = 
db.getTablesOnIdOrderOrThrowException(tableIdList);
 
         // Step 3: check auth
         if (request.isSetAuthCode()) {
@@ -1744,6 +1739,7 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
         } else if (request.isSetToken()) {
             checkToken(request.getToken());
         } else {
+            List<String> tables = 
tableList.stream().map(Table::getName).collect(Collectors.toList());
             checkPasswordAndPrivs(cluster, request.getUser(), 
request.getPasswd(), request.getDb(), tables,
                     request.getUserIp(), PrivPredicate.LOAD);
         }
@@ -1912,12 +1908,7 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
             throw new UserException("transaction [" + request.getTxnId() + "] 
not found");
         }
         List<Long> tableIdList = transactionState.getTableIdList();
-        List<Table> tableList = new ArrayList<>();
-        List<String> tables = new ArrayList<>();
-        tableList = db.getTablesOnIdOrderOrThrowException(tableIdList);
-        for (Table table : tableList) {
-            tables.add(table.getName());
-        }
+        List<Table> tableList = 
db.getTablesOnIdOrderOrThrowException(tableIdList);
 
         // Step 3: check auth
         if (request.isSetAuthCode()) {
@@ -1925,6 +1916,7 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
         } else if (request.isSetToken()) {
             checkToken(request.getToken());
         } else {
+            List<String> tables = 
tableList.stream().map(Table::getName).collect(Collectors.toList());
             checkPasswordAndPrivs(cluster, request.getUser(), 
request.getPasswd(), request.getDb(), tables,
                     request.getUserIp(), PrivPredicate.LOAD);
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
index 3bc676a1b42..2852c1de893 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
@@ -84,7 +84,6 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.stream.Collectors;
 
@@ -119,14 +118,6 @@ public class DatabaseTransactionMgr {
     // transactionId -> running TransactionState
     private final Map<Long, TransactionState> idToRunningTransactionState = 
Maps.newHashMap();
 
-    /**
-     * the multi table ids that are in transaction, used to check whether a 
table is in transaction
-     * multi table transaction state
-     * txnId -> tableId list
-     */
-    private final ConcurrentHashMap<Long, List<Long>> 
multiTableRunningTransactionTableIdMaps =
-            new ConcurrentHashMap<>();
-
     // transactionId -> final status TransactionState
     private final Map<Long, TransactionState> idToFinalStatusTransactionState 
= Maps.newHashMap();
 
@@ -436,8 +427,13 @@ public class DatabaseTransactionMgr {
         checkCommitStatus(tableList, transactionState, tabletCommitInfos, 
txnCommitAttachment, errorReplicaIds,
                           tableToPartition, totalInvolvedBackends);
 
-        unprotectedPreCommitTransaction2PC(transactionState, errorReplicaIds, 
tableToPartition,
-                totalInvolvedBackends, db);
+        writeLock();
+        try {
+            unprotectedPreCommitTransaction2PC(transactionState, 
errorReplicaIds, tableToPartition,
+                    totalInvolvedBackends, db);
+        } finally {
+            writeUnlock();
+        }
         LOG.info("transaction:[{}] successfully pre-committed", 
transactionState);
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
index 35c2195eb33..614843dcbbd 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
@@ -207,7 +207,7 @@ public class GlobalTransactionMgr implements Writable {
         }
     }
 
-    public void preCommitTransaction2PC(long dbId, List<Table> tableList, long 
transactionId,
+    private void preCommitTransaction2PC(long dbId, List<Table> tableList, 
long transactionId,
             List<TabletCommitInfo> tabletCommitInfos, TxnCommitAttachment 
txnCommitAttachment)
             throws UserException {
         if (Config.disable_load_job) {
@@ -219,6 +219,7 @@ public class GlobalTransactionMgr implements Writable {
         dbTransactionMgr.preCommitTransaction2PC(tableList, transactionId, 
tabletCommitInfos, txnCommitAttachment);
     }
 
+    @Deprecated
     public void commitTransaction(long dbId, List<Table> tableList,
             long transactionId, List<TabletCommitInfo> tabletCommitInfos)
             throws UserException {
@@ -675,6 +676,7 @@ public class GlobalTransactionMgr implements Writable {
                 TransactionState transactionState = 
dbTransactionMgr.getTransactionState(txnInfo.second);
                 long coordStartTime = 
transactionState.getCoordinator().startTime;
                 if (coordStartTime < beStartTime) {
+                    // does not hold table write lock
                     dbTransactionMgr.abortTransaction(txnInfo.second, 
"coordinate BE restart", null);
                 }
             } catch (UserException e) {
@@ -692,6 +694,7 @@ public class GlobalTransactionMgr implements Writable {
                 = getPrepareTransactionIdByCoordinateBe(coordinateBeId, 
coordinateHost, limit);
         for (Pair<Long, Long> txnInfo : transactionIdByCoordinateBe) {
             try {
+                // does not hold table write lock
                 DatabaseTransactionMgr dbTransactionMgr = 
getDatabaseTransactionMgr(txnInfo.first);
                 dbTransactionMgr.abortTransaction(txnInfo.second, "coordinate 
BE is down", null);
             } catch (UserException e) {
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java
index ea63a5e18b1..9a1a934e0d5 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java
@@ -110,7 +110,7 @@ public class DatabaseTransactionMgrTest {
         transTablets.add(tabletCommitInfo3);
         Table testTable1 = 
masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1)
                 .getTableOrMetaException(CatalogTestUtil.testTableId1);
-        masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, 
Lists.newArrayList(testTable1), transactionId1, transTablets);
+        masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, 
Lists.newArrayList(testTable1), transactionId1, transTablets, null);
         TransactionState transactionState1 = 
fakeEditLog.getTransaction(transactionId1);
         setTransactionFinishPublish(transactionState1,
                 Lists.newArrayList(CatalogTestUtil.testBackendId1,
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
index cc00237a4c4..09421d0849c 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
@@ -170,7 +170,7 @@ public class GlobalTransactionMgrTest {
         Table testTable1 = 
masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1)
                 .getTableOrMetaException(CatalogTestUtil.testTableId1);
         masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, 
Lists.newArrayList(testTable1), transactionId,
-                transTablets);
+                transTablets, null);
         TransactionState transactionState = 
fakeEditLog.getTransaction(transactionId);
         // check status is committed
         Assert.assertEquals(TransactionStatus.COMMITTED, 
transactionState.getTransactionStatus());
@@ -210,7 +210,8 @@ public class GlobalTransactionMgrTest {
         transTablets.add(tabletCommitInfo2);
         Table testTable1 = 
masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1)
                 .getTableOrMetaException(CatalogTestUtil.testTableId1);
-        masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, 
Lists.newArrayList(testTable1), transactionId, transTablets);
+        masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, 
Lists.newArrayList(testTable1), transactionId,
+                transTablets, null);
 
         // follower catalog replay the transaction
         transactionState = fakeEditLog.getTransaction(transactionId);
@@ -231,7 +232,8 @@ public class GlobalTransactionMgrTest {
         transTablets.add(tabletCommitInfo1);
         transTablets.add(tabletCommitInfo3);
         try {
-            masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, 
Lists.newArrayList(testTable1), transactionId2, transTablets);
+            masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, 
Lists.newArrayList(testTable1), transactionId2,
+                    transTablets, null);
             Assert.fail();
         } catch (TabletQuorumFailedException e) {
             transactionState = 
masterTransMgr.getTransactionState(CatalogTestUtil.testDbId1, transactionId2);
@@ -260,7 +262,8 @@ public class GlobalTransactionMgrTest {
         transTablets.add(tabletCommitInfo1);
         transTablets.add(tabletCommitInfo2);
         transTablets.add(tabletCommitInfo3);
-        masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, 
Lists.newArrayList(testTable1), transactionId2, transTablets);
+        masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, 
Lists.newArrayList(testTable1), transactionId2,
+                transTablets, null);
         transactionState = fakeEditLog.getTransaction(transactionId2);
         // check status is commit
         Assert.assertEquals(TransactionStatus.COMMITTED, 
transactionState.getTransactionStatus());
@@ -465,7 +468,8 @@ public class GlobalTransactionMgrTest {
         transTablets.add(tabletCommitInfo3);
         Table testTable1 = 
masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1)
                 .getTableOrMetaException(CatalogTestUtil.testTableId1);
-        masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, 
Lists.newArrayList(testTable1), transactionId, transTablets);
+        masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, 
Lists.newArrayList(testTable1), transactionId,
+                transTablets, null);
         TransactionState transactionState = 
fakeEditLog.getTransaction(transactionId);
         Assert.assertEquals(TransactionStatus.COMMITTED, 
transactionState.getTransactionStatus());
         slaveTransMgr.replayUpsertTransactionState(transactionState);
@@ -519,7 +523,8 @@ public class GlobalTransactionMgrTest {
         transTablets.add(tabletCommitInfo2);
         Table testTable1 = 
masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1)
                 .getTableOrMetaException(CatalogTestUtil.testTableId1);
-        masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, 
Lists.newArrayList(testTable1), transactionId, transTablets);
+        masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, 
Lists.newArrayList(testTable1), transactionId,
+                transTablets, null);
 
         // follower catalog replay the transaction
         transactionState = fakeEditLog.getTransaction(transactionId);
@@ -582,7 +587,8 @@ public class GlobalTransactionMgrTest {
         transTablets.add(tabletCommitInfo1);
         transTablets.add(tabletCommitInfo3);
         try {
-            masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, 
Lists.newArrayList(testTable1), transactionId2, transTablets);
+            masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, 
Lists.newArrayList(testTable1), transactionId2,
+                    transTablets, null);
             Assert.fail();
         } catch (TabletQuorumFailedException e) {
             transactionState = 
masterTransMgr.getTransactionState(CatalogTestUtil.testDbId1, transactionId2);
@@ -598,7 +604,8 @@ public class GlobalTransactionMgrTest {
         transTablets.add(tabletCommitInfo1);
         transTablets.add(tabletCommitInfo2);
         transTablets.add(tabletCommitInfo3);
-        masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, 
Lists.newArrayList(testTable1), transactionId2, transTablets);
+        masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, 
Lists.newArrayList(testTable1), transactionId2,
+                transTablets, null);
         transactionState = fakeEditLog.getTransaction(transactionId2);
         // check status is commit
         Assert.assertEquals(TransactionStatus.COMMITTED, 
transactionState.getTransactionStatus());


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to