This is an automated email from the ASF dual-hosted git repository.

kaili pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 3808387e13 [ISSUE #7429] clean channel map when CLIENT_UNREGISTER in 
proxy
3808387e13 is described below

commit 3808387e1389278edbe4ef023d200ecb3015622b
Author: lk <xdk...@outlook.com>
AuthorDate: Mon Oct 9 16:07:56 2023 +0800

    [ISSUE #7429] clean channel map when CLIENT_UNREGISTER in proxy
---
 .../proxy/service/sysmessage/HeartbeatSyncer.java  | 31 ++++++----
 .../service/sysmessage/HeartbeatSyncerTest.java    | 68 ++++++++++++++++++++++
 2 files changed, 88 insertions(+), 11 deletions(-)

diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncer.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncer.java
index f70c06b8f4..fee3ea87d2 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncer.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncer.java
@@ -18,6 +18,7 @@
 package org.apache.rocketmq.proxy.service.sysmessage;
 
 import com.alibaba.fastjson.JSON;
+import io.netty.channel.Channel;
 import java.nio.charset.StandardCharsets;
 import java.util.List;
 import java.util.Map;
@@ -73,16 +74,8 @@ public class HeartbeatSyncer extends 
AbstractSystemMessageSyncer {
         );
         this.consumerManager.appendConsumerIdsChangeListener(new 
ConsumerIdsChangeListener() {
             @Override
-            public void handle(ConsumerGroupEvent event, String s, Object... 
args) {
-                if (event == ConsumerGroupEvent.CLIENT_UNREGISTER) {
-                    if (args == null || args.length < 1) {
-                        return;
-                    }
-                    if (args[0] instanceof ClientChannelInfo) {
-                        ClientChannelInfo clientChannelInfo = 
(ClientChannelInfo) args[0];
-                        
remoteChannelMap.remove(clientChannelInfo.getChannel().id().asLongText());
-                    }
-                }
+            public void handle(ConsumerGroupEvent event, String group, 
Object... args) {
+                processConsumerGroupEvent(event, group, args);
             }
 
             @Override
@@ -98,6 +91,18 @@ public class HeartbeatSyncer extends 
AbstractSystemMessageSyncer {
         super.shutdown();
     }
 
+    protected void processConsumerGroupEvent(ConsumerGroupEvent event, String 
group, Object... args) {
+        if (event == ConsumerGroupEvent.CLIENT_UNREGISTER) {
+            if (args == null || args.length < 1) {
+                return;
+            }
+            if (args[0] instanceof ClientChannelInfo) {
+                ClientChannelInfo clientChannelInfo = (ClientChannelInfo) 
args[0];
+                remoteChannelMap.remove(buildKey(group, 
clientChannelInfo.getChannel()));
+            }
+        }
+    }
+
     public void onConsumerRegister(String consumerGroup, ClientChannelInfo 
clientChannelInfo,
         ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere 
consumeFromWhere,
         Set<SubscriptionData> subList) {
@@ -189,7 +194,7 @@ public class HeartbeatSyncer extends 
AbstractSystemMessageSyncer {
                 }
 
                 RemoteChannel decodedChannel = 
RemoteChannel.decode(data.getChannelData());
-                RemoteChannel channel = 
remoteChannelMap.computeIfAbsent(data.getGroup() + "@" + 
decodedChannel.id().asLongText(), key -> decodedChannel);
+                RemoteChannel channel = 
remoteChannelMap.computeIfAbsent(buildKey(data.getGroup(), decodedChannel), key 
-> decodedChannel);
                 
channel.setExtendAttribute(decodedChannel.getChannelExtendAttribute());
                 ClientChannelInfo clientChannelInfo = new ClientChannelInfo(
                     channel,
@@ -228,4 +233,8 @@ public class HeartbeatSyncer extends 
AbstractSystemMessageSyncer {
         // use local address, remoting port and grpc port to build unique 
local proxy Id
         return proxyConfig.getLocalServeAddr() + "%" + 
proxyConfig.getRemotingListenPort() + "%" + proxyConfig.getGrpcServerPort();
     }
+
+    private static String buildKey(String group, Channel channel) {
+        return group + "@" + channel.id().asLongText();
+    }
 }
diff --git 
a/proxy/src/test/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerTest.java
 
b/proxy/src/test/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerTest.java
index 43fba3d03c..9a2c5e3437 100644
--- 
a/proxy/src/test/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerTest.java
+++ 
b/proxy/src/test/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerTest.java
@@ -27,6 +27,7 @@ import com.google.common.collect.Sets;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelId;
 import java.time.Duration;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -35,6 +36,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.stream.Collectors;
 import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.rocketmq.broker.client.ClientChannelInfo;
+import org.apache.rocketmq.broker.client.ConsumerGroupEvent;
 import org.apache.rocketmq.broker.client.ConsumerManager;
 import org.apache.rocketmq.client.impl.mqclient.MQClientAPIExt;
 import org.apache.rocketmq.client.impl.mqclient.MQClientAPIFactory;
@@ -320,6 +322,72 @@ public class HeartbeatSyncerTest extends InitConfigTest {
         }
     }
 
+    @Test
+    public void testProcessConsumerGroupEventForRemoting() {
+        String consumerGroup = "consumerGroup";
+        Channel channel = createMockChannel();
+        RemotingProxyOutClient remotingProxyOutClient = 
mock(RemotingProxyOutClient.class);
+        RemotingChannel remotingChannel = new 
RemotingChannel(remotingProxyOutClient, proxyRelayService, channel, clientId, 
Collections.emptySet());
+        ClientChannelInfo clientChannelInfo = new ClientChannelInfo(
+            remotingChannel,
+            clientId,
+            LanguageCode.JAVA,
+            4
+        );
+
+        testProcessConsumerGroupEvent(consumerGroup, clientChannelInfo);
+    }
+
+    @Test
+    public void testProcessConsumerGroupEventForGrpcV2() {
+        String consumerGroup = "consumerGroup";
+        GrpcClientSettingsManager grpcClientSettingsManager = 
mock(GrpcClientSettingsManager.class);
+        GrpcChannelManager grpcChannelManager = mock(GrpcChannelManager.class);
+        GrpcClientChannel grpcClientChannel = new GrpcClientChannel(
+            proxyRelayService, grpcClientSettingsManager, grpcChannelManager,
+            
ProxyContext.create().setRemoteAddress(remoteAddress).setLocalAddress(localAddress),
+            clientId);
+        ClientChannelInfo clientChannelInfo = new ClientChannelInfo(
+            grpcClientChannel,
+            clientId,
+            LanguageCode.JAVA,
+            5
+        );
+
+        testProcessConsumerGroupEvent(consumerGroup, clientChannelInfo);
+    }
+
+    private void testProcessConsumerGroupEvent(String consumerGroup, 
ClientChannelInfo clientChannelInfo) {
+        HeartbeatSyncer heartbeatSyncer = new 
HeartbeatSyncer(topicRouteService, adminService, consumerManager, 
mqClientAPIFactory, null);
+        SendResult okSendResult = new SendResult();
+        okSendResult.setSendStatus(SendStatus.SEND_OK);
+
+        ArgumentCaptor<Message> messageArgumentCaptor = 
ArgumentCaptor.forClass(Message.class);
+        
doReturn(CompletableFuture.completedFuture(okSendResult)).when(this.mqClientAPIExt)
+            .sendMessageAsync(anyString(), anyString(), 
messageArgumentCaptor.capture(), any(), anyLong());
+
+        heartbeatSyncer.onConsumerRegister(
+            consumerGroup,
+            clientChannelInfo,
+            ConsumeType.CONSUME_PASSIVELY,
+            MessageModel.CLUSTERING,
+            ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET,
+            Collections.emptySet()
+        );
+        await().atMost(Duration.ofSeconds(3)).until(() -> 
messageArgumentCaptor.getAllValues().size() == 1);
+
+        // change local serve addr, to simulate other proxy receive messages
+        heartbeatSyncer.localProxyId = RandomStringUtils.randomAlphabetic(10);
+        ArgumentCaptor<ClientChannelInfo> channelInfoArgumentCaptor = 
ArgumentCaptor.forClass(ClientChannelInfo.class);
+        doReturn(true).when(consumerManager).registerConsumer(anyString(), 
channelInfoArgumentCaptor.capture(), any(), any(), any(), any(), anyBoolean());
+
+        
heartbeatSyncer.consumeMessage(convertFromMessage(messageArgumentCaptor.getAllValues()),
 null);
+        assertEquals(1, heartbeatSyncer.remoteChannelMap.size());
+
+        
heartbeatSyncer.processConsumerGroupEvent(ConsumerGroupEvent.CLIENT_UNREGISTER, 
consumerGroup, channelInfoArgumentCaptor.getValue());
+        assertTrue(heartbeatSyncer.remoteChannelMap.isEmpty());
+    }
+
     private MessageExt convertFromMessage(Message message) {
         MessageExt messageExt = new MessageExt();
         messageExt.setTopic(message.getTopic());

Reply via email to