This is an automated email from the ASF dual-hosted git repository. lizhanhui pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push: new 71ec1eda49 [ISSUE #8483] Optimize unnecessary broker reverse notification (notifyConsumerIdsChanged) in broadcast mode (#8484) 71ec1eda49 is described below commit 71ec1eda49277a47815159cd3d118854c8dcb3c4 Author: yx9o <yangx_s...@163.com> AuthorDate: Fri Aug 30 14:20:20 2024 +0800 [ISSUE #8483] Optimize unnecessary broker reverse notification (notifyConsumerIdsChanged) in broadcast mode (#8484) * [ISSUE #8483] Optimize unnecessary broker reverse notification (notifyConsumerIdsChanged) in broadcast mode * Update * Update test * Update test --- .../rocketmq/broker/client/ConsumerManager.java | 15 ++-- .../broker/client/ConsumerManagerTest.java | 93 +++++++++++----------- 2 files changed, 57 insertions(+), 51 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java index 9f838b5154..b1057e2a8d 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java @@ -145,8 +145,9 @@ public class ConsumerManager { callConsumerIdsChangeListener(ConsumerGroupEvent.UNREGISTER, next.getKey()); } } - - callConsumerIdsChangeListener(ConsumerGroupEvent.CHANGE, next.getKey(), info.getAllChannel()); + if (!isBroadcastMode(info.getMessageModel())) { + callConsumerIdsChangeListener(ConsumerGroupEvent.CHANGE, next.getKey(), info.getAllChannel()); + } } } return removed; @@ -196,7 +197,7 @@ public class ConsumerManager { } if (r1 || r2) { - if (isNotifyConsumerIdsChangedEnable) { + if (isNotifyConsumerIdsChangedEnable && !isBroadcastMode(consumerGroupInfo.getMessageModel())) { callConsumerIdsChangeListener(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel()); } } @@ -219,7 +220,7 @@ public class ConsumerManager { consumerGroupInfo = prev != null ? prev : tmp; } boolean updateChannelRst = consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel, consumeFromWhere); - if (updateChannelRst && isNotifyConsumerIdsChangedEnable) { + if (updateChannelRst && isNotifyConsumerIdsChangedEnable && !isBroadcastMode(consumerGroupInfo.getMessageModel())) { callConsumerIdsChangeListener(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel()); } if (null != this.brokerStatsManager) { @@ -244,7 +245,7 @@ public class ConsumerManager { callConsumerIdsChangeListener(ConsumerGroupEvent.UNREGISTER, group); } } - if (isNotifyConsumerIdsChangedEnable) { + if (isNotifyConsumerIdsChangedEnable && !isBroadcastMode(consumerGroupInfo.getMessageModel())) { callConsumerIdsChangeListener(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel()); } } @@ -334,4 +335,8 @@ public class ConsumerManager { } } } + + private boolean isBroadcastMode(final MessageModel messageModel) { + return MessageModel.BROADCASTING.equals(messageModel); + } } diff --git a/broker/src/test/java/org/apache/rocketmq/broker/client/ConsumerManagerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/client/ConsumerManagerTest.java index 8c90982434..a23ad20037 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/client/ConsumerManagerTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/client/ConsumerManagerTest.java @@ -18,10 +18,7 @@ package org.apache.rocketmq.broker.client; import io.netty.channel.Channel; -import java.util.HashSet; -import java.util.Set; import org.apache.rocketmq.broker.BrokerController; -import org.apache.rocketmq.broker.client.net.Broker2Client; import org.apache.rocketmq.broker.filter.ConsumerFilterManager; import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; @@ -30,13 +27,25 @@ import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumeType; import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel; import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData; import org.apache.rocketmq.store.stats.BrokerStatsManager; -import org.assertj.core.api.Assertions; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mock; import org.mockito.junit.MockitoJUnitRunner; +import java.util.HashSet; +import java.util.Set; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @RunWith(MockitoJUnitRunner.class) @@ -49,19 +58,10 @@ public class ConsumerManagerTest { private ConsumerManager consumerManager; - private DefaultConsumerIdsChangeListener defaultConsumerIdsChangeListener; - @Mock private BrokerController brokerController; - @Mock - private ConsumerFilterManager consumerFilterManager; - - private BrokerConfig brokerConfig = new BrokerConfig(); - - private Broker2Client broker2Client; - - private BrokerStatsManager brokerStatsManager; + private final BrokerConfig brokerConfig = new BrokerConfig(); private static final String GROUP = "DEFAULT_GROUP"; @@ -74,40 +74,38 @@ public class ConsumerManagerTest { @Before public void before() { clientChannelInfo = new ClientChannelInfo(channel, CLIENT_ID, LanguageCode.JAVA, VERSION); - defaultConsumerIdsChangeListener = new DefaultConsumerIdsChangeListener(brokerController); - brokerStatsManager = new BrokerStatsManager(brokerConfig); - consumerManager = new ConsumerManager(defaultConsumerIdsChangeListener, brokerStatsManager, brokerConfig); - broker2Client = new Broker2Client(brokerController); + DefaultConsumerIdsChangeListener defaultConsumerIdsChangeListener = new DefaultConsumerIdsChangeListener(brokerController); + BrokerStatsManager brokerStatsManager = new BrokerStatsManager(brokerConfig); + consumerManager = spy(new ConsumerManager(defaultConsumerIdsChangeListener, brokerStatsManager, brokerConfig)); + ConsumerFilterManager consumerFilterManager = mock(ConsumerFilterManager.class); when(brokerController.getConsumerFilterManager()).thenReturn(consumerFilterManager); - when(brokerController.getBrokerConfig()).thenReturn(brokerConfig); - when(brokerController.getBroker2Client()).thenReturn(broker2Client); } @Test public void compensateBasicConsumerInfoTest() { ConsumerGroupInfo consumerGroupInfo = consumerManager.getConsumerGroupInfo(GROUP, true); - Assertions.assertThat(consumerGroupInfo).isNull(); + assertThat(consumerGroupInfo).isNull(); consumerManager.compensateBasicConsumerInfo(GROUP, ConsumeType.CONSUME_ACTIVELY, MessageModel.BROADCASTING); consumerGroupInfo = consumerManager.getConsumerGroupInfo(GROUP, true); - Assertions.assertThat(consumerGroupInfo).isNotNull(); - Assertions.assertThat(consumerGroupInfo.getConsumeType()).isEqualTo(ConsumeType.CONSUME_ACTIVELY); - Assertions.assertThat(consumerGroupInfo.getMessageModel()).isEqualTo(MessageModel.BROADCASTING); + assertThat(consumerGroupInfo).isNotNull(); + assertThat(consumerGroupInfo.getConsumeType()).isEqualTo(ConsumeType.CONSUME_ACTIVELY); + assertThat(consumerGroupInfo.getMessageModel()).isEqualTo(MessageModel.BROADCASTING); } @Test public void compensateSubscribeDataTest() { ConsumerGroupInfo consumerGroupInfo = consumerManager.getConsumerGroupInfo(GROUP, true); - Assertions.assertThat(consumerGroupInfo).isNull(); + assertThat(consumerGroupInfo).isNull(); consumerManager.compensateSubscribeData(GROUP, TOPIC, new SubscriptionData(TOPIC, SubscriptionData.SUB_ALL)); consumerGroupInfo = consumerManager.getConsumerGroupInfo(GROUP, true); - Assertions.assertThat(consumerGroupInfo).isNotNull(); - Assertions.assertThat(consumerGroupInfo.getSubscriptionTable().size()).isEqualTo(1); + assertThat(consumerGroupInfo).isNotNull(); + assertThat(consumerGroupInfo.getSubscriptionTable().size()).isEqualTo(1); SubscriptionData subscriptionData = consumerGroupInfo.getSubscriptionTable().get(TOPIC); - Assertions.assertThat(subscriptionData).isNotNull(); - Assertions.assertThat(subscriptionData.getTopic()).isEqualTo(TOPIC); - Assertions.assertThat(subscriptionData.getSubString()).isEqualTo(SubscriptionData.SUB_ALL); + assertThat(subscriptionData).isNotNull(); + assertThat(subscriptionData.getTopic()).isEqualTo(TOPIC); + assertThat(subscriptionData.getSubString()).isEqualTo(SubscriptionData.SUB_ALL); } @Test @@ -118,7 +116,8 @@ public class ConsumerManagerTest { subList.add(subscriptionData); consumerManager.registerConsumer(GROUP, clientChannelInfo, ConsumeType.CONSUME_PASSIVELY, MessageModel.BROADCASTING, ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET, subList, true); - Assertions.assertThat(consumerManager.getConsumerTable().get(GROUP)).isNotNull(); + verify(consumerManager, never()).callConsumerIdsChangeListener(eq(ConsumerGroupEvent.CHANGE), any(), any()); + assertThat(consumerManager.getConsumerTable().get(GROUP)).isNotNull(); } @Test @@ -128,63 +127,65 @@ public class ConsumerManagerTest { // unregister consumerManager.unregisterConsumer(GROUP, clientChannelInfo, true); - Assertions.assertThat(consumerManager.getConsumerTable().get(GROUP)).isNull(); + verify(consumerManager, never()).callConsumerIdsChangeListener(eq(ConsumerGroupEvent.CHANGE), any(), any()); + assertThat(consumerManager.getConsumerTable().get(GROUP)).isNull(); } @Test public void findChannelTest() { register(); final ClientChannelInfo consumerManagerChannel = consumerManager.findChannel(GROUP, CLIENT_ID); - Assertions.assertThat(consumerManagerChannel).isNotNull(); + assertThat(consumerManagerChannel).isNotNull(); } @Test public void findSubscriptionDataTest() { register(); final SubscriptionData subscriptionData = consumerManager.findSubscriptionData(GROUP, TOPIC); - Assertions.assertThat(subscriptionData).isNotNull(); + assertThat(subscriptionData).isNotNull(); } @Test public void findSubscriptionDataCountTest() { register(); final int count = consumerManager.findSubscriptionDataCount(GROUP); - assert count > 0; + assertTrue(count > 0); } @Test public void findSubscriptionTest() { SubscriptionData subscriptionData = consumerManager.findSubscriptionData(GROUP, TOPIC, true); - Assertions.assertThat(subscriptionData).isNull(); + assertThat(subscriptionData).isNull(); consumerManager.compensateSubscribeData(GROUP, TOPIC, new SubscriptionData(TOPIC, SubscriptionData.SUB_ALL)); subscriptionData = consumerManager.findSubscriptionData(GROUP, TOPIC, true); - Assertions.assertThat(subscriptionData).isNotNull(); - Assertions.assertThat(subscriptionData.getTopic()).isEqualTo(TOPIC); - Assertions.assertThat(subscriptionData.getSubString()).isEqualTo(SubscriptionData.SUB_ALL); + assertThat(subscriptionData).isNotNull(); + assertThat(subscriptionData.getTopic()).isEqualTo(TOPIC); + assertThat(subscriptionData.getSubString()).isEqualTo(SubscriptionData.SUB_ALL); subscriptionData = consumerManager.findSubscriptionData(GROUP, TOPIC, false); - Assertions.assertThat(subscriptionData).isNull(); + assertThat(subscriptionData).isNull(); } @Test public void scanNotActiveChannelTest() { clientChannelInfo.setLastUpdateTimestamp(System.currentTimeMillis() - brokerConfig.getChannelExpiredTimeout() * 2); consumerManager.scanNotActiveChannel(); - Assertions.assertThat(consumerManager.getConsumerTable().size()).isEqualTo(0); + assertThat(consumerManager.getConsumerTable().size()).isEqualTo(0); } @Test public void queryTopicConsumeByWhoTest() { register(); final HashSet<String> consumeGroup = consumerManager.queryTopicConsumeByWho(TOPIC); - assert consumeGroup.size() > 0; + assertFalse(consumeGroup.isEmpty()); } @Test public void doChannelCloseEventTest() { consumerManager.doChannelCloseEvent("127.0.0.1", channel); - assert consumerManager.getConsumerTable().size() == 0; + verify(consumerManager, never()).callConsumerIdsChangeListener(eq(ConsumerGroupEvent.CHANGE), any(), any()); + assertEquals(0, consumerManager.getConsumerTable().size()); } private void register() { @@ -203,8 +204,8 @@ public class ConsumerManagerTest { consumerManager.compensateSubscribeData(GROUP, TOPIC, subscriptionData); consumerManager.compensateSubscribeData(GROUP, TOPIC + "_1", new SubscriptionData(TOPIC, SubscriptionData.SUB_ALL)); consumerManager.removeExpireConsumerGroupInfo(); - Assertions.assertThat(consumerManager.getConsumerGroupInfo(GROUP, true)).isNotNull(); - Assertions.assertThat(consumerManager.findSubscriptionData(GROUP, TOPIC)).isNull(); - Assertions.assertThat(consumerManager.findSubscriptionData(GROUP, TOPIC + "_1")).isNotNull(); + assertThat(consumerManager.getConsumerGroupInfo(GROUP, true)).isNotNull(); + assertThat(consumerManager.findSubscriptionData(GROUP, TOPIC)).isNull(); + assertThat(consumerManager.findSubscriptionData(GROUP, TOPIC + "_1")).isNotNull(); } }