This is an automated email from the ASF dual-hosted git repository. zhouxzhan pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit 0fea748f8ba5595049867651479325eac85d7af1 Author: zhouxiang <[email protected]> AuthorDate: Tue Jan 10 16:07:28 2023 +0800 [ISSUE #5847] Fix wake up in NotificationProcessor --- .../broker/longpolling/NotificationRequest.java | 2 +- .../broker/processor/NotificationProcessor.java | 62 ++++++++++++++-------- 2 files changed, 42 insertions(+), 22 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/NotificationRequest.java b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/NotificationRequest.java index fdae88128..2ff9a73a4 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/NotificationRequest.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/NotificationRequest.java @@ -43,7 +43,7 @@ public class NotificationRequest { } public boolean isTimeout() { - return System.currentTimeMillis() > (expired - 3000); + return System.currentTimeMillis() > (expired - 500); } public boolean complete() { diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java index 0b580df0f..b5e611a98 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java @@ -63,7 +63,7 @@ public class NotificationProcessor implements NettyRequestProcessor { break; } try { - Thread.sleep(2000L); + Thread.sleep(200L); Collection<ArrayBlockingQueue<NotificationRequest>> pops = pollingMap.values(); for (ArrayBlockingQueue<NotificationRequest> popQ : pops) { NotificationRequest tmPopRequest = popQ.peek(); @@ -73,15 +73,9 @@ public class NotificationProcessor implements NettyRequestProcessor { if (tmPopRequest == null) { break; } - if (!tmPopRequest.isTimeout()) { - POP_LOGGER.info("not timeout , but wakeUp Notification in advance: {}", tmPopRequest); - wakeUp(tmPopRequest, false); - break; - } else { - POP_LOGGER.info("timeout , wakeUp Notification : {}", tmPopRequest); - wakeUp(tmPopRequest, false); - tmPopRequest = popQ.peek(); - } + POP_LOGGER.info("timeout , wakeUp Notification : {}", tmPopRequest); + wakeUp(tmPopRequest, false); + tmPopRequest = popQ.peek(); } else { break; } @@ -146,17 +140,26 @@ public class NotificationProcessor implements NettyRequestProcessor { return; } Runnable run = () -> { - final RemotingCommand response = NotificationProcessor.this.responseNotification(request.getChannel(), hasMsg); - if (response != null) { - response.setOpaque(request.getRemotingCommand().getOpaque()); - response.markResponseType(); - NettyRemotingAbstract.writeResponse(request.getChannel(), request.getRemotingCommand(), response, future -> { - if (!future.isSuccess()) { - POP_LOGGER.error("ProcessRequestWrapper response to {} failed", request.getChannel().remoteAddress(), future.cause()); - POP_LOGGER.error(request.toString()); - POP_LOGGER.error(response.toString()); - } - }); + try { + final RemotingCommand response; + if (hasMsg) { + response = NotificationProcessor.this.responseNotification(request.getChannel(), true); + } else { + response = NotificationProcessor.this.processRequest(request.getChannel(), request.getRemotingCommand()); + } + if (response != null) { + response.setOpaque(request.getRemotingCommand().getOpaque()); + response.markResponseType(); + NettyRemotingAbstract.writeResponse(request.getChannel(), request.getRemotingCommand(), response, future -> { + if (!future.isSuccess()) { + POP_LOGGER.error("ProcessRequestWrapper response to {} failed", request.getChannel().remoteAddress(), future.cause()); + POP_LOGGER.error(request.toString()); + POP_LOGGER.error(response.toString()); + } + }); + } + } catch (RemotingCommandException e) { + POP_LOGGER.error("ExecuteRequestWhenWakeup run", e); } }; this.brokerController.getPullMessageExecutor().submit(new RequestTask(run, request.getChannel(), request.getRemotingCommand())); @@ -228,6 +231,9 @@ public class NotificationProcessor implements NettyRequestProcessor { for (int i = 0; i < retryTopicConfig.getReadQueueNums(); i++) { int queueId = (randomQ + i) % retryTopicConfig.getReadQueueNums(); hasMsg = hasMsgFromQueue(true, requestHeader, queueId); + if (hasMsg) { + break; + } } } } @@ -244,6 +250,20 @@ public class NotificationProcessor implements NettyRequestProcessor { int queueId = requestHeader.getQueueId(); hasMsg = hasMsgFromQueue(false, requestHeader, queueId); } + // if it has message, fetch retry again + if (!needRetry && !hasMsg) { + TopicConfig retryTopicConfig = + this.brokerController.getTopicConfigManager().selectTopicConfig(KeyBuilder.buildPopRetryTopic(requestHeader.getTopic(), requestHeader.getConsumerGroup())); + if (retryTopicConfig != null) { + for (int i = 0; i < retryTopicConfig.getReadQueueNums(); i++) { + int queueId = (randomQ + i) % retryTopicConfig.getReadQueueNums(); + hasMsg = hasMsgFromQueue(true, requestHeader, queueId); + if (hasMsg) { + break; + } + } + } + } if (!hasMsg) { if (polling(channel, request, requestHeader)) {
