This is an automated email from the ASF dual-hosted git repository.
kaili pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new eef581b464 [ISSUE #6785] Isolate the remoteChannel by group (#6786)
eef581b464 is described below
commit eef581b464d0144a3ec400a20087196f7eefd764
Author: lk <[email protected]>
AuthorDate: Tue May 23 16:41:46 2023 +0800
[ISSUE #6785] Isolate the remoteChannel by group (#6786)
---
.../proxy/service/sysmessage/HeartbeatSyncer.java | 4 +-
.../service/sysmessage/HeartbeatSyncerTest.java | 145 ++++++++++++++-------
2 files changed, 99 insertions(+), 50 deletions(-)
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncer.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncer.java
index fb3903697b..3333ebd2d9 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncer.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncer.java
@@ -49,7 +49,7 @@ public class HeartbeatSyncer extends
AbstractSystemMessageSyncer {
protected ThreadPoolExecutor threadPoolExecutor;
protected ConsumerManager consumerManager;
- protected final Map<String /* channelId as longText */, RemoteChannel>
remoteChannelMap = new ConcurrentHashMap<>();
+ protected final Map<String /* group @ channelId as longText */,
RemoteChannel> remoteChannelMap = new ConcurrentHashMap<>();
protected String localProxyId;
public HeartbeatSyncer(TopicRouteService topicRouteService, AdminService
adminService,
@@ -188,7 +188,7 @@ public class HeartbeatSyncer extends
AbstractSystemMessageSyncer {
}
RemoteChannel decodedChannel =
RemoteChannel.decode(data.getChannelData());
- RemoteChannel channel =
remoteChannelMap.computeIfAbsent(decodedChannel.id().asLongText(), key ->
decodedChannel);
+ RemoteChannel channel =
remoteChannelMap.computeIfAbsent(data.getGroup() + "@" +
decodedChannel.id().asLongText(), key -> decodedChannel);
channel.setExtendAttribute(decodedChannel.getChannelExtendAttribute());
ClientChannelInfo clientChannelInfo = new ClientChannelInfo(
channel,
diff --git
a/proxy/src/test/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerTest.java
b/proxy/src/test/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerTest.java
index 078a1bc997..6540523598 100644
---
a/proxy/src/test/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerTest.java
+++
b/proxy/src/test/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerTest.java
@@ -32,9 +32,12 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.rocketmq.broker.client.ClientChannelInfo;
import org.apache.rocketmq.broker.client.ConsumerManager;
+import org.apache.rocketmq.client.impl.mqclient.MQClientAPIExt;
+import org.apache.rocketmq.client.impl.mqclient.MQClientAPIFactory;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
@@ -50,8 +53,6 @@ import
org.apache.rocketmq.proxy.remoting.RemotingProxyOutClient;
import org.apache.rocketmq.proxy.remoting.channel.RemotingChannel;
import org.apache.rocketmq.proxy.service.admin.AdminService;
import org.apache.rocketmq.proxy.service.channel.SimpleChannel;
-import org.apache.rocketmq.client.impl.mqclient.MQClientAPIExt;
-import org.apache.rocketmq.client.impl.mqclient.MQClientAPIFactory;
import org.apache.rocketmq.proxy.service.relay.ProxyRelayService;
import org.apache.rocketmq.proxy.service.route.MessageQueueView;
import org.apache.rocketmq.proxy.service.route.TopicRouteService;
@@ -74,7 +75,9 @@ import org.mockito.junit.MockitoJUnitRunner;
import static org.awaitility.Awaitility.await;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyLong;
@@ -215,64 +218,106 @@ public class HeartbeatSyncerTest extends InitConfigTest {
@Test
public void testSyncRemotingChannel() throws Exception {
String consumerGroup = "consumerGroup";
+ String consumerGroup2 = "consumerGroup2";
+ Channel channel = createMockChannel();
Set<SubscriptionData> subscriptionDataSet = new HashSet<>();
subscriptionDataSet.add(FilterAPI.buildSubscriptionData("topic",
"tagSub"));
+ Set<SubscriptionData> subscriptionDataSet2 = new HashSet<>();
+ subscriptionDataSet2.add(FilterAPI.buildSubscriptionData("topic2",
"tagSub2"));
RemotingProxyOutClient remotingProxyOutClient =
mock(RemotingProxyOutClient.class);
- RemotingChannel remotingChannel = new
RemotingChannel(remotingProxyOutClient, proxyRelayService, createMockChannel(),
clientId, subscriptionDataSet);
+ RemotingChannel remotingChannel = new
RemotingChannel(remotingProxyOutClient, proxyRelayService, channel, clientId,
subscriptionDataSet);
ClientChannelInfo clientChannelInfo = new ClientChannelInfo(
remotingChannel,
clientId,
LanguageCode.JAVA,
4
);
-
- ArgumentCaptor<Message> messageArgumentCaptor =
ArgumentCaptor.forClass(Message.class);
- SendResult sendResult = new SendResult();
- sendResult.setSendStatus(SendStatus.SEND_OK);
-
doReturn(CompletableFuture.completedFuture(sendResult)).when(this.mqClientAPIExt)
- .sendMessageAsync(anyString(), anyString(),
messageArgumentCaptor.capture(), any(), anyLong());
-
- HeartbeatSyncer heartbeatSyncer = new
HeartbeatSyncer(topicRouteService, adminService, consumerManager,
mqClientAPIFactory);
- heartbeatSyncer.onConsumerRegister(
- consumerGroup,
- clientChannelInfo,
- ConsumeType.CONSUME_PASSIVELY,
- MessageModel.CLUSTERING,
- ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET,
- subscriptionDataSet
+ RemotingChannel remotingChannel2 = new
RemotingChannel(remotingProxyOutClient, proxyRelayService, channel, clientId,
subscriptionDataSet2);
+ ClientChannelInfo clientChannelInfo2 = new ClientChannelInfo(
+ remotingChannel2,
+ clientId,
+ LanguageCode.JAVA,
+ 4
);
- await().atMost(Duration.ofSeconds(3)).until(() ->
!messageArgumentCaptor.getAllValues().isEmpty());
-
heartbeatSyncer.consumeMessage(Lists.newArrayList(convertFromMessage(messageArgumentCaptor.getValue())),
null);
- verify(consumerManager, never()).registerConsumer(anyString(), any(),
any(), any(), any(), any(), anyBoolean());
-
- String localServeAddr =
ConfigurationManager.getProxyConfig().getLocalServeAddr();
- // change local serve addr, to simulate other proxy receive messages
- heartbeatSyncer.localProxyId = RandomStringUtils.randomAlphabetic(10);
- ArgumentCaptor<ClientChannelInfo> syncChannelInfoArgumentCaptor =
ArgumentCaptor.forClass(ClientChannelInfo.class);
- doReturn(true).when(consumerManager).registerConsumer(anyString(),
syncChannelInfoArgumentCaptor.capture(), any(), any(), any(), any(),
anyBoolean());
-
-
heartbeatSyncer.consumeMessage(Lists.newArrayList(convertFromMessage(messageArgumentCaptor.getValue())),
null);
-
heartbeatSyncer.consumeMessage(Lists.newArrayList(convertFromMessage(messageArgumentCaptor.getValue())),
null);
- assertEquals(2, syncChannelInfoArgumentCaptor.getAllValues().size());
- List<ClientChannelInfo> channelInfoList =
syncChannelInfoArgumentCaptor.getAllValues();
- assertSame(channelInfoList.get(0).getChannel(),
channelInfoList.get(1).getChannel());
- assertEquals(subscriptionDataSet,
RemotingChannel.parseChannelExtendAttribute(channelInfoList.get(0).getChannel()));
- assertEquals(subscriptionDataSet,
RemotingChannel.parseChannelExtendAttribute(channelInfoList.get(1).getChannel()));
-
- // start test sync client unregister
- // reset localServeAddr
-
ConfigurationManager.getProxyConfig().setLocalServeAddr(localServeAddr);
- heartbeatSyncer.onConsumerUnRegister(consumerGroup, clientChannelInfo);
- await().atMost(Duration.ofSeconds(3)).until(() ->
messageArgumentCaptor.getAllValues().size() == 2);
-
- ArgumentCaptor<ClientChannelInfo>
syncUnRegisterChannelInfoArgumentCaptor =
ArgumentCaptor.forClass(ClientChannelInfo.class);
- doNothing().when(consumerManager).unregisterConsumer(anyString(),
syncUnRegisterChannelInfoArgumentCaptor.capture(), anyBoolean());
+ HeartbeatSyncer heartbeatSyncer = new
HeartbeatSyncer(topicRouteService, adminService, consumerManager,
mqClientAPIFactory);
+ SendResult okSendResult = new SendResult();
+ okSendResult.setSendStatus(SendStatus.SEND_OK);
+ {
+ ArgumentCaptor<Message> messageArgumentCaptor =
ArgumentCaptor.forClass(Message.class);
+
doReturn(CompletableFuture.completedFuture(okSendResult)).when(this.mqClientAPIExt)
+ .sendMessageAsync(anyString(), anyString(),
messageArgumentCaptor.capture(), any(), anyLong());
+
+ heartbeatSyncer.onConsumerRegister(
+ consumerGroup,
+ clientChannelInfo,
+ ConsumeType.CONSUME_PASSIVELY,
+ MessageModel.CLUSTERING,
+ ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET,
+ subscriptionDataSet
+ );
+ heartbeatSyncer.onConsumerRegister(
+ consumerGroup2,
+ clientChannelInfo2,
+ ConsumeType.CONSUME_PASSIVELY,
+ MessageModel.CLUSTERING,
+ ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET,
+ subscriptionDataSet2
+ );
+
+ await().atMost(Duration.ofSeconds(3)).until(() ->
messageArgumentCaptor.getAllValues().size() == 2);
+
heartbeatSyncer.consumeMessage(convertFromMessage(messageArgumentCaptor.getAllValues()),
null);
+ verify(consumerManager, never()).registerConsumer(anyString(),
any(), any(), any(), any(), any(), anyBoolean());
+
+ // change local serve addr, to simulate other proxy receive
messages
+ heartbeatSyncer.localProxyId =
RandomStringUtils.randomAlphabetic(10);
+ ArgumentCaptor<ClientChannelInfo> syncChannelInfoArgumentCaptor =
ArgumentCaptor.forClass(ClientChannelInfo.class);
+ doReturn(true).when(consumerManager).registerConsumer(anyString(),
syncChannelInfoArgumentCaptor.capture(), any(), any(), any(), any(),
anyBoolean());
+
+
heartbeatSyncer.consumeMessage(convertFromMessage(messageArgumentCaptor.getAllValues()),
null);
+
heartbeatSyncer.consumeMessage(convertFromMessage(messageArgumentCaptor.getAllValues()),
null);
+ /*
+ data in syncChannelInfoArgumentCaptor will be like:
+ 1st, data of group1
+ 2nd, data of group2
+ 3rd, data of group1
+ 4th, data of group2
+ */
+ assertEquals(4,
syncChannelInfoArgumentCaptor.getAllValues().size());
+ List<ClientChannelInfo> channelInfoList =
syncChannelInfoArgumentCaptor.getAllValues();
+ assertSame(channelInfoList.get(0).getChannel(),
channelInfoList.get(2).getChannel());
+ assertNotSame(channelInfoList.get(0).getChannel(),
channelInfoList.get(1).getChannel());
+ Set<Set<SubscriptionData>> checkSubscriptionDatas = new
HashSet<>();
+
checkSubscriptionDatas.add(RemotingChannel.parseChannelExtendAttribute(channelInfoList.get(0).getChannel()));
+
checkSubscriptionDatas.add(RemotingChannel.parseChannelExtendAttribute(channelInfoList.get(1).getChannel()));
+ assertTrue(checkSubscriptionDatas.contains(subscriptionDataSet));
+ assertTrue(checkSubscriptionDatas.contains(subscriptionDataSet2));
+ }
- // change local serve addr, to simulate other proxy receive messages
- heartbeatSyncer.localProxyId = RandomStringUtils.randomAlphabetic(10);
-
heartbeatSyncer.consumeMessage(Lists.newArrayList(convertFromMessage(messageArgumentCaptor.getAllValues().get(1))),
null);
- assertSame(channelInfoList.get(0).getChannel(),
syncUnRegisterChannelInfoArgumentCaptor.getValue().getChannel());
+ {
+ // start test sync client unregister
+ // reset localServeAddr
+ ArgumentCaptor<Message> messageArgumentCaptor =
ArgumentCaptor.forClass(Message.class);
+
doReturn(CompletableFuture.completedFuture(okSendResult)).when(this.mqClientAPIExt)
+ .sendMessageAsync(anyString(), anyString(),
messageArgumentCaptor.capture(), any(), anyLong());
+ heartbeatSyncer.onConsumerUnRegister(consumerGroup,
clientChannelInfo);
+ heartbeatSyncer.onConsumerUnRegister(consumerGroup2,
clientChannelInfo2);
+ await().atMost(Duration.ofSeconds(3)).until(() ->
messageArgumentCaptor.getAllValues().size() == 2);
+
+ ArgumentCaptor<ClientChannelInfo>
syncUnRegisterChannelInfoArgumentCaptor =
ArgumentCaptor.forClass(ClientChannelInfo.class);
+ doNothing().when(consumerManager).unregisterConsumer(anyString(),
syncUnRegisterChannelInfoArgumentCaptor.capture(), anyBoolean());
+
+ // change local serve addr, to simulate other proxy receive
messages
+ heartbeatSyncer.localProxyId =
RandomStringUtils.randomAlphabetic(10);
+
heartbeatSyncer.consumeMessage(convertFromMessage(messageArgumentCaptor.getAllValues()),
null);
+ List<ClientChannelInfo> channelInfoList =
syncUnRegisterChannelInfoArgumentCaptor.getAllValues();
+ assertNotSame(channelInfoList.get(0).getChannel(),
channelInfoList.get(1).getChannel());
+ Set<Set<SubscriptionData>> checkSubscriptionDatas = new
HashSet<>();
+
checkSubscriptionDatas.add(RemotingChannel.parseChannelExtendAttribute(channelInfoList.get(0).getChannel()));
+
checkSubscriptionDatas.add(RemotingChannel.parseChannelExtendAttribute(channelInfoList.get(1).getChannel()));
+ assertTrue(checkSubscriptionDatas.contains(subscriptionDataSet));
+ assertTrue(checkSubscriptionDatas.contains(subscriptionDataSet2));
+ }
}
private MessageExt convertFromMessage(Message message) {
@@ -282,6 +327,10 @@ public class HeartbeatSyncerTest extends InitConfigTest {
return messageExt;
}
+ private List<MessageExt> convertFromMessage(List<Message> message) {
+ return
message.stream().map(this::convertFromMessage).collect(Collectors.toList());
+ }
+
private Channel createMockChannel() {
return new MockChannel(RandomStringUtils.randomAlphabetic(10));
}