This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new f55055096bf branch-2.1: [improve](binlog) Allow commit txn without
waiting txn publish #48961 (#49266)
f55055096bf is described below
commit f55055096bfdea24b2fd15186b6b1ba71c092af0
Author: walter <[email protected]>
AuthorDate: Sat Mar 29 08:59:23 2025 +0800
branch-2.1: [improve](binlog) Allow commit txn without waiting txn publish
#48961 (#49266)
cherry pick from #48961
---
.../apache/doris/load/loadv2/BrokerLoadJob.java | 2 +-
.../org/apache/doris/load/loadv2/SparkLoadJob.java | 2 +-
.../apache/doris/service/FrontendServiceImpl.java | 20 ++++++++++----
.../doris/transaction/GlobalTransactionMgr.java | 24 +++++++++++++----
.../org/apache/doris/load/DeleteHandlerTest.java | 3 ++-
.../apache/doris/load/loadv2/SparkLoadJobTest.java | 3 ++-
.../transaction/DatabaseTransactionMgrTest.java | 3 ++-
.../transaction/GlobalTransactionMgrTest.java | 31 ++++++++++++++--------
gensrc/thrift/FrontendService.thrift | 2 ++
9 files changed, 64 insertions(+), 26 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
index 3b8428a4351..1b76ce42bc0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
@@ -321,7 +321,7 @@ public class BrokerLoadJob extends BulkLoadJob {
.add("txn_id", transactionId)
.add("msg", "Load job try to commit txn")
.build());
- Env.getCurrentGlobalTransactionMgr().commitTransaction(
+ Env.getCurrentGlobalTransactionMgr().commitTransactionWithoutLock(
dbId, tableList, transactionId, commitInfos,
new LoadJobFinalOperation(id, loadingStatus, progress,
loadStartTimestamp,
finishTimestamp, state, failMsg));
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java
index 9aac4b36557..f7accb126e7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java
@@ -653,7 +653,7 @@ public class SparkLoadJob extends BulkLoadJob {
Lists.newArrayList(tableToLoadPartitions.keySet()));
MetaLockUtils.writeLockTablesOrMetaException(tableList);
try {
- Env.getCurrentGlobalTransactionMgr().commitTransaction(
+ Env.getCurrentGlobalTransactionMgr().commitTransactionWithoutLock(
dbId, tableList, transactionId, commitInfos,
new LoadJobFinalOperation(id, loadingStatus, progress,
loadStartTimestamp,
finishTimestamp, state,
failMsg));
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index 7c1b6a45b4c..d48418c7f25 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -1747,11 +1747,21 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
long timeoutMs = request.isSetThriftRpcTimeoutMs() ?
request.getThriftRpcTimeoutMs() / 2 : 5000;
// Step 5: commit and publish
- return Env.getCurrentGlobalTransactionMgr()
- .commitAndPublishTransaction(db, tableList,
- request.getTxnId(),
- TabletCommitInfo.fromThrift(request.getCommitInfos()),
timeoutMs,
-
TxnCommitAttachment.fromThrift(request.getTxnCommitAttachment()));
+ if (!request.isOnlyCommit()) {
+ return Env.getCurrentGlobalTransactionMgr()
+ .commitAndPublishTransaction(db, tableList,
+ request.getTxnId(),
+
TabletCommitInfo.fromThrift(request.getCommitInfos()), timeoutMs,
+
TxnCommitAttachment.fromThrift(request.getTxnCommitAttachment()));
+ } else {
+ // single table commit, so don't need to wait for publish.
+ Env.getCurrentGlobalTransactionMgr()
+ .commitTransaction(db, tableList,
+ request.getTxnId(),
+
TabletCommitInfo.fromThrift(request.getCommitInfos()), timeoutMs,
+
TxnCommitAttachment.fromThrift(request.getTxnCommitAttachment()));
+ return true;
+ }
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
index 15d6daf30d6..3bee819a1c8 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
@@ -226,10 +226,10 @@ public class GlobalTransactionMgr implements Writable {
}
@Deprecated
- public void commitTransaction(long dbId, List<Table> tableList,
+ public void commitTransactionWithoutLock(long dbId, List<Table> tableList,
long transactionId, List<TabletCommitInfo> tabletCommitInfos)
throws UserException {
- commitTransaction(dbId, tableList, transactionId, tabletCommitInfos,
null);
+ commitTransactionWithoutLock(dbId, tableList, transactionId,
tabletCommitInfos, null);
}
/**
@@ -241,7 +241,7 @@ public class GlobalTransactionMgr implements Writable {
* @note it is necessary to optimize the `lock` mechanism and `lock` scope
resulting from wait lock long time
* @note callers should get all tables' write locks before call this api
*/
- public void commitTransaction(long dbId, List<Table> tableList, long
transactionId,
+ public void commitTransactionWithoutLock(long dbId, List<Table> tableList,
long transactionId,
List<TabletCommitInfo> tabletCommitInfos, TxnCommitAttachment
txnCommitAttachment)
throws UserException {
if (Config.disable_load_job) {
@@ -255,6 +255,20 @@ public class GlobalTransactionMgr implements Writable {
dbTransactionMgr.commitTransaction(tableList, transactionId,
tabletCommitInfos, txnCommitAttachment, false);
}
+ public void commitTransaction(DatabaseIf db, List<Table> tableList, long
transactionId,
+ List<TabletCommitInfo> tabletCommitInfos, long timeoutMillis,
TxnCommitAttachment txnCommitAttachment)
+ throws UserException {
+ if (!MetaLockUtils.tryWriteLockTablesOrMetaException(tableList,
timeoutMillis, TimeUnit.MILLISECONDS)) {
+ throw new UserException("get tableList write lock timeout,
tableList=("
+ + StringUtils.join(tableList, ",") + ")");
+ }
+ try {
+ commitTransactionWithoutLock(db.getId(), tableList, transactionId,
tabletCommitInfos, txnCommitAttachment);
+ } finally {
+ MetaLockUtils.writeUnlockTables(tableList);
+ }
+ }
+
private void commitTransaction2PC(long dbId, long transactionId)
throws UserException {
if (Config.disable_load_job) {
@@ -282,7 +296,7 @@ public class GlobalTransactionMgr implements Writable {
+ StringUtils.join(tableList, ",") + ")");
}
try {
- commitTransaction(db.getId(), tableList, transactionId,
tabletCommitInfos, txnCommitAttachment);
+ commitTransactionWithoutLock(db.getId(), tableList, transactionId,
tabletCommitInfos, txnCommitAttachment);
} finally {
MetaLockUtils.writeUnlockTables(tableList);
}
@@ -313,7 +327,7 @@ public class GlobalTransactionMgr implements Writable {
}
stopWatch.stop();
LOG.info("stream load tasks are committed successfully. txns: {}. time
cost: {} ms."
- + " data will be visable later.", transactionId,
stopWatch.getTime());
+ + " data will be visible later.", transactionId,
stopWatch.getTime());
}
public void abortTransaction(long dbId, long transactionId, String reason)
throws UserException {
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/load/DeleteHandlerTest.java
b/fe/fe-core/src/test/java/org/apache/doris/load/DeleteHandlerTest.java
index 18f29ac30d2..bf1b82019eb 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/load/DeleteHandlerTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/load/DeleteHandlerTest.java
@@ -358,7 +358,8 @@ public class DeleteHandlerTest {
new Expectations(globalTransactionMgr) {
{
try {
- globalTransactionMgr.commitTransaction(anyLong,
(List<Table>) any, anyLong, (List<TabletCommitInfo>) any, (TxnCommitAttachment)
any);
+ globalTransactionMgr.commitTransactionWithoutLock(
+ anyLong, (List<Table>) any, anyLong,
(List<TabletCommitInfo>) any, (TxnCommitAttachment) any);
} catch (UserException e) {
// CHECKSTYLE IGNORE THIS LINE
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java
b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java
index e3916cfb18d..f1e9942c7e3 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java
@@ -377,7 +377,8 @@ public class SparkLoadJobTest {
AgentTaskExecutor.submit((AgentBatchTask) any);
Env.getCurrentGlobalTransactionMgr();
result = transactionMgr;
- transactionMgr.commitTransaction(dbId, (List<Table>) any,
transactionId, (List<TabletCommitInfo>) any,
+ transactionMgr.commitTransactionWithoutLock(
+ dbId, (List<Table>) any, transactionId,
(List<TabletCommitInfo>) any,
(LoadJobFinalOperation) any);
}
};
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java
b/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java
index 5f182ef50b7..76ae778153d 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java
@@ -111,7 +111,8 @@ public class DatabaseTransactionMgrTest {
transTablets.add(tabletCommitInfo3);
Table testTable1 =
masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1)
.getTableOrMetaException(CatalogTestUtil.testTableId1);
- masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1,
Lists.newArrayList(testTable1), transactionId1,
+ masterTransMgr.commitTransactionWithoutLock(
+ CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1),
transactionId1,
transTablets, null);
TransactionState transactionState1 =
fakeEditLog.getTransaction(transactionId1);
setTransactionFinishPublish(transactionState1,
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
b/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
index 7910df48be2..2779a5d5107 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
@@ -172,8 +172,8 @@ public class GlobalTransactionMgrTest {
transTablets.add(tabletCommitInfo3);
Table testTable1 =
masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1)
.getTableOrMetaException(CatalogTestUtil.testTableId1);
- masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1,
Lists.newArrayList(testTable1), transactionId,
- transTablets, null);
+ masterTransMgr.commitTransactionWithoutLock(
+ CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1),
transactionId, transTablets, null);
TransactionState transactionState =
fakeEditLog.getTransaction(transactionId);
// check status is committed
Assert.assertEquals(TransactionStatus.COMMITTED,
transactionState.getTransactionStatus());
@@ -213,7 +213,8 @@ public class GlobalTransactionMgrTest {
transTablets.add(tabletCommitInfo2);
Table testTable1 =
masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1)
.getTableOrMetaException(CatalogTestUtil.testTableId1);
- masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1,
Lists.newArrayList(testTable1), transactionId,
+ masterTransMgr.commitTransactionWithoutLock(
+ CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1),
transactionId,
transTablets, null);
// follower catalog replay the transaction
@@ -235,7 +236,8 @@ public class GlobalTransactionMgrTest {
transTablets.add(tabletCommitInfo1);
transTablets.add(tabletCommitInfo3);
try {
- masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1,
Lists.newArrayList(testTable1), transactionId2,
+ masterTransMgr.commitTransactionWithoutLock(
+ CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1),
transactionId2,
transTablets, null);
Assert.fail();
} catch (TabletQuorumFailedException e) {
@@ -265,7 +267,8 @@ public class GlobalTransactionMgrTest {
transTablets.add(tabletCommitInfo1);
transTablets.add(tabletCommitInfo2);
transTablets.add(tabletCommitInfo3);
- masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1,
Lists.newArrayList(testTable1), transactionId2,
+ masterTransMgr.commitTransactionWithoutLock(
+ CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1),
transactionId2,
transTablets, null);
transactionState = fakeEditLog.getTransaction(transactionId2);
// check status is commit
@@ -366,7 +369,8 @@ public class GlobalTransactionMgrTest {
Deencapsulation.setField(masterTransMgr.getDatabaseTransactionMgr(CatalogTestUtil.testDbId1),
"idToRunningTransactionState", idToTransactionState);
Table testTable1 =
masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1)
.getTableOrMetaException(CatalogTestUtil.testTableId1);
- masterTransMgr.commitTransaction(1L, Lists.newArrayList(testTable1),
1L, transTablets, txnCommitAttachment);
+ masterTransMgr.commitTransactionWithoutLock(
+ 1L, Lists.newArrayList(testTable1), 1L, transTablets,
txnCommitAttachment);
RoutineLoadStatistic jobStatistic =
Deencapsulation.getField(routineLoadJob, "jobStatistic");
Assert.assertEquals(Long.valueOf(101),
Deencapsulation.getField(jobStatistic, "currentTotalRows"));
@@ -439,7 +443,8 @@ public class GlobalTransactionMgrTest {
Deencapsulation.setField(masterTransMgr.getDatabaseTransactionMgr(CatalogTestUtil.testDbId1),
"idToRunningTransactionState", idToTransactionState);
Table testTable1 =
masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1)
.getTableOrMetaException(CatalogTestUtil.testTableId1);
- masterTransMgr.commitTransaction(1L, Lists.newArrayList(testTable1),
1L, transTablets, txnCommitAttachment);
+ masterTransMgr.commitTransactionWithoutLock(
+ 1L, Lists.newArrayList(testTable1), 1L, transTablets,
txnCommitAttachment);
// current total rows and error rows will be reset after job pause, so
here they should be 0.
RoutineLoadStatistic jobStatistic =
Deencapsulation.getField(routineLoadJob, "jobStatistic");
@@ -471,7 +476,8 @@ public class GlobalTransactionMgrTest {
transTablets.add(tabletCommitInfo3);
Table testTable1 =
masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1)
.getTableOrMetaException(CatalogTestUtil.testTableId1);
- masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1,
Lists.newArrayList(testTable1), transactionId,
+ masterTransMgr.commitTransactionWithoutLock(
+ CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1),
transactionId,
transTablets, null);
TransactionState transactionState =
fakeEditLog.getTransaction(transactionId);
Assert.assertEquals(TransactionStatus.COMMITTED,
transactionState.getTransactionStatus());
@@ -537,7 +543,8 @@ public class GlobalTransactionMgrTest {
transTablets.add(tabletCommitInfo2);
Table testTable1 =
masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1)
.getTableOrMetaException(CatalogTestUtil.testTableId1);
- masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1,
Lists.newArrayList(testTable1), transactionId,
+ masterTransMgr.commitTransactionWithoutLock(
+ CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1),
transactionId,
transTablets, null);
// follower catalog replay the transaction
@@ -607,7 +614,8 @@ public class GlobalTransactionMgrTest {
transTablets.add(tabletCommitInfo1);
transTablets.add(tabletCommitInfo3);
try {
- masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1,
Lists.newArrayList(testTable1), transactionId2,
+ masterTransMgr.commitTransactionWithoutLock(
+ CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1),
transactionId2,
transTablets, null);
Assert.fail();
} catch (TabletQuorumFailedException e) {
@@ -624,7 +632,8 @@ public class GlobalTransactionMgrTest {
transTablets.add(tabletCommitInfo1);
transTablets.add(tabletCommitInfo2);
transTablets.add(tabletCommitInfo3);
- masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1,
Lists.newArrayList(testTable1), transactionId2,
+ masterTransMgr.commitTransactionWithoutLock(
+ CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1),
transactionId2,
transTablets, null);
transactionState = fakeEditLog.getTransaction(transactionId2);
// check status is commit
diff --git a/gensrc/thrift/FrontendService.thrift
b/gensrc/thrift/FrontendService.thrift
index 3018924064b..210f1ada0a9 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -824,6 +824,8 @@ struct TCommitTxnRequest {
10: optional i64 thrift_rpc_timeout_ms
11: optional string token
12: optional i64 db_id
+ // used for ccr
+ 15: optional bool only_commit // only commit txn, without waiting txn
publish
}
struct TCommitTxnResult {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]