This is an automated email from the ASF dual-hosted git repository. lollipop pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push: new 04683ec058 [ISSUE 7117] check message is in memory or not when init consumer offset for pop (#7118) 04683ec058 is described below commit 04683ec05808d63f742f8702a9bd3a2fb846c154 Author: lk <xdk...@outlook.com> AuthorDate: Wed Aug 9 19:08:33 2023 +0800 [ISSUE 7117] check message is in memory or not when init consumer offset for pop (#7118) --- .../broker/processor/AckMessageProcessor.java | 1 - .../broker/processor/PopMessageProcessor.java | 40 ++++++++++++++-------- .../org/apache/rocketmq/common/BrokerConfig.java | 9 +++++ .../proxy/service/route/TopicRouteService.java | 2 +- 4 files changed, 36 insertions(+), 16 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java index 2140aa881c..687811409e 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java @@ -308,7 +308,6 @@ public class AckMessageProcessor implements NettyRequestProcessor { && putMessageResult.getPutMessageStatus() != PutMessageStatus.SLAVE_NOT_AVAILABLE) { POP_LOGGER.error("put ack msg error:" + putMessageResult); } - System.out.printf("put ack to store %s", ackMsg); PopMetricsManager.incPopReviveAckPutCount(ackMsg, putMessageResult.getPutMessageStatus()); brokerController.getPopInflightMessageCounter().decrementInFlightMessageNum(topic, consumeGroup, popTime, qId, ackCount); } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java index 53e1725614..441f7de08a 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java @@ -639,20 +639,7 @@ public class PopMessageProcessor implements NettyRequestProcessor { long offset = this.brokerController.getConsumerOffsetManager().queryOffset(group, topic, queueId); if (offset < 0) { - if (ConsumeInitMode.MIN == initMode) { - offset = this.brokerController.getMessageStore().getMinOffsetInQueue(topic, queueId); - } else { - // pop last one,then commit offset. - offset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId) - 1; - // max & no consumer offset - if (offset < 0) { - offset = 0; - } - if (init) { - this.brokerController.getConsumerOffsetManager().commitOffset( - "getPopOffset", group, topic, queueId, offset); - } - } + offset = this.getInitOffset(topic, group, queueId, initMode, init); } if (checkResetOffset) { @@ -670,6 +657,31 @@ public class PopMessageProcessor implements NettyRequestProcessor { } } + private long getInitOffset(String topic, String group, int queueId, int initMode, boolean init) { + long offset; + if (ConsumeInitMode.MIN == initMode) { + return this.brokerController.getMessageStore().getMinOffsetInQueue(topic, queueId); + } else { + if (this.brokerController.getBrokerConfig().isInitPopOffsetByCheckMsgInMem() && + this.brokerController.getMessageStore().getMinOffsetInQueue(topic, queueId) <= 0 && + this.brokerController.getMessageStore().checkInMemByConsumeOffset(topic, queueId, 0, 1)) { + offset = 0; + } else { + // pop last one,then commit offset. + offset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId) - 1; + // max & no consumer offset + if (offset < 0) { + offset = 0; + } + } + if (init) { + this.brokerController.getConsumerOffsetManager().commitOffset( + "getPopOffset", group, topic, queueId, offset); + } + } + return offset; + } + public final MessageExtBrokerInner buildCkMsg(final PopCheckPoint ck, final int reviveQid) { MessageExtBrokerInner msgInner = new MessageExtBrokerInner(); diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java index 02c692e2b2..a815636b18 100644 --- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java +++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java @@ -222,6 +222,7 @@ public class BrokerConfig extends BrokerIdentity { private int popCkOffsetMaxQueueSize = 20000; private boolean enablePopBatchAck = false; private boolean enableNotifyAfterPopOrderLockRelease = true; + private boolean initPopOffsetByCheckMsgInMem = true; private boolean realTimeNotifyConsumerChange = true; @@ -1264,6 +1265,14 @@ public class BrokerConfig extends BrokerIdentity { this.enableNotifyAfterPopOrderLockRelease = enableNotifyAfterPopOrderLockRelease; } + public boolean isInitPopOffsetByCheckMsgInMem() { + return initPopOffsetByCheckMsgInMem; + } + + public void setInitPopOffsetByCheckMsgInMem(boolean initPopOffsetByCheckMsgInMem) { + this.initPopOffsetByCheckMsgInMem = initPopOffsetByCheckMsgInMem; + } + public boolean isRealTimeNotifyConsumerChange() { return realTimeNotifyConsumerChange; } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java index b6b14faa49..e012a5465a 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java @@ -133,7 +133,7 @@ public abstract class TopicRouteService extends AbstractStartAndShutdown { protected MessageQueueView buildMessageQueueView(String topic, TopicRouteData topicRouteData) { if (isTopicRouteValid(topicRouteData)) { MessageQueueView tmp = new MessageQueueView(topic, topicRouteData); - log.info("load topic route from namesrv. topic: {}, queue: {}", topic, tmp); + log.debug("load topic route from namesrv. topic: {}, queue: {}", topic, tmp); return tmp; } return MessageQueueView.WRAPPED_EMPTY_QUEUE;