This is an automated email from the ASF dual-hosted git repository.
kaili pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 0e3f8b4d7 Fix the issue that TransactionMessageIT can not pass (#5394)
0e3f8b4d7 is described below
commit 0e3f8b4d7265b17b3592ec0432859d482bc6a9d1
Author: rongtong <[email protected]>
AuthorDate: Mon Oct 24 18:45:05 2022 +0800
Fix the issue that TransactionMessageIT can not pass (#5394)
---
.../test/container/TransactionMessageIT.java | 35 ++++++++--------------
1 file changed, 13 insertions(+), 22 deletions(-)
diff --git
a/test/src/test/java/org/apache/rocketmq/test/container/TransactionMessageIT.java
b/test/src/test/java/org/apache/rocketmq/test/container/TransactionMessageIT.java
index 06566e46f..e2e020d8c 100644
---
a/test/src/test/java/org/apache/rocketmq/test/container/TransactionMessageIT.java
+++
b/test/src/test/java/org/apache/rocketmq/test/container/TransactionMessageIT.java
@@ -46,11 +46,11 @@ import static org.awaitility.Awaitility.await;
public class TransactionMessageIT extends ContainerIntegrationTestBase {
private static final String MESSAGE_STRING =
RandomStringUtils.random(1024);
- private static byte[] MESSAGE_BODY;
+ private static byte[] messageBody;
static {
try {
- MESSAGE_BODY =
MESSAGE_STRING.getBytes(RemotingHelper.DEFAULT_CHARSET);
+ messageBody =
MESSAGE_STRING.getBytes(RemotingHelper.DEFAULT_CHARSET);
} catch (UnsupportedEncodingException ignored) {
}
}
@@ -83,7 +83,7 @@ public class TransactionMessageIT extends
ContainerIntegrationTestBase {
producer.start();
for (int i = 0; i < MESSAGE_COUNT; i++) {
- Message msg = new Message(topic, MESSAGE_BODY);
+ Message msg = new Message(topic, messageBody);
TransactionSendResult result =
producer.sendMessageInTransaction(msg, null);
assertThat(result.getLocalTransactionState()).isEqualTo(LocalTransactionState.COMMIT_MESSAGE);
}
@@ -106,7 +106,6 @@ public class TransactionMessageIT extends
ContainerIntegrationTestBase {
public void consumeTransactionMsgLocalEscape() throws Exception {
final String topic = generateTopic();
createTopicTo(master1With3Replicas, topic, 1, 1);
- System.out.println("topic " + topic + " created");
final String group = generateGroup();
DefaultMQPushConsumer pushConsumer = createPushConsumer(group);
@@ -115,7 +114,6 @@ public class TransactionMessageIT extends
ContainerIntegrationTestBase {
Map<String, Message> msgSentMap = new HashMap<>();
pushConsumer.registerMessageListener((MessageListenerConcurrently)
(msgs, context) -> {
for (MessageExt msg : msgs) {
- System.out.println("receive trans msgId=" + msg.getMsgId() +
", transactionId=" + msg.getTransactionId());
if (msgSentMap.containsKey(msg.getMsgId())) {
receivedMsgCount.incrementAndGet();
}
@@ -130,11 +128,10 @@ public class TransactionMessageIT extends
ContainerIntegrationTestBase {
producer.start();
for (int i = 0; i < MESSAGE_COUNT; i++) {
- Message msg = new Message(topic, MESSAGE_BODY);
+ Message msg = new Message(topic, messageBody);
msg.setKeys(UUID.randomUUID().toString());
SendResult result = producer.sendMessageInTransaction(msg, null);
String msgId = result.getMsgId();
- System.out.println("Sent trans msgid=" + msgId + ",
transactionId=" + result.getTransactionId() + ", key=" + msg.getKeys());
msgSentMap.put(msgId, msg);
}
@@ -143,8 +140,8 @@ public class TransactionMessageIT extends
ContainerIntegrationTestBase {
brokerContainer1.removeBroker(new
BrokerIdentity(master1With3Replicas.getBrokerIdentity().getBrokerClusterName(),
master1With3Replicas.getBrokerIdentity().getBrokerName(),
master1With3Replicas.getBrokerIdentity().getBrokerId()));
- System.out.println("=========" +
master1With3Replicas.getBrokerIdentity().getBrokerName() + "-"
- + master1With3Replicas.getBrokerIdentity().getBrokerId() + "
removed");
+ System.out.printf("=========" +
master1With3Replicas.getBrokerIdentity().getBrokerName() + "-"
+ + master1With3Replicas.getBrokerIdentity().getBrokerId() + "
removed%n");
createTopicTo(master2With3Replicas, topic, 1, 1);
transactionCheckListener.setShouldReturnUnknownState(false);
@@ -169,7 +166,6 @@ public class TransactionMessageIT extends
ContainerIntegrationTestBase {
pushConsumer2.subscribe(topic, "*");
pushConsumer2.registerMessageListener((MessageListenerConcurrently)
(msgs, context) -> {
for (MessageExt msg : msgs) {
- System.out.println("[After master recovered] receive trans
msgId=" + msg.getMsgId() + ", transactionId=" + msg.getTransactionId());
if (msgSentMap.containsKey(msg.getMsgId())) {
receivedMsgCount.incrementAndGet();
}
@@ -178,17 +174,15 @@ public class TransactionMessageIT extends
ContainerIntegrationTestBase {
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
pushConsumer2.start();
- System.out.println("Wait for checking...");
+ System.out.printf("Wait for checking...%n");
Thread.sleep(10000L);
-
}
@Test
public void consumeTransactionMsgRemoteEscape() throws Exception {
final String topic = generateTopic();
createTopicTo(master1With3Replicas, topic, 1, 1);
- System.out.println("topic " + topic + " created");
final String group = generateGroup();
@@ -198,7 +192,6 @@ public class TransactionMessageIT extends
ContainerIntegrationTestBase {
pushConsumer.subscribe(topic, "*");
pushConsumer.registerMessageListener((MessageListenerConcurrently)
(msgs, context) -> {
for (MessageExt msg : msgs) {
- System.out.println("receive trans msgId=" + msg.getMsgId() +
", transactionId=" + msg.getTransactionId());
if (msgSentMap.containsKey(msg.getMsgId())) {
receivedMsgCount.incrementAndGet();
}
@@ -213,11 +206,10 @@ public class TransactionMessageIT extends
ContainerIntegrationTestBase {
producer.start();
for (int i = 0; i < MESSAGE_COUNT; i++) {
- Message msg = new Message(topic, MESSAGE_BODY);
+ Message msg = new Message(topic, messageBody);
msg.setKeys(UUID.randomUUID().toString());
SendResult result = producer.sendMessageInTransaction(msg, null);
String msgId = result.getMsgId();
- System.out.println("Sent trans msgid=" + msgId + ",
transactionId=" + result.getTransactionId() + ", key=" + msg.getKeys());
msgSentMap.put(msgId, msg);
}
@@ -226,8 +218,8 @@ public class TransactionMessageIT extends
ContainerIntegrationTestBase {
brokerContainer1.removeBroker(new
BrokerIdentity(master1With3Replicas.getBrokerIdentity().getBrokerClusterName(),
master1With3Replicas.getBrokerIdentity().getBrokerName(),
master1With3Replicas.getBrokerIdentity().getBrokerId()));
- System.out.println("=========" +
master1With3Replicas.getBrokerIdentity().getBrokerName() + "-"
- + master1With3Replicas.getBrokerIdentity().getBrokerId() + "
removed");
+ System.out.printf("=========" +
master1With3Replicas.getBrokerIdentity().getBrokerName() + "-"
+ + master1With3Replicas.getBrokerIdentity().getBrokerId() + "
removed%n");
createTopicTo(master2With3Replicas, topic, 1, 1);
createTopicTo(master3With3Replicas, topic, 1, 1);
@@ -235,9 +227,9 @@ public class TransactionMessageIT extends
ContainerIntegrationTestBase {
brokerContainer2.removeBroker(new
BrokerIdentity(master2With3Replicas.getBrokerIdentity().getBrokerClusterName(),
master2With3Replicas.getBrokerIdentity().getBrokerName(),
master2With3Replicas.getBrokerIdentity().getBrokerId()));
- System.out.println("=========" +
master2With3Replicas.getBrokerIdentity().getBrokerClusterName() + "-"
+ System.out.printf("=========" +
master2With3Replicas.getBrokerIdentity().getBrokerClusterName() + "-"
+ master2With3Replicas.getBrokerIdentity().getBrokerName()
- + "-" + master2With3Replicas.getBrokerIdentity().getBrokerId() + "
removed");
+ + "-" + master2With3Replicas.getBrokerIdentity().getBrokerId() + "
removed%n");
pushConsumer.getDefaultMQPushConsumerImpl().getRebalanceImpl().doRebalance(false);
transactionCheckListener.setShouldReturnUnknownState(false);
@@ -268,7 +260,6 @@ public class TransactionMessageIT extends
ContainerIntegrationTestBase {
pushConsumer2.subscribe(topic, "*");
pushConsumer2.registerMessageListener((MessageListenerConcurrently)
(msgs, context) -> {
for (MessageExt msg : msgs) {
- System.out.println("[After master recovered] receive trans
msgId=" + msg.getMsgId() + ", transactionId=" + msg.getTransactionId());
if (msgSentMap.containsKey(msg.getMsgId())) {
receivedMsgCount.incrementAndGet();
}
@@ -277,7 +268,7 @@ public class TransactionMessageIT extends
ContainerIntegrationTestBase {
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
pushConsumer2.start();
- System.out.println("Wait for checking...");
+ System.out.printf("Wait for checking...%n");
Thread.sleep(10000L);
assertThat(receivedMsgCount.get()).isEqualTo(0);
pushConsumer2.shutdown();