This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new f55055096bf branch-2.1: [improve](binlog) Allow commit txn without 
waiting txn publish #48961 (#49266)
f55055096bf is described below

commit f55055096bfdea24b2fd15186b6b1ba71c092af0
Author: walter <[email protected]>
AuthorDate: Sat Mar 29 08:59:23 2025 +0800

    branch-2.1: [improve](binlog) Allow commit txn without waiting txn publish 
#48961 (#49266)
    
    cherry pick from #48961
---
 .../apache/doris/load/loadv2/BrokerLoadJob.java    |  2 +-
 .../org/apache/doris/load/loadv2/SparkLoadJob.java |  2 +-
 .../apache/doris/service/FrontendServiceImpl.java  | 20 ++++++++++----
 .../doris/transaction/GlobalTransactionMgr.java    | 24 +++++++++++++----
 .../org/apache/doris/load/DeleteHandlerTest.java   |  3 ++-
 .../apache/doris/load/loadv2/SparkLoadJobTest.java |  3 ++-
 .../transaction/DatabaseTransactionMgrTest.java    |  3 ++-
 .../transaction/GlobalTransactionMgrTest.java      | 31 ++++++++++++++--------
 gensrc/thrift/FrontendService.thrift               |  2 ++
 9 files changed, 64 insertions(+), 26 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
index 3b8428a4351..1b76ce42bc0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
@@ -321,7 +321,7 @@ public class BrokerLoadJob extends BulkLoadJob {
                     .add("txn_id", transactionId)
                     .add("msg", "Load job try to commit txn")
                     .build());
