This is an automated email from the ASF dual-hosted git repository.
kaili pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 57642bc63 [ISSUE #6691] Support reentrant pop orderly for broker
(#6692)
57642bc63 is described below
commit 57642bc630d5ee42cca026ae389ae3016a61bb9c
Author: lk <[email protected]>
AuthorDate: Fri May 5 15:06:11 2023 +0800
[ISSUE #6691] Support reentrant pop orderly for broker (#6692)
---
.../broker/offset/ConsumerOrderInfoManager.java | 37 ++++++++++----
.../broker/processor/AckMessageProcessor.java | 2 +-
.../broker/processor/NotificationProcessor.java | 2 +-
.../broker/processor/PopMessageProcessor.java | 14 +++---
...ConsumerOrderInfoManagerLockFreeNotifyTest.java | 5 ++
.../offset/ConsumerOrderInfoManagerTest.java | 57 ++++++++++++++++++----
.../protocol/header/PopMessageRequestHeader.java | 11 +++++
.../rocketmq/test/client/rmq/RMQPopClient.java | 7 +++
.../test/client/consumer/pop/BasePopOrderly.java | 19 +++++++-
.../test/client/consumer/pop/PopOrderlyIT.java | 38 +++++++++++++++
.../rocketmq/test/offset/OffsetResetForPopIT.java | 8 +--
11 files changed, 167 insertions(+), 33 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManager.java
b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManager.java
index 29bbe9970..2e2850dbb 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManager.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManager.java
@@ -91,7 +91,7 @@ public class ConsumerOrderInfoManager extends ConfigManager {
* @param msgQueueOffsetList the queue offsets of messages
* @param orderInfoBuilder will append order info to this builder
*/
- public void update(boolean isRetry, String topic, String group, int
queueId, long popTime, long invisibleTime,
+ public void update(String attemptId, boolean isRetry, String topic, String
group, int queueId, long popTime, long invisibleTime,
List<Long> msgQueueOffsetList, StringBuilder orderInfoBuilder) {
String key = buildKey(topic, group);
ConcurrentHashMap<Integer/*queueId*/, OrderInfo> qs = table.get(key);
@@ -106,12 +106,12 @@ public class ConsumerOrderInfoManager extends
ConfigManager {
OrderInfo orderInfo = qs.get(queueId);
if (orderInfo != null) {
- OrderInfo newOrderInfo = new OrderInfo(popTime, invisibleTime,
msgQueueOffsetList, System.currentTimeMillis(), 0);
- newOrderInfo.mergeOffsetConsumedCount(orderInfo.offsetList,
orderInfo.offsetConsumedCount);
+ OrderInfo newOrderInfo = new OrderInfo(attemptId, popTime,
invisibleTime, msgQueueOffsetList, System.currentTimeMillis(), 0);
+ newOrderInfo.mergeOffsetConsumedCount(orderInfo.attemptId,
orderInfo.offsetList, orderInfo.offsetConsumedCount);
orderInfo = newOrderInfo;
} else {
- orderInfo = new OrderInfo(popTime, invisibleTime,
msgQueueOffsetList, System.currentTimeMillis(), 0);
+ orderInfo = new OrderInfo(attemptId, popTime, invisibleTime,
msgQueueOffsetList, System.currentTimeMillis(), 0);
}
qs.put(queueId, orderInfo);
@@ -140,7 +140,7 @@ public class ConsumerOrderInfoManager extends ConfigManager
{
updateLockFreeTimestamp(topic, group, queueId, orderInfo);
}
- public boolean checkBlock(String topic, String group, int queueId, long
invisibleTime) {
+ public boolean checkBlock(String attemptId, String topic, String group,
int queueId, long invisibleTime) {
String key = buildKey(topic, group);
ConcurrentHashMap<Integer/*queueId*/, OrderInfo> qs = table.get(key);
if (qs == null) {
@@ -156,7 +156,7 @@ public class ConsumerOrderInfoManager extends ConfigManager
{
if (orderInfo == null) {
return false;
}
- return orderInfo.needBlock(invisibleTime);
+ return orderInfo.needBlock(attemptId, invisibleTime);
}
public void clearBlock(String topic, String group, int queueId) {
@@ -391,17 +391,20 @@ public class ConsumerOrderInfoManager extends
ConfigManager {
*/
@JSONField(name = "cm")
private long commitOffsetBit;
+ @JSONField(name = "a")
+ private String attemptId;
public OrderInfo() {
}
- public OrderInfo(long popTime, long invisibleTime, List<Long>
queueOffsetList, long lastConsumeTimestamp,
+ public OrderInfo(String attemptId, long popTime, long invisibleTime,
List<Long> queueOffsetList, long lastConsumeTimestamp,
long commitOffsetBit) {
this.popTime = popTime;
this.invisibleTime = invisibleTime;
this.offsetList = buildOffsetList(queueOffsetList);
this.lastConsumeTimestamp = lastConsumeTimestamp;
this.commitOffsetBit = commitOffsetBit;
+ this.attemptId = attemptId;
}
public List<Long> getOffsetList() {
@@ -460,6 +463,14 @@ public class ConsumerOrderInfoManager extends
ConfigManager {
this.offsetConsumedCount = offsetConsumedCount;
}
+ public String getAttemptId() {
+ return attemptId;
+ }
+
+ public void setAttemptId(String attemptId) {
+ this.attemptId = attemptId;
+ }
+
public static List<Long> buildOffsetList(List<Long> queueOffsetList) {
List<Long> simple = new ArrayList<>();
if (queueOffsetList.size() == 1) {
@@ -475,10 +486,13 @@ public class ConsumerOrderInfoManager extends
ConfigManager {
}
@JSONField(serialize = false, deserialize = false)
- public boolean needBlock(long currentInvisibleTime) {
+ public boolean needBlock(String attemptId, long currentInvisibleTime) {
if (offsetList == null || offsetList.isEmpty()) {
return false;
}
+ if (this.attemptId != null && this.attemptId.equals(attemptId)) {
+ return false;
+ }
int num = offsetList.size();
int i = 0;
if (this.invisibleTime == null || this.invisibleTime <= 0) {
@@ -586,11 +600,15 @@ public class ConsumerOrderInfoManager extends
ConfigManager {
* @param prevOffsetConsumedCount the offset list of message
*/
@JSONField(serialize = false, deserialize = false)
- public void mergeOffsetConsumedCount(List<Long> preOffsetList,
Map<Long, Integer> prevOffsetConsumedCount) {
+ public void mergeOffsetConsumedCount(String preAttemptId, List<Long>
preOffsetList, Map<Long, Integer> prevOffsetConsumedCount) {
Map<Long, Integer> offsetConsumedCount = new HashMap<>();
if (prevOffsetConsumedCount == null) {
prevOffsetConsumedCount = new HashMap<>();
}
+ if (preAttemptId != null && preAttemptId.equals(this.attemptId)) {
+ this.offsetConsumedCount = prevOffsetConsumedCount;
+ return;
+ }
Set<Long> preQueueOffsetSet = new HashSet<>();
for (int i = 0; i < preOffsetList.size(); i++) {
preQueueOffsetSet.add(getQueueOffset(preOffsetList, i));
@@ -619,6 +637,7 @@ public class ConsumerOrderInfoManager extends ConfigManager
{
.add("offsetConsumedCount", offsetConsumedCount)
.add("lastConsumeTimestamp", lastConsumeTimestamp)
.add("commitOffsetBit", commitOffsetBit)
+ .add("attemptId", attemptId)
.toString();
}
}
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java
index 824ba48fc..fa1c0793e 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java
@@ -178,7 +178,7 @@ public class AckMessageProcessor implements
NettyRequestProcessor {
this.brokerController.getConsumerOffsetManager().commitOffset(channel.remoteAddress().toString(),
requestHeader.getConsumerGroup(),
requestHeader.getTopic(), requestHeader.getQueueId(), nextOffset);
}
- if
(!this.brokerController.getConsumerOrderInfoManager().checkBlock(requestHeader.getTopic(),
+ if
(!this.brokerController.getConsumerOrderInfoManager().checkBlock(null,
requestHeader.getTopic(),
requestHeader.getConsumerGroup(),
requestHeader.getQueueId(), invisibleTime)) {
this.brokerController.getPopMessageProcessor().notifyMessageArriving(
requestHeader.getTopic(),
requestHeader.getConsumerGroup(), requestHeader.getQueueId());
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java
index 3b306ca2d..4be77468f 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java
@@ -264,7 +264,7 @@ public class NotificationProcessor implements
NettyRequestProcessor {
}
private boolean hasMsgFromQueue(boolean isRetry, NotificationRequestHeader
requestHeader, int queueId) {
- if
(this.brokerController.getConsumerOrderInfoManager().checkBlock(requestHeader.getTopic(),
requestHeader.getConsumerGroup(), queueId, 0)) {
+ if
(this.brokerController.getConsumerOrderInfoManager().checkBlock(null,
requestHeader.getTopic(), requestHeader.getConsumerGroup(), queueId, 0)) {
return false;
}
String topic = isRetry ?
KeyBuilder.buildPopRetryTopic(requestHeader.getTopic(),
requestHeader.getConsumerGroup()) : requestHeader.getTopic();
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
index 5fa4c586a..a89bbb156 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
@@ -416,7 +416,7 @@ public class PopMessageProcessor implements
NettyRequestProcessor {
if (retryTopicConfig != null) {
for (int i = 0; i < retryTopicConfig.getReadQueueNums(); i++) {
int queueId = (randomQ + i) %
retryTopicConfig.getReadQueueNums();
- getMessageFuture = getMessageFuture.thenCompose(restNum ->
popMsgFromQueue(true, getMessageResult, requestHeader, queueId, restNum,
reviveQid, channel, popTime, finalMessageFilter,
+ getMessageFuture = getMessageFuture.thenCompose(restNum ->
popMsgFromQueue(requestHeader.getAttemptId(), true, getMessageResult,
requestHeader, queueId, restNum, reviveQid, channel, popTime,
finalMessageFilter,
startOffsetInfo, msgOffsetInfo, finalOrderCountInfo));
}
}
@@ -425,12 +425,12 @@ public class PopMessageProcessor implements
NettyRequestProcessor {
// read all queue
for (int i = 0; i < topicConfig.getReadQueueNums(); i++) {
int queueId = (randomQ + i) % topicConfig.getReadQueueNums();
- getMessageFuture = getMessageFuture.thenCompose(restNum ->
popMsgFromQueue(false, getMessageResult, requestHeader, queueId, restNum,
reviveQid, channel, popTime, finalMessageFilter,
+ getMessageFuture = getMessageFuture.thenCompose(restNum ->
popMsgFromQueue(requestHeader.getAttemptId(), false, getMessageResult,
requestHeader, queueId, restNum, reviveQid, channel, popTime,
finalMessageFilter,
startOffsetInfo, msgOffsetInfo, finalOrderCountInfo));
}
} else {
int queueId = requestHeader.getQueueId();
- getMessageFuture = getMessageFuture.thenCompose(restNum ->
popMsgFromQueue(false, getMessageResult, requestHeader, queueId, restNum,
reviveQid, channel, popTime, finalMessageFilter,
+ getMessageFuture = getMessageFuture.thenCompose(restNum ->
popMsgFromQueue(requestHeader.getAttemptId(), false, getMessageResult,
requestHeader, queueId, restNum, reviveQid, channel, popTime,
finalMessageFilter,
startOffsetInfo, msgOffsetInfo, finalOrderCountInfo));
}
// if not full , fetch retry again
@@ -440,7 +440,7 @@ public class PopMessageProcessor implements
NettyRequestProcessor {
if (retryTopicConfig != null) {
for (int i = 0; i < retryTopicConfig.getReadQueueNums(); i++) {
int queueId = (randomQ + i) %
retryTopicConfig.getReadQueueNums();
- getMessageFuture = getMessageFuture.thenCompose(restNum ->
popMsgFromQueue(true, getMessageResult, requestHeader, queueId, restNum,
reviveQid, channel, popTime, finalMessageFilter,
+ getMessageFuture = getMessageFuture.thenCompose(restNum ->
popMsgFromQueue(requestHeader.getAttemptId(), true, getMessageResult,
requestHeader, queueId, restNum, reviveQid, channel, popTime,
finalMessageFilter,
startOffsetInfo, msgOffsetInfo, finalOrderCountInfo));
}
}
@@ -523,7 +523,7 @@ public class PopMessageProcessor implements
NettyRequestProcessor {
return null;
}
- private CompletableFuture<Long> popMsgFromQueue(boolean isRetry,
GetMessageResult getMessageResult,
+ private CompletableFuture<Long> popMsgFromQueue(String attemptId, boolean
isRetry, GetMessageResult getMessageResult,
PopMessageRequestHeader requestHeader, int queueId, long restNum, int
reviveQid,
Channel channel, long popTime, ExpressionMessageFilter messageFilter,
StringBuilder startOffsetInfo,
StringBuilder msgOffsetInfo, StringBuilder orderCountInfo) {
@@ -545,7 +545,7 @@ public class PopMessageProcessor implements
NettyRequestProcessor {
future.whenComplete((result, throwable) ->
queueLockManager.unLock(lockKey));
offset = getPopOffset(topic, requestHeader.getConsumerGroup(),
queueId, requestHeader.getInitMode(),
true, lockKey, true);
- if (isOrder &&
brokerController.getConsumerOrderInfoManager().checkBlock(topic,
+ if (isOrder &&
brokerController.getConsumerOrderInfoManager().checkBlock(attemptId, topic,
requestHeader.getConsumerGroup(), queueId,
requestHeader.getInvisibleTime())) {
future.complete(this.brokerController.getMessageStore().getMaxOffsetInQueue(topic,
queueId) - offset + restNum);
return future;
@@ -618,7 +618,7 @@ public class PopMessageProcessor implements
NettyRequestProcessor {
BrokerMetricsManager.throughputOutTotal.add(result.getBufferTotalSize(),
attributes);
if (isOrder) {
-
this.brokerController.getConsumerOrderInfoManager().update(isRetry, topic,
+
this.brokerController.getConsumerOrderInfoManager().update(requestHeader.getAttemptId(),
isRetry, topic,
requestHeader.getConsumerGroup(),
queueId, popTime,
requestHeader.getInvisibleTime(), result.getMessageQueueOffset(),
orderCountInfo);
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManagerLockFreeNotifyTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManagerLockFreeNotifyTest.java
index e5033a05d..93689efa5 100644
---
a/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManagerLockFreeNotifyTest.java
+++
b/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManagerLockFreeNotifyTest.java
@@ -67,6 +67,7 @@ public class ConsumerOrderInfoManagerLockFreeNotifyTest {
@Test
public void testConsumeMessageThenNoAck() {
consumerOrderInfoManager.update(
+ null,
false,
TOPIC,
GROUP,
@@ -83,6 +84,7 @@ public class ConsumerOrderInfoManagerLockFreeNotifyTest {
@Test
public void testConsumeMessageThenAck() {
consumerOrderInfoManager.update(
+ null,
false,
TOPIC,
GROUP,
@@ -106,6 +108,7 @@ public class ConsumerOrderInfoManagerLockFreeNotifyTest {
@Test
public void testConsumeTheChangeInvisibleLonger() {
consumerOrderInfoManager.update(
+ null,
false,
TOPIC,
GROUP,
@@ -130,6 +133,7 @@ public class ConsumerOrderInfoManagerLockFreeNotifyTest {
@Test
public void testConsumeTheChangeInvisibleShorter() {
consumerOrderInfoManager.update(
+ null,
false,
TOPIC,
GROUP,
@@ -155,6 +159,7 @@ public class ConsumerOrderInfoManagerLockFreeNotifyTest {
public void testRecover() {
ConsumerOrderInfoManager savedConsumerOrderInfoManager = new
ConsumerOrderInfoManager();
savedConsumerOrderInfoManager.update(
+ null,
false,
TOPIC,
GROUP,
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManagerTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManagerTest.java
index f260632c6..25b418c93 100644
---
a/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManagerTest.java
+++
b/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManagerTest.java
@@ -19,6 +19,7 @@ package org.apache.rocketmq.broker.offset;
import java.time.Duration;
import java.util.Map;
+import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
@@ -63,6 +64,7 @@ public class ConsumerOrderInfoManagerTest {
@Test
public void testCommitAndNext() {
consumerOrderInfoManager.update(
+ null,
false,
TOPIC,
GROUP,
@@ -82,6 +84,7 @@ public class ConsumerOrderInfoManagerTest {
));
assertEncodeAndDecode();
assertTrue(consumerOrderInfoManager.checkBlock(
+ null,
TOPIC,
GROUP,
QUEUE_ID_0,
@@ -97,6 +100,7 @@ public class ConsumerOrderInfoManagerTest {
));
assertEncodeAndDecode();
assertFalse(consumerOrderInfoManager.checkBlock(
+ null,
TOPIC,
GROUP,
QUEUE_ID_0,
@@ -110,6 +114,7 @@ public class ConsumerOrderInfoManagerTest {
// consume three new messages
StringBuilder orderInfoBuilder = new StringBuilder();
consumerOrderInfoManager.update(
+ null,
false,
TOPIC,
GROUP,
@@ -129,6 +134,7 @@ public class ConsumerOrderInfoManagerTest {
// reconsume same messages
StringBuilder orderInfoBuilder = new StringBuilder();
consumerOrderInfoManager.update(
+ null,
false,
TOPIC,
GROUP,
@@ -151,6 +157,7 @@ public class ConsumerOrderInfoManagerTest {
// reconsume last two message
StringBuilder orderInfoBuilder = new StringBuilder();
consumerOrderInfoManager.update(
+ null,
false,
TOPIC,
GROUP,
@@ -173,6 +180,7 @@ public class ConsumerOrderInfoManagerTest {
// consume a new message and reconsume last message
StringBuilder orderInfoBuilder = new StringBuilder();
consumerOrderInfoManager.update(
+ null,
false,
TOPIC,
GROUP,
@@ -193,6 +201,7 @@ public class ConsumerOrderInfoManagerTest {
// consume two new messages
StringBuilder orderInfoBuilder = new StringBuilder();
consumerOrderInfoManager.update(
+ null,
false,
TOPIC,
GROUP,
@@ -215,6 +224,7 @@ public class ConsumerOrderInfoManagerTest {
// consume two new messages
StringBuilder orderInfoBuilder = new StringBuilder();
consumerOrderInfoManager.update(
+ null,
false,
TOPIC,
GROUP,
@@ -225,6 +235,7 @@ public class ConsumerOrderInfoManagerTest {
orderInfoBuilder
);
consumerOrderInfoManager.update(
+ null,
false,
TOPIC,
GROUP,
@@ -244,6 +255,7 @@ public class ConsumerOrderInfoManagerTest {
// reconsume two message
StringBuilder orderInfoBuilder = new StringBuilder();
consumerOrderInfoManager.update(
+ null,
false,
TOPIC,
GROUP,
@@ -254,6 +266,7 @@ public class ConsumerOrderInfoManagerTest {
orderInfoBuilder
);
consumerOrderInfoManager.update(
+ null,
false,
TOPIC,
GROUP,
@@ -275,6 +288,7 @@ public class ConsumerOrderInfoManagerTest {
// reconsume with a new message
StringBuilder orderInfoBuilder = new StringBuilder();
consumerOrderInfoManager.update(
+ null,
false,
TOPIC,
GROUP,
@@ -285,6 +299,7 @@ public class ConsumerOrderInfoManagerTest {
orderInfoBuilder
);
consumerOrderInfoManager.update(
+ null,
false,
TOPIC,
GROUP,
@@ -311,6 +326,7 @@ public class ConsumerOrderInfoManagerTest {
StringBuilder orderInfoBuilder = new StringBuilder();
consumerOrderInfoManager.update(
+ null,
false,
TOPIC,
GROUP,
@@ -329,10 +345,11 @@ public class ConsumerOrderInfoManagerTest {
assertEquals(2, consumerOrderInfoManager.commitAndNext(TOPIC, GROUP,
QUEUE_ID_0, 3L, popTime));
assertEncodeAndDecode();
- await().atMost(Duration.ofSeconds(invisibleTime + 1)).until(() ->
!consumerOrderInfoManager.checkBlock(TOPIC, GROUP, QUEUE_ID_0, invisibleTime));
+ await().atMost(Duration.ofSeconds(invisibleTime + 1)).until(() ->
!consumerOrderInfoManager.checkBlock(null, TOPIC, GROUP, QUEUE_ID_0,
invisibleTime));
orderInfoBuilder = new StringBuilder();
consumerOrderInfoManager.update(
+ null,
false,
TOPIC,
GROUP,
@@ -350,11 +367,11 @@ public class ConsumerOrderInfoManagerTest {
assertEncodeAndDecode();
assertEquals(2, consumerOrderInfoManager.commitAndNext(TOPIC, GROUP,
QUEUE_ID_0, 4L, popTime));
assertEncodeAndDecode();
- assertTrue(consumerOrderInfoManager.checkBlock(TOPIC, GROUP,
QUEUE_ID_0, invisibleTime));
+ assertTrue(consumerOrderInfoManager.checkBlock(null, TOPIC, GROUP,
QUEUE_ID_0, invisibleTime));
assertEquals(5L, consumerOrderInfoManager.commitAndNext(TOPIC, GROUP,
QUEUE_ID_0, 2L, popTime));
assertEncodeAndDecode();
- assertFalse(consumerOrderInfoManager.checkBlock(TOPIC, GROUP,
QUEUE_ID_0, invisibleTime));
+ assertFalse(consumerOrderInfoManager.checkBlock(null, TOPIC, GROUP,
QUEUE_ID_0, invisibleTime));
}
@Test
@@ -377,7 +394,7 @@ public class ConsumerOrderInfoManagerTest {
ConsumerOrderInfoManager consumerOrderInfoManager = new
ConsumerOrderInfoManager(brokerController);
{
- consumerOrderInfoManager.update(false,
+ consumerOrderInfoManager.update(null, false,
"errTopic",
"errGroup",
QUEUE_ID_0,
@@ -390,7 +407,7 @@ public class ConsumerOrderInfoManagerTest {
assertEquals(0, consumerOrderInfoManager.getTable().size());
}
{
- consumerOrderInfoManager.update(false,
+ consumerOrderInfoManager.update(null, false,
TOPIC,
"errGroup",
QUEUE_ID_0,
@@ -404,7 +421,7 @@ public class ConsumerOrderInfoManagerTest {
}
{
topicConfig.setReadQueueNums(0);
- consumerOrderInfoManager.update(false,
+ consumerOrderInfoManager.update(null, false,
TOPIC,
GROUP,
QUEUE_ID_0,
@@ -420,7 +437,7 @@ public class ConsumerOrderInfoManagerTest {
}
{
topicConfig.setReadQueueNums(8);
- consumerOrderInfoManager.update(false,
+ consumerOrderInfoManager.update(null, false,
TOPIC,
GROUP,
QUEUE_ID_0,
@@ -461,7 +478,7 @@ public class ConsumerOrderInfoManagerTest {
@Test
public void testLoadFromOldVersionOrderInfoData() {
- consumerOrderInfoManager.update(false,
+ consumerOrderInfoManager.update(null, false,
TOPIC,
GROUP,
QUEUE_ID_0,
@@ -479,10 +496,10 @@ public class ConsumerOrderInfoManagerTest {
String dataEncoded = consumerOrderInfoManager.encode();
consumerOrderInfoManager.decode(dataEncoded);
- assertTrue(consumerOrderInfoManager.checkBlock(TOPIC, GROUP,
QUEUE_ID_0, 3000));
+ assertTrue(consumerOrderInfoManager.checkBlock(null, TOPIC, GROUP,
QUEUE_ID_0, 3000));
StringBuilder orderInfoBuilder = new StringBuilder();
- consumerOrderInfoManager.update(false,
+ consumerOrderInfoManager.update(null, false,
TOPIC,
GROUP,
QUEUE_ID_0,
@@ -497,4 +514,24 @@ public class ConsumerOrderInfoManagerTest {
assertEquals(1,
orderInfoMap.get(ExtraInfoUtil.getQueueOffsetMapKey(TOPIC, QUEUE_ID_0,
3)).intValue());
assertEquals(1,
orderInfoMap.get(ExtraInfoUtil.getQueueOffsetMapKey(TOPIC, QUEUE_ID_0,
4)).intValue());
}
+
+ @Test
+ public void testReentrant() {
+ StringBuilder orderInfoBuilder = new StringBuilder();
+ String attemptId = UUID.randomUUID().toString();
+ consumerOrderInfoManager.update(
+ attemptId,
+ false,
+ TOPIC,
+ GROUP,
+ QUEUE_ID_0,
+ popTime,
+ 3000,
+ Lists.newArrayList(1L, 2L, 3L),
+ orderInfoBuilder
+ );
+
+ assertTrue(consumerOrderInfoManager.checkBlock(null, TOPIC, GROUP,
QUEUE_ID_0, 3000));
+ assertFalse(consumerOrderInfoManager.checkBlock(attemptId, TOPIC,
GROUP, QUEUE_ID_0, 3000));
+ }
}
diff --git
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/PopMessageRequestHeader.java
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/PopMessageRequestHeader.java
index 2460a4f2e..34b97987d 100644
---
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/PopMessageRequestHeader.java
+++
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/PopMessageRequestHeader.java
@@ -50,6 +50,8 @@ public class PopMessageRequestHeader extends
TopicQueueRequestHeader {
*/
private Boolean order = Boolean.FALSE;
+ private String attemptId;
+
@Override
public void checkFields() throws RemotingCommandException {
}
@@ -154,6 +156,14 @@ public class PopMessageRequestHeader extends
TopicQueueRequestHeader {
return this.order != null && this.order.booleanValue();
}
+ public String getAttemptId() {
+ return attemptId;
+ }
+
+ public void setAttemptId(String attemptId) {
+ this.attemptId = attemptId;
+ }
+
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
@@ -168,6 +178,7 @@ public class PopMessageRequestHeader extends
TopicQueueRequestHeader {
.add("expType", expType)
.add("exp", exp)
.add("order", order)
+ .add("attemptId", attemptId)
.toString();
}
}
diff --git
a/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQPopClient.java
b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQPopClient.java
index b0c8c3250..85dfa7b49 100644
--- a/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQPopClient.java
+++ b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQPopClient.java
@@ -69,6 +69,12 @@ public class RMQPopClient implements MQConsumer {
public CompletableFuture<PopResult> popMessageAsync(String brokerAddr,
MessageQueue mq, long invisibleTime,
int maxNums, String consumerGroup, long timeout, boolean poll, int
initMode, boolean order,
String expressionType, String expression) {
+ return popMessageAsync(brokerAddr, mq, invisibleTime, maxNums,
consumerGroup, timeout, poll, initMode, order, expressionType, expression,
null);
+ }
+
+ public CompletableFuture<PopResult> popMessageAsync(String brokerAddr,
MessageQueue mq, long invisibleTime,
+ int maxNums, String consumerGroup, long timeout, boolean poll, int
initMode, boolean order,
+ String expressionType, String expression, String attemptId) {
PopMessageRequestHeader requestHeader = new PopMessageRequestHeader();
requestHeader.setConsumerGroup(consumerGroup);
requestHeader.setTopic(mq.getTopic());
@@ -79,6 +85,7 @@ public class RMQPopClient implements MQConsumer {
requestHeader.setExpType(expressionType);
requestHeader.setExp(expression);
requestHeader.setOrder(order);
+ requestHeader.setAttemptId(attemptId);
if (poll) {
requestHeader.setPollTime(timeout);
requestHeader.setBornTime(System.currentTimeMillis());
diff --git
a/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/BasePopOrderly.java
b/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/BasePopOrderly.java
index ecd70c134..acf70f7f9 100644
---
a/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/BasePopOrderly.java
+++
b/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/BasePopOrderly.java
@@ -95,6 +95,19 @@ public class BasePopOrderly extends BasePop {
}
}
+ protected void assertMsgRecv(int seqId, int expectNum, List<Integer>
expectReconsumeTimes) {
+ String msgId = msgRecvSequence.get(seqId);
+ List<MsgRcv> msgRcvList = msgRecv.get(msgId);
+ assertEquals(expectNum, msgRcvList.size());
+ assertConsumeTimes(msgRcvList, expectReconsumeTimes);
+ }
+
+ protected void assertConsumeTimes(List<MsgRcv> msgRcvList, List<Integer>
expectReconsumeTimes) {
+ for (int i = 0; i < msgRcvList.size(); i++) {
+ assertEquals(expectReconsumeTimes.get(i).intValue(),
msgRcvList.get(i).messageExt.getReconsumeTimes());
+ }
+ }
+
protected void onRecvNewMessage(MessageExt messageExt) {
msgDataRecv.add(new String(messageExt.getBody()));
msgRecvSequence.add(messageExt.getMsgId());
@@ -108,9 +121,13 @@ public class BasePopOrderly extends BasePop {
}
protected CompletableFuture<PopResult> popMessageOrderlyAsync(long
invisibleTime, int maxNums, long timeout) {
+ return popMessageOrderlyAsync(invisibleTime, maxNums, timeout, null);
+ }
+
+ protected CompletableFuture<PopResult> popMessageOrderlyAsync(long
invisibleTime, int maxNums, long timeout, String attemptId) {
return client.popMessageAsync(
brokerAddr, messageQueue, invisibleTime, maxNums, group, timeout,
true,
- ConsumeInitMode.MIN, true, ExpressionType.TAG, "*");
+ ConsumeInitMode.MIN, true, ExpressionType.TAG, "*", attemptId);
}
protected CompletableFuture<AckResult> ackMessageAsync(MessageExt
messageExt) {
diff --git
a/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/PopOrderlyIT.java
b/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/PopOrderlyIT.java
index 04c7f4a34..efb12a321 100644
---
a/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/PopOrderlyIT.java
+++
b/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/PopOrderlyIT.java
@@ -24,6 +24,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.client.consumer.PopResult;
import org.apache.rocketmq.common.message.MessageExt;
+import org.assertj.core.util.Lists;
import org.junit.Test;
import static org.awaitility.Awaitility.await;
@@ -248,4 +249,41 @@ public class PopOrderlyIT extends BasePopOrderly {
});
return resultFuture;
}
+
+ @Test
+ public void testReentrant() {
+ producer.send(1);
+
+ popMessageForReentrant(null).join();
+ assertMsgRecv(0, 1, Lists.newArrayList(0));
+
+ String attemptId01 = "attemptId-01";
+ popMessageForReentrant(attemptId01).join();
+ assertMsgRecv(0, 2, Lists.newArrayList(0, 1));
+ popMessageForReentrant(attemptId01).join();
+ assertMsgRecv(0, 3, Lists.newArrayList(0, 1, 1));
+
+ String attemptId02 = "attemptId-02";
+
await().atLeast(Duration.ofSeconds(5)).atMost(Duration.ofSeconds(15)).until(()
-> {
+ popMessageForReentrant(attemptId02).join();
+ return msgRecvSequence.size() == 4;
+ });
+ popMessageForReentrant(attemptId02).join();
+ assertMsgRecv(0, 5, Lists.newArrayList(0, 1, 1, 2, 2));
+
+
await().atLeast(Duration.ofSeconds(5)).atMost(Duration.ofSeconds(15)).until(()
-> {
+ popMessageForReentrant(null).join();
+ return msgRecvSequence.size() == 6;
+ });
+ assertMsgRecv(0, 6, Lists.newArrayList(0, 1, 1, 2, 2, 3));
+ }
+
+ private CompletableFuture<Void> popMessageForReentrant(String attemptId) {
+ return popMessageOrderlyAsync(TimeUnit.SECONDS.toMillis(10), 3,
TimeUnit.SECONDS.toMillis(30), attemptId)
+ .thenAccept(popResult -> {
+ for (MessageExt messageExt : popResult.getMsgFoundList()) {
+ onRecvNewMessage(messageExt);
+ }
+ });
+ }
}
diff --git
a/test/src/test/java/org/apache/rocketmq/test/offset/OffsetResetForPopIT.java
b/test/src/test/java/org/apache/rocketmq/test/offset/OffsetResetForPopIT.java
index cedc0fe2a..b9798cfd5 100644
---
a/test/src/test/java/org/apache/rocketmq/test/offset/OffsetResetForPopIT.java
+++
b/test/src/test/java/org/apache/rocketmq/test/offset/OffsetResetForPopIT.java
@@ -155,12 +155,12 @@ public class OffsetResetForPopIT extends BaseConf {
// ack old msg, expect has no effect
ackMessageSync(popResult1.getMsgFoundList());
Assert.assertTrue(brokerController1.getConsumerOrderInfoManager()
- .checkBlock(topic, group, 0,
RMQPopConsumer.DEFAULT_INVISIBLE_TIME));
+ .checkBlock(null, topic, group, 0,
RMQPopConsumer.DEFAULT_INVISIBLE_TIME));
// ack new msg
ackMessageSync(popResult2.getMsgFoundList());
Assert.assertFalse(brokerController1.getConsumerOrderInfoManager()
- .checkBlock(topic, group, 0,
RMQPopConsumer.DEFAULT_INVISIBLE_TIME));
+ .checkBlock(null, topic, group, 0,
RMQPopConsumer.DEFAULT_INVISIBLE_TIME));
}
@Test
@@ -176,12 +176,12 @@ public class OffsetResetForPopIT extends BaseConf {
PopResult popResult =
consumer.popOrderly(brokerController1.getBrokerAddr(), mq);
Assert.assertEquals(messageCount - resetOffset,
popResult.getMsgFoundList().size());
Assert.assertTrue(brokerController1.getConsumerOrderInfoManager()
- .checkBlock(topic, group, 0,
RMQPopConsumer.DEFAULT_INVISIBLE_TIME));
+ .checkBlock(null, topic, group, 0,
RMQPopConsumer.DEFAULT_INVISIBLE_TIME));
ackMessageSync(popResult.getMsgFoundList());
TimeUnit.SECONDS.sleep(1);
Assert.assertFalse(brokerController1.getConsumerOrderInfoManager()
- .checkBlock(topic, group, 0,
RMQPopConsumer.DEFAULT_INVISIBLE_TIME));
+ .checkBlock(null, topic, group, 0,
RMQPopConsumer.DEFAULT_INVISIBLE_TIME));
}
@Test