This is an automated email from the ASF dual-hosted git repository. lizhimin pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push: new 5f2642391d [ISSUE #8877] Refactor lock in ReceiptHandleGroup to make the lock can be properly released when future can not be completed (#8916) 5f2642391d is described below commit 5f2642391dad0ec4043c6984a9b8b038f10f89b9 Author: qianye <wuxingcan....@alibaba-inc.com> AuthorDate: Thu Nov 21 17:53:46 2024 +0800 [ISSUE #8877] Refactor lock in ReceiptHandleGroup to make the lock can be properly released when future can not be completed (#8916) --- .../rocketmq/proxy/common/ReceiptHandleGroup.java | 59 +++++++++++++++++----- 1 file changed, 47 insertions(+), 12 deletions(-) diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/common/ReceiptHandleGroup.java b/proxy/src/main/java/org/apache/rocketmq/proxy/common/ReceiptHandleGroup.java index 6fee38d117..15da628dc3 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/common/ReceiptHandleGroup.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/common/ReceiptHandleGroup.java @@ -25,14 +25,19 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.consumer.ReceiptHandle; import org.apache.rocketmq.common.utils.ConcurrentHashMapUtils; +import org.apache.rocketmq.logging.org.slf4j.Logger; +import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; import org.apache.rocketmq.proxy.config.ConfigurationManager; public class ReceiptHandleGroup { + protected final static Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME); // The messages having the same messageId will be deduplicated based on the parameters of broker, queueId, and offset protected final Map<String /* msgID */, Map<HandleKey, HandleData>> receiptHandleMap = new ConcurrentHashMap<>(); @@ -98,6 +103,7 @@ public class ReceiptHandleGroup { public static class HandleData { private final Semaphore semaphore = new Semaphore(1); + private final AtomicLong lastLockTimeMs = new AtomicLong(-1L); private volatile boolean needRemove = false; private volatile MessageReceiptHandle messageReceiptHandle; @@ -105,15 +111,40 @@ public class ReceiptHandleGroup { this.messageReceiptHandle = messageReceiptHandle; } - public boolean lock(long timeoutMs) { + public Long lock(long timeoutMs) { try { - return this.semaphore.tryAcquire(timeoutMs, TimeUnit.MILLISECONDS); + boolean result = this.semaphore.tryAcquire(timeoutMs, TimeUnit.MILLISECONDS); + long currentTimeMs = System.currentTimeMillis(); + if (result) { + this.lastLockTimeMs.set(currentTimeMs); + return currentTimeMs; + } else { + // if the lock is expired, can be acquired again + long expiredTimeMs = ConfigurationManager.getProxyConfig().getLockTimeoutMsInHandleGroup() * 3; + if (currentTimeMs - this.lastLockTimeMs.get() > expiredTimeMs) { + synchronized (this) { + if (currentTimeMs - this.lastLockTimeMs.get() > expiredTimeMs) { + log.warn("HandleData lock expired, acquire lock success and reset lock time. " + + "MessageReceiptHandle={}, lockTime={}", messageReceiptHandle, currentTimeMs); + this.lastLockTimeMs.set(currentTimeMs); + return currentTimeMs; + } + } + } + } + return null; } catch (InterruptedException e) { - return false; + return null; } } - public void unlock() { + public void unlock(long lockTimeMs) { + // if the lock is expired, we don't need to unlock it + if (System.currentTimeMillis() - lockTimeMs > ConfigurationManager.getProxyConfig().getLockTimeoutMsInHandleGroup() * 2) { + log.warn("HandleData lock expired, unlock fail. MessageReceiptHandle={}, lockTime={}, now={}", + messageReceiptHandle, lockTimeMs, System.currentTimeMillis()); + return; + } this.semaphore.release(); } @@ -149,7 +180,8 @@ public class ReceiptHandleGroup { if (handleData == null || handleData.needRemove) { return new HandleData(value); } - if (!handleData.lock(timeout)) { + Long lockTimeMs = handleData.lock(timeout); + if (lockTimeMs == null) { throw new ProxyException(ProxyExceptionCode.INTERNAL_SERVER_ERROR, "try to put handle failed"); } try { @@ -158,7 +190,7 @@ public class ReceiptHandleGroup { } handleData.messageReceiptHandle = value; } finally { - handleData.unlock(); + handleData.unlock(lockTimeMs); } return handleData; }); @@ -176,7 +208,8 @@ public class ReceiptHandleGroup { long timeout = ConfigurationManager.getProxyConfig().getLockTimeoutMsInHandleGroup(); AtomicReference<MessageReceiptHandle> res = new AtomicReference<>(); handleMap.computeIfPresent(new HandleKey(handle), (handleKey, handleData) -> { - if (!handleData.lock(timeout)) { + Long lockTimeMs = handleData.lock(timeout); + if (lockTimeMs == null) { throw new ProxyException(ProxyExceptionCode.INTERNAL_SERVER_ERROR, "try to get handle failed"); } try { @@ -185,7 +218,7 @@ public class ReceiptHandleGroup { } res.set(handleData.messageReceiptHandle); } finally { - handleData.unlock(); + handleData.unlock(lockTimeMs); } return handleData; }); @@ -200,7 +233,8 @@ public class ReceiptHandleGroup { long timeout = ConfigurationManager.getProxyConfig().getLockTimeoutMsInHandleGroup(); AtomicReference<MessageReceiptHandle> res = new AtomicReference<>(); handleMap.computeIfPresent(new HandleKey(handle), (handleKey, handleData) -> { - if (!handleData.lock(timeout)) { + Long lockTimeMs = handleData.lock(timeout); + if (lockTimeMs == null) { throw new ProxyException(ProxyExceptionCode.INTERNAL_SERVER_ERROR, "try to remove and get handle failed"); } try { @@ -210,7 +244,7 @@ public class ReceiptHandleGroup { } return null; } finally { - handleData.unlock(); + handleData.unlock(lockTimeMs); } }); removeHandleMapKeyIfNeed(msgID); @@ -240,7 +274,8 @@ public class ReceiptHandleGroup { } long timeout = ConfigurationManager.getProxyConfig().getLockTimeoutMsInHandleGroup(); handleMap.computeIfPresent(new HandleKey(handle), (handleKey, handleData) -> { - if (!handleData.lock(timeout)) { + Long lockTimeMs = handleData.lock(timeout); + if (lockTimeMs == null) { throw new ProxyException(ProxyExceptionCode.INTERNAL_SERVER_ERROR, "try to compute failed"); } CompletableFuture<MessageReceiptHandle> future = function.apply(handleData.messageReceiptHandle); @@ -255,7 +290,7 @@ public class ReceiptHandleGroup { handleData.messageReceiptHandle = messageReceiptHandle; } } finally { - handleData.unlock(); + handleData.unlock(lockTimeMs); } if (handleData.needRemove) { handleMap.remove(handleKey, handleData);