This is an automated email from the ASF dual-hosted git repository. kaili pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push: new 3808387e13 [ISSUE #7429] clean channel map when CLIENT_UNREGISTER in proxy 3808387e13 is described below commit 3808387e1389278edbe4ef023d200ecb3015622b Author: lk <xdk...@outlook.com> AuthorDate: Mon Oct 9 16:07:56 2023 +0800 [ISSUE #7429] clean channel map when CLIENT_UNREGISTER in proxy --- .../proxy/service/sysmessage/HeartbeatSyncer.java | 31 ++++++---- .../service/sysmessage/HeartbeatSyncerTest.java | 68 ++++++++++++++++++++++ 2 files changed, 88 insertions(+), 11 deletions(-) diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncer.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncer.java index f70c06b8f4..fee3ea87d2 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncer.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncer.java @@ -18,6 +18,7 @@ package org.apache.rocketmq.proxy.service.sysmessage; import com.alibaba.fastjson.JSON; +import io.netty.channel.Channel; import java.nio.charset.StandardCharsets; import java.util.List; import java.util.Map; @@ -73,16 +74,8 @@ public class HeartbeatSyncer extends AbstractSystemMessageSyncer { ); this.consumerManager.appendConsumerIdsChangeListener(new ConsumerIdsChangeListener() { @Override - public void handle(ConsumerGroupEvent event, String s, Object... args) { - if (event == ConsumerGroupEvent.CLIENT_UNREGISTER) { - if (args == null || args.length < 1) { - return; - } - if (args[0] instanceof ClientChannelInfo) { - ClientChannelInfo clientChannelInfo = (ClientChannelInfo) args[0]; - remoteChannelMap.remove(clientChannelInfo.getChannel().id().asLongText()); - } - } + public void handle(ConsumerGroupEvent event, String group, Object... args) { + processConsumerGroupEvent(event, group, args); } @Override @@ -98,6 +91,18 @@ public class HeartbeatSyncer extends AbstractSystemMessageSyncer { super.shutdown(); } + protected void processConsumerGroupEvent(ConsumerGroupEvent event, String group, Object... args) { + if (event == ConsumerGroupEvent.CLIENT_UNREGISTER) { + if (args == null || args.length < 1) { + return; + } + if (args[0] instanceof ClientChannelInfo) { + ClientChannelInfo clientChannelInfo = (ClientChannelInfo) args[0]; + remoteChannelMap.remove(buildKey(group, clientChannelInfo.getChannel())); + } + } + } + public void onConsumerRegister(String consumerGroup, ClientChannelInfo clientChannelInfo, ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere, Set<SubscriptionData> subList) { @@ -189,7 +194,7 @@ public class HeartbeatSyncer extends AbstractSystemMessageSyncer { } RemoteChannel decodedChannel = RemoteChannel.decode(data.getChannelData()); - RemoteChannel channel = remoteChannelMap.computeIfAbsent(data.getGroup() + "@" + decodedChannel.id().asLongText(), key -> decodedChannel); + RemoteChannel channel = remoteChannelMap.computeIfAbsent(buildKey(data.getGroup(), decodedChannel), key -> decodedChannel); channel.setExtendAttribute(decodedChannel.getChannelExtendAttribute()); ClientChannelInfo clientChannelInfo = new ClientChannelInfo( channel, @@ -228,4 +233,8 @@ public class HeartbeatSyncer extends AbstractSystemMessageSyncer { // use local address, remoting port and grpc port to build unique local proxy Id return proxyConfig.getLocalServeAddr() + "%" + proxyConfig.getRemotingListenPort() + "%" + proxyConfig.getGrpcServerPort(); } + + private static String buildKey(String group, Channel channel) { + return group + "@" + channel.id().asLongText(); + } } diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerTest.java index 43fba3d03c..9a2c5e3437 100644 --- a/proxy/src/test/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerTest.java +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerTest.java @@ -27,6 +27,7 @@ import com.google.common.collect.Sets; import io.netty.channel.Channel; import io.netty.channel.ChannelId; import java.time.Duration; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -35,6 +36,7 @@ import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; import org.apache.commons.lang3.RandomStringUtils; import org.apache.rocketmq.broker.client.ClientChannelInfo; +import org.apache.rocketmq.broker.client.ConsumerGroupEvent; import org.apache.rocketmq.broker.client.ConsumerManager; import org.apache.rocketmq.client.impl.mqclient.MQClientAPIExt; import org.apache.rocketmq.client.impl.mqclient.MQClientAPIFactory; @@ -320,6 +322,72 @@ public class HeartbeatSyncerTest extends InitConfigTest { } } + @Test + public void testProcessConsumerGroupEventForRemoting() { + String consumerGroup = "consumerGroup"; + Channel channel = createMockChannel(); + RemotingProxyOutClient remotingProxyOutClient = mock(RemotingProxyOutClient.class); + RemotingChannel remotingChannel = new RemotingChannel(remotingProxyOutClient, proxyRelayService, channel, clientId, Collections.emptySet()); + ClientChannelInfo clientChannelInfo = new ClientChannelInfo( + remotingChannel, + clientId, + LanguageCode.JAVA, + 4 + ); + + testProcessConsumerGroupEvent(consumerGroup, clientChannelInfo); + } + + @Test + public void testProcessConsumerGroupEventForGrpcV2() { + String consumerGroup = "consumerGroup"; + GrpcClientSettingsManager grpcClientSettingsManager = mock(GrpcClientSettingsManager.class); + GrpcChannelManager grpcChannelManager = mock(GrpcChannelManager.class); + GrpcClientChannel grpcClientChannel = new GrpcClientChannel( + proxyRelayService, grpcClientSettingsManager, grpcChannelManager, + ProxyContext.create().setRemoteAddress(remoteAddress).setLocalAddress(localAddress), + clientId); + ClientChannelInfo clientChannelInfo = new ClientChannelInfo( + grpcClientChannel, + clientId, + LanguageCode.JAVA, + 5 + ); + + testProcessConsumerGroupEvent(consumerGroup, clientChannelInfo); + } + + private void testProcessConsumerGroupEvent(String consumerGroup, ClientChannelInfo clientChannelInfo) { + HeartbeatSyncer heartbeatSyncer = new HeartbeatSyncer(topicRouteService, adminService, consumerManager, mqClientAPIFactory, null); + SendResult okSendResult = new SendResult(); + okSendResult.setSendStatus(SendStatus.SEND_OK); + + ArgumentCaptor<Message> messageArgumentCaptor = ArgumentCaptor.forClass(Message.class); + doReturn(CompletableFuture.completedFuture(okSendResult)).when(this.mqClientAPIExt) + .sendMessageAsync(anyString(), anyString(), messageArgumentCaptor.capture(), any(), anyLong()); + + heartbeatSyncer.onConsumerRegister( + consumerGroup, + clientChannelInfo, + ConsumeType.CONSUME_PASSIVELY, + MessageModel.CLUSTERING, + ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET, + Collections.emptySet() + ); + await().atMost(Duration.ofSeconds(3)).until(() -> messageArgumentCaptor.getAllValues().size() == 1); + + // change local serve addr, to simulate other proxy receive messages + heartbeatSyncer.localProxyId = RandomStringUtils.randomAlphabetic(10); + ArgumentCaptor<ClientChannelInfo> channelInfoArgumentCaptor = ArgumentCaptor.forClass(ClientChannelInfo.class); + doReturn(true).when(consumerManager).registerConsumer(anyString(), channelInfoArgumentCaptor.capture(), any(), any(), any(), any(), anyBoolean()); + + heartbeatSyncer.consumeMessage(convertFromMessage(messageArgumentCaptor.getAllValues()), null); + assertEquals(1, heartbeatSyncer.remoteChannelMap.size()); + + heartbeatSyncer.processConsumerGroupEvent(ConsumerGroupEvent.CLIENT_UNREGISTER, consumerGroup, channelInfoArgumentCaptor.getValue()); + assertTrue(heartbeatSyncer.remoteChannelMap.isEmpty()); + } + private MessageExt convertFromMessage(Message message) { MessageExt messageExt = new MessageExt(); messageExt.setTopic(message.getTopic());