This is an automated email from the ASF dual-hosted git repository.

lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 5c34a59cd9 [ISSUE #9184] Optimize QueueLockManager#tryLock method 
(#9185)
5c34a59cd9 is described below

commit 5c34a59cd90074a544525de6609f8ef400dfe2b3
Author: mxsm <m...@apache.org>
AuthorDate: Fri Mar 7 13:46:52 2025 +0800

    [ISSUE #9184] Optimize QueueLockManager#tryLock method (#9185)
---
 .../broker/processor/PopMessageProcessor.java      | 40 ++++++++--------------
 1 file changed, 14 insertions(+), 26 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
index 9355af319e..b84afe2194 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
@@ -64,6 +64,7 @@ import org.apache.rocketmq.common.message.MessageDecoder;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageExtBrokerInner;
 import org.apache.rocketmq.common.topic.TopicValidator;
+import org.apache.rocketmq.common.utils.ConcurrentHashMapUtils;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
 import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
 import org.apache.rocketmq.remoting.CommandCallback;
@@ -150,11 +151,11 @@ public class PopMessageProcessor implements 
NettyRequestProcessor {
 
     public static String genBatchAckUniqueId(BatchAckMsg batchAckMsg) {
         return batchAckMsg.getTopic()
-                + PopAckConstants.SPLIT + batchAckMsg.getQueueId()
-                + PopAckConstants.SPLIT + 
batchAckMsg.getAckOffsetList().toString()
-                + PopAckConstants.SPLIT + batchAckMsg.getConsumerGroup()
-                + PopAckConstants.SPLIT + batchAckMsg.getPopTime()
-                + PopAckConstants.SPLIT + PopAckConstants.BATCH_ACK_TAG;
+            + PopAckConstants.SPLIT + batchAckMsg.getQueueId()
+            + PopAckConstants.SPLIT + batchAckMsg.getAckOffsetList().toString()
+            + PopAckConstants.SPLIT + batchAckMsg.getConsumerGroup()
+            + PopAckConstants.SPLIT + batchAckMsg.getPopTime()
+            + PopAckConstants.SPLIT + PopAckConstants.BATCH_ACK_TAG;
     }
 
     public static String genCkUniqueId(PopCheckPoint ck) {
@@ -861,7 +862,7 @@ public class PopMessageProcessor implements 
NettyRequestProcessor {
 
     private boolean isPopShouldStop(String topic, String group, int queueId) {
         return 
brokerController.getBrokerConfig().isEnablePopMessageThreshold() &&
-                
brokerController.getPopInflightMessageCounter().getGroupPopInFlightMessageNum(topic,
 group, queueId) > 
brokerController.getBrokerConfig().getPopInflightMessageThreshold();
+            
brokerController.getPopInflightMessageCounter().getGroupPopInFlightMessageNum(topic,
 group, queueId) > 
brokerController.getBrokerConfig().getPopInflightMessageThreshold();
     }
 
     private long getPopOffset(String topic, String group, int queueId, int 
initMode, boolean init, String lockKey,
@@ -908,7 +909,7 @@ public class PopMessageProcessor implements 
NettyRequestProcessor {
         }
         if (init) { // whichever initMode
             this.brokerController.getConsumerOffsetManager().commitOffset(
-                    "getPopOffset", group, topic, queueId, offset);
+                "getPopOffset", group, topic, queueId, offset);
         }
         return offset;
     }
@@ -1002,12 +1003,13 @@ public class PopMessageProcessor implements 
NettyRequestProcessor {
         private volatile long lockTime;
 
         public TimedLock() {
-            this.lock = new AtomicBoolean(true);
+            // init lock status, false means not locked
+            this.lock = new AtomicBoolean(false);
             this.lockTime = System.currentTimeMillis();
         }
 
         public boolean tryLock() {
-            boolean ret = lock.compareAndSet(true, false);
+            boolean ret = lock.compareAndSet(false, true);
             if (ret) {
                 this.lockTime = System.currentTimeMillis();
                 return true;
@@ -1017,11 +1019,11 @@ public class PopMessageProcessor implements 
NettyRequestProcessor {
         }
 
         public void unLock() {
-            lock.set(true);
+            lock.set(false);
         }
 
         public boolean isLock() {
-            return !lock.get();
+            return lock.get();
         }
 
         public long getLockTime() {
@@ -1041,21 +1043,7 @@ public class PopMessageProcessor implements 
NettyRequestProcessor {
         }
 
         public boolean tryLock(String key) {
-            TimedLock timedLock = expiredLocalCache.get(key);
-
-            if (timedLock == null) {
-                TimedLock old = expiredLocalCache.putIfAbsent(key, new 
TimedLock());
-                if (old != null) {
-                    return false;
-                } else {
-                    timedLock = expiredLocalCache.get(key);
-                }
-            }
-
-            if (timedLock == null) {
-                return false;
-            }
-
+            TimedLock timedLock = 
ConcurrentHashMapUtils.computeIfAbsent(expiredLocalCache, key, k -> new 
TimedLock());
             return timedLock.tryLock();
         }
 

Reply via email to