This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 71ec1eda49 [ISSUE #8483] Optimize unnecessary broker reverse 
notification (notifyConsumerIdsChanged) in broadcast mode (#8484)
71ec1eda49 is described below

commit 71ec1eda49277a47815159cd3d118854c8dcb3c4
Author: yx9o <yangx_s...@163.com>
AuthorDate: Fri Aug 30 14:20:20 2024 +0800

    [ISSUE #8483] Optimize unnecessary broker reverse notification 
(notifyConsumerIdsChanged) in broadcast mode (#8484)
    
    * [ISSUE #8483] Optimize unnecessary broker reverse notification 
(notifyConsumerIdsChanged) in broadcast mode
    
    * Update
    
    * Update test
    
    * Update test
---
 .../rocketmq/broker/client/ConsumerManager.java    | 15 ++--
 .../broker/client/ConsumerManagerTest.java         | 93 +++++++++++-----------
 2 files changed, 57 insertions(+), 51 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java 
b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java
index 9f838b5154..b1057e2a8d 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java
@@ -145,8 +145,9 @@ public class ConsumerManager {
                         
callConsumerIdsChangeListener(ConsumerGroupEvent.UNREGISTER, next.getKey());
                     }
                 }
-
-                callConsumerIdsChangeListener(ConsumerGroupEvent.CHANGE, 
next.getKey(), info.getAllChannel());
+                if (!isBroadcastMode(info.getMessageModel())) {
+                    callConsumerIdsChangeListener(ConsumerGroupEvent.CHANGE, 
next.getKey(), info.getAllChannel());
+                }
             }
         }
         return removed;
@@ -196,7 +197,7 @@ public class ConsumerManager {
         }
 
         if (r1 || r2) {
-            if (isNotifyConsumerIdsChangedEnable) {
+            if (isNotifyConsumerIdsChangedEnable && 
!isBroadcastMode(consumerGroupInfo.getMessageModel())) {
                 callConsumerIdsChangeListener(ConsumerGroupEvent.CHANGE, 
group, consumerGroupInfo.getAllChannel());
             }
         }
@@ -219,7 +220,7 @@ public class ConsumerManager {
             consumerGroupInfo = prev != null ? prev : tmp;
         }
         boolean updateChannelRst = 
consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel, 
consumeFromWhere);
-        if (updateChannelRst && isNotifyConsumerIdsChangedEnable) {
+        if (updateChannelRst && isNotifyConsumerIdsChangedEnable && 
!isBroadcastMode(consumerGroupInfo.getMessageModel())) {
             callConsumerIdsChangeListener(ConsumerGroupEvent.CHANGE, group, 
consumerGroupInfo.getAllChannel());
         }
         if (null != this.brokerStatsManager) {
@@ -244,7 +245,7 @@ public class ConsumerManager {
                     
callConsumerIdsChangeListener(ConsumerGroupEvent.UNREGISTER, group);
                 }
             }
-            if (isNotifyConsumerIdsChangedEnable) {
+            if (isNotifyConsumerIdsChangedEnable && 
!isBroadcastMode(consumerGroupInfo.getMessageModel())) {
                 callConsumerIdsChangeListener(ConsumerGroupEvent.CHANGE, 
group, consumerGroupInfo.getAllChannel());
             }
         }
@@ -334,4 +335,8 @@ public class ConsumerManager {
             }
         }
     }
+
+    private boolean isBroadcastMode(final MessageModel messageModel) {
+        return MessageModel.BROADCASTING.equals(messageModel);
+    }
 }
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/client/ConsumerManagerTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/client/ConsumerManagerTest.java
index 8c90982434..a23ad20037 100644
--- 
a/broker/src/test/java/org/apache/rocketmq/broker/client/ConsumerManagerTest.java
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/client/ConsumerManagerTest.java
@@ -18,10 +18,7 @@
 package org.apache.rocketmq.broker.client;
 
 import io.netty.channel.Channel;
-import java.util.HashSet;
-import java.util.Set;
 import org.apache.rocketmq.broker.BrokerController;
-import org.apache.rocketmq.broker.client.net.Broker2Client;
 import org.apache.rocketmq.broker.filter.ConsumerFilterManager;
 import org.apache.rocketmq.common.BrokerConfig;
 import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
@@ -30,13 +27,25 @@ import 
org.apache.rocketmq.remoting.protocol.heartbeat.ConsumeType;
 import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
 import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
 import org.apache.rocketmq.store.stats.BrokerStatsManager;
