This is an automated email from the ASF dual-hosted git repository.

kaili pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new eef581b464 [ISSUE #6785] Isolate the remoteChannel by group (#6786)
eef581b464 is described below

commit eef581b464d0144a3ec400a20087196f7eefd764
Author: lk <[email protected]>
AuthorDate: Tue May 23 16:41:46 2023 +0800

    [ISSUE #6785] Isolate the remoteChannel by group (#6786)
---
 .../proxy/service/sysmessage/HeartbeatSyncer.java  |   4 +-
 .../service/sysmessage/HeartbeatSyncerTest.java    | 145 ++++++++++++++-------
 2 files changed, 99 insertions(+), 50 deletions(-)

diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncer.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncer.java
index fb3903697b..3333ebd2d9 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncer.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncer.java
@@ -49,7 +49,7 @@ public class HeartbeatSyncer extends 
AbstractSystemMessageSyncer {
 
     protected ThreadPoolExecutor threadPoolExecutor;
     protected ConsumerManager consumerManager;
-    protected final Map<String /* channelId as longText */, RemoteChannel> 
remoteChannelMap = new ConcurrentHashMap<>();
+    protected final Map<String /* group @ channelId as longText */, 
RemoteChannel> remoteChannelMap = new ConcurrentHashMap<>();
     protected String localProxyId;
 
     public HeartbeatSyncer(TopicRouteService topicRouteService, AdminService 
adminService,
@@ -188,7 +188,7 @@ public class HeartbeatSyncer extends 
AbstractSystemMessageSyncer {
                 }
 
                 RemoteChannel decodedChannel = 
RemoteChannel.decode(data.getChannelData());
-                RemoteChannel channel = 
remoteChannelMap.computeIfAbsent(decodedChannel.id().asLongText(), key -> 
decodedChannel);
+                RemoteChannel channel = 
remoteChannelMap.computeIfAbsent(data.getGroup() + "@" + 
decodedChannel.id().asLongText(), key -> decodedChannel);
                 
channel.setExtendAttribute(decodedChannel.getChannelExtendAttribute());
                 ClientChannelInfo clientChannelInfo = new ClientChannelInfo(
                     channel,
diff --git 
a/proxy/src/test/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerTest.java
 
b/proxy/src/test/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerTest.java
index 078a1bc997..6540523598 100644
--- 
a/proxy/src/test/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerTest.java
+++ 
b/proxy/src/test/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerTest.java
@@ -32,9 +32,12 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
 import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.rocketmq.broker.client.ClientChannelInfo;
 import org.apache.rocketmq.broker.client.ConsumerManager;
+import org.apache.rocketmq.client.impl.mqclient.MQClientAPIExt;
+import org.apache.rocketmq.client.impl.mqclient.MQClientAPIFactory;
 import org.apache.rocketmq.client.producer.SendResult;
 import org.apache.rocketmq.client.producer.SendStatus;
 import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
@@ -50,8 +53,6 @@ import 
org.apache.rocketmq.proxy.remoting.RemotingProxyOutClient;
 import org.apache.rocketmq.proxy.remoting.channel.RemotingChannel;
 import org.apache.rocketmq.proxy.service.admin.AdminService;
 import org.apache.rocketmq.proxy.service.channel.SimpleChannel;
-import org.apache.rocketmq.client.impl.mqclient.MQClientAPIExt;
-import org.apache.rocketmq.client.impl.mqclient.MQClientAPIFactory;
 import org.apache.rocketmq.proxy.service.relay.ProxyRelayService;
 import org.apache.rocketmq.proxy.service.route.MessageQueueView;
 import org.apache.rocketmq.proxy.service.route.TopicRouteService;
@@ -74,7 +75,9 @@ import org.mockito.junit.MockitoJUnitRunner;
 
 import static org.awaitility.Awaitility.await;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotSame;
 import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.ArgumentMatchers.anyLong;
@@ -215,64 +218,106 @@ public class HeartbeatSyncerTest extends InitConfigTest {
     @Test
     public void testSyncRemotingChannel() throws Exception {
         String consumerGroup = "consumerGroup";
+        String consumerGroup2 = "consumerGroup2";
+        Channel channel = createMockChannel();
         Set<SubscriptionData> subscriptionDataSet = new HashSet<>();
         subscriptionDataSet.add(FilterAPI.buildSubscriptionData("topic", 
"tagSub"));
+        Set<SubscriptionData> subscriptionDataSet2 = new HashSet<>();
+        subscriptionDataSet2.add(FilterAPI.buildSubscriptionData("topic2", 
"tagSub2"));
         RemotingProxyOutClient remotingProxyOutClient = 
mock(RemotingProxyOutClient.class);
-        RemotingChannel remotingChannel = new 
RemotingChannel(remotingProxyOutClient, proxyRelayService, createMockChannel(), 
clientId, subscriptionDataSet);
+        RemotingChannel remotingChannel = new 
RemotingChannel(remotingProxyOutClient, proxyRelayService, channel, clientId, 
subscriptionDataSet);
         ClientChannelInfo clientChannelInfo = new ClientChannelInfo(
             remotingChannel,
             clientId,
             LanguageCode.JAVA,
             4
         );
-
-        ArgumentCaptor<Message> messageArgumentCaptor = 
ArgumentCaptor.forClass(Message.class);
-        SendResult sendResult = new SendResult();
-        sendResult.setSendStatus(SendStatus.SEND_OK);
-        
doReturn(CompletableFuture.completedFuture(sendResult)).when(this.mqClientAPIExt)
-            .sendMessageAsync(anyString(), anyString(), 
messageArgumentCaptor.capture(), any(), anyLong());
-
-        HeartbeatSyncer heartbeatSyncer = new 
HeartbeatSyncer(topicRouteService, adminService, consumerManager, 
mqClientAPIFactory);
-        heartbeatSyncer.onConsumerRegister(
-            consumerGroup,
-            clientChannelInfo,
-            ConsumeType.CONSUME_PASSIVELY,
-            MessageModel.CLUSTERING,
-            ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET,
-            subscriptionDataSet
+        RemotingChannel remotingChannel2 = new 
RemotingChannel(remotingProxyOutClient, proxyRelayService, channel, clientId, 
subscriptionDataSet2);
+        ClientChannelInfo clientChannelInfo2 = new ClientChannelInfo(
+            remotingChannel2,
+            clientId,
+            LanguageCode.JAVA,
+            4
         );
 
-        await().atMost(Duration.ofSeconds(3)).until(() -> 
!messageArgumentCaptor.getAllValues().isEmpty());
-        
heartbeatSyncer.consumeMessage(Lists.newArrayList(convertFromMessage(messageArgumentCaptor.getValue())),
 null);
-        verify(consumerManager, never()).registerConsumer(anyString(), any(), 
any(), any(), any(), any(), anyBoolean());
-
-        String localServeAddr = 
ConfigurationManager.getProxyConfig().getLocalServeAddr();
-        // change local serve addr, to simulate other proxy receive messages
-        heartbeatSyncer.localProxyId = RandomStringUtils.randomAlphabetic(10);
-        ArgumentCaptor<ClientChannelInfo> syncChannelInfoArgumentCaptor = 
ArgumentCaptor.forClass(ClientChannelInfo.class);
-        doReturn(true).when(consumerManager).registerConsumer(anyString(), 
syncChannelInfoArgumentCaptor.capture(), any(), any(), any(), any(), 
anyBoolean());
-
-        
heartbeatSyncer.consumeMessage(Lists.newArrayList(convertFromMessage(messageArgumentCaptor.getValue())),
 null);
-        
heartbeatSyncer.consumeMessage(Lists.newArrayList(convertFromMessage(messageArgumentCaptor.getValue())),
 null);
-        assertEquals(2, syncChannelInfoArgumentCaptor.getAllValues().size());
-        List<ClientChannelInfo> channelInfoList = 
syncChannelInfoArgumentCaptor.getAllValues();
-        assertSame(channelInfoList.get(0).getChannel(), 
channelInfoList.get(1).getChannel());
-        assertEquals(subscriptionDataSet, 
RemotingChannel.parseChannelExtendAttribute(channelInfoList.get(0).getChannel()));
-        assertEquals(subscriptionDataSet, 
RemotingChannel.parseChannelExtendAttribute(channelInfoList.get(1).getChannel()));
-
-        // start test sync client unregister
-        // reset localServeAddr
-        
ConfigurationManager.getProxyConfig().setLocalServeAddr(localServeAddr);
-        heartbeatSyncer.onConsumerUnRegister(consumerGroup, clientChannelInfo);
-        await().atMost(Duration.ofSeconds(3)).until(() -> 
messageArgumentCaptor.getAllValues().size() == 2);
-
-        ArgumentCaptor<ClientChannelInfo> 
syncUnRegisterChannelInfoArgumentCaptor = 
ArgumentCaptor.forClass(ClientChannelInfo.class);
-        doNothing().when(consumerManager).unregisterConsumer(anyString(), 
syncUnRegisterChannelInfoArgumentCaptor.capture(), anyBoolean());
+        HeartbeatSyncer heartbeatSyncer = new 
HeartbeatSyncer(topicRouteService, adminService, consumerManager, 
mqClientAPIFactory);
+        SendResult okSendResult = new SendResult();
+        okSendResult.setSendStatus(SendStatus.SEND_OK);
+        {
+            ArgumentCaptor<Message> messageArgumentCaptor = 
ArgumentCaptor.forClass(Message.class);
+            
doReturn(CompletableFuture.completedFuture(okSendResult)).when(this.mqClientAPIExt)
+                .sendMessageAsync(anyString(), anyString(), 
messageArgumentCaptor.capture(), any(), anyLong());
+
+            heartbeatSyncer.onConsumerRegister(
+                consumerGroup,
+                clientChannelInfo,
+                ConsumeType.CONSUME_PASSIVELY,
+                MessageModel.CLUSTERING,
+                ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET,
+                subscriptionDataSet
+            );
+            heartbeatSyncer.onConsumerRegister(
+                consumerGroup2,
+                clientChannelInfo2,
+                ConsumeType.CONSUME_PASSIVELY,
+                MessageModel.CLUSTERING,
+                ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET,
+                subscriptionDataSet2
+            );
+
+            await().atMost(Duration.ofSeconds(3)).until(() -> 
messageArgumentCaptor.getAllValues().size() == 2);
+            
heartbeatSyncer.consumeMessage(convertFromMessage(messageArgumentCaptor.getAllValues()),
 null);
+            verify(consumerManager, never()).registerConsumer(anyString(), 
any(), any(), any(), any(), any(), anyBoolean());
+
+            // change local serve addr, to simulate other proxy receive 
messages
+            heartbeatSyncer.localProxyId = 
RandomStringUtils.randomAlphabetic(10);
+            ArgumentCaptor<ClientChannelInfo> syncChannelInfoArgumentCaptor = 
ArgumentCaptor.forClass(ClientChannelInfo.class);
+            doReturn(true).when(consumerManager).registerConsumer(anyString(), 
syncChannelInfoArgumentCaptor.capture(), any(), any(), any(), any(), 
anyBoolean());
+
+            
heartbeatSyncer.consumeMessage(convertFromMessage(messageArgumentCaptor.getAllValues()),
 null);
+            
heartbeatSyncer.consumeMessage(convertFromMessage(messageArgumentCaptor.getAllValues()),
 null);
+            /*
+            data in syncChannelInfoArgumentCaptor will be like:
+            1st, data of group1
+            2nd, data of group2
+            3rd, data of group1
+            4th, data of group2
+             */
+            assertEquals(4, 
syncChannelInfoArgumentCaptor.getAllValues().size());
+            List<ClientChannelInfo> channelInfoList = 
syncChannelInfoArgumentCaptor.getAllValues();
+            assertSame(channelInfoList.get(0).getChannel(), 
channelInfoList.get(2).getChannel());
+            assertNotSame(channelInfoList.get(0).getChannel(), 
channelInfoList.get(1).getChannel());
+            Set<Set<SubscriptionData>> checkSubscriptionDatas = new 
HashSet<>();
+            
checkSubscriptionDatas.add(RemotingChannel.parseChannelExtendAttribute(channelInfoList.get(0).getChannel()));
+            
checkSubscriptionDatas.add(RemotingChannel.parseChannelExtendAttribute(channelInfoList.get(1).getChannel()));
+            assertTrue(checkSubscriptionDatas.contains(subscriptionDataSet));
+            assertTrue(checkSubscriptionDatas.contains(subscriptionDataSet2));
+        }
 
-        // change local serve addr, to simulate other proxy receive messages
-        heartbeatSyncer.localProxyId = RandomStringUtils.randomAlphabetic(10);
-        
heartbeatSyncer.consumeMessage(Lists.newArrayList(convertFromMessage(messageArgumentCaptor.getAllValues().get(1))),
 null);
-        assertSame(channelInfoList.get(0).getChannel(), 
syncUnRegisterChannelInfoArgumentCaptor.getValue().getChannel());
+        {
+            // start test sync client unregister
+            // reset localServeAddr
+            ArgumentCaptor<Message> messageArgumentCaptor = 
ArgumentCaptor.forClass(Message.class);
+            
doReturn(CompletableFuture.completedFuture(okSendResult)).when(this.mqClientAPIExt)
+                .sendMessageAsync(anyString(), anyString(), 
messageArgumentCaptor.capture(), any(), anyLong());
+            heartbeatSyncer.onConsumerUnRegister(consumerGroup, 
clientChannelInfo);
+            heartbeatSyncer.onConsumerUnRegister(consumerGroup2, 
clientChannelInfo2);
+            await().atMost(Duration.ofSeconds(3)).until(() -> 
messageArgumentCaptor.getAllValues().size() == 2);
+
+            ArgumentCaptor<ClientChannelInfo> 
syncUnRegisterChannelInfoArgumentCaptor = 
ArgumentCaptor.forClass(ClientChannelInfo.class);
+            doNothing().when(consumerManager).unregisterConsumer(anyString(), 
syncUnRegisterChannelInfoArgumentCaptor.capture(), anyBoolean());
+
+            // change local serve addr, to simulate other proxy receive 
messages
+            heartbeatSyncer.localProxyId = 
RandomStringUtils.randomAlphabetic(10);
+            
heartbeatSyncer.consumeMessage(convertFromMessage(messageArgumentCaptor.getAllValues()),
 null);
+            List<ClientChannelInfo> channelInfoList = 
syncUnRegisterChannelInfoArgumentCaptor.getAllValues();
+            assertNotSame(channelInfoList.get(0).getChannel(), 
channelInfoList.get(1).getChannel());
+            Set<Set<SubscriptionData>> checkSubscriptionDatas = new 
HashSet<>();
+            
checkSubscriptionDatas.add(RemotingChannel.parseChannelExtendAttribute(channelInfoList.get(0).getChannel()));
+            
checkSubscriptionDatas.add(RemotingChannel.parseChannelExtendAttribute(channelInfoList.get(1).getChannel()));
+            assertTrue(checkSubscriptionDatas.contains(subscriptionDataSet));
+            assertTrue(checkSubscriptionDatas.contains(subscriptionDataSet2));
+        }
     }
 
     private MessageExt convertFromMessage(Message message) {
@@ -282,6 +327,10 @@ public class HeartbeatSyncerTest extends InitConfigTest {
         return messageExt;
     }
 
+    private List<MessageExt> convertFromMessage(List<Message> message) {
+        return 
message.stream().map(this::convertFromMessage).collect(Collectors.toList());
+    }
+
     private Channel createMockChannel() {
         return new MockChannel(RandomStringUtils.randomAlphabetic(10));
     }

Reply via email to