This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new e3c2111fe [ISSUE #4821] Add some integration tests for POP consumption
in slave-acting-master mode (#4822)
e3c2111fe is described below
commit e3c2111febcc90e64c3860cef539bb6b9780c12c
Author: caigy <[email protected]>
AuthorDate: Tue Aug 16 21:39:41 2022 +0800
[ISSUE #4821] Add some integration tests for POP consumption in
slave-acting-master mode (#4822)
* fix #4821
* add license header and ignore executing integration tests in automatic
flow
* fix unit test
* fix unit test
---
.../processor/ChangeInvisibleTimeProcessor.java | 2 +-
.../broker/processor/PopBufferMergeService.java | 13 +-
.../broker/processor/PopMessageProcessor.java | 4 +-
.../broker/processor/PopReviveService.java | 2 +-
.../ChangeInvisibleTimeProcessorTest.java | 11 +-
.../broker/processor/PopMessageProcessorTest.java | 1 -
.../test/container/PopSlaveActingMasterIT.java | 588 +++++++++++++++++++++
7 files changed, 613 insertions(+), 8 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessor.java
index bb68badc1..76c1b908e 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessor.java
@@ -189,7 +189,7 @@ public class ChangeInvisibleTimeProcessor implements
NettyRequestProcessor {
msgInner.setDeliverTimeMs(ck.getReviveTime() -
PopAckConstants.ackTimeInterval);
msgInner.getProperties().put(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX,
PopMessageProcessor.genCkUniqueId(ck));
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
- PutMessageResult putMessageResult =
this.brokerController.getMessageStore().putMessage(msgInner);
+ PutMessageResult putMessageResult =
this.brokerController.getEscapeBridge().putMessageToSpecificQueue(msgInner);
if (brokerController.getBrokerConfig().isEnablePopLog()) {
POP_LOGGER.info("change Invisible , appendCheckPoint, topic {},
queueId {},reviveId {}, cid {}, startOffset {}, rt {}, result {}",
requestHeader.getTopic(), queueId, reviveQid, requestHeader.getConsumerGroup(),
offset,
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java
index 4b2df0875..bb432a851 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java
@@ -538,7 +538,7 @@ public class PopBufferMergeService extends ServiceThread {
return;
}
MessageExtBrokerInner msgInner =
popMessageProcessor.buildCkMsg(pointWrapper.getCk(),
pointWrapper.getReviveQueueId());
- PutMessageResult putMessageResult =
brokerController.getMessageStore().putMessage(msgInner);
+ PutMessageResult putMessageResult =
brokerController.getEscapeBridge().putMessageToSpecificQueue(msgInner);
if (putMessageResult.getPutMessageStatus() != PutMessageStatus.PUT_OK
&& putMessageResult.getPutMessageStatus() !=
PutMessageStatus.FLUSH_DISK_TIMEOUT
&& putMessageResult.getPutMessageStatus() !=
PutMessageStatus.FLUSH_SLAVE_TIMEOUT
@@ -547,7 +547,14 @@ public class PopBufferMergeService extends ServiceThread {
return;
}
pointWrapper.setCkStored(true);
-
pointWrapper.setReviveQueueOffset(putMessageResult.getAppendMessageResult().getLogicsOffset());
+
+ if (putMessageResult.isRemotePut()) {
+ //No AppendMessageResult when escaping remotely
+ pointWrapper.setReviveQueueOffset(0);
+ } else {
+
pointWrapper.setReviveQueueOffset(putMessageResult.getAppendMessageResult().getLogicsOffset());
+ }
+
if (brokerController.getBrokerConfig().isEnablePopLog()) {
POP_LOGGER.info("[PopBuffer]put ck to store ok: {}, {}",
pointWrapper, putMessageResult);
}
@@ -575,7 +582,7 @@ public class PopBufferMergeService extends ServiceThread {
msgInner.getProperties().put(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX,
PopMessageProcessor.genAckUniqueId(ackMsg));
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
- PutMessageResult putMessageResult =
brokerController.getMessageStore().putMessage(msgInner);
+ PutMessageResult putMessageResult =
brokerController.getEscapeBridge().putMessageToSpecificQueue(msgInner);
if (putMessageResult.getPutMessageStatus() != PutMessageStatus.PUT_OK
&& putMessageResult.getPutMessageStatus() !=
PutMessageStatus.FLUSH_DISK_TIMEOUT
&& putMessageResult.getPutMessageStatus() !=
PutMessageStatus.FLUSH_SLAVE_TIMEOUT
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
index 842d0d98b..0d2c5f9b5 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
@@ -477,7 +477,7 @@ public class PopMessageProcessor implements
NettyRequestProcessor {
return restNum;
}
offset = getPopOffset(topic, requestHeader, queueId, true, lockKey);
- GetMessageResult getMessageTmpResult;
+ GetMessageResult getMessageTmpResult = null;
try {
if (isOrder &&
brokerController.getConsumerOrderInfoManager().checkBlock(topic,
requestHeader.getConsumerGroup(), queueId,
requestHeader.getInvisibleTime())) {
@@ -544,6 +544,8 @@ public class PopMessageProcessor implements
NettyRequestProcessor {
//
this.brokerController.getConsumerOffsetManager().commitOffset(channel.remoteAddress().toString(),
requestHeader.getConsumerGroup(), topic,
// queueId, getMessageTmpResult.getNextBeginOffset());
}
+ } catch (Exception e) {
+ POP_LOGGER.error("Exception in popMsgFromQueue", e);
} finally {
queueLockManager.unLock(lockKey);
}
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
index 1b95cce4e..1a6c52ec3 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
@@ -106,7 +106,7 @@ public class PopReviveService extends ServiceThread {
}
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
addRetryTopicIfNoExit(msgInner.getTopic(), popCheckPoint.getCId());
- PutMessageResult putMessageResult =
brokerController.getMessageStore().putMessage(msgInner);
+ PutMessageResult putMessageResult =
brokerController.getEscapeBridge().putMessageToSpecificQueue(msgInner);
if (brokerController.getBrokerConfig().isEnablePopLog()) {
POP_LOGGER.info("reviveQueueId={},retry msg , ck={}, msg queueId
{}, offset {}, reviveDelay={}, result is {} ",
queueId, popCheckPoint, messageExt.getQueueId(),
messageExt.getQueueOffset(),
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessorTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessorTest.java
index 2a009e951..811913a26 100644
---
a/broker/src/test/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessorTest.java
+++
b/broker/src/test/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessorTest.java
@@ -22,6 +22,7 @@ import java.lang.reflect.Field;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.client.ClientChannelInfo;
import org.apache.rocketmq.broker.client.net.Broker2Client;
+import org.apache.rocketmq.broker.failover.EscapeBridge;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.message.MessageConst;
@@ -75,12 +76,20 @@ public class ChangeInvisibleTimeProcessorTest {
@Mock
private Broker2Client broker2Client;
+ @Mock
+ private EscapeBridge escapeBridge = new
EscapeBridge(this.brokerController);
+
@Before
public void init() throws IllegalAccessException, NoSuchFieldException {
brokerController.setMessageStore(messageStore);
Field field = BrokerController.class.getDeclaredField("broker2Client");
field.setAccessible(true);
field.set(brokerController, broker2Client);
+
+ Field ebField =
BrokerController.class.getDeclaredField("escapeBridge");
+ ebField.setAccessible(true);
+ ebField.set(brokerController, this.escapeBridge);
+
Channel mockChannel = mock(Channel.class);
when(handlerContext.channel()).thenReturn(mockChannel);
brokerController.getTopicConfigManager().getTopicConfigTable().put(topic, new
TopicConfig());
@@ -99,7 +108,7 @@ public class ChangeInvisibleTimeProcessorTest {
@Test
public void testProcessRequest_Success() throws RemotingCommandException,
InterruptedException, RemotingTimeoutException, RemotingSendRequestException {
-
when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new
PutMessageResult(PutMessageStatus.PUT_OK, new
AppendMessageResult(AppendMessageStatus.PUT_OK)));
+
when(escapeBridge.putMessageToSpecificQueue(any(MessageExtBrokerInner.class))).thenReturn(new
PutMessageResult(PutMessageStatus.PUT_OK, new
AppendMessageResult(AppendMessageStatus.PUT_OK)));
int queueId = 0;
long queueOffset = 0;
long popTime = System.currentTimeMillis() - 1_000;
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/processor/PopMessageProcessorTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/processor/PopMessageProcessorTest.java
index af6b4bb55..7ea20ceff 100644
---
a/broker/src/test/java/org/apache/rocketmq/broker/processor/PopMessageProcessorTest.java
+++
b/broker/src/test/java/org/apache/rocketmq/broker/processor/PopMessageProcessorTest.java
@@ -78,7 +78,6 @@ public class PopMessageProcessorTest {
public void init() {
brokerController.setMessageStore(messageStore);
popMessageProcessor = new PopMessageProcessor(brokerController);
- when(messageStore.putMessage(any())).thenReturn(new
PutMessageResult(PutMessageStatus.PUT_OK, new
AppendMessageResult(AppendMessageStatus.PUT_OK)));
Channel mockChannel = mock(Channel.class);
when(mockChannel.remoteAddress()).thenReturn(new
InetSocketAddress(1024));
when(handlerContext.channel()).thenReturn(mockChannel);
diff --git
a/test/src/test/java/org/apache/rocketmq/test/container/PopSlaveActingMasterIT.java
b/test/src/test/java/org/apache/rocketmq/test/container/PopSlaveActingMasterIT.java
new file mode 100644
index 000000000..17d5e0cc3
--- /dev/null
+++
b/test/src/test/java/org/apache/rocketmq/test/container/PopSlaveActingMasterIT.java
@@ -0,0 +1,588 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.test.container;
+
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import
org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.client.producer.SendStatus;
+import org.apache.rocketmq.common.BrokerIdentity;
+import org.apache.rocketmq.common.KeyBuilder;
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.message.MessageRequestMode;
+import org.apache.rocketmq.container.BrokerContainer;
+import org.apache.rocketmq.container.InnerBrokerController;
+import org.apache.rocketmq.container.InnerSalveBrokerController;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.io.UnsupportedEncodingException;
+import java.time.Duration;
+import java.time.LocalDateTime;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.awaitility.Awaitility.await;
+
+@Ignore
+public class PopSlaveActingMasterIT extends ContainerIntegrationTestBase {
+ private static final String CONSUME_GROUP =
PopSlaveActingMasterIT.class.getSimpleName() + "_Consumer";
+ private final static int MESSAGE_COUNT = 16;
+ private final static Random random = new Random();
+ private static DefaultMQProducer producer;
+ private final static String MESSAGE_STRING =
RandomStringUtils.random(1024);
+ private static byte[] MESSAGE_BODY;
+
+ public PopSlaveActingMasterIT() {
+ }
+
+ static {
+ try {
+ MESSAGE_BODY =
MESSAGE_STRING.getBytes(RemotingHelper.DEFAULT_CHARSET);
+ } catch (UnsupportedEncodingException e) {
+ e.printStackTrace();
+ }
+ }
+
+ void createTopic(String topic) {
+ createTopicTo(master1With3Replicas, topic, 1, 1);
+ createTopicTo(master2With3Replicas, topic, 1, 1);
+ createTopicTo(master3With3Replicas, topic, 1, 1);
+ System.out.println("Topic [" + topic + "] created");
+ }
+
+ @BeforeClass
+ public static void beforeClass() throws Throwable {
+ producer = createProducer(PopSlaveActingMasterIT.class.getSimpleName()
+ "_PRODUCER");
+ producer.setSendMsgTimeout(5000);
+ producer.start();
+ }
+
+ @AfterClass
+ public static void afterClass() throws Exception {
+ producer.shutdown();
+ }
+
+
+ @Test
+ public void testLocalActing_ackSlave() throws Exception {
+ String topic = PopSlaveActingMasterIT.class.getSimpleName() +
random.nextInt(65535);
+ createTopic(topic);
+ String retryTopic = KeyBuilder.buildPopRetryTopic(topic,
CONSUME_GROUP);
+ createTopic(retryTopic);
+
+ this.switchPop(topic);
+
+
producer.getDefaultMQProducerImpl().getmQClientFactory().updateTopicRouteInfoFromNameServer(topic);
+
+ MessageQueue messageQueue = new MessageQueue(topic,
master1With3Replicas.getBrokerConfig().getBrokerName(), 0);
+ int sendSuccess = 0;
+ for (int i = 0; i < MESSAGE_COUNT; i++) {
+ Message msg = new Message(topic, MESSAGE_BODY);
+ SendResult sendResult = producer.send(msg, messageQueue);
+ if (sendResult.getSendStatus() == SendStatus.SEND_OK) {
+ System.out.println("send message id: " +
sendResult.getMsgId());
+ sendSuccess++;
+ }
+ }
+
+ System.out.printf("send success %d%n", sendSuccess);
+ final int finalSendSuccess = sendSuccess;
+ await().atMost(Duration.ofMinutes(1)).until(() -> finalSendSuccess >=
MESSAGE_COUNT);
+
+ isolateBroker(master1With3Replicas);
+ System.out.printf("isolate master1%n");
+
+ DefaultMQPushConsumer consumer = createPushConsumer(CONSUME_GROUP);
+ consumer.subscribe(topic, "*");
+
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
+ List<String> consumedMessages = new CopyOnWriteArrayList<>();
+ consumer.registerMessageListener((MessageListenerConcurrently) (msgs,
context) -> {
+ msgs.forEach(msg -> {
+ System.out.println("receive msg id: " + msg.getMsgId());
+ consumedMessages.add(msg.getMsgId());
+ });
+ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+ });
+ consumer.setClientRebalance(false);
+ consumer.start();
+
+ await().atMost(Duration.ofMinutes(1)).until(() ->
consumedMessages.size() >= MESSAGE_COUNT);
+ System.out.printf("%s pop receive msg count: %d%n",
LocalDateTime.now(), consumedMessages.size());
+
+ consumer.shutdown();
+
+ List<String> retryMsgList = new CopyOnWriteArrayList<>();
+ DefaultMQPushConsumer pushConsumer = createPushConsumer(CONSUME_GROUP);
+ pushConsumer.subscribe(retryTopic, "*");
+ pushConsumer.registerMessageListener((MessageListenerConcurrently)
(msgs, context) -> {
+ for (MessageExt msg : msgs) {
+ System.out.printf("receive retry msg: %s %s%n", new
String(msg.getBody()), msg);
+ retryMsgList.add(new String(msg.getBody()));
+ }
+ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+ });
+ pushConsumer.start();
+
+ System.out.printf("wait for ack revive%n");
+ Thread.sleep(10000L);
+
+ assertThat(retryMsgList.size()).isEqualTo(0);
+
+ cancelIsolatedBroker(master1With3Replicas);
+ awaitUntilSlaveOK();
+
+ pushConsumer.shutdown();
+ }
+
+ @Test
+ public void testLocalActing_notAckSlave() throws Exception {
+// master1With3Replicas.getBrokerConfig().setReviveMaxSlow(0L);
+// master1With3Replicas.getBrokerConfig().setReviveInterval(0L);
+//
//master1With3Replicas.getMessageStoreConfig().setMessageDelayLevel("1s 1s 1s
1s 1s 1s 1s 1s 1s 1s 1s 1s 1s 1s 1s 1s 1s 1s");
+//
+// master2With3Replicas.getBrokerConfig().setReviveMaxSlow(0L);
+// master2With3Replicas.getBrokerConfig().setReviveInterval(0L);
+//
//master2With3Replicas.getMessageStoreConfig().setMessageDelayLevel("1s 1s 1s
1s 1s 1s 1s 1s 1s 1s 1s 1s 1s 1s 1s 1s 1s 1s");
+//
+// master3With3Replicas.getBrokerConfig().setReviveMaxSlow(0L);
+// master3With3Replicas.getBrokerConfig().setReviveInterval(0L);
+//
//master3With3Replicas.getMessageStoreConfig().setMessageDelayLevel("1s 1s 1s
1s 1s 1s 1s 1s 1s 1s 1s 1s 1s 1s 1s 1s 1s 1s");
+
+ String topic = PopSlaveActingMasterIT.class.getSimpleName() +
random.nextInt(65535);
+ createTopic(topic);
+ String retryTopic = KeyBuilder.buildPopRetryTopic(topic,
CONSUME_GROUP);
+ createTopic(retryTopic);
+
+ this.switchPop(topic);
+
+
producer.getDefaultMQProducerImpl().getmQClientFactory().updateTopicRouteInfoFromNameServer(topic);
+
+ Set<String> sendToIsolateMsgSet = new HashSet<>();
+ MessageQueue messageQueue = new MessageQueue(topic,
master1With3Replicas.getBrokerConfig().getBrokerName(), 0);
+ int sendSuccess = 0;
+ for (int i = 0; i < MESSAGE_COUNT; i++) {
+ Message msg = new Message(topic, MESSAGE_BODY);
+ SendResult sendResult = producer.send(msg, messageQueue);
+ if (sendResult.getSendStatus() == SendStatus.SEND_OK) {
+ sendToIsolateMsgSet.add(new String(msg.getBody()));
+ sendSuccess++;
+ }
+ }
+
+ System.out.printf("send success %d%n", sendSuccess);
+ final int finalSendSuccess = sendSuccess;
+ await().atMost(Duration.ofMinutes(1)).until(() -> finalSendSuccess >=
MESSAGE_COUNT);
+
+ isolateBroker(master1With3Replicas);
+ System.out.printf("isolate master1%n");
+
+ DefaultMQPushConsumer consumer = createPushConsumer(CONSUME_GROUP);
+ consumer.subscribe(topic, "*");
+
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
+ consumer.setPopInvisibleTime(5000L);
+ List<String> consumedMessages = new CopyOnWriteArrayList<>();
+ consumer.registerMessageListener((MessageListenerConcurrently) (msgs,
context) -> {
+ msgs.forEach(msg -> {
+ System.out.println("receive msg id: " + msg.getMsgId());
+
+ msg.setReconsumeTimes(0);
+
+ consumedMessages.add(msg.getMsgId());
+ });
+ return ConsumeConcurrentlyStatus.RECONSUME_LATER;
+ });
+ consumer.setClientRebalance(false);
+ consumer.start();
+
+ await().atMost(Duration.ofMinutes(1)).until(() ->
consumedMessages.size() >= MESSAGE_COUNT);
+ consumer.shutdown();
+
+ List<String> retryMsgList = new CopyOnWriteArrayList<>();
+ DefaultMQPushConsumer pushConsumer = createPushConsumer(CONSUME_GROUP);
+ pushConsumer.subscribe(retryTopic, "*");
+
pushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
+ pushConsumer.registerMessageListener((MessageListenerConcurrently)
(msgs, context) -> {
+ for (MessageExt msg : msgs) {
+ System.out.printf("receive retry msg: %s%n",
msg.getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));
+ retryMsgList.add(new String(msg.getBody()));
+ }
+ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+ });
+ pushConsumer.start();
+
+ System.out.printf(LocalDateTime.now() + ": wait for ack revive%n");
+
+ AtomicInteger failCnt = new AtomicInteger(0);
+
await().atMost(Duration.ofMinutes(3)).pollInterval(Duration.ofSeconds(10)).until(()
-> {
+ if (retryMsgList.size() < MESSAGE_COUNT) {
+ System.out.println("check FAILED" + failCnt.incrementAndGet()
+ ": retryMsgList.size=" + retryMsgList.size() + " less than " + MESSAGE_COUNT);
+ return false;
+ }
+
+ for (String msgBodyString : retryMsgList) {
+ if (!sendToIsolateMsgSet.contains(msgBodyString)) {
+ System.out.println("check FAILED: sendToIsolateMsgSet
doesn't contain " + msgBodyString);
+ return false;
+ }
+ }
+ return true;
+ });
+
+ System.out.printf(LocalDateTime.now() + ": receive retry msg
size=%d%n", retryMsgList.size());
+
+ cancelIsolatedBroker(master1With3Replicas);
+ awaitUntilSlaveOK();
+
+ pushConsumer.shutdown();
+ }
+
+ @Test
+ public void testRemoteActing_ackSlave() throws Exception {
+ String topic = PopSlaveActingMasterIT.class.getSimpleName() +
random.nextInt(65535);
+ createTopic(topic);
+ String retryTopic = KeyBuilder.buildPopRetryTopic(topic,
CONSUME_GROUP);
+ createTopic(retryTopic);
+
+ switchPop(topic);
+
+
producer.getDefaultMQProducerImpl().getmQClientFactory().updateTopicRouteInfoFromNameServer(topic);
+
+ MessageQueue messageQueue = new MessageQueue(topic,
master1With3Replicas.getBrokerConfig().getBrokerName(), 0);
+ int sendSuccess = 0;
+ for (int i = 0; i < MESSAGE_COUNT; i++) {
+ Message msg = new Message(topic, MESSAGE_BODY);
+ SendResult sendResult = producer.send(msg, messageQueue);
+ if (sendResult.getSendStatus() == SendStatus.SEND_OK) {
+ System.out.println("Send message id: " +
sendResult.getMsgId());
+ sendSuccess++;
+ }
+ }
+
+ System.out.printf("%s send success %d%n", LocalDateTime.now(),
sendSuccess);
+ final int finalSendSuccess = sendSuccess;
+ await().atMost(Duration.ofMinutes(1)).until(() -> finalSendSuccess >=
MESSAGE_COUNT);
+
+ isolateBroker(master1With3Replicas);
+ System.out.printf("%s isolate master1%n", LocalDateTime.now());
+
+ isolateBroker(master2With3Replicas);
+ brokerContainer2.removeBroker(new BrokerIdentity(
+ master2With3Replicas.getBrokerConfig().getBrokerClusterName(),
+ master2With3Replicas.getBrokerConfig().getBrokerName(),
+ master2With3Replicas.getBrokerConfig().getBrokerId()));
+ System.out.printf("%s Remove master2%n", LocalDateTime.now());
+
+
+ DefaultMQPushConsumer consumer = createPushConsumer(CONSUME_GROUP);
+ consumer.subscribe(topic, "*");
+
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
+ List<String> consumedMessages = new CopyOnWriteArrayList<>();
+ consumer.registerMessageListener((MessageListenerConcurrently) (msgs,
context) -> {
+ msgs.forEach(msg -> {
+ System.out.println("receive msg id: " + msg.getMsgId());
+ consumedMessages.add(msg.getMsgId());
+ });
+ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+ });
+ consumer.setClientRebalance(false);
+ consumer.start();
+
+ await().atMost(Duration.ofMinutes(2)).until(() ->
consumedMessages.size() >= MESSAGE_COUNT);
+ consumer.shutdown();
+ System.out.printf("%s %d messages consumed%n", LocalDateTime.now(),
consumedMessages.size());
+
+ List<String> retryMsgList = new CopyOnWriteArrayList<>();
+ DefaultMQPushConsumer pushConsumer = createPushConsumer(CONSUME_GROUP);
+ pushConsumer.subscribe(retryTopic, "*");
+ pushConsumer.registerMessageListener((MessageListenerConcurrently)
(msgs, context) -> {
+ for (MessageExt msg : msgs) {
+ System.out.printf("receive retry msg: %s %n",
msg.getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));
+ retryMsgList.add(new String(msg.getBody()));
+ }
+ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+ });
+ pushConsumer.start();
+
+ System.out.printf("%s wait for ack revive%n", LocalDateTime.now());
+ Thread.sleep(10000);
+
+ assertThat(retryMsgList.size()).isEqualTo(0);
+
+ cancelIsolatedBroker(master1With3Replicas);
+ System.out.printf("%s Cancel isolate master1%n", LocalDateTime.now());
+
+ //Add back master
+ master2With3Replicas =
brokerContainer2.addBroker(master2With3Replicas.getBrokerConfig(),
master2With3Replicas.getMessageStoreConfig());
+ master2With3Replicas.start();
+ cancelIsolatedBroker(master2With3Replicas);
+ System.out.printf("%s Add back master2%n", LocalDateTime.now());
+
+ awaitUntilSlaveOK();
+
+ System.out.printf("%s wait for ack revive%n", LocalDateTime.now());
+ Thread.sleep(10000);
+
+ assertThat(retryMsgList.size()).isEqualTo(0);
+
+ System.out.printf("%s shutting down pushConsumer%n",
LocalDateTime.now());
+ pushConsumer.shutdown();
+ }
+
+ @Test
+ public void testRemoteActing_notAckSlave_getFromLocal() throws Exception {
+ String topic = PopSlaveActingMasterIT.class.getSimpleName() +
random.nextInt(65535);
+ createTopic(topic);
+ this.switchPop(topic);
+
+ String retryTopic = KeyBuilder.buildPopRetryTopic(topic,
CONSUME_GROUP);
+ createTopic(retryTopic);
+
+
producer.getDefaultMQProducerImpl().getmQClientFactory().updateTopicRouteInfoFromNameServer(topic);
+
+ Set<String> sendToIsolateMsgSet = new HashSet<>();
+ MessageQueue messageQueue = new MessageQueue(topic,
master1With3Replicas.getBrokerConfig().getBrokerName(), 0);
+ int sendSuccess = 0;
+ for (int i = 0; i < MESSAGE_COUNT; i++) {
+ Message msg = new Message(topic, MESSAGE_BODY);
+ SendResult sendResult = producer.send(msg, messageQueue);
+ if (sendResult.getSendStatus() == SendStatus.SEND_OK) {
+ sendToIsolateMsgSet.add(new String(msg.getBody()));
+ sendSuccess++;
+ }
+ }
+
+ System.out.printf("send success %d%n", sendSuccess);
+ final int finalSendSuccess = sendSuccess;
+ await().atMost(Duration.ofMinutes(1)).until(() -> finalSendSuccess >=
MESSAGE_COUNT);
+
+ isolateBroker(master1With3Replicas);
+ System.out.printf("isolate master1%n");
+
+ isolateBroker(master2With3Replicas);
+ brokerContainer2.removeBroker(new BrokerIdentity(
+ master2With3Replicas.getBrokerConfig().getBrokerClusterName(),
+ master2With3Replicas.getBrokerConfig().getBrokerName(),
+ master2With3Replicas.getBrokerConfig().getBrokerId()));
+ System.out.printf("Remove master2%n");
+
+
+ DefaultMQPushConsumer consumer = createPushConsumer(CONSUME_GROUP);
+ consumer.subscribe(topic, "*");
+
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
+ List<String> consumedMessages = new CopyOnWriteArrayList<>();
+ consumer.registerMessageListener((MessageListenerConcurrently) (msgs,
context) -> {
+ msgs.forEach(msg -> {
+ System.out.println("receive msg id: " + msg.getMsgId());
+ consumedMessages.add(msg.getMsgId());
+ });
+ return ConsumeConcurrentlyStatus.RECONSUME_LATER;
+ });
+ consumer.setClientRebalance(false);
+ consumer.start();
+
+ await().atMost(Duration.ofMinutes(3)).until(() ->
consumedMessages.size() >= MESSAGE_COUNT);
+ consumer.shutdown();
+
+
+ List<String> retryMsgList = new CopyOnWriteArrayList<>();
+ DefaultMQPushConsumer pushConsumer = createPushConsumer(CONSUME_GROUP);
+ pushConsumer.subscribe(retryTopic, "*");
+ pushConsumer.registerMessageListener((MessageListenerConcurrently)
(msgs, context) -> {
+ for (MessageExt msg : msgs) {
+ System.out.printf("receive retry msg: %s%n",
msg.getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));
+ retryMsgList.add(new String(msg.getBody()));
+ }
+ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+ });
+ pushConsumer.start();
+
+ System.out.printf("wait for ack revive%n");
+
+ await().atMost(Duration.ofMinutes(1)).until(() -> {
+ if (retryMsgList.size() < MESSAGE_COUNT) {
+ System.out.println("check FAILED: retryMsgList.size=" +
retryMsgList.size() + " less than " + MESSAGE_COUNT);
+ return false;
+ }
+
+ for (String msgBodyString : retryMsgList) {
+ if (!sendToIsolateMsgSet.contains(msgBodyString)) {
+ System.out.println("check FAILED: sendToIsolateMsgSet
doesn't contain: " + msgBodyString);
+ return false;
+ }
+ }
+ return true;
+ });
+
+ System.out.printf("receive retry msg as expected%n");
+
+ cancelIsolatedBroker(master1With3Replicas);
+ System.out.printf("Cancel isolate master1%n");
+
+ //Add back master
+ master2With3Replicas =
brokerContainer2.addBroker(master2With3Replicas.getBrokerConfig(),
master2With3Replicas.getMessageStoreConfig());
+ master2With3Replicas.start();
+ cancelIsolatedBroker(master2With3Replicas);
+ System.out.printf("Add back master2%n");
+
+ awaitUntilSlaveOK();
+ pushConsumer.shutdown();
+ }
+
+ @Test
+ public void testRemoteActing_notAckSlave_getFromRemote() throws Exception {
+ String topic = PopSlaveActingMasterIT.class.getSimpleName() +
random.nextInt(65535);
+ createTopic(topic);
+ this.switchPop(topic);
+ String retryTopic = KeyBuilder.buildPopRetryTopic(topic,
CONSUME_GROUP);
+ createTopic(retryTopic);
+
+
producer.getDefaultMQProducerImpl().getmQClientFactory().updateTopicRouteInfoFromNameServer(topic);
+
+ Set<String> sendToIsolateMsgSet = new HashSet<>();
+ MessageQueue messageQueue = new MessageQueue(topic,
master1With3Replicas.getBrokerConfig().getBrokerName(), 0);
+ int sendSuccess = 0;
+ for (int i = 0; i < MESSAGE_COUNT; i++) {
+ Message msg = new Message(topic, MESSAGE_BODY);
+ SendResult sendResult = producer.send(msg, messageQueue);
+ if (sendResult.getSendStatus() == SendStatus.SEND_OK) {
+ sendToIsolateMsgSet.add(new String(msg.getBody()));
+ sendSuccess++;
+ }
+ }
+
+ System.out.printf("send success %d%n", sendSuccess);
+ final int finalSendSuccess = sendSuccess;
+ await().atMost(Duration.ofMinutes(1)).until(() -> finalSendSuccess >=
MESSAGE_COUNT);
+
+ isolateBroker(master1With3Replicas);
+ System.out.printf("isolate master1%n");
+
+ isolateBroker(master2With3Replicas);
+ brokerContainer2.removeBroker(new BrokerIdentity(
+ master2With3Replicas.getBrokerConfig().getBrokerClusterName(),
+ master2With3Replicas.getBrokerConfig().getBrokerName(),
+ master2With3Replicas.getBrokerConfig().getBrokerId()));
+ System.out.printf("Remove master2%n");
+
+ BrokerController slave1InBrokerContainer3 =
getSlaveFromContainerByName(brokerContainer3,
master1With3Replicas.getBrokerConfig().getBrokerName());
+ isolateBroker(slave1InBrokerContainer3);
+ brokerContainer3.removeBroker(new BrokerIdentity(
+
slave1InBrokerContainer3.getBrokerConfig().getBrokerClusterName(),
+ slave1InBrokerContainer3.getBrokerConfig().getBrokerName(),
+ slave1InBrokerContainer3.getBrokerConfig().getBrokerId()));
+ System.out.printf("Remove slave1 form container3%n");
+
+ DefaultMQPushConsumer consumer = createPushConsumer(CONSUME_GROUP);
+ consumer.subscribe(topic, "*");
+
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
+ List<String> consumedMessages = new CopyOnWriteArrayList<>();
+ consumer.registerMessageListener((MessageListenerConcurrently) (msgs,
context) -> {
+ msgs.forEach(msg -> {
+ System.out.println("receive msg id: " + msg.getMsgId());
+ consumedMessages.add(msg.getMsgId());
+ });
+ return ConsumeConcurrentlyStatus.RECONSUME_LATER;
+ });
+ consumer.setClientRebalance(false);
+ consumer.start();
+
+ await().atMost(Duration.ofMinutes(1)).until(() ->
consumedMessages.size() >= MESSAGE_COUNT);
+ System.out.printf("%s pop receive msg count: %d%n",
LocalDateTime.now(), consumedMessages.size());
+ consumer.shutdown();
+
+
+ List<String> retryMsgList = new CopyOnWriteArrayList<>();
+ DefaultMQPushConsumer pushConsumer = createPushConsumer(CONSUME_GROUP);
+ pushConsumer.subscribe(retryTopic, "*");
+ pushConsumer.registerMessageListener((MessageListenerConcurrently)
(msgs, context) -> {
+ for (MessageExt msg : msgs) {
+ System.out.printf("receive retry msg: %s%n",
msg.getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));
+ retryMsgList.add(new String(msg.getBody()));
+ }
+ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+ });
+ pushConsumer.start();
+
+ System.out.printf("wait for ack revive%n");
+ Thread.sleep(10000);
+
+ await().atMost(Duration.ofMinutes(1)).until(() -> {
+ if (retryMsgList.size() < MESSAGE_COUNT) {
+ return false;
+ }
+
+ for (String msgBodyString : retryMsgList) {
+ if (!sendToIsolateMsgSet.contains(msgBodyString)) {
+ return false;
+ }
+ }
+ return true;
+ });
+
+ System.out.printf("receive retry msg as expected%n");
+
+ cancelIsolatedBroker(master1With3Replicas);
+ System.out.printf("Cancel isolate master1%n");
+
+ //Add back master
+ master2With3Replicas =
brokerContainer2.addBroker(master2With3Replicas.getBrokerConfig(),
master2With3Replicas.getMessageStoreConfig());
+ master2With3Replicas.start();
+ cancelIsolatedBroker(master2With3Replicas);
+ System.out.printf("Add back master2%n");
+
+ //Add back slave1 to container3
+ slave1InBrokerContainer3 =
brokerContainer3.addBroker(slave1InBrokerContainer3.getBrokerConfig(),
slave1InBrokerContainer3.getMessageStoreConfig());
+ slave1InBrokerContainer3.start();
+ cancelIsolatedBroker(slave1InBrokerContainer3);
+ System.out.printf("Add back slave1 to container3%n");
+
+ awaitUntilSlaveOK();
+ pushConsumer.shutdown();
+ }
+
+ private void switchPop(String topic) throws Exception {
+ for (BrokerContainer brokerContainer : brokerContainerList) {
+ for (InnerBrokerController master :
brokerContainer.getMasterBrokers()) {
+ String brokerAddr = master.getBrokerAddr();
+ defaultMQAdminExt.setMessageRequestMode(brokerAddr, topic,
CONSUME_GROUP, MessageRequestMode.POP, 8, 60_000);
+ }
+ for (InnerSalveBrokerController slave :
brokerContainer.getSlaveBrokers()) {
+ defaultMQAdminExt.setMessageRequestMode(slave.getBrokerAddr(),
topic, CONSUME_GROUP, MessageRequestMode.POP, 8, 60_000);
+ }
+ }
+
+ }
+
+}
\ No newline at end of file