This is an automated email from the ASF dual-hosted git repository.

lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 5f2642391d [ISSUE #8877] Refactor lock in ReceiptHandleGroup  to make 
the lock can be properly released when future can not be completed (#8916)
5f2642391d is described below

commit 5f2642391dad0ec4043c6984a9b8b038f10f89b9
Author: qianye <wuxingcan....@alibaba-inc.com>
AuthorDate: Thu Nov 21 17:53:46 2024 +0800

    [ISSUE #8877] Refactor lock in ReceiptHandleGroup  to make the lock can be 
properly released when future can not be completed (#8916)
---
 .../rocketmq/proxy/common/ReceiptHandleGroup.java  | 59 +++++++++++++++++-----
 1 file changed, 47 insertions(+), 12 deletions(-)

diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/common/ReceiptHandleGroup.java 
b/proxy/src/main/java/org/apache/rocketmq/proxy/common/ReceiptHandleGroup.java
index 6fee38d117..15da628dc3 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/common/ReceiptHandleGroup.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/common/ReceiptHandleGroup.java
@@ -25,14 +25,19 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
 import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.consumer.ReceiptHandle;
 import org.apache.rocketmq.common.utils.ConcurrentHashMapUtils;
+import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
 import org.apache.rocketmq.proxy.config.ConfigurationManager;
 
 public class ReceiptHandleGroup {
+    protected final static Logger log = 
LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
 
     // The messages having the same messageId will be deduplicated based on 
the parameters of broker, queueId, and offset
     protected final Map<String /* msgID */, Map<HandleKey, HandleData>> 
receiptHandleMap = new ConcurrentHashMap<>();
@@ -98,6 +103,7 @@ public class ReceiptHandleGroup {
 
     public static class HandleData {
         private final Semaphore semaphore = new Semaphore(1);
+        private final AtomicLong lastLockTimeMs = new AtomicLong(-1L);
         private volatile boolean needRemove = false;
         private volatile MessageReceiptHandle messageReceiptHandle;
 
@@ -105,15 +111,40 @@ public class ReceiptHandleGroup {
             this.messageReceiptHandle = messageReceiptHandle;
         }
 
-        public boolean lock(long timeoutMs) {
+        public Long lock(long timeoutMs) {
             try {
-                return this.semaphore.tryAcquire(timeoutMs, 
TimeUnit.MILLISECONDS);
+                boolean result = this.semaphore.tryAcquire(timeoutMs, 
TimeUnit.MILLISECONDS);
+                long currentTimeMs = System.currentTimeMillis();
+                if (result) {
+                    this.lastLockTimeMs.set(currentTimeMs);
+                    return currentTimeMs;
+                } else {
+                    // if the lock is expired, can be acquired again
+                    long expiredTimeMs = 
ConfigurationManager.getProxyConfig().getLockTimeoutMsInHandleGroup() * 3;
+                    if (currentTimeMs - this.lastLockTimeMs.get() > 
expiredTimeMs) {
+                        synchronized (this) {
+                            if (currentTimeMs - this.lastLockTimeMs.get() > 
expiredTimeMs) {
+                                log.warn("HandleData lock expired, acquire 
lock success and reset lock time. " +
+                                    "MessageReceiptHandle={}, lockTime={}", 
messageReceiptHandle, currentTimeMs);
+                                this.lastLockTimeMs.set(currentTimeMs);
+                                return currentTimeMs;
+                            }
+                        }
+                    }
+                }
+                return null;
             } catch (InterruptedException e) {
-                return false;
+                return null;
             }
         }
 
-        public void unlock() {
+        public void unlock(long lockTimeMs) {
+            // if the lock is expired, we don't need to unlock it
+            if (System.currentTimeMillis() - lockTimeMs > 
ConfigurationManager.getProxyConfig().getLockTimeoutMsInHandleGroup() * 2) {
+                log.warn("HandleData lock expired, unlock fail. 
MessageReceiptHandle={}, lockTime={}, now={}",
+                    messageReceiptHandle, lockTimeMs, 
System.currentTimeMillis());
+                return;
+            }
             this.semaphore.release();
         }
 
@@ -149,7 +180,8 @@ public class ReceiptHandleGroup {
             if (handleData == null || handleData.needRemove) {
                 return new HandleData(value);
             }
-            if (!handleData.lock(timeout)) {
+            Long lockTimeMs = handleData.lock(timeout);
+            if (lockTimeMs == null) {
                 throw new 
ProxyException(ProxyExceptionCode.INTERNAL_SERVER_ERROR, "try to put handle 
failed");
             }
             try {
@@ -158,7 +190,7 @@ public class ReceiptHandleGroup {
                 }
                 handleData.messageReceiptHandle = value;
             } finally {
-                handleData.unlock();
+                handleData.unlock(lockTimeMs);
             }
             return handleData;
         });
@@ -176,7 +208,8 @@ public class ReceiptHandleGroup {
         long timeout = 
ConfigurationManager.getProxyConfig().getLockTimeoutMsInHandleGroup();
         AtomicReference<MessageReceiptHandle> res = new AtomicReference<>();
         handleMap.computeIfPresent(new HandleKey(handle), (handleKey, 
handleData) -> {
-            if (!handleData.lock(timeout)) {
+            Long lockTimeMs = handleData.lock(timeout);
+            if (lockTimeMs == null) {
                 throw new 
ProxyException(ProxyExceptionCode.INTERNAL_SERVER_ERROR, "try to get handle 
failed");
             }
             try {
@@ -185,7 +218,7 @@ public class ReceiptHandleGroup {
                 }
                 res.set(handleData.messageReceiptHandle);
             } finally {
-                handleData.unlock();
+                handleData.unlock(lockTimeMs);
             }
             return handleData;
         });
@@ -200,7 +233,8 @@ public class ReceiptHandleGroup {
         long timeout = 
ConfigurationManager.getProxyConfig().getLockTimeoutMsInHandleGroup();
         AtomicReference<MessageReceiptHandle> res = new AtomicReference<>();
         handleMap.computeIfPresent(new HandleKey(handle), (handleKey, 
handleData) -> {
-            if (!handleData.lock(timeout)) {
+            Long lockTimeMs = handleData.lock(timeout);
+            if (lockTimeMs == null) {
                 throw new 
ProxyException(ProxyExceptionCode.INTERNAL_SERVER_ERROR, "try to remove and get 
handle failed");
             }
             try {
@@ -210,7 +244,7 @@ public class ReceiptHandleGroup {
                 }
                 return null;
             } finally {
-                handleData.unlock();
+                handleData.unlock(lockTimeMs);
             }
         });
         removeHandleMapKeyIfNeed(msgID);
@@ -240,7 +274,8 @@ public class ReceiptHandleGroup {
         }
         long timeout = 
ConfigurationManager.getProxyConfig().getLockTimeoutMsInHandleGroup();
         handleMap.computeIfPresent(new HandleKey(handle), (handleKey, 
handleData) -> {
-            if (!handleData.lock(timeout)) {
+            Long lockTimeMs = handleData.lock(timeout);
+            if (lockTimeMs == null) {
                 throw new 
ProxyException(ProxyExceptionCode.INTERNAL_SERVER_ERROR, "try to compute 
failed");
             }
             CompletableFuture<MessageReceiptHandle> future = 
function.apply(handleData.messageReceiptHandle);
@@ -255,7 +290,7 @@ public class ReceiptHandleGroup {
                         handleData.messageReceiptHandle = messageReceiptHandle;
                     }
                 } finally {
-                    handleData.unlock();
+                    handleData.unlock(lockTimeMs);
                 }
                 if (handleData.needRemove) {
                     handleMap.remove(handleKey, handleData);

Reply via email to