This is an automated email from the ASF dual-hosted git repository. lizhimin pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push: new 23cc24cc6e [ISSUE #8947] Notify pop request before calculate consumer lag (#8949) 23cc24cc6e is described below commit 23cc24cc6e2fa33b9be2c434c42dee3a54e726a4 Author: lizhimins <707364...@qq.com> AuthorDate: Wed Nov 20 14:57:35 2024 +0800 [ISSUE #8947] Notify pop request before calculate consumer lag (#8949) --- .../longpolling/NotifyMessageArrivingListener.java | 4 +- .../broker/longpolling/PopCommandCallback.java | 49 +++++++++++++++++ .../broker/longpolling/PopLongPollingService.java | 49 +++++++++++++---- .../broker/metrics/ConsumerLagCalculator.java | 62 ++++++++++++++-------- .../broker/processor/NotificationProcessor.java | 4 +- .../broker/processor/PopMessageProcessor.java | 18 +++++-- .../longpolling/PopLongPollingServiceTest.java | 11 ++-- .../org/apache/rocketmq/common/BrokerConfig.java | 19 +++++++ .../apache/rocketmq/remoting/CommandCallback.java | 22 ++++++++ .../remoting/protocol/RemotingCommand.java | 11 ++++ 10 files changed, 206 insertions(+), 43 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/NotifyMessageArrivingListener.java b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/NotifyMessageArrivingListener.java index 1ddb9f4f8e..9c0ee89e4d 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/NotifyMessageArrivingListener.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/NotifyMessageArrivingListener.java @@ -40,8 +40,8 @@ public class NotifyMessageArrivingListener implements MessageArrivingListener { this.pullRequestHoldService.notifyMessageArriving( topic, queueId, logicOffset, tagsCode, msgStoreTime, filterBitMap, properties); this.popMessageProcessor.notifyMessageArriving( - topic, queueId, tagsCode, msgStoreTime, filterBitMap, properties); + topic, queueId, logicOffset, tagsCode, msgStoreTime, filterBitMap, properties); this.notificationProcessor.notifyMessageArriving( - topic, queueId, tagsCode, msgStoreTime, filterBitMap, properties); + topic, queueId, logicOffset, tagsCode, msgStoreTime, filterBitMap, properties); } } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopCommandCallback.java b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopCommandCallback.java new file mode 100644 index 0000000000..2e190e20f9 --- /dev/null +++ b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopCommandCallback.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.broker.longpolling; + +import java.util.function.BiConsumer; +import java.util.function.Consumer; +import org.apache.rocketmq.broker.metrics.ConsumerLagCalculator; +import org.apache.rocketmq.remoting.CommandCallback; + +public class PopCommandCallback implements CommandCallback { + + private final BiConsumer<ConsumerLagCalculator.ProcessGroupInfo, + Consumer<ConsumerLagCalculator.CalculateLagResult>> biConsumer; + + private final ConsumerLagCalculator.ProcessGroupInfo info; + private final Consumer<ConsumerLagCalculator.CalculateLagResult> lagRecorder; + + + public PopCommandCallback( + BiConsumer<ConsumerLagCalculator.ProcessGroupInfo, + Consumer<ConsumerLagCalculator.CalculateLagResult>> biConsumer, + ConsumerLagCalculator.ProcessGroupInfo info, + Consumer<ConsumerLagCalculator.CalculateLagResult> lagRecorder) { + + this.biConsumer = biConsumer; + this.info = info; + this.lagRecorder = lagRecorder; + } + + @Override + public void accept() { + biConsumer.accept(info, lagRecorder); + } +} diff --git a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java index b5179114f3..91185fbe94 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java @@ -19,6 +19,7 @@ package org.apache.rocketmq.broker.longpolling; import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap; import io.netty.channel.ChannelHandlerContext; +import java.util.ArrayList; import java.util.Iterator; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -31,6 +32,7 @@ import org.apache.rocketmq.common.ServiceThread; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; +import org.apache.rocketmq.remoting.CommandCallback; import org.apache.rocketmq.remoting.netty.NettyRemotingAbstract; import org.apache.rocketmq.remoting.netty.NettyRequestProcessor; import org.apache.rocketmq.remoting.netty.RequestTask; @@ -45,6 +47,7 @@ import static org.apache.rocketmq.broker.longpolling.PollingResult.POLLING_SUC; import static org.apache.rocketmq.broker.longpolling.PollingResult.POLLING_TIMEOUT; public class PopLongPollingService extends ServiceThread { + private static final Logger POP_LOGGER = LoggerFactory.getLogger(LoggerName.ROCKETMQ_POP_LOGGER_NAME); private final BrokerController brokerController; @@ -150,10 +153,10 @@ public class PopLongPollingService extends ServiceThread { } public void notifyMessageArrivingWithRetryTopic(final String topic, final int queueId) { - this.notifyMessageArrivingWithRetryTopic(topic, queueId, null, 0L, null, null); + this.notifyMessageArrivingWithRetryTopic(topic, queueId, -1L, null, 0L, null, null); } - public void notifyMessageArrivingWithRetryTopic(final String topic, final int queueId, + public void notifyMessageArrivingWithRetryTopic(final String topic, final int queueId, long offset, Long tagsCode, long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) { String notifyTopic; if (KeyBuilder.isPopRetryTopicV2(topic)) { @@ -161,25 +164,37 @@ public class PopLongPollingService extends ServiceThread { } else { notifyTopic = topic; } - notifyMessageArriving(notifyTopic, queueId, tagsCode, msgStoreTime, filterBitMap, properties); + notifyMessageArriving(notifyTopic, queueId, offset, tagsCode, msgStoreTime, filterBitMap, properties); } - public void notifyMessageArriving(final String topic, final int queueId, + public void notifyMessageArriving(final String topic, final int queueId, long offset, Long tagsCode, long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) { ConcurrentHashMap<String, Byte> cids = topicCidMap.get(topic); if (cids == null) { return; } + long interval = brokerController.getBrokerConfig().getPopLongPollingForceNotifyInterval(); + boolean force = interval > 0L && offset % interval == 0L; for (Map.Entry<String, Byte> cid : cids.entrySet()) { if (queueId >= 0) { - notifyMessageArriving(topic, -1, cid.getKey(), tagsCode, msgStoreTime, filterBitMap, properties); + notifyMessageArriving(topic, -1, cid.getKey(), force, tagsCode, msgStoreTime, filterBitMap, properties); } - notifyMessageArriving(topic, queueId, cid.getKey(), tagsCode, msgStoreTime, filterBitMap, properties); + notifyMessageArriving(topic, queueId, cid.getKey(), force, tagsCode, msgStoreTime, filterBitMap, properties); } } public boolean notifyMessageArriving(final String topic, final int queueId, final String cid, Long tagsCode, long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) { + return notifyMessageArriving(topic, queueId, cid, false, tagsCode, msgStoreTime, filterBitMap, properties, null); + } + + public boolean notifyMessageArriving(final String topic, final int queueId, final String cid, boolean force, + Long tagsCode, long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) { + return notifyMessageArriving(topic, queueId, cid, force, tagsCode, msgStoreTime, filterBitMap, properties, null); + } + + public boolean notifyMessageArriving(final String topic, final int queueId, final String cid, boolean force, + Long tagsCode, long msgStoreTime, byte[] filterBitMap, Map<String, String> properties, CommandCallback callback) { ConcurrentSkipListSet<PopRequest> remotingCommands = pollingMap.get(KeyBuilder.buildPollingKey(topic, cid, queueId)); if (remotingCommands == null || remotingCommands.isEmpty()) { return false; @@ -190,7 +205,7 @@ public class PopLongPollingService extends ServiceThread { return false; } - if (popRequest.getMessageFilter() != null && popRequest.getSubscriptionData() != null) { + if (!force && popRequest.getMessageFilter() != null && popRequest.getSubscriptionData() != null) { boolean match = popRequest.getMessageFilter().isMatchedByConsumeQueue(tagsCode, new ConsumeQueueExt.CqExtUnit(tagsCode, msgStoreTime, filterBitMap)); if (match && properties != null) { @@ -206,16 +221,30 @@ public class PopLongPollingService extends ServiceThread { if (brokerController.getBrokerConfig().isEnablePopLog()) { POP_LOGGER.info("lock release, new msg arrive, wakeUp: {}", popRequest); } - return wakeUp(popRequest); + + return wakeUp(popRequest, callback); } public boolean wakeUp(final PopRequest request) { + return wakeUp(request, null); + } + + public boolean wakeUp(final PopRequest request, CommandCallback callback) { if (request == null || !request.complete()) { return false; } + + if (callback != null && request.getRemotingCommand() != null) { + if (request.getRemotingCommand().getCallbackList() == null) { + request.getRemotingCommand().setCallbackList(new ArrayList<>()); + } + request.getRemotingCommand().getCallbackList().add(callback); + } + if (!request.getCtx().channel().isActive()) { return false; } + Runnable run = () -> { try { final RemotingCommand response = processor.processRequest(request.getCtx(), request.getRemotingCommand()); @@ -234,7 +263,9 @@ public class PopLongPollingService extends ServiceThread { POP_LOGGER.error("ExecuteRequestWhenWakeup run", e1); } }; - this.brokerController.getPullMessageExecutor().submit(new RequestTask(run, request.getChannel(), request.getRemotingCommand())); + + this.brokerController.getPullMessageExecutor().submit( + new RequestTask(run, request.getChannel(), request.getRemotingCommand())); return true; } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/metrics/ConsumerLagCalculator.java b/broker/src/main/java/org/apache/rocketmq/broker/metrics/ConsumerLagCalculator.java index 3ac6528b2a..1b898f95de 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/metrics/ConsumerLagCalculator.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/metrics/ConsumerLagCalculator.java @@ -26,6 +26,8 @@ import org.apache.rocketmq.broker.client.ConsumerManager; import org.apache.rocketmq.broker.filter.ConsumerFilterData; import org.apache.rocketmq.broker.filter.ConsumerFilterManager; import org.apache.rocketmq.broker.filter.ExpressionMessageFilter; +import org.apache.rocketmq.broker.longpolling.PopCommandCallback; +import org.apache.rocketmq.broker.longpolling.PopLongPollingService; import org.apache.rocketmq.broker.offset.ConsumerOffsetManager; import org.apache.rocketmq.broker.processor.PopBufferMergeService; import org.apache.rocketmq.broker.processor.PopInflightMessageCounter; @@ -51,6 +53,7 @@ import org.apache.rocketmq.store.MessageStore; import org.apache.rocketmq.store.exception.ConsumeQueueException; public class ConsumerLagCalculator { + private final BrokerConfig brokerConfig; private final TopicConfigManager topicConfigManager; private final ConsumerManager consumerManager; @@ -59,6 +62,7 @@ public class ConsumerLagCalculator { private final SubscriptionGroupManager subscriptionGroupManager; private final MessageStore messageStore; private final PopBufferMergeService popBufferMergeService; + private final PopLongPollingService popLongPollingService; private final PopInflightMessageCounter popInflightMessageCounter; private static final Logger LOGGER = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); @@ -72,10 +76,11 @@ public class ConsumerLagCalculator { this.subscriptionGroupManager = brokerController.getSubscriptionGroupManager(); this.messageStore = brokerController.getMessageStore(); this.popBufferMergeService = brokerController.getPopMessageProcessor().getPopBufferMergeService(); + this.popLongPollingService = brokerController.getPopMessageProcessor().getPopLongPollingService(); this.popInflightMessageCounter = brokerController.getPopInflightMessageCounter(); } - private static class ProcessGroupInfo { + public static class ProcessGroupInfo { public String group; public String topic; public boolean isPop; @@ -211,34 +216,44 @@ public class ConsumerLagCalculator { return; } - CalculateLagResult result = new CalculateLagResult(info.group, info.topic, false); + if (info.isPop && brokerConfig.isEnableNotifyBeforePopCalculateLag()) { + if (popLongPollingService.notifyMessageArriving(info.topic, -1, info.group, + true, null, 0, null, null, new PopCommandCallback(this::calculate, info, lagRecorder))) { + return; + } + } + + calculate(info, lagRecorder); + }); + } + public void calculate(ProcessGroupInfo info, Consumer<CalculateLagResult> lagRecorder) { + CalculateLagResult result = new CalculateLagResult(info.group, info.topic, false); + try { + Pair<Long, Long> lag = getConsumerLagStats(info.group, info.topic, info.isPop); + if (lag != null) { + result.lag = lag.getObject1(); + result.earliestUnconsumedTimestamp = lag.getObject2(); + } + lagRecorder.accept(result); + } catch (ConsumeQueueException e) { + LOGGER.error("Failed to get lag stats", e); + } + + if (info.isPop) { try { - Pair<Long, Long> lag = getConsumerLagStats(info.group, info.topic, info.isPop); - if (lag != null) { - result.lag = lag.getObject1(); - result.earliestUnconsumedTimestamp = lag.getObject2(); + Pair<Long, Long> retryLag = getConsumerLagStats(info.group, info.retryTopic, true); + + result = new CalculateLagResult(info.group, info.topic, true); + if (retryLag != null) { + result.lag = retryLag.getObject1(); + result.earliestUnconsumedTimestamp = retryLag.getObject2(); } lagRecorder.accept(result); } catch (ConsumeQueueException e) { LOGGER.error("Failed to get lag stats", e); } - - if (info.isPop) { - try { - Pair<Long, Long> retryLag = getConsumerLagStats(info.group, info.retryTopic, true); - - result = new CalculateLagResult(info.group, info.topic, true); - if (retryLag != null) { - result.lag = retryLag.getObject1(); - result.earliestUnconsumedTimestamp = retryLag.getObject2(); - } - lagRecorder.accept(result); - } catch (ConsumeQueueException e) { - LOGGER.error("Failed to get lag stats", e); - } - } - }); + } } public void calculateInflight(Consumer<CalculateInflightResult> inflightRecorder) { @@ -320,6 +335,9 @@ public class ConsumerLagCalculator { earliestUnconsumedTimestamp = 0L; } + LOGGER.debug("GetConsumerLagStats, topic={}, group={}, lag={}, latency={}", topic, group, total, + earliestUnconsumedTimestamp > 0 ? System.currentTimeMillis() - earliestUnconsumedTimestamp : 0); + return new Pair<>(total, earliestUnconsumedTimestamp); } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java index 75c77b6d79..b4ebd9c4a9 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java @@ -62,10 +62,10 @@ public class NotificationProcessor implements NettyRequestProcessor { // When a new message is written to CommitLog, this method would be called. // Suspended long polling will receive notification and be wakeup. - public void notifyMessageArriving(final String topic, final int queueId, + public void notifyMessageArriving(final String topic, final int queueId, long offset, Long tagsCode, long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) { this.popLongPollingService.notifyMessageArrivingWithRetryTopic( - topic, queueId, tagsCode, msgStoreTime, filterBitMap, properties); + topic, queueId, offset, tagsCode, msgStoreTime, filterBitMap, properties); } public void notifyMessageArriving(final String topic, final int queueId) { diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java index fe8ccb03dc..e0454afa3c 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java @@ -65,6 +65,7 @@ import org.apache.rocketmq.common.message.MessageExtBrokerInner; import org.apache.rocketmq.common.topic.TopicValidator; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; +import org.apache.rocketmq.remoting.CommandCallback; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.metrics.RemotingMetricsManager; @@ -97,8 +98,10 @@ import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.LABEL import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.LABEL_RESULT; public class PopMessageProcessor implements NettyRequestProcessor { + private static final Logger POP_LOGGER = LoggerFactory.getLogger(LoggerName.ROCKETMQ_POP_LOGGER_NAME); + private final BrokerController brokerController; private final Random random = new Random(System.currentTimeMillis()); String reviveTopic; @@ -196,15 +199,15 @@ public class PopMessageProcessor implements NettyRequestProcessor { } } - public void notifyMessageArriving(final String topic, final int queueId, + public void notifyMessageArriving(final String topic, final int queueId, long offset, Long tagsCode, long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) { popLongPollingService.notifyMessageArrivingWithRetryTopic( - topic, queueId, tagsCode, msgStoreTime, filterBitMap, properties); + topic, queueId, offset, tagsCode, msgStoreTime, filterBitMap, properties); } public void notifyMessageArriving(final String topic, final int queueId, final String cid) { popLongPollingService.notifyMessageArriving( - topic, queueId, cid, null, 0L, null, null); + topic, queueId, cid, false, null, 0L, null, null); } @Override @@ -419,6 +422,15 @@ public class PopMessageProcessor implements NettyRequestProcessor { final RemotingCommand finalResponse = response; SubscriptionData finalSubscriptionData = subscriptionData; getMessageFuture.thenApply(restNum -> { + try { + if (request.getCallbackList() != null) { + request.getCallbackList().forEach(CommandCallback::accept); + request.getCallbackList().clear(); + } + } catch (Throwable t) { + POP_LOGGER.error("PopProcessor execute callback error", t); + } + if (!getMessageResult.getMessageBufferList().isEmpty()) { finalResponse.setCode(ResponseCode.SUCCESS); getMessageResult.setStatus(GetMessageStatus.FOUND); diff --git a/broker/src/test/java/org/apache/rocketmq/broker/longpolling/PopLongPollingServiceTest.java b/broker/src/test/java/org/apache/rocketmq/broker/longpolling/PopLongPollingServiceTest.java index 6527beeb68..1f064ec05d 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/longpolling/PopLongPollingServiceTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/longpolling/PopLongPollingServiceTest.java @@ -80,21 +80,22 @@ public class PopLongPollingServiceTest { @Test public void testNotifyMessageArrivingWithRetryTopic() { int queueId = 0; - doNothing().when(popLongPollingService).notifyMessageArrivingWithRetryTopic(defaultTopic, queueId, null, 0L, null, null); + doNothing().when(popLongPollingService).notifyMessageArrivingWithRetryTopic(defaultTopic, queueId, -1L, null, 0L, null, null); popLongPollingService.notifyMessageArrivingWithRetryTopic(defaultTopic, queueId); - verify(popLongPollingService, times(1)).notifyMessageArrivingWithRetryTopic(defaultTopic, queueId, null, 0L, null, null); + verify(popLongPollingService, times(1)).notifyMessageArrivingWithRetryTopic(defaultTopic, queueId, -1L, null, 0L, null, null); } @Test public void testNotifyMessageArriving() { int queueId = 0; Long tagsCode = 123L; + long offset = 123L; long msgStoreTime = System.currentTimeMillis(); byte[] filterBitMap = new byte[]{0x01}; Map<String, String> properties = new ConcurrentHashMap<>(); - doNothing().when(popLongPollingService).notifyMessageArriving(defaultTopic, queueId, tagsCode, msgStoreTime, filterBitMap, properties); - popLongPollingService.notifyMessageArrivingWithRetryTopic(defaultTopic, queueId, tagsCode, msgStoreTime, filterBitMap, properties); - verify(popLongPollingService).notifyMessageArriving(defaultTopic, queueId, tagsCode, msgStoreTime, filterBitMap, properties); + doNothing().when(popLongPollingService).notifyMessageArriving(defaultTopic, queueId, offset, tagsCode, msgStoreTime, filterBitMap, properties); + popLongPollingService.notifyMessageArrivingWithRetryTopic(defaultTopic, queueId, offset, tagsCode, msgStoreTime, filterBitMap, properties); + verify(popLongPollingService).notifyMessageArriving(defaultTopic, queueId, offset, tagsCode, msgStoreTime, filterBitMap, properties); } @Test diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java index c651047661..f459abf0db 100644 --- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java +++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java @@ -227,6 +227,9 @@ public class BrokerConfig extends BrokerIdentity { private int popCkMaxBufferSize = 200000; private int popCkOffsetMaxQueueSize = 20000; private boolean enablePopBatchAck = false; + // set the interval to the maxFilterMessageSize in MessageStoreConfig divided by the cq unit size + private long popLongPollingForceNotifyInterval = 800; + private boolean enableNotifyBeforePopCalculateLag = true; private boolean enableNotifyAfterPopOrderLockRelease = true; private boolean initPopOffsetByCheckMsgInMem = true; // read message from pop retry topic v1, for the compatibility, will be removed in the future version @@ -1326,6 +1329,22 @@ public class BrokerConfig extends BrokerIdentity { this.enableNetWorkFlowControl = enableNetWorkFlowControl; } + public long getPopLongPollingForceNotifyInterval() { + return popLongPollingForceNotifyInterval; + } + + public void setPopLongPollingForceNotifyInterval(long popLongPollingForceNotifyInterval) { + this.popLongPollingForceNotifyInterval = popLongPollingForceNotifyInterval; + } + + public boolean isEnableNotifyBeforePopCalculateLag() { + return enableNotifyBeforePopCalculateLag; + } + + public void setEnableNotifyBeforePopCalculateLag(boolean enableNotifyBeforePopCalculateLag) { + this.enableNotifyBeforePopCalculateLag = enableNotifyBeforePopCalculateLag; + } + public boolean isEnableNotifyAfterPopOrderLockRelease() { return enableNotifyAfterPopOrderLockRelease; } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/CommandCallback.java b/remoting/src/main/java/org/apache/rocketmq/remoting/CommandCallback.java new file mode 100644 index 0000000000..884f3d9e5d --- /dev/null +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/CommandCallback.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.remoting; + +public interface CommandCallback { + + void accept(); +} diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java index 5de48350cf..9b2b0f07b4 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java @@ -29,6 +29,7 @@ import java.nio.ByteBuffer; import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; @@ -38,6 +39,7 @@ import org.apache.rocketmq.common.BoundaryType; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; +import org.apache.rocketmq.remoting.CommandCallback; import org.apache.rocketmq.remoting.CommandCustomHeader; import org.apache.rocketmq.remoting.annotation.CFNotNull; import org.apache.rocketmq.remoting.exception.RemotingCommandException; @@ -96,6 +98,7 @@ public class RemotingCommand { private transient byte[] body; private boolean suspended; private transient Stopwatch processTimer; + private transient List<CommandCallback> callbackList; protected RemotingCommand() { } @@ -639,4 +642,12 @@ public class RemotingCommand { public void setProcessTimer(Stopwatch processTimer) { this.processTimer = processTimer; } + + public List<CommandCallback> getCallbackList() { + return callbackList; + } + + public void setCallbackList(List<CommandCallback> callbackList) { + this.callbackList = callbackList; + } }