-            Env.getCurrentGlobalTransactionMgr().commitTransaction(
+            Env.getCurrentGlobalTransactionMgr().commitTransactionWithoutLock(
                     dbId, tableList, transactionId, commitInfos,
                     new LoadJobFinalOperation(id, loadingStatus, progress, 
loadStartTimestamp,
                             finishTimestamp, state, failMsg));
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java
index 9aac4b36557..f7accb126e7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java
@@ -653,7 +653,7 @@ public class SparkLoadJob extends BulkLoadJob {
                 Lists.newArrayList(tableToLoadPartitions.keySet()));
         MetaLockUtils.writeLockTablesOrMetaException(tableList);
         try {
-            Env.getCurrentGlobalTransactionMgr().commitTransaction(
+            Env.getCurrentGlobalTransactionMgr().commitTransactionWithoutLock(
                     dbId, tableList, transactionId, commitInfos,
                     new LoadJobFinalOperation(id, loadingStatus, progress, 
loadStartTimestamp,
                                               finishTimestamp, state, 
failMsg));
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java 
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index 7c1b6a45b4c..d48418c7f25 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -1747,11 +1747,21 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
         long timeoutMs = request.isSetThriftRpcTimeoutMs() ? 
request.getThriftRpcTimeoutMs() / 2 : 5000;
 
         // Step 5: commit and publish
-        return Env.getCurrentGlobalTransactionMgr()
-                .commitAndPublishTransaction(db, tableList,
-                        request.getTxnId(),
-                        TabletCommitInfo.fromThrift(request.getCommitInfos()), 
timeoutMs,
-                        
TxnCommitAttachment.fromThrift(request.getTxnCommitAttachment()));
+        if (!request.isOnlyCommit()) {
+            return Env.getCurrentGlobalTransactionMgr()
+                    .commitAndPublishTransaction(db, tableList,
+                            request.getTxnId(),
+                            
TabletCommitInfo.fromThrift(request.getCommitInfos()), timeoutMs,
+                            
TxnCommitAttachment.fromThrift(request.getTxnCommitAttachment()));
+        } else {
+            // single table commit, so don't need to wait for publish.
+            Env.getCurrentGlobalTransactionMgr()
+                    .commitTransaction(db, tableList,
+                            request.getTxnId(),
+                            
TabletCommitInfo.fromThrift(request.getCommitInfos()), timeoutMs,
+                            
TxnCommitAttachment.fromThrift(request.getTxnCommitAttachment()));
+            return true;
+        }
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
index 15d6daf30d6..3bee819a1c8 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
@@ -226,10 +226,10 @@ public class GlobalTransactionMgr implements Writable {
     }
 
     @Deprecated
-    public void commitTransaction(long dbId, List<Table> tableList,
+    public void commitTransactionWithoutLock(long dbId, List<Table> tableList,
             long transactionId, List<TabletCommitInfo> tabletCommitInfos)
             throws UserException {
-        commitTransaction(dbId, tableList, transactionId, tabletCommitInfos, 
null);
+        commitTransactionWithoutLock(dbId, tableList, transactionId, 
tabletCommitInfos, null);
     }
 
     /**
@@ -241,7 +241,7 @@ public class GlobalTransactionMgr implements Writable {
      * @note it is necessary to optimize the `lock` mechanism and `lock` scope 
resulting from wait lock long time
      * @note callers should get all tables' write locks before call this api
      */
-    public void commitTransaction(long dbId, List<Table> tableList, long 
transactionId,
+    public void commitTransactionWithoutLock(long dbId, List<Table> tableList, 
long transactionId,
             List<TabletCommitInfo> tabletCommitInfos, TxnCommitAttachment 
txnCommitAttachment)
             throws UserException {
         if (Config.disable_load_job) {
@@ -255,6 +255,20 @@ public class GlobalTransactionMgr implements Writable {
         dbTransactionMgr.commitTransaction(tableList, transactionId, 
tabletCommitInfos, txnCommitAttachment, false);
     }
 
+    public void commitTransaction(DatabaseIf db, List<Table> tableList, long 
transactionId,
+            List<TabletCommitInfo> tabletCommitInfos, long timeoutMillis, 
TxnCommitAttachment txnCommitAttachment)
+            throws UserException {
+        if (!MetaLockUtils.tryWriteLockTablesOrMetaException(tableList, 
timeoutMillis, TimeUnit.MILLISECONDS)) {
+            throw new UserException("get tableList write lock timeout, 
tableList=("
+                    + StringUtils.join(tableList, ",") + ")");
+        }
+        try {
+            commitTransactionWithoutLock(db.getId(), tableList, transactionId, 
tabletCommitInfos, txnCommitAttachment);
+        } finally {
+            MetaLockUtils.writeUnlockTables(tableList);
+        }
+    }
+
     private void commitTransaction2PC(long dbId, long transactionId)
             throws UserException {
         if (Config.disable_load_job) {
@@ -282,7 +296,7 @@ public class GlobalTransactionMgr implements Writable {
                     + StringUtils.join(tableList, ",") + ")");
         }
         try {
-            commitTransaction(db.getId(), tableList, transactionId, 
tabletCommitInfos, txnCommitAttachment);
+            commitTransactionWithoutLock(db.getId(), tableList, transactionId, 
tabletCommitInfos, txnCommitAttachment);
         } finally {
             MetaLockUtils.writeUnlockTables(tableList);
         }
@@ -313,7 +327,7 @@ public class GlobalTransactionMgr implements Writable {
         }
         stopWatch.stop();
         LOG.info("stream load tasks are committed successfully. txns: {}. time 
cost: {} ms."
-                + " data will be visable later.", transactionId, 
stopWatch.getTime());
+                + " data will be visible later.", transactionId, 
stopWatch.getTime());
     }
 
     public void abortTransaction(long dbId, long transactionId, String reason) 
throws UserException {
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/load/DeleteHandlerTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/load/DeleteHandlerTest.java
index 18f29ac30d2..bf1b82019eb 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/load/DeleteHandlerTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/load/DeleteHandlerTest.java
@@ -358,7 +358,8 @@ public class DeleteHandlerTest {
         new Expectations(globalTransactionMgr) {
             {
                 try {
-                    globalTransactionMgr.commitTransaction(anyLong, 
(List<Table>) any, anyLong, (List<TabletCommitInfo>) any, (TxnCommitAttachment) 
any);
+                    globalTransactionMgr.commitTransactionWithoutLock(
+                            anyLong, (List<Table>) any, anyLong, 
(List<TabletCommitInfo>) any, (TxnCommitAttachment) any);
                 } catch (UserException e) {
                     // CHECKSTYLE IGNORE THIS LINE
                 }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java
index e3916cfb18d..f1e9942c7e3 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java
@@ -377,7 +377,8 @@ public class SparkLoadJobTest {
                 AgentTaskExecutor.submit((AgentBatchTask) any);
                 Env.getCurrentGlobalTransactionMgr();
                 result = transactionMgr;
-                transactionMgr.commitTransaction(dbId, (List<Table>) any, 
transactionId, (List<TabletCommitInfo>) any,
+                transactionMgr.commitTransactionWithoutLock(
+                        dbId, (List<Table>) any, transactionId, 
(List<TabletCommitInfo>) any,
                         (LoadJobFinalOperation) any);
             }
         };
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java
index 5f182ef50b7..76ae778153d 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java
@@ -111,7 +111,8 @@ public class DatabaseTransactionMgrTest {
         transTablets.add(tabletCommitInfo3);
         Table testTable1 = 
masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1)
                 .getTableOrMetaException(CatalogTestUtil.testTableId1);
-        masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, 
Lists.newArrayList(testTable1), transactionId1,
+        masterTransMgr.commitTransactionWithoutLock(
+                CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), 
transactionId1,
                 transTablets, null);
         TransactionState transactionState1 = 
fakeEditLog.getTransaction(transactionId1);
         setTransactionFinishPublish(transactionState1,
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
index 7910df48be2..2779a5d5107 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
@@ -172,8 +172,8 @@ public class GlobalTransactionMgrTest {
         transTablets.add(tabletCommitInfo3);
         Table testTable1 = 
masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1)
                 .getTableOrMetaException(CatalogTestUtil.testTableId1);
-        masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, 
Lists.newArrayList(testTable1), transactionId,
-                transTablets, null);
+        masterTransMgr.commitTransactionWithoutLock(
+                CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), 
transactionId, transTablets, null);
         TransactionState transactionState = 
fakeEditLog.getTransaction(transactionId);
         // check status is committed
         Assert.assertEquals(TransactionStatus.COMMITTED, 
transactionState.getTransactionStatus());
@@ -213,7 +213,8 @@ public class GlobalTransactionMgrTest {
         transTablets.add(tabletCommitInfo2);
         Table testTable1 = 
masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1)
                 .getTableOrMetaException(CatalogTestUtil.testTableId1);
-        masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, 
Lists.newArrayList(testTable1), transactionId,
+        masterTransMgr.commitTransactionWithoutLock(
+                CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), 
transactionId,
                 transTablets, null);
 
         // follower catalog replay the transaction
@@ -235,7 +236,8 @@ public class GlobalTransactionMgrTest {
         transTablets.add(tabletCommitInfo1);
         transTablets.add(tabletCommitInfo3);
         try {
-            masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, 
Lists.newArrayList(testTable1), transactionId2,
+            masterTransMgr.commitTransactionWithoutLock(
+                    CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), 
transactionId2,
                     transTablets, null);
             Assert.fail();
         } catch (TabletQuorumFailedException e) {
@@ -265,7 +267,8 @@ public class GlobalTransactionMgrTest {
         transTablets.add(tabletCommitInfo1);
         transTablets.add(tabletCommitInfo2);
         transTablets.add(tabletCommitInfo3);
-        masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, 
Lists.newArrayList(testTable1), transactionId2,
+        masterTransMgr.commitTransactionWithoutLock(
+                CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), 
transactionId2,
                 transTablets, null);
         transactionState = fakeEditLog.getTransaction(transactionId2);
         // check status is commit
@@ -366,7 +369,8 @@ public class GlobalTransactionMgrTest {
         
Deencapsulation.setField(masterTransMgr.getDatabaseTransactionMgr(CatalogTestUtil.testDbId1),
 "idToRunningTransactionState", idToTransactionState);
         Table testTable1 = 
masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1)
                 .getTableOrMetaException(CatalogTestUtil.testTableId1);
-        masterTransMgr.commitTransaction(1L, Lists.newArrayList(testTable1), 
1L, transTablets, txnCommitAttachment);
+        masterTransMgr.commitTransactionWithoutLock(
+                1L, Lists.newArrayList(testTable1), 1L, transTablets, 
txnCommitAttachment);
         RoutineLoadStatistic jobStatistic =  
Deencapsulation.getField(routineLoadJob, "jobStatistic");
 
         Assert.assertEquals(Long.valueOf(101), 
Deencapsulation.getField(jobStatistic, "currentTotalRows"));
@@ -439,7 +443,8 @@ public class GlobalTransactionMgrTest {
         
Deencapsulation.setField(masterTransMgr.getDatabaseTransactionMgr(CatalogTestUtil.testDbId1),
 "idToRunningTransactionState", idToTransactionState);
         Table testTable1 = 
masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1)
                 .getTableOrMetaException(CatalogTestUtil.testTableId1);
-        masterTransMgr.commitTransaction(1L, Lists.newArrayList(testTable1), 
1L, transTablets, txnCommitAttachment);
+        masterTransMgr.commitTransactionWithoutLock(
+                1L, Lists.newArrayList(testTable1), 1L, transTablets, 
txnCommitAttachment);
 
         // current total rows and error rows will be reset after job pause, so 
here they should be 0.
         RoutineLoadStatistic jobStatistic =  
Deencapsulation.getField(routineLoadJob, "jobStatistic");
@@ -471,7 +476,8 @@ public class GlobalTransactionMgrTest {
         transTablets.add(tabletCommitInfo3);
         Table testTable1 = 
masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1)
                 .getTableOrMetaException(CatalogTestUtil.testTableId1);
-        masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, 
Lists.newArrayList(testTable1), transactionId,
+        masterTransMgr.commitTransactionWithoutLock(
+                CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), 
transactionId,
                 transTablets, null);
         TransactionState transactionState = 
fakeEditLog.getTransaction(transactionId);
         Assert.assertEquals(TransactionStatus.COMMITTED, 
transactionState.getTransactionStatus());
@@ -537,7 +543,8 @@ public class GlobalTransactionMgrTest {
         transTablets.add(tabletCommitInfo2);
         Table testTable1 = 
masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1)
                 .getTableOrMetaException(CatalogTestUtil.testTableId1);
-        masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, 
Lists.newArrayList(testTable1), transactionId,
+        masterTransMgr.commitTransactionWithoutLock(
+                CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), 
transactionId,
                 transTablets, null);
 
         // follower catalog replay the transaction
@@ -607,7 +614,8 @@ public class GlobalTransactionMgrTest {
         transTablets.add(tabletCommitInfo1);
         transTablets.add(tabletCommitInfo3);
         try {
-            masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, 
Lists.newArrayList(testTable1), transactionId2,
+            masterTransMgr.commitTransactionWithoutLock(
+                    CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), 
transactionId2,
                     transTablets, null);
             Assert.fail();
         } catch (TabletQuorumFailedException e) {
@@ -624,7 +632,8 @@ public class GlobalTransactionMgrTest {
         transTablets.add(tabletCommitInfo1);
         transTablets.add(tabletCommitInfo2);
         transTablets.add(tabletCommitInfo3);
-        masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, 
Lists.newArrayList(testTable1), transactionId2,
+        masterTransMgr.commitTransactionWithoutLock(
+                CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), 
transactionId2,
                 transTablets, null);
         transactionState = fakeEditLog.getTransaction(transactionId2);
         // check status is commit
diff --git a/gensrc/thrift/FrontendService.thrift 
b/gensrc/thrift/FrontendService.thrift
index 3018924064b..210f1ada0a9 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -824,6 +824,8 @@ struct TCommitTxnRequest {
     10: optional i64 thrift_rpc_timeout_ms
     11: optional string token
     12: optional i64 db_id
+    // used for ccr
+    15: optional bool only_commit   // only commit txn, without waiting txn 
publish
 }
 
 struct TCommitTxnResult {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to