-import org.assertj.core.api.Assertions;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.Mock;
 import org.mockito.junit.MockitoJUnitRunner;
 
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 @RunWith(MockitoJUnitRunner.class)
@@ -49,19 +58,10 @@ public class ConsumerManagerTest {
 
     private ConsumerManager consumerManager;
 
-    private DefaultConsumerIdsChangeListener defaultConsumerIdsChangeListener;
-
     @Mock
     private BrokerController brokerController;
 
-    @Mock
-    private ConsumerFilterManager consumerFilterManager;
-
-    private BrokerConfig brokerConfig = new BrokerConfig();
-
-    private Broker2Client broker2Client;
-
-    private BrokerStatsManager brokerStatsManager;
+    private final BrokerConfig brokerConfig = new BrokerConfig();
 
     private static final String GROUP = "DEFAULT_GROUP";
 
@@ -74,40 +74,38 @@ public class ConsumerManagerTest {
     @Before
     public void before() {
         clientChannelInfo = new ClientChannelInfo(channel, CLIENT_ID, 
LanguageCode.JAVA, VERSION);
-        defaultConsumerIdsChangeListener = new 
DefaultConsumerIdsChangeListener(brokerController);
-        brokerStatsManager = new BrokerStatsManager(brokerConfig);
-        consumerManager = new 
ConsumerManager(defaultConsumerIdsChangeListener, brokerStatsManager, 
brokerConfig);
-        broker2Client = new Broker2Client(brokerController);
+        DefaultConsumerIdsChangeListener defaultConsumerIdsChangeListener = 
new DefaultConsumerIdsChangeListener(brokerController);
+        BrokerStatsManager brokerStatsManager = new 
BrokerStatsManager(brokerConfig);
+        consumerManager = spy(new 
ConsumerManager(defaultConsumerIdsChangeListener, brokerStatsManager, 
brokerConfig));
+        ConsumerFilterManager consumerFilterManager = 
mock(ConsumerFilterManager.class);
         
when(brokerController.getConsumerFilterManager()).thenReturn(consumerFilterManager);
-        when(brokerController.getBrokerConfig()).thenReturn(brokerConfig);
-        when(brokerController.getBroker2Client()).thenReturn(broker2Client);
     }
 
     @Test
     public void compensateBasicConsumerInfoTest() {
         ConsumerGroupInfo consumerGroupInfo = 
consumerManager.getConsumerGroupInfo(GROUP, true);
-        Assertions.assertThat(consumerGroupInfo).isNull();
+        assertThat(consumerGroupInfo).isNull();
 
         consumerManager.compensateBasicConsumerInfo(GROUP, 
ConsumeType.CONSUME_ACTIVELY, MessageModel.BROADCASTING);
         consumerGroupInfo = consumerManager.getConsumerGroupInfo(GROUP, true);
-        Assertions.assertThat(consumerGroupInfo).isNotNull();
-        
Assertions.assertThat(consumerGroupInfo.getConsumeType()).isEqualTo(ConsumeType.CONSUME_ACTIVELY);
-        
Assertions.assertThat(consumerGroupInfo.getMessageModel()).isEqualTo(MessageModel.BROADCASTING);
+        assertThat(consumerGroupInfo).isNotNull();
+        
assertThat(consumerGroupInfo.getConsumeType()).isEqualTo(ConsumeType.CONSUME_ACTIVELY);
+        
assertThat(consumerGroupInfo.getMessageModel()).isEqualTo(MessageModel.BROADCASTING);
     }
 
     @Test
     public void compensateSubscribeDataTest() {
         ConsumerGroupInfo consumerGroupInfo = 
consumerManager.getConsumerGroupInfo(GROUP, true);
-        Assertions.assertThat(consumerGroupInfo).isNull();
+        assertThat(consumerGroupInfo).isNull();
 
         consumerManager.compensateSubscribeData(GROUP, TOPIC, new 
SubscriptionData(TOPIC, SubscriptionData.SUB_ALL));
         consumerGroupInfo = consumerManager.getConsumerGroupInfo(GROUP, true);
-        Assertions.assertThat(consumerGroupInfo).isNotNull();
-        
Assertions.assertThat(consumerGroupInfo.getSubscriptionTable().size()).isEqualTo(1);
+        assertThat(consumerGroupInfo).isNotNull();
+        
assertThat(consumerGroupInfo.getSubscriptionTable().size()).isEqualTo(1);
         SubscriptionData subscriptionData = 
consumerGroupInfo.getSubscriptionTable().get(TOPIC);
-        Assertions.assertThat(subscriptionData).isNotNull();
-        Assertions.assertThat(subscriptionData.getTopic()).isEqualTo(TOPIC);
-        
Assertions.assertThat(subscriptionData.getSubString()).isEqualTo(SubscriptionData.SUB_ALL);
+        assertThat(subscriptionData).isNotNull();
+        assertThat(subscriptionData.getTopic()).isEqualTo(TOPIC);
+        
assertThat(subscriptionData.getSubString()).isEqualTo(SubscriptionData.SUB_ALL);
     }
 
     @Test
@@ -118,7 +116,8 @@ public class ConsumerManagerTest {
         subList.add(subscriptionData);
         consumerManager.registerConsumer(GROUP, clientChannelInfo, 
ConsumeType.CONSUME_PASSIVELY,
             MessageModel.BROADCASTING, 
ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET, subList, true);
-        
Assertions.assertThat(consumerManager.getConsumerTable().get(GROUP)).isNotNull();
+        verify(consumerManager, 
never()).callConsumerIdsChangeListener(eq(ConsumerGroupEvent.CHANGE), any(), 
any());
+        assertThat(consumerManager.getConsumerTable().get(GROUP)).isNotNull();
     }
 
     @Test
@@ -128,63 +127,65 @@ public class ConsumerManagerTest {
 
         // unregister
         consumerManager.unregisterConsumer(GROUP, clientChannelInfo, true);
-        
Assertions.assertThat(consumerManager.getConsumerTable().get(GROUP)).isNull();
+        verify(consumerManager, 
never()).callConsumerIdsChangeListener(eq(ConsumerGroupEvent.CHANGE), any(), 
any());
+        assertThat(consumerManager.getConsumerTable().get(GROUP)).isNull();
     }
 
     @Test
     public void findChannelTest() {
         register();
         final ClientChannelInfo consumerManagerChannel = 
consumerManager.findChannel(GROUP, CLIENT_ID);
-        Assertions.assertThat(consumerManagerChannel).isNotNull();
+        assertThat(consumerManagerChannel).isNotNull();
     }
 
     @Test
     public void findSubscriptionDataTest() {
         register();
         final SubscriptionData subscriptionData = 
consumerManager.findSubscriptionData(GROUP, TOPIC);
-        Assertions.assertThat(subscriptionData).isNotNull();
+        assertThat(subscriptionData).isNotNull();
     }
 
     @Test
     public void findSubscriptionDataCountTest() {
         register();
         final int count = consumerManager.findSubscriptionDataCount(GROUP);
-        assert count > 0;
+        assertTrue(count > 0);
     }
 
     @Test
     public void findSubscriptionTest() {
         SubscriptionData subscriptionData = 
consumerManager.findSubscriptionData(GROUP, TOPIC, true);
-        Assertions.assertThat(subscriptionData).isNull();
+        assertThat(subscriptionData).isNull();
 
         consumerManager.compensateSubscribeData(GROUP, TOPIC, new 
SubscriptionData(TOPIC, SubscriptionData.SUB_ALL));
         subscriptionData = consumerManager.findSubscriptionData(GROUP, TOPIC, 
true);
-        Assertions.assertThat(subscriptionData).isNotNull();
-        Assertions.assertThat(subscriptionData.getTopic()).isEqualTo(TOPIC);
-        
Assertions.assertThat(subscriptionData.getSubString()).isEqualTo(SubscriptionData.SUB_ALL);
+        assertThat(subscriptionData).isNotNull();
+        assertThat(subscriptionData.getTopic()).isEqualTo(TOPIC);
+        
assertThat(subscriptionData.getSubString()).isEqualTo(SubscriptionData.SUB_ALL);
 
         subscriptionData = consumerManager.findSubscriptionData(GROUP, TOPIC, 
false);
-        Assertions.assertThat(subscriptionData).isNull();
+        assertThat(subscriptionData).isNull();
     }
 
     @Test
     public void scanNotActiveChannelTest() {
         clientChannelInfo.setLastUpdateTimestamp(System.currentTimeMillis() - 
brokerConfig.getChannelExpiredTimeout() * 2);
         consumerManager.scanNotActiveChannel();
-        
Assertions.assertThat(consumerManager.getConsumerTable().size()).isEqualTo(0);
+        assertThat(consumerManager.getConsumerTable().size()).isEqualTo(0);
     }
 
     @Test
     public void queryTopicConsumeByWhoTest() {
         register();
         final HashSet<String> consumeGroup = 
consumerManager.queryTopicConsumeByWho(TOPIC);
-        assert consumeGroup.size() > 0;
+        assertFalse(consumeGroup.isEmpty());
     }
 
     @Test
     public void doChannelCloseEventTest() {
         consumerManager.doChannelCloseEvent("127.0.0.1", channel);
-        assert consumerManager.getConsumerTable().size() == 0;
+        verify(consumerManager, 
never()).callConsumerIdsChangeListener(eq(ConsumerGroupEvent.CHANGE), any(), 
any());
+        assertEquals(0, consumerManager.getConsumerTable().size());
     }
 
     private void register() {
@@ -203,8 +204,8 @@ public class ConsumerManagerTest {
         consumerManager.compensateSubscribeData(GROUP, TOPIC, 
subscriptionData);
         consumerManager.compensateSubscribeData(GROUP, TOPIC + "_1", new 
SubscriptionData(TOPIC, SubscriptionData.SUB_ALL));
         consumerManager.removeExpireConsumerGroupInfo();
-        Assertions.assertThat(consumerManager.getConsumerGroupInfo(GROUP, 
true)).isNotNull();
-        Assertions.assertThat(consumerManager.findSubscriptionData(GROUP, 
TOPIC)).isNull();
-        Assertions.assertThat(consumerManager.findSubscriptionData(GROUP, 
TOPIC + "_1")).isNotNull();
+        assertThat(consumerManager.getConsumerGroupInfo(GROUP, 
true)).isNotNull();
+        assertThat(consumerManager.findSubscriptionData(GROUP, 
TOPIC)).isNull();
+        assertThat(consumerManager.findSubscriptionData(GROUP, TOPIC + 
"_1")).isNotNull();
     }
 }

Reply via email to