This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 386c55f00aa [fix](transaction) commit txn check txn status (#40064) 
(#41227)
386c55f00aa is described below

commit 386c55f00aa9cd2f178eefb1d41f6e082e0715cd
Author: meiyi <[email protected]>
AuthorDate: Wed Sep 25 10:56:10 2024 +0800

    [fix](transaction) commit txn check txn status (#40064) (#41227)
    
    pick https://github.com/apache/doris/pull/40064/
---
 .../apache/doris/service/FrontendServiceImpl.java  | 16 ++++-----------
 .../doris/transaction/DatabaseTransactionMgr.java  | 18 +++++++----------
 .../doris/transaction/GlobalTransactionMgr.java    |  5 ++++-
 .../transaction/DatabaseTransactionMgrTest.java    |  3 ++-
 .../transaction/GlobalTransactionMgrTest.java      | 23 ++++++++++++++--------
 5 files changed, 32 insertions(+), 33 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java 
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index be05c023166..29e17934903 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -1712,13 +1712,8 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
             throw new UserException("transaction [" + request.getTxnId() + "] 
not found");
         }
         List<Long> tableIdList = transactionState.getTableIdList();
-        List<Table> tableList = new ArrayList<>();
-        List<String> tables = new ArrayList<>();
         // if table was dropped, transaction must be aborted
-        tableList = db.getTablesOnIdOrderOrThrowException(tableIdList);
-        for (Table table : tableList) {
-            tables.add(table.getName());
-        }
+        List<Table> tableList = 
db.getTablesOnIdOrderOrThrowException(tableIdList);
 
         // Step 3: check auth
         if (request.isSetAuthCode()) {
@@ -1726,6 +1721,7 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
         } else if (request.isSetToken()) {
             checkToken(request.getToken());
         } else {
+            List<String> tables = 
tableList.stream().map(Table::getName).collect(Collectors.toList());
             checkPasswordAndPrivs(request.getUser(), request.getPasswd(), 
request.getDb(), tables,
                     request.getUserIp(), PrivPredicate.LOAD);
         }
@@ -1901,12 +1897,7 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
             throw new UserException("transaction [" + request.getTxnId() + "] 
not found");
         }
         List<Long> tableIdList = transactionState.getTableIdList();
-        List<Table> tableList = new ArrayList<>();
-        List<String> tables = new ArrayList<>();
-        tableList = db.getTablesOnIdOrderOrThrowException(tableIdList);
-        for (Table table : tableList) {
-            tables.add(table.getName());
-        }
+        List<Table> tableList = 
db.getTablesOnIdOrderOrThrowException(tableIdList);
 
         // Step 3: check auth
         if (request.isSetAuthCode()) {
@@ -1914,6 +1905,7 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
         } else if (request.isSetToken()) {
             checkToken(request.getToken());
         } else {
+            List<String> tables = 
tableList.stream().map(Table::getName).collect(Collectors.toList());
             checkPasswordAndPrivs(request.getUser(), request.getPasswd(), 
request.getDb(), tables,
                     request.getUserIp(), PrivPredicate.LOAD);
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
index e7794769754..61721e7d892 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
@@ -92,7 +92,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
 
 /**
@@ -126,14 +125,6 @@ public class DatabaseTransactionMgr {
     // transactionId -> running TransactionState
     private final Map<Long, TransactionState> idToRunningTransactionState = 
Maps.newHashMap();
 
-    /**
-     * the multi table ids that are in transaction, used to check whether a 
table is in transaction
-     * multi table transaction state
-     * txnId -> tableId list
-     */
-    private final ConcurrentHashMap<Long, List<Long>> 
multiTableRunningTransactionTableIdMaps =
-            new ConcurrentHashMap<>();
-
     // transactionId -> final status TransactionState
     private final Map<Long, TransactionState> idToFinalStatusTransactionState 
= Maps.newHashMap();
 
@@ -472,8 +463,13 @@ public class DatabaseTransactionMgr {
         checkCommitStatus(tableList, transactionState, tabletCommitInfos, 
txnCommitAttachment, errorReplicaIds,
                           tableToPartition, totalInvolvedBackends);
 
-        unprotectedPreCommitTransaction2PC(transactionState, errorReplicaIds, 
tableToPartition,
-                totalInvolvedBackends, db);
+        writeLock();
+        try {
+            unprotectedPreCommitTransaction2PC(transactionState, 
errorReplicaIds, tableToPartition,
+                    totalInvolvedBackends, db);
+        } finally {
+            writeUnlock();
+        }
         LOG.info("transaction:[{}] successfully pre-committed", 
transactionState);
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
index 928fa785507..15d6daf30d6 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
@@ -211,7 +211,7 @@ public class GlobalTransactionMgr implements Writable {
         }
     }
 
-    public void preCommitTransaction2PC(long dbId, List<Table> tableList, long 
transactionId,
+    private void preCommitTransaction2PC(long dbId, List<Table> tableList, 
long transactionId,
             List<TabletCommitInfo> tabletCommitInfos, TxnCommitAttachment 
txnCommitAttachment)
             throws UserException {
         if (Config.disable_load_job) {
@@ -225,6 +225,7 @@ public class GlobalTransactionMgr implements Writable {
         dbTransactionMgr.preCommitTransaction2PC(tableList, transactionId, 
tabletCommitInfos, txnCommitAttachment);
     }
 
+    @Deprecated
     public void commitTransaction(long dbId, List<Table> tableList,
             long transactionId, List<TabletCommitInfo> tabletCommitInfos)
             throws UserException {
@@ -692,6 +693,7 @@ public class GlobalTransactionMgr implements Writable {
                 TransactionState transactionState = 
dbTransactionMgr.getTransactionState(txnInfo.second);
                 long coordStartTime = 
transactionState.getCoordinator().startTime;
                 if (coordStartTime < beStartTime) {
+                    // does not hold table write lock
                     dbTransactionMgr.abortTransaction(txnInfo.second, 
"coordinate BE restart", null);
                 }
             } catch (UserException e) {
@@ -709,6 +711,7 @@ public class GlobalTransactionMgr implements Writable {
                 = getPrepareTransactionIdByCoordinateBe(coordinateBeId, 
coordinateHost, limit);
         for (Pair<Long, Long> txnInfo : transactionIdByCoordinateBe) {
             try {
+                // does not hold table write lock
                 DatabaseTransactionMgr dbTransactionMgr = 
getDatabaseTransactionMgr(txnInfo.first);
                 dbTransactionMgr.abortTransaction(txnInfo.second, "coordinate 
BE is down", null);
             } catch (UserException e) {
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java
index 437f1bcb209..5f182ef50b7 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java
@@ -111,7 +111,8 @@ public class DatabaseTransactionMgrTest {
         transTablets.add(tabletCommitInfo3);
         Table testTable1 = 
masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1)
                 .getTableOrMetaException(CatalogTestUtil.testTableId1);
-        masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, 
Lists.newArrayList(testTable1), transactionId1, transTablets);
+        masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, 
Lists.newArrayList(testTable1), transactionId1,
+                transTablets, null);
         TransactionState transactionState1 = 
fakeEditLog.getTransaction(transactionId1);
         setTransactionFinishPublish(transactionState1,
                 Lists.newArrayList(CatalogTestUtil.testBackendId1,
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
index 5c8f72723ad..8ae95f35f2a 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
@@ -173,7 +173,7 @@ public class GlobalTransactionMgrTest {
         Table testTable1 = 
masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1)
                 .getTableOrMetaException(CatalogTestUtil.testTableId1);
         masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, 
Lists.newArrayList(testTable1), transactionId,
-                transTablets);
+                transTablets, null);
         TransactionState transactionState = 
fakeEditLog.getTransaction(transactionId);
         // check status is committed
         Assert.assertEquals(TransactionStatus.COMMITTED, 
transactionState.getTransactionStatus());
@@ -213,7 +213,8 @@ public class GlobalTransactionMgrTest {
         transTablets.add(tabletCommitInfo2);
         Table testTable1 = 
masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1)
                 .getTableOrMetaException(CatalogTestUtil.testTableId1);
-        masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, 
Lists.newArrayList(testTable1), transactionId, transTablets);
+        masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, 
Lists.newArrayList(testTable1), transactionId,
+                transTablets, null);
 
         // follower catalog replay the transaction
         transactionState = fakeEditLog.getTransaction(transactionId);
@@ -234,7 +235,8 @@ public class GlobalTransactionMgrTest {
         transTablets.add(tabletCommitInfo1);
         transTablets.add(tabletCommitInfo3);
         try {
-            masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, 
Lists.newArrayList(testTable1), transactionId2, transTablets);
+            masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, 
Lists.newArrayList(testTable1), transactionId2,
+                    transTablets, null);
             Assert.fail();
         } catch (TabletQuorumFailedException e) {
             transactionState = 
masterTransMgr.getTransactionState(CatalogTestUtil.testDbId1, transactionId2);
@@ -263,7 +265,8 @@ public class GlobalTransactionMgrTest {
         transTablets.add(tabletCommitInfo1);
         transTablets.add(tabletCommitInfo2);
         transTablets.add(tabletCommitInfo3);
-        masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, 
Lists.newArrayList(testTable1), transactionId2, transTablets);
+        masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, 
Lists.newArrayList(testTable1), transactionId2,
+                transTablets, null);
         transactionState = fakeEditLog.getTransaction(transactionId2);
         // check status is commit
         Assert.assertEquals(TransactionStatus.COMMITTED, 
transactionState.getTransactionStatus());
@@ -468,7 +471,8 @@ public class GlobalTransactionMgrTest {
         transTablets.add(tabletCommitInfo3);
         Table testTable1 = 
masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1)
                 .getTableOrMetaException(CatalogTestUtil.testTableId1);
-        masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, 
Lists.newArrayList(testTable1), transactionId, transTablets);
+        masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, 
Lists.newArrayList(testTable1), transactionId,
+                transTablets, null);
         TransactionState transactionState = 
fakeEditLog.getTransaction(transactionId);
         Assert.assertEquals(TransactionStatus.COMMITTED, 
transactionState.getTransactionStatus());
         slaveTransMgr.replayUpsertTransactionState(transactionState);
@@ -533,7 +537,8 @@ public class GlobalTransactionMgrTest {
         transTablets.add(tabletCommitInfo2);
         Table testTable1 = 
masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1)
                 .getTableOrMetaException(CatalogTestUtil.testTableId1);
-        masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, 
Lists.newArrayList(testTable1), transactionId, transTablets);
+        masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, 
Lists.newArrayList(testTable1), transactionId,
+                transTablets, null);
 
         // follower catalog replay the transaction
         transactionState = fakeEditLog.getTransaction(transactionId);
@@ -602,7 +607,8 @@ public class GlobalTransactionMgrTest {
         transTablets.add(tabletCommitInfo1);
         transTablets.add(tabletCommitInfo3);
         try {
-            masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, 
Lists.newArrayList(testTable1), transactionId2, transTablets);
+            masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, 
Lists.newArrayList(testTable1), transactionId2,
+                    transTablets, null);
             Assert.fail();
         } catch (TabletQuorumFailedException e) {
             transactionState = 
masterTransMgr.getTransactionState(CatalogTestUtil.testDbId1, transactionId2);
@@ -618,7 +624,8 @@ public class GlobalTransactionMgrTest {
         transTablets.add(tabletCommitInfo1);
         transTablets.add(tabletCommitInfo2);
         transTablets.add(tabletCommitInfo3);
-        masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, 
Lists.newArrayList(testTable1), transactionId2, transTablets);
+        masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, 
Lists.newArrayList(testTable1), transactionId2,
+                transTablets, null);
         transactionState = fakeEditLog.getTransaction(transactionId2);
         // check status is commit
         Assert.assertEquals(TransactionStatus.COMMITTED, 
transactionState.getTransactionStatus());


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to