This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new 639d20e8ea2 [fix](cloud) fix routine load job stuck if commit transaction failed (#40539) 639d20e8ea2 is described below commit 639d20e8ea2fe0fe40fb077b3e0393357de2fcc8 Author: hui lai <1353307...@qq.com> AuthorDate: Wed Sep 11 14:43:48 2024 +0800 [fix](cloud) fix routine load job stuck if commit transaction failed (#40539) At the before commit stage, a write lock will be added. If the commit transaction fails, the thread will return directly and the write lock will no longer be released which cause job stuck. --- .../cloud/transaction/CloudGlobalTransactionMgr.java | 18 +++++++++++++++++- .../apache/doris/load/routineload/RoutineLoadJob.java | 2 +- 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java index f224d2929a6..e52a4c62957 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java @@ -505,7 +505,17 @@ public class CloudGlobalTransactionMgr implements GlobalTransactionMgrIface { } final CommitTxnRequest commitTxnRequest = builder.build(); - commitTxn(commitTxnRequest, transactionId, is2PC, dbId, tableList); + try { + commitTxn(commitTxnRequest, transactionId, is2PC, dbId, tableList); + } catch (UserException e) { + // For routine load, it is necessary to release the write lock when commit transaction fails, + // otherwise it will cause the lock added in beforeCommitted to not be released. + if (txnCommitAttachment != null && txnCommitAttachment instanceof RLTaskTxnCommitAttachment) { + RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment = (RLTaskTxnCommitAttachment) txnCommitAttachment; + Env.getCurrentEnv().getRoutineLoadManager().getJob(rlTaskTxnCommitAttachment.getJobId()).writeUnlock(); + } + throw e; + } } private void commitTxn(CommitTxnRequest commitTxnRequest, long transactionId, boolean is2PC, long dbId, @@ -1035,6 +1045,12 @@ public class CloudGlobalTransactionMgr implements GlobalTransactionMgrIface { Preconditions.checkNotNull(abortTxnResponse.getStatus()); } catch (RpcException e) { LOG.warn("abortTxn failed, transactionId:{}, Exception", transactionId, e); + // For routine load, it is necessary to release the write lock when abort transaction fails, + // otherwise it will cause the lock added in beforeAborted to not be released. + if (txnCommitAttachment != null && txnCommitAttachment instanceof RLTaskTxnCommitAttachment) { + RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment = (RLTaskTxnCommitAttachment) txnCommitAttachment; + Env.getCurrentEnv().getRoutineLoadManager().getJob(rlTaskTxnCommitAttachment.getJobId()).writeUnlock(); + } throw new UserException("abortTxn failed, errMsg:" + e.getMessage()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java index 8e3ed8c4682..ac4a548c62f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java @@ -485,7 +485,7 @@ public abstract class RoutineLoadJob lock.writeLock().lock(); } - protected void writeUnlock() { + public void writeUnlock() { lock.writeLock().unlock(); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org