This is an automated email from the ASF dual-hosted git repository.

dongyuanpan pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 9ce83452a6 [ISSUE #9105] Fix the issue of duplicate consumption in LMQ 
(#9101)
9ce83452a6 is described below

commit 9ce83452a62f3fb910454bab92c092c83d561bdb
Author: rongtong <jinrongton...@mails.ucas.ac.cn>
AuthorDate: Mon Jan 6 10:51:58 2025 +0800

    [ISSUE #9105] Fix the issue of duplicate consumption in LMQ (#9101)
    
    * Fix the issue of duplicate consumption in LMQ
    
    * Pass the checkstyle
    
    * Pass the UTs
    
    * Pass the check style
---
 .../broker/longpolling/PopLongPollingService.java  | 17 ++++-----
 .../broker/offset/ConsumerOrderInfoManager.java    |  2 +-
 .../broker/processor/AdminBrokerProcessor.java     |  6 ++--
 .../broker/processor/PopBufferMergeService.java    |  6 ++--
 .../longpolling/PopLongPollingServiceTest.java     | 42 +++++++++++-----------
 .../offset/ConsumerOrderInfoManagerTest.java       |  6 +---
 6 files changed, 39 insertions(+), 40 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java
index 91185fbe94..e87a8e803f 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java
@@ -52,7 +52,7 @@ public class PopLongPollingService extends ServiceThread {
         LoggerFactory.getLogger(LoggerName.ROCKETMQ_POP_LOGGER_NAME);
     private final BrokerController brokerController;
     private final NettyRequestProcessor processor;
-    private final ConcurrentHashMap<String, ConcurrentHashMap<String, Byte>> 
topicCidMap;
+    private final ConcurrentLinkedHashMap<String, ConcurrentHashMap<String, 
Byte>> topicCidMap;
     private final ConcurrentLinkedHashMap<String, 
ConcurrentSkipListSet<PopRequest>> pollingMap;
     private long lastCleanTime = 0;
 
@@ -63,7 +63,8 @@ public class PopLongPollingService extends ServiceThread {
         this.brokerController = brokerController;
         this.processor = processor;
         // 100000 topic default,  100000 lru topic + cid + qid
-        this.topicCidMap = new 
ConcurrentHashMap<>(brokerController.getBrokerConfig().getPopPollingMapSize());
+        this.topicCidMap = new ConcurrentLinkedHashMap.Builder<String, 
ConcurrentHashMap<String, Byte>>()
+            
.maximumWeightedCapacity(this.brokerController.getBrokerConfig().getPopPollingMapSize()
 * 2L).build();
         this.pollingMap = new ConcurrentLinkedHashMap.Builder<String, 
ConcurrentSkipListSet<PopRequest>>()
             
.maximumWeightedCapacity(this.brokerController.getBrokerConfig().getPopPollingMapSize()).build();
         this.notifyLast = notifyLast;
@@ -350,7 +351,7 @@ public class PopLongPollingService extends ServiceThread {
                     Map.Entry<String, ConcurrentHashMap<String, Byte>> entry = 
topicCidMapIter.next();
                     String topic = entry.getKey();
                     if 
(brokerController.getTopicConfigManager().selectTopicConfig(topic) == null) {
-                        POP_LOGGER.info("remove not exit topic {} in 
topicCidMap!", topic);
+                        POP_LOGGER.info("remove nonexistent topic {} in 
topicCidMap!", topic);
                         topicCidMapIter.remove();
                         continue;
                     }
@@ -358,8 +359,8 @@ public class PopLongPollingService extends ServiceThread {
                     while (cidMapIter.hasNext()) {
                         Map.Entry<String, Byte> cidEntry = cidMapIter.next();
                         String cid = cidEntry.getKey();
-                        if 
(!brokerController.getSubscriptionGroupManager().getSubscriptionGroupTable().containsKey(cid))
 {
-                            POP_LOGGER.info("remove not exit sub {} of topic 
{} in topicCidMap!", cid, topic);
+                        if 
(!brokerController.getSubscriptionGroupManager().containsSubscriptionGroup(cid))
 {
+                            POP_LOGGER.info("remove nonexistent subscription 
group {} of topic {} in topicCidMap!", cid, topic);
                             cidMapIter.remove();
                         }
                     }
@@ -380,12 +381,12 @@ public class PopLongPollingService extends ServiceThread {
                     String topic = keyArray[0];
                     String cid = keyArray[1];
                     if 
(brokerController.getTopicConfigManager().selectTopicConfig(topic) == null) {
-                        POP_LOGGER.info("remove not exit topic {} in 
pollingMap!", topic);
+                        POP_LOGGER.info("remove nonexistent topic {} in 
pollingMap!", topic);
                         pollingMapIter.remove();
                         continue;
                     }
-                    if 
(!brokerController.getSubscriptionGroupManager().getSubscriptionGroupTable().containsKey(cid))
 {
-                        POP_LOGGER.info("remove not exit sub {} of topic {} in 
pollingMap!", cid, topic);
+                    if 
(!brokerController.getSubscriptionGroupManager().containsSubscriptionGroup(cid))
 {
+                        POP_LOGGER.info("remove nonexistent subscription group 
{} of topic {} in pollingMap!", cid, topic);
                         pollingMapIter.remove();
                     }
                 }
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManager.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManager.java
index 4eccc6c037..120f5b104c 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManager.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManager.java
@@ -281,7 +281,7 @@ public class ConsumerOrderInfoManager extends ConfigManager 
{
                 continue;
             }
 
-            if 
(this.brokerController.getSubscriptionGroupManager().getSubscriptionGroupTable().get(group)
 == null) {
+            if 
(!this.brokerController.getSubscriptionGroupManager().containsSubscriptionGroup(group))
 {
                 iterator.remove();
                 log.info("Group not exist, Clean order info, {}:{}", 
topicAtGroup, qs);
                 continue;
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index 6bcf9aaa0f..6fb7584aa9 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -424,7 +424,7 @@ public class AdminBrokerProcessor implements 
NettyRequestProcessor {
         GetSubscriptionGroupConfigRequestHeader requestHeader = 
(GetSubscriptionGroupConfigRequestHeader) 
request.decodeCommandCustomHeader(GetSubscriptionGroupConfigRequestHeader.class);
         final RemotingCommand response = 
RemotingCommand.createResponseCommand(null);
 
-        SubscriptionGroupConfig groupConfig = 
this.brokerController.getSubscriptionGroupManager().getSubscriptionGroupTable().get(requestHeader.getGroup());
+        SubscriptionGroupConfig groupConfig = 
this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getGroup());
         if (groupConfig == null) {
             LOGGER.error("No group in this broker, client: {} group: {}", 
ctx.channel().remoteAddress(), requestHeader.getGroup());
             response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
@@ -2444,7 +2444,7 @@ public class AdminBrokerProcessor implements 
NettyRequestProcessor {
         }
         // groupSysFlag
         if (StringUtils.isNotEmpty(requestHeader.getConsumerGroup())) {
-            SubscriptionGroupConfig groupConfig = 
brokerController.getSubscriptionGroupManager().getSubscriptionGroupTable().get(requestHeader.getConsumerGroup());
+            SubscriptionGroupConfig groupConfig = 
brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getConsumerGroup());
             if (groupConfig != null) {
                 request.addExtField("groupSysFlag", 
String.valueOf(groupConfig.getGroupSysFlag()));
             }
@@ -2933,7 +2933,7 @@ public class AdminBrokerProcessor implements 
NettyRequestProcessor {
         GetTopicConfigRequestHeader requestHeader = 
(GetTopicConfigRequestHeader) 
request.decodeCommandCustomHeader(GetTopicConfigRequestHeader.class);
         final RemotingCommand response = 
RemotingCommand.createResponseCommand(null);
 
-        TopicConfig topicConfig = 
this.brokerController.getTopicConfigManager().getTopicConfigTable().get(requestHeader.getTopic());
+        TopicConfig topicConfig = 
this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
         if (topicConfig == null) {
             LOGGER.error("No topic in this broker, client: {} topic: {}", 
ctx.channel().remoteAddress(), requestHeader.getTopic());
             //be care of the response code, should set "not-exist" explicitly
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java
index 05a92c54b1..820388b18d 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java
@@ -197,12 +197,12 @@ public class PopBufferMergeService extends ServiceThread {
             String topic = keyArray[0];
             String cid = keyArray[1];
             if 
(brokerController.getTopicConfigManager().selectTopicConfig(topic) == null) {
-                POP_LOGGER.info("[PopBuffer]remove not exit topic {} in 
buffer!", topic);
+                POP_LOGGER.info("[PopBuffer]remove nonexistent topic {} in 
buffer!", topic);
                 iterator.remove();
                 continue;
             }
-            if 
(!brokerController.getSubscriptionGroupManager().getSubscriptionGroupTable().containsKey(cid))
 {
-                POP_LOGGER.info("[PopBuffer]remove not exit sub {} of topic {} 
in buffer!", cid, topic);
+            if 
(!brokerController.getSubscriptionGroupManager().containsSubscriptionGroup(cid))
 {
+                POP_LOGGER.info("[PopBuffer]remove nonexistent subscription 
group {} of topic {} in buffer!", cid, topic);
                 iterator.remove();
                 continue;
             }
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/longpolling/PopLongPollingServiceTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/longpolling/PopLongPollingServiceTest.java
index 1f064ec05d..003bf09842 100644
--- 
a/broker/src/test/java/org/apache/rocketmq/broker/longpolling/PopLongPollingServiceTest.java
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/longpolling/PopLongPollingServiceTest.java
@@ -55,20 +55,20 @@ public class PopLongPollingServiceTest {
     
     @Mock
     private BrokerController brokerController;
-    
+
     @Mock
     private NettyRequestProcessor processor;
-    
+
     @Mock
     private ChannelHandlerContext ctx;
-    
+
     @Mock
     private ExecutorService pullMessageExecutor;
-    
+
     private PopLongPollingService popLongPollingService;
-    
+
     private final String defaultTopic = "defaultTopic";
-    
+
     @Before
     public void init() {
         BrokerConfig brokerConfig = new BrokerConfig();
@@ -76,7 +76,7 @@ public class PopLongPollingServiceTest {
         when(brokerController.getBrokerConfig()).thenReturn(brokerConfig);
         popLongPollingService = spy(new 
PopLongPollingService(brokerController, processor, true));
     }
-    
+
     @Test
     public void testNotifyMessageArrivingWithRetryTopic() {
         int queueId = 0;
@@ -84,31 +84,32 @@ public class PopLongPollingServiceTest {
         
popLongPollingService.notifyMessageArrivingWithRetryTopic(defaultTopic, 
queueId);
         verify(popLongPollingService, 
times(1)).notifyMessageArrivingWithRetryTopic(defaultTopic, queueId, -1L, null, 
0L, null, null);
     }
-    
+
     @Test
     public void testNotifyMessageArriving() {
         int queueId = 0;
         Long tagsCode = 123L;
         long offset = 123L;
         long msgStoreTime = System.currentTimeMillis();
-        byte[] filterBitMap = new byte[]{0x01};
+        byte[] filterBitMap = new byte[] {0x01};
         Map<String, String> properties = new ConcurrentHashMap<>();
         
doNothing().when(popLongPollingService).notifyMessageArriving(defaultTopic, 
queueId, offset, tagsCode, msgStoreTime, filterBitMap, properties);
         
popLongPollingService.notifyMessageArrivingWithRetryTopic(defaultTopic, 
queueId, offset, tagsCode, msgStoreTime, filterBitMap, properties);
         verify(popLongPollingService).notifyMessageArriving(defaultTopic, 
queueId, offset, tagsCode, msgStoreTime, filterBitMap, properties);
     }
-    
+
     @Test
     public void testNotifyMessageArrivingValidRequest() throws Exception {
         String cid = "CID_1";
         int queueId = 0;
-        ConcurrentHashMap<String, ConcurrentHashMap<String, Byte>> topicCidMap 
= new ConcurrentHashMap<>();
+        ConcurrentLinkedHashMap<String, ConcurrentHashMap<String, Byte>> 
topicCidMap = new ConcurrentLinkedHashMap.Builder<String, 
ConcurrentHashMap<String, Byte>>()
+            .maximumWeightedCapacity(10).build();
         ConcurrentHashMap<String, Byte> cids = new ConcurrentHashMap<>();
         cids.put(cid, (byte) 1);
         topicCidMap.put(defaultTopic, cids);
         popLongPollingService = new PopLongPollingService(brokerController, 
processor, true);
         ConcurrentLinkedHashMap<String, ConcurrentSkipListSet<PopRequest>> 
pollingMap =
-                new ConcurrentLinkedHashMap.Builder<String, 
ConcurrentSkipListSet<PopRequest>>().maximumWeightedCapacity(this.brokerController.getBrokerConfig().getPopPollingMapSize()).build();
+            new ConcurrentLinkedHashMap.Builder<String, 
ConcurrentSkipListSet<PopRequest>>().maximumWeightedCapacity(this.brokerController.getBrokerConfig().getPopPollingMapSize()).build();
         Channel channel = mock(Channel.class);
         when(channel.isActive()).thenReturn(true);
         PopRequest popRequest = mock(PopRequest.class);
@@ -126,19 +127,19 @@ public class PopLongPollingServiceTest {
         boolean actual = 
popLongPollingService.notifyMessageArriving(defaultTopic, queueId, cid, null, 
0, null, null);
         assertFalse(actual);
     }
-    
+
     @Test
     public void testWakeUpNullRequest() {
         assertFalse(popLongPollingService.wakeUp(null));
     }
-    
+
     @Test
     public void testWakeUpIncompleteRequest() {
         PopRequest request = mock(PopRequest.class);
         when(request.complete()).thenReturn(false);
         assertFalse(popLongPollingService.wakeUp(request));
     }
-    
+
     @Test
     public void testWakeUpInactiveChannel() {
         PopRequest request = mock(PopRequest.class);
@@ -150,7 +151,7 @@ public class PopLongPollingServiceTest {
         
when(brokerController.getPullMessageExecutor()).thenReturn(pullMessageExecutor);
         assertTrue(popLongPollingService.wakeUp(request));
     }
-    
+
     @Test
     public void testWakeUpValidRequestWithException() throws Exception {
         PopRequest request = mock(PopRequest.class);
@@ -168,7 +169,7 @@ public class PopLongPollingServiceTest {
         captor.getValue().run();
         verify(processor).processRequest(any(), any());
     }
-    
+
     @Test
     public void testPollingNotPolling() {
         ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
@@ -180,7 +181,7 @@ public class PopLongPollingServiceTest {
         PollingResult result = popLongPollingService.polling(ctx, 
remotingCommand, requestHeader, subscriptionData, messageFilter);
         assertEquals(PollingResult.NOT_POLLING, result);
     }
-    
+
     @Test
     public void testPollingServicePollingTimeout() throws 
IllegalAccessException {
         String cid = "CID_1";
@@ -194,7 +195,8 @@ public class PopLongPollingServiceTest {
         when(requestHeader.getPollTime()).thenReturn(1000L);
         when(requestHeader.getTopic()).thenReturn(defaultTopic);
         when(requestHeader.getConsumerGroup()).thenReturn("defaultGroup");
-        ConcurrentHashMap<String, ConcurrentHashMap<String, Byte>> topicCidMap 
= new ConcurrentHashMap<>();
+        ConcurrentLinkedHashMap<String, ConcurrentHashMap<String, Byte>> 
topicCidMap = new ConcurrentLinkedHashMap.Builder<String, 
ConcurrentHashMap<String, Byte>>()
+            .maximumWeightedCapacity(10).build();
         ConcurrentHashMap<String, Byte> cids = new ConcurrentHashMap<>();
         cids.put(cid, (byte) 1);
         topicCidMap.put(defaultTopic, cids);
@@ -202,7 +204,7 @@ public class PopLongPollingServiceTest {
         PollingResult result = popLongPollingService.polling(ctx, 
remotingCommand, requestHeader, subscriptionData, messageFilter);
         assertEquals(PollingResult.POLLING_TIMEOUT, result);
     }
-    
+
     @Test
     public void testPollingPollingSuc() {
         ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManagerTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManagerTest.java
index 25b418c934..4414eda54e 100644
--- 
a/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManagerTest.java
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManagerTest.java
@@ -21,7 +21,6 @@ import java.time.Duration;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
@@ -29,7 +28,6 @@ import org.apache.rocketmq.broker.topic.TopicConfigManager;
 import org.apache.rocketmq.common.BrokerConfig;
 import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.remoting.protocol.header.ExtraInfoUtil;
-import 
org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
 import org.assertj.core.util.Lists;
 import org.junit.Before;
 import org.junit.Test;
@@ -384,9 +382,7 @@ public class ConsumerOrderInfoManagerTest {
 
         SubscriptionGroupManager subscriptionGroupManager = 
mock(SubscriptionGroupManager.class);
         
when(brokerController.getSubscriptionGroupManager()).thenReturn(subscriptionGroupManager);
-        ConcurrentMap<String, SubscriptionGroupConfig> 
subscriptionGroupConfigConcurrentMap = new ConcurrentHashMap<>();
-        subscriptionGroupConfigConcurrentMap.put(GROUP, new 
SubscriptionGroupConfig());
-        
when(subscriptionGroupManager.getSubscriptionGroupTable()).thenReturn(subscriptionGroupConfigConcurrentMap);
+        
when(subscriptionGroupManager.containsSubscriptionGroup(GROUP)).thenReturn(true);
 
         TopicConfig topicConfig = new TopicConfig(TOPIC);
         
when(topicConfigManager.selectTopicConfig(eq(TOPIC))).thenReturn(topicConfig);

Reply via email to