originalHeaed opened a new issue, #6163:
URL: https://github.com/apache/rocketmq/issues/6163
你好!
版本:RocketMQ 5.0.0
情况概述:消费端只有一个消费者,设置 MaxReconsumeTimes 值为 1,调用DefaultMQPushConsumer
的顺序消费端口,在消费重试次数达到 1 后,消息被消费端投入延迟队列,随后 broker 重新将消息投入了重试队列,并没有投入死信队列;
个人源码走查:
消息在重试超过指定次数后,由消费端投递至延迟队列时,消息的重试次数 == 最大重新重试次数;
消息在延迟时间到达后,重新投递时调用的时 SendMessageProcessor.sendMessage 方法,内部对消息是否投递死信队列调用了
SendMessageProcessor.handleRetryAndDLQ 方法进行判断(且没有将消息的重试次数进行增加)
` private boolean handleRetryAndDLQ(SendMessageRequestHeader
requestHeader, RemotingCommand response,
RemotingCommand request,
MessageExt msg, TopicConfig topicConfig, Map<String, String>
properties) {
String newTopic = requestHeader.getTopic();
if (null != newTopic &&
newTopic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
String groupName =
newTopic.substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length());
SubscriptionGroupConfig subscriptionGroupConfig =
this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(groupName);
if (null == subscriptionGroupConfig) {
response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
response.setRemark(
"subscription group not exist, " + groupName + " " +
FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST));
return false;
}
int maxReconsumeTimes =
subscriptionGroupConfig.getRetryMaxTimes();
if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()
&& requestHeader.getMaxReconsumeTimes() != null) {
maxReconsumeTimes = requestHeader.getMaxReconsumeTimes();
}
int reconsumeTimes = requestHeader.getReconsumeTimes() == null ?
0 : requestHeader.getReconsumeTimes();
// Using '>' instead of '>=' to compatible with the case that
reconsumeTimes here are increased by client.
if (reconsumeTimes > maxReconsumeTimes) {
properties.put(MessageConst.PROPERTY_DELAY_TIME_LEVEL, "-1");
newTopic = MixAll.getDLQTopic(groupName);
int queueIdInt = randomQueueId(DLQ_NUMS_PER_GROUP);
topicConfig =
this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,
DLQ_NUMS_PER_GROUP,
PermName.PERM_WRITE | PermName.PERM_READ, 0
);
msg.setTopic(newTopic);
msg.setQueueId(queueIdInt);
msg.setDelayTimeLevel(0);
if (null == topicConfig) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("topic[" + newTopic + "] not exist");
return false;
}
}
}
int sysFlag = requestHeader.getSysFlag();
if (TopicFilterType.MULTI_TAG == topicConfig.getTopicFilterType()) {
sysFlag |= MessageSysFlag.MULTI_TAGS_FLAG;
}
msg.setSysFlag(sysFlag);
return true;
}
`
其中进入死信队列的条件是 reconsumeTimes >
maxReconsumeTimes,因此投入了重试队列,随后消费端拉取该消息,重复上面的流程(消息的 reconsumeTimes 并没有增加)。
我想问下这里是需要消费端在消费时自行增加 reconsumeTimes 吗?且只有消息从重试队列中那消息后才需要自行增加
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]