This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new f8acd81a68f [fix](transaction) commit txn check txn status (#40064)
(#41229)
f8acd81a68f is described below
commit f8acd81a68f4513362df2cfa813b10fa3380391c
Author: meiyi <[email protected]>
AuthorDate: Wed Sep 25 10:57:51 2024 +0800
[fix](transaction) commit txn check txn status (#40064) (#41229)
pick https://github.com/apache/doris/pull/40064
---
.../apache/doris/service/FrontendServiceImpl.java | 16 ++++-----------
.../doris/transaction/DatabaseTransactionMgr.java | 18 +++++++----------
.../doris/transaction/GlobalTransactionMgr.java | 5 ++++-
.../transaction/DatabaseTransactionMgrTest.java | 2 +-
.../transaction/GlobalTransactionMgrTest.java | 23 ++++++++++++++--------
5 files changed, 31 insertions(+), 33 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index ff25e8c1934..884065baf30 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -1730,13 +1730,8 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
throw new UserException("transaction [" + request.getTxnId() + "]
not found");
}
List<Long> tableIdList = transactionState.getTableIdList();
- List<Table> tableList = new ArrayList<>();
- List<String> tables = new ArrayList<>();
// if table was dropped, transaction must be aborted
- tableList = db.getTablesOnIdOrderOrThrowException(tableIdList);
- for (Table table : tableList) {
- tables.add(table.getName());
- }
+ List<Table> tableList =
db.getTablesOnIdOrderOrThrowException(tableIdList);
// Step 3: check auth
if (request.isSetAuthCode()) {
@@ -1744,6 +1739,7 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
} else if (request.isSetToken()) {
checkToken(request.getToken());
} else {
+ List<String> tables =
tableList.stream().map(Table::getName).collect(Collectors.toList());
checkPasswordAndPrivs(cluster, request.getUser(),
request.getPasswd(), request.getDb(), tables,
request.getUserIp(), PrivPredicate.LOAD);
}
@@ -1912,12 +1908,7 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
throw new UserException("transaction [" + request.getTxnId() + "]
not found");
}
List<Long> tableIdList = transactionState.getTableIdList();
- List<Table> tableList = new ArrayList<>();
- List<String> tables = new ArrayList<>();
- tableList = db.getTablesOnIdOrderOrThrowException(tableIdList);
- for (Table table : tableList) {
- tables.add(table.getName());
- }
+ List<Table> tableList =
db.getTablesOnIdOrderOrThrowException(tableIdList);
// Step 3: check auth
if (request.isSetAuthCode()) {
@@ -1925,6 +1916,7 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
} else if (request.isSetToken()) {
checkToken(request.getToken());
} else {
+ List<String> tables =
tableList.stream().map(Table::getName).collect(Collectors.toList());
checkPasswordAndPrivs(cluster, request.getUser(),
request.getPasswd(), request.getDb(), tables,
request.getUserIp(), PrivPredicate.LOAD);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
index 3bc676a1b42..2852c1de893 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
@@ -84,7 +84,6 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
@@ -119,14 +118,6 @@ public class DatabaseTransactionMgr {
// transactionId -> running TransactionState
private final Map<Long, TransactionState> idToRunningTransactionState =
Maps.newHashMap();
- /**
- * the multi table ids that are in transaction, used to check whether a
table is in transaction
- * multi table transaction state
- * txnId -> tableId list
- */
- private final ConcurrentHashMap<Long, List<Long>>
multiTableRunningTransactionTableIdMaps =
- new ConcurrentHashMap<>();
-
// transactionId -> final status TransactionState
private final Map<Long, TransactionState> idToFinalStatusTransactionState
= Maps.newHashMap();
@@ -436,8 +427,13 @@ public class DatabaseTransactionMgr {
checkCommitStatus(tableList, transactionState, tabletCommitInfos,
txnCommitAttachment, errorReplicaIds,
tableToPartition, totalInvolvedBackends);
- unprotectedPreCommitTransaction2PC(transactionState, errorReplicaIds,
tableToPartition,
- totalInvolvedBackends, db);
+ writeLock();
+ try {
+ unprotectedPreCommitTransaction2PC(transactionState,
errorReplicaIds, tableToPartition,
+ totalInvolvedBackends, db);
+ } finally {
+ writeUnlock();
+ }
LOG.info("transaction:[{}] successfully pre-committed",
transactionState);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
index 35c2195eb33..614843dcbbd 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
@@ -207,7 +207,7 @@ public class GlobalTransactionMgr implements Writable {
}
}
- public void preCommitTransaction2PC(long dbId, List<Table> tableList, long
transactionId,
+ private void preCommitTransaction2PC(long dbId, List<Table> tableList,
long transactionId,
List<TabletCommitInfo> tabletCommitInfos, TxnCommitAttachment
txnCommitAttachment)
throws UserException {
if (Config.disable_load_job) {
@@ -219,6 +219,7 @@ public class GlobalTransactionMgr implements Writable {
dbTransactionMgr.preCommitTransaction2PC(tableList, transactionId,
tabletCommitInfos, txnCommitAttachment);
}
+ @Deprecated
public void commitTransaction(long dbId, List<Table> tableList,
long transactionId, List<TabletCommitInfo> tabletCommitInfos)
throws UserException {
@@ -675,6 +676,7 @@ public class GlobalTransactionMgr implements Writable {
TransactionState transactionState =
dbTransactionMgr.getTransactionState(txnInfo.second);
long coordStartTime =
transactionState.getCoordinator().startTime;
if (coordStartTime < beStartTime) {
+ // does not hold table write lock
dbTransactionMgr.abortTransaction(txnInfo.second,
"coordinate BE restart", null);
}
} catch (UserException e) {
@@ -692,6 +694,7 @@ public class GlobalTransactionMgr implements Writable {
= getPrepareTransactionIdByCoordinateBe(coordinateBeId,
coordinateHost, limit);
for (Pair<Long, Long> txnInfo : transactionIdByCoordinateBe) {
try {
+ // does not hold table write lock
DatabaseTransactionMgr dbTransactionMgr =
getDatabaseTransactionMgr(txnInfo.first);
dbTransactionMgr.abortTransaction(txnInfo.second, "coordinate
BE is down", null);
} catch (UserException e) {
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java
b/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java
index ea63a5e18b1..9a1a934e0d5 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java
@@ -110,7 +110,7 @@ public class DatabaseTransactionMgrTest {
transTablets.add(tabletCommitInfo3);
Table testTable1 =
masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1)
.getTableOrMetaException(CatalogTestUtil.testTableId1);
- masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1,
Lists.newArrayList(testTable1), transactionId1, transTablets);
+ masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1,
Lists.newArrayList(testTable1), transactionId1, transTablets, null);
TransactionState transactionState1 =
fakeEditLog.getTransaction(transactionId1);
setTransactionFinishPublish(transactionState1,
Lists.newArrayList(CatalogTestUtil.testBackendId1,
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
b/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
index cc00237a4c4..09421d0849c 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
@@ -170,7 +170,7 @@ public class GlobalTransactionMgrTest {
Table testTable1 =
masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1)
.getTableOrMetaException(CatalogTestUtil.testTableId1);
masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1,
Lists.newArrayList(testTable1), transactionId,
- transTablets);
+ transTablets, null);
TransactionState transactionState =
fakeEditLog.getTransaction(transactionId);
// check status is committed
Assert.assertEquals(TransactionStatus.COMMITTED,
transactionState.getTransactionStatus());
@@ -210,7 +210,8 @@ public class GlobalTransactionMgrTest {
transTablets.add(tabletCommitInfo2);
Table testTable1 =
masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1)
.getTableOrMetaException(CatalogTestUtil.testTableId1);
- masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1,
Lists.newArrayList(testTable1), transactionId, transTablets);
+ masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1,
Lists.newArrayList(testTable1), transactionId,
+ transTablets, null);
// follower catalog replay the transaction
transactionState = fakeEditLog.getTransaction(transactionId);
@@ -231,7 +232,8 @@ public class GlobalTransactionMgrTest {
transTablets.add(tabletCommitInfo1);
transTablets.add(tabletCommitInfo3);
try {
- masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1,
Lists.newArrayList(testTable1), transactionId2, transTablets);
+ masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1,
Lists.newArrayList(testTable1), transactionId2,
+ transTablets, null);
Assert.fail();
} catch (TabletQuorumFailedException e) {
transactionState =
masterTransMgr.getTransactionState(CatalogTestUtil.testDbId1, transactionId2);
@@ -260,7 +262,8 @@ public class GlobalTransactionMgrTest {
transTablets.add(tabletCommitInfo1);
transTablets.add(tabletCommitInfo2);
transTablets.add(tabletCommitInfo3);
- masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1,
Lists.newArrayList(testTable1), transactionId2, transTablets);
+ masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1,
Lists.newArrayList(testTable1), transactionId2,
+ transTablets, null);
transactionState = fakeEditLog.getTransaction(transactionId2);
// check status is commit
Assert.assertEquals(TransactionStatus.COMMITTED,
transactionState.getTransactionStatus());
@@ -465,7 +468,8 @@ public class GlobalTransactionMgrTest {
transTablets.add(tabletCommitInfo3);
Table testTable1 =
masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1)
.getTableOrMetaException(CatalogTestUtil.testTableId1);
- masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1,
Lists.newArrayList(testTable1), transactionId, transTablets);
+ masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1,
Lists.newArrayList(testTable1), transactionId,
+ transTablets, null);
TransactionState transactionState =
fakeEditLog.getTransaction(transactionId);
Assert.assertEquals(TransactionStatus.COMMITTED,
transactionState.getTransactionStatus());
slaveTransMgr.replayUpsertTransactionState(transactionState);
@@ -519,7 +523,8 @@ public class GlobalTransactionMgrTest {
transTablets.add(tabletCommitInfo2);
Table testTable1 =
masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1)
.getTableOrMetaException(CatalogTestUtil.testTableId1);
- masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1,
Lists.newArrayList(testTable1), transactionId, transTablets);
+ masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1,
Lists.newArrayList(testTable1), transactionId,
+ transTablets, null);
// follower catalog replay the transaction
transactionState = fakeEditLog.getTransaction(transactionId);
@@ -582,7 +587,8 @@ public class GlobalTransactionMgrTest {
transTablets.add(tabletCommitInfo1);
transTablets.add(tabletCommitInfo3);
try {
- masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1,
Lists.newArrayList(testTable1), transactionId2, transTablets);
+ masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1,
Lists.newArrayList(testTable1), transactionId2,
+ transTablets, null);
Assert.fail();
} catch (TabletQuorumFailedException e) {
transactionState =
masterTransMgr.getTransactionState(CatalogTestUtil.testDbId1, transactionId2);
@@ -598,7 +604,8 @@ public class GlobalTransactionMgrTest {
transTablets.add(tabletCommitInfo1);
transTablets.add(tabletCommitInfo2);
transTablets.add(tabletCommitInfo3);
- masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1,
Lists.newArrayList(testTable1), transactionId2, transTablets);
+ masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1,
Lists.newArrayList(testTable1), transactionId2,
+ transTablets, null);
transactionState = fakeEditLog.getTransaction(transactionId2);
// check status is commit
Assert.assertEquals(TransactionStatus.COMMITTED,
transactionState.getTransactionStatus());
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]