This is an automated email from the ASF dual-hosted git repository. dongyuanpan pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push: new 9ce83452a6 [ISSUE #9105] Fix the issue of duplicate consumption in LMQ (#9101) 9ce83452a6 is described below commit 9ce83452a62f3fb910454bab92c092c83d561bdb Author: rongtong <jinrongton...@mails.ucas.ac.cn> AuthorDate: Mon Jan 6 10:51:58 2025 +0800 [ISSUE #9105] Fix the issue of duplicate consumption in LMQ (#9101) * Fix the issue of duplicate consumption in LMQ * Pass the checkstyle * Pass the UTs * Pass the check style --- .../broker/longpolling/PopLongPollingService.java | 17 ++++----- .../broker/offset/ConsumerOrderInfoManager.java | 2 +- .../broker/processor/AdminBrokerProcessor.java | 6 ++-- .../broker/processor/PopBufferMergeService.java | 6 ++-- .../longpolling/PopLongPollingServiceTest.java | 42 +++++++++++----------- .../offset/ConsumerOrderInfoManagerTest.java | 6 +--- 6 files changed, 39 insertions(+), 40 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java index 91185fbe94..e87a8e803f 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java @@ -52,7 +52,7 @@ public class PopLongPollingService extends ServiceThread { LoggerFactory.getLogger(LoggerName.ROCKETMQ_POP_LOGGER_NAME); private final BrokerController brokerController; private final NettyRequestProcessor processor; - private final ConcurrentHashMap<String, ConcurrentHashMap<String, Byte>> topicCidMap; + private final ConcurrentLinkedHashMap<String, ConcurrentHashMap<String, Byte>> topicCidMap; private final ConcurrentLinkedHashMap<String, ConcurrentSkipListSet<PopRequest>> pollingMap; private long lastCleanTime = 0; @@ -63,7 +63,8 @@ public class PopLongPollingService extends ServiceThread { this.brokerController = brokerController; this.processor = processor; // 100000 topic default, 100000 lru topic + cid + qid - this.topicCidMap = new ConcurrentHashMap<>(brokerController.getBrokerConfig().getPopPollingMapSize()); + this.topicCidMap = new ConcurrentLinkedHashMap.Builder<String, ConcurrentHashMap<String, Byte>>() + .maximumWeightedCapacity(this.brokerController.getBrokerConfig().getPopPollingMapSize() * 2L).build(); this.pollingMap = new ConcurrentLinkedHashMap.Builder<String, ConcurrentSkipListSet<PopRequest>>() .maximumWeightedCapacity(this.brokerController.getBrokerConfig().getPopPollingMapSize()).build(); this.notifyLast = notifyLast; @@ -350,7 +351,7 @@ public class PopLongPollingService extends ServiceThread { Map.Entry<String, ConcurrentHashMap<String, Byte>> entry = topicCidMapIter.next(); String topic = entry.getKey(); if (brokerController.getTopicConfigManager().selectTopicConfig(topic) == null) { - POP_LOGGER.info("remove not exit topic {} in topicCidMap!", topic); + POP_LOGGER.info("remove nonexistent topic {} in topicCidMap!", topic); topicCidMapIter.remove(); continue; } @@ -358,8 +359,8 @@ public class PopLongPollingService extends ServiceThread { while (cidMapIter.hasNext()) { Map.Entry<String, Byte> cidEntry = cidMapIter.next(); String cid = cidEntry.getKey(); - if (!brokerController.getSubscriptionGroupManager().getSubscriptionGroupTable().containsKey(cid)) { - POP_LOGGER.info("remove not exit sub {} of topic {} in topicCidMap!", cid, topic); + if (!brokerController.getSubscriptionGroupManager().containsSubscriptionGroup(cid)) { + POP_LOGGER.info("remove nonexistent subscription group {} of topic {} in topicCidMap!", cid, topic); cidMapIter.remove(); } } @@ -380,12 +381,12 @@ public class PopLongPollingService extends ServiceThread { String topic = keyArray[0]; String cid = keyArray[1]; if (brokerController.getTopicConfigManager().selectTopicConfig(topic) == null) { - POP_LOGGER.info("remove not exit topic {} in pollingMap!", topic); + POP_LOGGER.info("remove nonexistent topic {} in pollingMap!", topic); pollingMapIter.remove(); continue; } - if (!brokerController.getSubscriptionGroupManager().getSubscriptionGroupTable().containsKey(cid)) { - POP_LOGGER.info("remove not exit sub {} of topic {} in pollingMap!", cid, topic); + if (!brokerController.getSubscriptionGroupManager().containsSubscriptionGroup(cid)) { + POP_LOGGER.info("remove nonexistent subscription group {} of topic {} in pollingMap!", cid, topic); pollingMapIter.remove(); } } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManager.java b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManager.java index 4eccc6c037..120f5b104c 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManager.java @@ -281,7 +281,7 @@ public class ConsumerOrderInfoManager extends ConfigManager { continue; } - if (this.brokerController.getSubscriptionGroupManager().getSubscriptionGroupTable().get(group) == null) { + if (!this.brokerController.getSubscriptionGroupManager().containsSubscriptionGroup(group)) { iterator.remove(); log.info("Group not exist, Clean order info, {}:{}", topicAtGroup, qs); continue; diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index 6bcf9aaa0f..6fb7584aa9 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -424,7 +424,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { GetSubscriptionGroupConfigRequestHeader requestHeader = (GetSubscriptionGroupConfigRequestHeader) request.decodeCommandCustomHeader(GetSubscriptionGroupConfigRequestHeader.class); final RemotingCommand response = RemotingCommand.createResponseCommand(null); - SubscriptionGroupConfig groupConfig = this.brokerController.getSubscriptionGroupManager().getSubscriptionGroupTable().get(requestHeader.getGroup()); + SubscriptionGroupConfig groupConfig = this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getGroup()); if (groupConfig == null) { LOGGER.error("No group in this broker, client: {} group: {}", ctx.channel().remoteAddress(), requestHeader.getGroup()); response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST); @@ -2444,7 +2444,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { } // groupSysFlag if (StringUtils.isNotEmpty(requestHeader.getConsumerGroup())) { - SubscriptionGroupConfig groupConfig = brokerController.getSubscriptionGroupManager().getSubscriptionGroupTable().get(requestHeader.getConsumerGroup()); + SubscriptionGroupConfig groupConfig = brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getConsumerGroup()); if (groupConfig != null) { request.addExtField("groupSysFlag", String.valueOf(groupConfig.getGroupSysFlag())); } @@ -2933,7 +2933,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { GetTopicConfigRequestHeader requestHeader = (GetTopicConfigRequestHeader) request.decodeCommandCustomHeader(GetTopicConfigRequestHeader.class); final RemotingCommand response = RemotingCommand.createResponseCommand(null); - TopicConfig topicConfig = this.brokerController.getTopicConfigManager().getTopicConfigTable().get(requestHeader.getTopic()); + TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic()); if (topicConfig == null) { LOGGER.error("No topic in this broker, client: {} topic: {}", ctx.channel().remoteAddress(), requestHeader.getTopic()); //be care of the response code, should set "not-exist" explicitly diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java index 05a92c54b1..820388b18d 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java @@ -197,12 +197,12 @@ public class PopBufferMergeService extends ServiceThread { String topic = keyArray[0]; String cid = keyArray[1]; if (brokerController.getTopicConfigManager().selectTopicConfig(topic) == null) { - POP_LOGGER.info("[PopBuffer]remove not exit topic {} in buffer!", topic); + POP_LOGGER.info("[PopBuffer]remove nonexistent topic {} in buffer!", topic); iterator.remove(); continue; } - if (!brokerController.getSubscriptionGroupManager().getSubscriptionGroupTable().containsKey(cid)) { - POP_LOGGER.info("[PopBuffer]remove not exit sub {} of topic {} in buffer!", cid, topic); + if (!brokerController.getSubscriptionGroupManager().containsSubscriptionGroup(cid)) { + POP_LOGGER.info("[PopBuffer]remove nonexistent subscription group {} of topic {} in buffer!", cid, topic); iterator.remove(); continue; } diff --git a/broker/src/test/java/org/apache/rocketmq/broker/longpolling/PopLongPollingServiceTest.java b/broker/src/test/java/org/apache/rocketmq/broker/longpolling/PopLongPollingServiceTest.java index 1f064ec05d..003bf09842 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/longpolling/PopLongPollingServiceTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/longpolling/PopLongPollingServiceTest.java @@ -55,20 +55,20 @@ public class PopLongPollingServiceTest { @Mock private BrokerController brokerController; - + @Mock private NettyRequestProcessor processor; - + @Mock private ChannelHandlerContext ctx; - + @Mock private ExecutorService pullMessageExecutor; - + private PopLongPollingService popLongPollingService; - + private final String defaultTopic = "defaultTopic"; - + @Before public void init() { BrokerConfig brokerConfig = new BrokerConfig(); @@ -76,7 +76,7 @@ public class PopLongPollingServiceTest { when(brokerController.getBrokerConfig()).thenReturn(brokerConfig); popLongPollingService = spy(new PopLongPollingService(brokerController, processor, true)); } - + @Test public void testNotifyMessageArrivingWithRetryTopic() { int queueId = 0; @@ -84,31 +84,32 @@ public class PopLongPollingServiceTest { popLongPollingService.notifyMessageArrivingWithRetryTopic(defaultTopic, queueId); verify(popLongPollingService, times(1)).notifyMessageArrivingWithRetryTopic(defaultTopic, queueId, -1L, null, 0L, null, null); } - + @Test public void testNotifyMessageArriving() { int queueId = 0; Long tagsCode = 123L; long offset = 123L; long msgStoreTime = System.currentTimeMillis(); - byte[] filterBitMap = new byte[]{0x01}; + byte[] filterBitMap = new byte[] {0x01}; Map<String, String> properties = new ConcurrentHashMap<>(); doNothing().when(popLongPollingService).notifyMessageArriving(defaultTopic, queueId, offset, tagsCode, msgStoreTime, filterBitMap, properties); popLongPollingService.notifyMessageArrivingWithRetryTopic(defaultTopic, queueId, offset, tagsCode, msgStoreTime, filterBitMap, properties); verify(popLongPollingService).notifyMessageArriving(defaultTopic, queueId, offset, tagsCode, msgStoreTime, filterBitMap, properties); } - + @Test public void testNotifyMessageArrivingValidRequest() throws Exception { String cid = "CID_1"; int queueId = 0; - ConcurrentHashMap<String, ConcurrentHashMap<String, Byte>> topicCidMap = new ConcurrentHashMap<>(); + ConcurrentLinkedHashMap<String, ConcurrentHashMap<String, Byte>> topicCidMap = new ConcurrentLinkedHashMap.Builder<String, ConcurrentHashMap<String, Byte>>() + .maximumWeightedCapacity(10).build(); ConcurrentHashMap<String, Byte> cids = new ConcurrentHashMap<>(); cids.put(cid, (byte) 1); topicCidMap.put(defaultTopic, cids); popLongPollingService = new PopLongPollingService(brokerController, processor, true); ConcurrentLinkedHashMap<String, ConcurrentSkipListSet<PopRequest>> pollingMap = - new ConcurrentLinkedHashMap.Builder<String, ConcurrentSkipListSet<PopRequest>>().maximumWeightedCapacity(this.brokerController.getBrokerConfig().getPopPollingMapSize()).build(); + new ConcurrentLinkedHashMap.Builder<String, ConcurrentSkipListSet<PopRequest>>().maximumWeightedCapacity(this.brokerController.getBrokerConfig().getPopPollingMapSize()).build(); Channel channel = mock(Channel.class); when(channel.isActive()).thenReturn(true); PopRequest popRequest = mock(PopRequest.class); @@ -126,19 +127,19 @@ public class PopLongPollingServiceTest { boolean actual = popLongPollingService.notifyMessageArriving(defaultTopic, queueId, cid, null, 0, null, null); assertFalse(actual); } - + @Test public void testWakeUpNullRequest() { assertFalse(popLongPollingService.wakeUp(null)); } - + @Test public void testWakeUpIncompleteRequest() { PopRequest request = mock(PopRequest.class); when(request.complete()).thenReturn(false); assertFalse(popLongPollingService.wakeUp(request)); } - + @Test public void testWakeUpInactiveChannel() { PopRequest request = mock(PopRequest.class); @@ -150,7 +151,7 @@ public class PopLongPollingServiceTest { when(brokerController.getPullMessageExecutor()).thenReturn(pullMessageExecutor); assertTrue(popLongPollingService.wakeUp(request)); } - + @Test public void testWakeUpValidRequestWithException() throws Exception { PopRequest request = mock(PopRequest.class); @@ -168,7 +169,7 @@ public class PopLongPollingServiceTest { captor.getValue().run(); verify(processor).processRequest(any(), any()); } - + @Test public void testPollingNotPolling() { ChannelHandlerContext ctx = mock(ChannelHandlerContext.class); @@ -180,7 +181,7 @@ public class PopLongPollingServiceTest { PollingResult result = popLongPollingService.polling(ctx, remotingCommand, requestHeader, subscriptionData, messageFilter); assertEquals(PollingResult.NOT_POLLING, result); } - + @Test public void testPollingServicePollingTimeout() throws IllegalAccessException { String cid = "CID_1"; @@ -194,7 +195,8 @@ public class PopLongPollingServiceTest { when(requestHeader.getPollTime()).thenReturn(1000L); when(requestHeader.getTopic()).thenReturn(defaultTopic); when(requestHeader.getConsumerGroup()).thenReturn("defaultGroup"); - ConcurrentHashMap<String, ConcurrentHashMap<String, Byte>> topicCidMap = new ConcurrentHashMap<>(); + ConcurrentLinkedHashMap<String, ConcurrentHashMap<String, Byte>> topicCidMap = new ConcurrentLinkedHashMap.Builder<String, ConcurrentHashMap<String, Byte>>() + .maximumWeightedCapacity(10).build(); ConcurrentHashMap<String, Byte> cids = new ConcurrentHashMap<>(); cids.put(cid, (byte) 1); topicCidMap.put(defaultTopic, cids); @@ -202,7 +204,7 @@ public class PopLongPollingServiceTest { PollingResult result = popLongPollingService.polling(ctx, remotingCommand, requestHeader, subscriptionData, messageFilter); assertEquals(PollingResult.POLLING_TIMEOUT, result); } - + @Test public void testPollingPollingSuc() { ChannelHandlerContext ctx = mock(ChannelHandlerContext.class); diff --git a/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManagerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManagerTest.java index 25b418c934..4414eda54e 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManagerTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManagerTest.java @@ -21,7 +21,6 @@ import java.time.Duration; import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager; @@ -29,7 +28,6 @@ import org.apache.rocketmq.broker.topic.TopicConfigManager; import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.remoting.protocol.header.ExtraInfoUtil; -import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig; import org.assertj.core.util.Lists; import org.junit.Before; import org.junit.Test; @@ -384,9 +382,7 @@ public class ConsumerOrderInfoManagerTest { SubscriptionGroupManager subscriptionGroupManager = mock(SubscriptionGroupManager.class); when(brokerController.getSubscriptionGroupManager()).thenReturn(subscriptionGroupManager); - ConcurrentMap<String, SubscriptionGroupConfig> subscriptionGroupConfigConcurrentMap = new ConcurrentHashMap<>(); - subscriptionGroupConfigConcurrentMap.put(GROUP, new SubscriptionGroupConfig()); - when(subscriptionGroupManager.getSubscriptionGroupTable()).thenReturn(subscriptionGroupConfigConcurrentMap); + when(subscriptionGroupManager.containsSubscriptionGroup(GROUP)).thenReturn(true); TopicConfig topicConfig = new TopicConfig(TOPIC); when(topicConfigManager.selectTopicConfig(eq(TOPIC))).thenReturn(topicConfig);