crazy-pizza opened a new issue #3731:
URL: https://github.com/apache/rocketmq/issues/3731
1: try内的发送没有校验broker是否是只读,这对运维来说很重要,运维会发现broker设置只读后还是有流量打入。
2:catch内的兜底发送没有使用用户设置的delayLevelWhenNextConsume,不过如果使用delayLevelWhenNextConsume,也需要考虑delayLevelWhenNextConsume是否超过了消费组的最大重试次数。
``` java
public class DefaultMQPushConsumerImpl implements MQConsumerInner {
public void sendMessageBack(MessageExt msg, int delayLevel, final String
brokerName)
throws RemotingException, MQBrokerException, InterruptedException,
MQClientException {
try {
String brokerAddr = (null != brokerName) ?
this.mQClientFactory.findBrokerAddressInPublish(brokerName)
: RemotingHelper.parseSocketAddressAddr(msg.getStoreHost());
this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr,
msg,
this.defaultMQPushConsumer.getConsumerGroup(), delayLevel,
5000, getMaxReconsumeTimes());
} catch (Exception e) {
log.error("sendMessageBack Exception, " +
this.defaultMQPushConsumer.getConsumerGroup(), e);
Message newMsg = new
Message(MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()),
msg.getBody());
String originMsgId = MessageAccessor.getOriginMessageId(msg);
MessageAccessor.setOriginMessageId(newMsg,
UtilAll.isBlank(originMsgId) ? msg.getMsgId() : originMsgId);
newMsg.setFlag(msg.getFlag());
MessageAccessor.setProperties(newMsg, msg.getProperties());
MessageAccessor.putProperty(newMsg,
MessageConst.PROPERTY_RETRY_TOPIC, msg.getTopic());
MessageAccessor.setReconsumeTime(newMsg,
String.valueOf(msg.getReconsumeTimes() + 1));
MessageAccessor.setMaxReconsumeTimes(newMsg,
String.valueOf(getMaxReconsumeTimes()));
MessageAccessor.clearProperty(newMsg,
MessageConst.PROPERTY_TRANSACTION_PREPARED);
newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes());
this.mQClientFactory.getDefaultMQProducer().send(newMsg);
} finally {
msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(),
this.defaultMQPushConsumer.getNamespace()));
}
}
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]