This is an automated email from the ASF dual-hosted git repository. lizhimin pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push: new 5c34a59cd9 [ISSUE #9184] Optimize QueueLockManager#tryLock method (#9185) 5c34a59cd9 is described below commit 5c34a59cd90074a544525de6609f8ef400dfe2b3 Author: mxsm <m...@apache.org> AuthorDate: Fri Mar 7 13:46:52 2025 +0800 [ISSUE #9184] Optimize QueueLockManager#tryLock method (#9185) --- .../broker/processor/PopMessageProcessor.java | 40 ++++++++-------------- 1 file changed, 14 insertions(+), 26 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java index 9355af319e..b84afe2194 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java @@ -64,6 +64,7 @@ import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageExtBrokerInner; import org.apache.rocketmq.common.topic.TopicValidator; +import org.apache.rocketmq.common.utils.ConcurrentHashMapUtils; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; import org.apache.rocketmq.remoting.CommandCallback; @@ -150,11 +151,11 @@ public class PopMessageProcessor implements NettyRequestProcessor { public static String genBatchAckUniqueId(BatchAckMsg batchAckMsg) { return batchAckMsg.getTopic() - + PopAckConstants.SPLIT + batchAckMsg.getQueueId() - + PopAckConstants.SPLIT + batchAckMsg.getAckOffsetList().toString() - + PopAckConstants.SPLIT + batchAckMsg.getConsumerGroup() - + PopAckConstants.SPLIT + batchAckMsg.getPopTime() - + PopAckConstants.SPLIT + PopAckConstants.BATCH_ACK_TAG; + + PopAckConstants.SPLIT + batchAckMsg.getQueueId() + + PopAckConstants.SPLIT + batchAckMsg.getAckOffsetList().toString() + + PopAckConstants.SPLIT + batchAckMsg.getConsumerGroup() + + PopAckConstants.SPLIT + batchAckMsg.getPopTime() + + PopAckConstants.SPLIT + PopAckConstants.BATCH_ACK_TAG; } public static String genCkUniqueId(PopCheckPoint ck) { @@ -861,7 +862,7 @@ public class PopMessageProcessor implements NettyRequestProcessor { private boolean isPopShouldStop(String topic, String group, int queueId) { return brokerController.getBrokerConfig().isEnablePopMessageThreshold() && - brokerController.getPopInflightMessageCounter().getGroupPopInFlightMessageNum(topic, group, queueId) > brokerController.getBrokerConfig().getPopInflightMessageThreshold(); + brokerController.getPopInflightMessageCounter().getGroupPopInFlightMessageNum(topic, group, queueId) > brokerController.getBrokerConfig().getPopInflightMessageThreshold(); } private long getPopOffset(String topic, String group, int queueId, int initMode, boolean init, String lockKey, @@ -908,7 +909,7 @@ public class PopMessageProcessor implements NettyRequestProcessor { } if (init) { // whichever initMode this.brokerController.getConsumerOffsetManager().commitOffset( - "getPopOffset", group, topic, queueId, offset); + "getPopOffset", group, topic, queueId, offset); } return offset; } @@ -1002,12 +1003,13 @@ public class PopMessageProcessor implements NettyRequestProcessor { private volatile long lockTime; public TimedLock() { - this.lock = new AtomicBoolean(true); + // init lock status, false means not locked + this.lock = new AtomicBoolean(false); this.lockTime = System.currentTimeMillis(); } public boolean tryLock() { - boolean ret = lock.compareAndSet(true, false); + boolean ret = lock.compareAndSet(false, true); if (ret) { this.lockTime = System.currentTimeMillis(); return true; @@ -1017,11 +1019,11 @@ public class PopMessageProcessor implements NettyRequestProcessor { } public void unLock() { - lock.set(true); + lock.set(false); } public boolean isLock() { - return !lock.get(); + return lock.get(); } public long getLockTime() { @@ -1041,21 +1043,7 @@ public class PopMessageProcessor implements NettyRequestProcessor { } public boolean tryLock(String key) { - TimedLock timedLock = expiredLocalCache.get(key); - - if (timedLock == null) { - TimedLock old = expiredLocalCache.putIfAbsent(key, new TimedLock()); - if (old != null) { - return false; - } else { - timedLock = expiredLocalCache.get(key); - } - } - - if (timedLock == null) { - return false; - } - + TimedLock timedLock = ConcurrentHashMapUtils.computeIfAbsent(expiredLocalCache, key, k -> new TimedLock()); return timedLock.tryLock(); }