This is an automated email from the ASF dual-hosted git repository. jinrongtong pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push: new aab646ce83 [ISSUE #8460] Improve the pop revive process when reading biz messages from a remote broker (#8475) aab646ce83 is described below commit aab646ce83930454d8b5956779aa28a80e326e24 Author: imzs <i...@foxmail.com> AuthorDate: Fri Aug 2 14:43:21 2024 +0800 [ISSUE #8460] Improve the pop revive process when reading biz messages from a remote broker (#8475) * [ISSUE #8460] part1: add extra information to the call chain of remote message reading * [ISSUE #8460] part2: add exponential backoff and ending condition of CK rewrite, and fix checkstyle * [ISSUE #8460] exclude test, BrokerOuterAPITest passed locally, but failed on bazel. --- broker/BUILD.bazel | 1 + .../rocketmq/broker/failover/EscapeBridge.java | 44 +++--- .../apache/rocketmq/broker/out/BrokerOuterAPI.java | 13 +- .../broker/processor/PopReviveService.java | 41 +++-- .../apache/rocketmq/broker/BrokerOuterAPITest.java | 173 ++++++++++++++++++++- .../rocketmq/broker/failover/EscapeBridgeTest.java | 150 +++++++++++++++++- .../broker/processor/PopReviveServiceTest.java | 166 +++++++++++++++++++- .../apache/rocketmq/store/pop/PopCheckPoint.java | 23 ++- 8 files changed, 554 insertions(+), 57 deletions(-) diff --git a/broker/BUILD.bazel b/broker/BUILD.bazel index 0dbc85f945..66e621e930 100644 --- a/broker/BUILD.bazel +++ b/broker/BUILD.bazel @@ -100,6 +100,7 @@ GenTestRules( exclude_tests = [ # These tests are extremely slow and flaky, exclude them before they are properly fixed. "src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerRegisterTest", + "src/test/java/org/apache/rocketmq/broker/BrokerOuterAPITest", ], deps = [ ":tests", diff --git a/broker/src/main/java/org/apache/rocketmq/broker/failover/EscapeBridge.java b/broker/src/main/java/org/apache/rocketmq/broker/failover/EscapeBridge.java index ededaf2c65..7df49f8c47 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/failover/EscapeBridge.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/failover/EscapeBridge.java @@ -25,7 +25,9 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.tuple.Triple; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageUtil; import org.apache.rocketmq.client.consumer.PullStatus; @@ -34,7 +36,6 @@ import org.apache.rocketmq.client.impl.producer.TopicPublishInfo; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.SendStatus; import org.apache.rocketmq.common.MixAll; -import org.apache.rocketmq.common.Pair; import org.apache.rocketmq.common.ThreadFactoryImpl; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.message.MessageConst; @@ -47,7 +48,6 @@ import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; import org.apache.rocketmq.remoting.exception.RemotingException; import org.apache.rocketmq.store.GetMessageResult; -import org.apache.rocketmq.store.GetMessageStatus; import org.apache.rocketmq.store.MessageStore; import org.apache.rocketmq.store.PutMessageResult; import org.apache.rocketmq.store.PutMessageStatus; @@ -263,34 +263,29 @@ public class EscapeBridge { } } - public Pair<GetMessageStatus, MessageExt> getMessage(String topic, long offset, int queueId, String brokerName, boolean deCompressBody) { + public Triple<MessageExt, String, Boolean> getMessage(String topic, long offset, int queueId, String brokerName, boolean deCompressBody) { return getMessageAsync(topic, offset, queueId, brokerName, deCompressBody).join(); } - public CompletableFuture<Pair<GetMessageStatus, MessageExt>> getMessageAsync(String topic, long offset, int queueId, String brokerName, boolean deCompressBody) { + // Triple<MessageExt, info, needRetry>, check info and retry if and only if MessageExt is null + public CompletableFuture<Triple<MessageExt, String, Boolean>> getMessageAsync(String topic, long offset, int queueId, String brokerName, boolean deCompressBody) { MessageStore messageStore = brokerController.getMessageStoreByBrokerName(brokerName); if (messageStore != null) { return messageStore.getMessageAsync(innerConsumerGroupName, topic, queueId, offset, 1, null) .thenApply(result -> { if (result == null) { LOG.warn("getMessageResult is null , innerConsumerGroupName {}, topic {}, offset {}, queueId {}", innerConsumerGroupName, topic, offset, queueId); - return new Pair<>(GetMessageStatus.MESSAGE_WAS_REMOVING, null); + return Triple.of(null, "getMessageResult is null", false); // local store, so no retry } List<MessageExt> list = decodeMsgList(result, deCompressBody); if (list == null || list.isEmpty()) { LOG.warn("Can not get msg , topic {}, offset {}, queueId {}, result is {}", topic, offset, queueId, result); - return new Pair<>(result.getStatus(), null); + return Triple.of(null, "Can not get msg", false); // local store, so no retry } - return new Pair<>(result.getStatus(), list.get(0)); + return Triple.of(list.get(0), "", false); }); } else { - return getMessageFromRemoteAsync(topic, offset, queueId, brokerName) - .thenApply(msg -> { - if (msg == null) { - return new Pair<>(GetMessageStatus.MESSAGE_WAS_REMOVING, null); - } - return new Pair<>(GetMessageStatus.FOUND, msg); - }); + return getMessageFromRemoteAsync(topic, offset, queueId, brokerName); } } @@ -322,11 +317,12 @@ public class EscapeBridge { return foundList; } - protected MessageExt getMessageFromRemote(String topic, long offset, int queueId, String brokerName) { + protected Triple<MessageExt, String, Boolean> getMessageFromRemote(String topic, long offset, int queueId, String brokerName) { return getMessageFromRemoteAsync(topic, offset, queueId, brokerName).join(); } - protected CompletableFuture<MessageExt> getMessageFromRemoteAsync(String topic, long offset, int queueId, String brokerName) { + // Triple<MessageExt, info, needRetry>, check info and retry if and only if MessageExt is null + protected CompletableFuture<Triple<MessageExt, String, Boolean>> getMessageFromRemoteAsync(String topic, long offset, int queueId, String brokerName) { try { String brokerAddr = this.brokerController.getTopicRouteInfoManager().findBrokerAddressInSubscribe(brokerName, MixAll.MASTER_ID, false); if (null == brokerAddr) { @@ -334,23 +330,25 @@ public class EscapeBridge { brokerAddr = this.brokerController.getTopicRouteInfoManager().findBrokerAddressInSubscribe(brokerName, MixAll.MASTER_ID, false); if (null == brokerAddr) { - LOG.warn("can't find broker address for topic {}", topic); - return CompletableFuture.completedFuture(null); + LOG.warn("can't find broker address for topic {}, {}", topic, brokerName); + return CompletableFuture.completedFuture(Triple.of(null, "brokerAddress not found", true)); // maybe offline temporarily, so need retry } } return this.brokerController.getBrokerOuterAPI().pullMessageFromSpecificBrokerAsync(brokerName, brokerAddr, this.innerConsumerGroupName, topic, queueId, offset, 1, DEFAULT_PULL_TIMEOUT_MILLIS) .thenApply(pullResult -> { - if (pullResult.getPullStatus().equals(PullStatus.FOUND) && !pullResult.getMsgFoundList().isEmpty()) { - return pullResult.getMsgFoundList().get(0); + if (pullResult.getLeft() != null + && PullStatus.FOUND.equals(pullResult.getLeft().getPullStatus()) + && CollectionUtils.isNotEmpty(pullResult.getLeft().getMsgFoundList())) { + return Triple.of(pullResult.getLeft().getMsgFoundList().get(0), "", false); } - return null; + return Triple.of(null, pullResult.getMiddle(), pullResult.getRight()); }); } catch (Exception e) { - LOG.error("Get message from remote failed.", e); + LOG.error("Get message from remote failed. {}, {}, {}, {}", topic, offset, queueId, brokerName, e); } - return CompletableFuture.completedFuture(null); + return CompletableFuture.completedFuture(Triple.of(null, "Get message from remote failed", true)); // need retry } } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java index d5c80ce2ec..83edd88408 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java @@ -31,6 +31,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.tuple.Triple; import org.apache.rocketmq.acl.common.AclClientRPCHook; import org.apache.rocketmq.acl.common.SessionCredentials; import org.apache.rocketmq.auth.config.AuthConfig; @@ -1378,7 +1379,8 @@ public class BrokerOuterAPI { }); } - public CompletableFuture<PullResult> pullMessageFromSpecificBrokerAsync(String brokerName, String brokerAddr, + // Triple<PullResult, info, needRetry>, should check info and retry if and only if PullResult is null + public CompletableFuture<Triple<PullResult, String, Boolean>> pullMessageFromSpecificBrokerAsync(String brokerName, String brokerAddr, String consumerGroup, String topic, int queueId, long offset, int maxNums, long timeoutMillis) throws RemotingException, InterruptedException { PullMessageRequestHeader requestHeader = new PullMessageRequestHeader(); @@ -1397,7 +1399,7 @@ public class BrokerOuterAPI { requestHeader.setBrokerName(brokerName); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader); - CompletableFuture<PullResult> pullResultFuture = new CompletableFuture<>(); + CompletableFuture<Triple<PullResult, String, Boolean>> pullResultFuture = new CompletableFuture<>(); this.remotingClient.invokeAsync(brokerAddr, request, timeoutMillis, new InvokeCallback() { @Override public void operationComplete(ResponseFuture responseFuture) { @@ -1409,15 +1411,16 @@ public class BrokerOuterAPI { try { PullResultExt pullResultExt = processPullResponse(response, brokerAddr); processPullResult(pullResultExt, brokerName, queueId); - pullResultFuture.complete(pullResultExt); + pullResultFuture.complete(Triple.of(pullResultExt, pullResultExt.getPullStatus().name(), false)); // found or not found really, so no retry } catch (Exception e) { - pullResultFuture.complete(new PullResult(PullStatus.NO_MATCHED_MSG, -1, -1, -1, new ArrayList<>())); + // retry when NO_PERMISSION, SUBSCRIPTION_GROUP_NOT_EXIST etc. even when TOPIC_NOT_EXIST + pullResultFuture.complete(Triple.of(null, "Response Code:" + response.getCode(), true)); } } @Override public void operationFail(Throwable throwable) { - pullResultFuture.complete(new PullResult(PullStatus.NO_MATCHED_MSG, -1, -1, -1, new ArrayList<>())); + pullResultFuture.complete(Triple.of(null, throwable.getMessage(), true)); } }); return pullResultFuture; diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java index 8074af23bf..114d094600 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java @@ -27,6 +27,7 @@ import java.util.Map; import java.util.NavigableMap; import java.util.TreeMap; import java.util.concurrent.CompletableFuture; +import org.apache.commons.lang3.tuple.Triple; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.metrics.BrokerMetricsManager; import org.apache.rocketmq.broker.metrics.PopMetricsManager; @@ -34,6 +35,7 @@ import org.apache.rocketmq.client.consumer.PullResult; import org.apache.rocketmq.client.consumer.PullStatus; import org.apache.rocketmq.common.KeyBuilder; import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.Pair; import org.apache.rocketmq.common.PopAckConstants; import org.apache.rocketmq.common.ServiceThread; @@ -51,7 +53,6 @@ import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; import org.apache.rocketmq.store.AppendMessageStatus; import org.apache.rocketmq.store.GetMessageResult; -import org.apache.rocketmq.store.GetMessageStatus; import org.apache.rocketmq.store.PutMessageResult; import org.apache.rocketmq.store.pop.AckMsg; import org.apache.rocketmq.store.pop.BatchAckMsg; @@ -63,6 +64,7 @@ import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_TOP public class PopReviveService extends ServiceThread { private static final Logger POP_LOGGER = LoggerFactory.getLogger(LoggerName.ROCKETMQ_POP_LOGGER_NAME); + private final int[] ckRewriteIntervalsInSeconds = new int[] { 10, 20, 30, 60, 120, 180, 240, 300, 360, 420, 480, 540, 600, 1200, 1800, 3600, 7200 }; private int queueId; private BrokerController brokerController; @@ -196,7 +198,8 @@ public class PopReviveService extends ServiceThread { || pullResult.getPullStatus() == PullStatus.OFFSET_ILLEGAL && offset == pullResult.getMaxOffset(); } - private CompletableFuture<Pair<GetMessageStatus, MessageExt>> getBizMessage(String topic, long offset, int queueId, + // Triple<MessageExt, info, needRetry> + private CompletableFuture<Triple<MessageExt, String, Boolean>> getBizMessage(String topic, long offset, int queueId, String brokerName) { return this.brokerController.getEscapeBridge().getMessageAsync(topic, offset, queueId, brokerName, false); } @@ -491,6 +494,8 @@ public class PopReviveService extends ServiceThread { PopCheckPoint oldCK = inflightReviveRequestMap.firstKey(); rePutCK(oldCK, pair); inflightReviveRequestMap.remove(oldCK); + POP_LOGGER.warn("stay too long, remove from reviveRequestMap, {}, {}, {}, {}", popCheckPoint.getTopic(), + popCheckPoint.getBrokerName(), popCheckPoint.getQueueId(), popCheckPoint.getStartOffset()); } } @@ -524,22 +529,12 @@ public class PopReviveService extends ServiceThread { // retry msg long msgOffset = popCheckPoint.ackOffsetByIndex((byte) j); CompletableFuture<Pair<Long, Boolean>> future = getBizMessage(popCheckPoint.getTopic(), msgOffset, popCheckPoint.getQueueId(), popCheckPoint.getBrokerName()) - .thenApply(resultPair -> { - GetMessageStatus getMessageStatus = resultPair.getObject1(); - MessageExt message = resultPair.getObject2(); + .thenApply(rst -> { + MessageExt message = rst.getLeft(); if (message == null) { - POP_LOGGER.debug("reviveQueueId={}, can not get biz msg topic is {}, offset is {}, then continue", - queueId, popCheckPoint.getTopic(), msgOffset); - switch (getMessageStatus) { - case MESSAGE_WAS_REMOVING: - case OFFSET_TOO_SMALL: - case NO_MATCHED_LOGIC_QUEUE: - case NO_MESSAGE_IN_QUEUE: - return new Pair<>(msgOffset, true); - default: - return new Pair<>(msgOffset, false); - - } + POP_LOGGER.info("reviveQueueId={}, can not get biz msg, topic:{}, qid:{}, offset:{}, brokerName:{}, info:{}, retry:{}, then continue", + queueId, popCheckPoint.getTopic(), popCheckPoint.getQueueId(), msgOffset, popCheckPoint.getBrokerName(), UtilAll.frontStringAtLeast(rst.getMiddle(), 60), rst.getRight()); + return new Pair<>(msgOffset, !rst.getRight()); // Pair.object2 means OK or not, Triple.right value means needRetry } boolean result = reviveRetry(popCheckPoint, message); return new Pair<>(msgOffset, result); @@ -572,6 +567,13 @@ public class PopReviveService extends ServiceThread { } private void rePutCK(PopCheckPoint oldCK, Pair<Long, Boolean> pair) { + int rePutTimes = oldCK.parseRePutTimes(); + if (rePutTimes >= ckRewriteIntervalsInSeconds.length) { + POP_LOGGER.warn("rePut CK reach max times, drop it. {}, {}, {}, {}-{}, {}, {}", oldCK.getTopic(), oldCK.getCId(), + oldCK.getBrokerName(), oldCK.getQueueId(), pair.getObject1(), oldCK.getPopTime(), oldCK.getInvisibleTime()); + return; + } + PopCheckPoint newCk = new PopCheckPoint(); newCk.setBitMap(0); newCk.setNum((byte) 1); @@ -583,6 +585,11 @@ public class PopReviveService extends ServiceThread { newCk.setQueueId(oldCK.getQueueId()); newCk.setBrokerName(oldCK.getBrokerName()); newCk.addDiff(0); + newCk.setRePutTimes(String.valueOf(rePutTimes + 1)); // always increment even if removed from reviveRequestMap + if (oldCK.getReviveTime() <= System.currentTimeMillis()) { + // never expect an ACK matched in the future, we just use it to rewrite CK and try to revive retry message next time + newCk.setInvisibleTime(oldCK.getInvisibleTime() + ckRewriteIntervalsInSeconds[rePutTimes] * 1000); + } MessageExtBrokerInner ckMsg = brokerController.getPopMessageProcessor().buildCkMsg(newCk, queueId); brokerController.getMessageStore().putMessage(ckMsg); } diff --git a/broker/src/test/java/org/apache/rocketmq/broker/BrokerOuterAPITest.java b/broker/src/test/java/org/apache/rocketmq/broker/BrokerOuterAPITest.java index 8f89c14ae9..440ebf813b 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/BrokerOuterAPITest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/BrokerOuterAPITest.java @@ -20,29 +20,43 @@ package org.apache.rocketmq.broker; import com.google.common.base.Predicate; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; +import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import java.lang.reflect.Field; import java.lang.reflect.Method; +import java.net.InetSocketAddress; import java.util.Collections; import java.util.HashMap; import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import io.netty.channel.DefaultChannelPromise; +import io.netty.util.concurrent.DefaultEventExecutor; +import org.apache.commons.lang3.tuple.Triple; import org.apache.rocketmq.auth.config.AuthConfig; import org.apache.rocketmq.broker.out.BrokerOuterAPI; +import org.apache.rocketmq.client.consumer.PullResult; +import org.apache.rocketmq.client.consumer.PullStatus; import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.BrokerIdentity; import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.message.MessageDecoder; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.remoting.common.SemaphoreReleaseOnlyOnce; import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; import org.apache.rocketmq.remoting.netty.NettyClientConfig; import org.apache.rocketmq.remoting.netty.NettyRemotingClient; import org.apache.rocketmq.remoting.netty.NettyServerConfig; +import org.apache.rocketmq.remoting.netty.ResponseFuture; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RemotingSerializable; import org.apache.rocketmq.remoting.protocol.RequestCode; import org.apache.rocketmq.remoting.protocol.ResponseCode; import org.apache.rocketmq.remoting.protocol.body.ClusterInfo; import org.apache.rocketmq.remoting.protocol.body.TopicConfigSerializeWrapper; +import org.apache.rocketmq.remoting.protocol.header.PullMessageResponseHeader; import org.apache.rocketmq.remoting.protocol.header.namesrv.QueryDataVersionResponseHeader; import org.apache.rocketmq.remoting.protocol.header.namesrv.RegisterBrokerResponseHeader; import org.apache.rocketmq.remoting.protocol.namesrv.RegisterBrokerResult; @@ -56,9 +70,12 @@ import org.mockito.ArgumentCaptor; import org.mockito.ArgumentMatchers; import org.mockito.Mock; import org.mockito.Spy; +import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; -import org.mockito.junit.MockitoJUnitRunner; import org.mockito.stubbing.Answer; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -69,9 +86,11 @@ import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.ArgumentMatchers.isNull; import static org.mockito.AdditionalMatchers.or; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.when; -@RunWith(MockitoJUnitRunner.class) +@RunWith(PowerMockRunner.class) +@PrepareForTest(NettyRemotingClient.class) public class BrokerOuterAPITest { @Mock private ChannelHandlerContext handlerContext; @@ -251,4 +270,154 @@ public class BrokerOuterAPITest { }); Assert.assertTrue(result.get()); } + + @Test + public void testPullMessageFromSpecificBrokerAsync_createChannel_null() throws Exception { + NettyRemotingClient mockClient = PowerMockito.spy(new NettyRemotingClient(new NettyClientConfig())); + PowerMockito.when(mockClient, "getAndCreateChannelAsync", any()).thenReturn(null); + BrokerOuterAPI api = new BrokerOuterAPI(new NettyClientConfig(), new AuthConfig()); + Field field = BrokerOuterAPI.class.getDeclaredField("remotingClient"); + field.setAccessible(true); + field.set(api, mockClient); + + Triple<PullResult, String, Boolean> rst = api.pullMessageFromSpecificBrokerAsync("", "", "", "", 1, 1, 1, 3000L).join(); + Assert.assertNull(rst.getLeft()); + Assert.assertTrue(rst.getMiddle().contains("connect")); + Assert.assertTrue(rst.getRight()); // need retry + } + + @Test + public void testPullMessageFromSpecificBrokerAsync_createChannel_future_notSuccess() throws Exception { + NettyRemotingClient mockClient = PowerMockito.spy(new NettyRemotingClient(new NettyClientConfig())); + DefaultChannelPromise promise = PowerMockito.spy(new DefaultChannelPromise(PowerMockito.mock(Channel.class), new DefaultEventExecutor())); + PowerMockito.when(mockClient, "getAndCreateChannelAsync", any()).thenReturn(promise); + BrokerOuterAPI api = new BrokerOuterAPI(new NettyClientConfig(), new AuthConfig()); + Field field = BrokerOuterAPI.class.getDeclaredField("remotingClient"); + field.setAccessible(true); + field.set(api, mockClient); + + promise.tryFailure(new Throwable()); + Triple<PullResult, String, Boolean> rst + = api.pullMessageFromSpecificBrokerAsync("", "", "", "", 1, 1, 1, 3000L).join(); + Assert.assertNull(rst.getLeft()); + Assert.assertTrue(rst.getMiddle().contains("connect")); + Assert.assertTrue(rst.getRight()); // need retry + } + + // skip other future status test + + @Test + public void testPullMessageFromSpecificBrokerAsync_timeout() throws Exception { + Channel channel = Mockito.mock(Channel.class); + when(channel.isActive()).thenReturn(true); + NettyRemotingClient mockClient = PowerMockito.spy(new NettyRemotingClient(new NettyClientConfig())); + DefaultChannelPromise promise = PowerMockito.spy(new DefaultChannelPromise(PowerMockito.mock(Channel.class), new DefaultEventExecutor())); + PowerMockito.when(mockClient, "getAndCreateChannelAsync", any()).thenReturn(promise); + when(promise.channel()).thenReturn(channel); + BrokerOuterAPI api = new BrokerOuterAPI(new NettyClientConfig(), new AuthConfig()); + Field field = BrokerOuterAPI.class.getDeclaredField("remotingClient"); + field.setAccessible(true); + field.set(api, mockClient); + + CompletableFuture<ResponseFuture> future = new CompletableFuture<>(); + doReturn(future).when(mockClient).invokeImpl(any(Channel.class), any(RemotingCommand.class), anyLong()); + promise.trySuccess(null); + future.completeExceptionally(new RemotingTimeoutException("wait response on the channel timeout")); + Triple<PullResult, String, Boolean> rst = api.pullMessageFromSpecificBrokerAsync("", "", "", "", 1, 1, 1, 3000L).join(); + Assert.assertNull(rst.getLeft()); + Assert.assertTrue(rst.getMiddle().contains("timeout")); + Assert.assertTrue(rst.getRight()); // need retry + } + + @Test + public void testPullMessageFromSpecificBrokerAsync_brokerReturn_pullStatusCode() throws Exception { + Channel channel = Mockito.mock(Channel.class); + when(channel.isActive()).thenReturn(true); + NettyRemotingClient mockClient = PowerMockito.spy(new NettyRemotingClient(new NettyClientConfig())); + DefaultChannelPromise promise = PowerMockito.spy(new DefaultChannelPromise(PowerMockito.mock(Channel.class), new DefaultEventExecutor())); + PowerMockito.when(mockClient, "getAndCreateChannelAsync", any()).thenReturn(promise); + when(promise.channel()).thenReturn(channel); + BrokerOuterAPI api = new BrokerOuterAPI(new NettyClientConfig(), new AuthConfig()); + Field field = BrokerOuterAPI.class.getDeclaredField("remotingClient"); + field.setAccessible(true); + field.set(api, mockClient); + + int[] respCodes = new int[] {ResponseCode.SUCCESS, ResponseCode.PULL_NOT_FOUND, ResponseCode.PULL_RETRY_IMMEDIATELY, ResponseCode.PULL_OFFSET_MOVED}; + PullStatus[] respStatus = new PullStatus[] {PullStatus.FOUND, PullStatus.NO_NEW_MSG, PullStatus.NO_MATCHED_MSG, PullStatus.OFFSET_ILLEGAL}; + for (int i = 0; i < respCodes.length; i++) { + CompletableFuture<ResponseFuture> future = new CompletableFuture<>(); + doReturn(future).when(mockClient).invokeImpl(any(Channel.class), any(RemotingCommand.class), anyLong()); + RemotingCommand response = mockPullMessageResponse(respCodes[i]); + ResponseFuture responseFuture = new ResponseFuture(channel, 0, null, 1000, + resp -> { }, new SemaphoreReleaseOnlyOnce(new Semaphore(1))); + responseFuture.setResponseCommand(response); + promise.trySuccess(null); + future.complete(responseFuture); + + Triple<PullResult, String, Boolean> rst = api.pullMessageFromSpecificBrokerAsync("", "", "", "", 1, 1, 1, 3000L).join(); + Assert.assertEquals(respStatus[i], rst.getLeft().getPullStatus()); + if (ResponseCode.SUCCESS == respCodes[i]) { + Assert.assertEquals(1, rst.getLeft().getMsgFoundList().size()); + } else { + Assert.assertNull(rst.getLeft().getMsgFoundList()); + } + Assert.assertEquals(respStatus[i].name(), rst.getMiddle()); + Assert.assertFalse(rst.getRight()); // no retry + } + } + + @Test + public void testPullMessageFromSpecificBrokerAsync_brokerReturn_allOtherResponseCode() throws Exception { + Channel channel = Mockito.mock(Channel.class); + when(channel.isActive()).thenReturn(true); + NettyRemotingClient mockClient = PowerMockito.spy(new NettyRemotingClient(new NettyClientConfig())); + DefaultChannelPromise promise = PowerMockito.spy(new DefaultChannelPromise(PowerMockito.mock(Channel.class), new DefaultEventExecutor())); + PowerMockito.when(mockClient, "getAndCreateChannelAsync", any()).thenReturn(promise); + when(promise.channel()).thenReturn(channel); + BrokerOuterAPI api = new BrokerOuterAPI(new NettyClientConfig(), new AuthConfig()); + Field field = BrokerOuterAPI.class.getDeclaredField("remotingClient"); + field.setAccessible(true); + field.set(api, mockClient); + + CompletableFuture<ResponseFuture> future = new CompletableFuture<>(); + doReturn(future).when(mockClient).invokeImpl(any(Channel.class), any(RemotingCommand.class), anyLong()); + // test one code here, skip others + RemotingCommand response = mockPullMessageResponse(ResponseCode.SUBSCRIPTION_NOT_EXIST); + ResponseFuture responseFuture = new ResponseFuture(channel, 0, null, 1000, + resp -> { }, new SemaphoreReleaseOnlyOnce(new Semaphore(1))); + responseFuture.setResponseCommand(response); + promise.trySuccess(null); + future.complete(responseFuture); + + Triple<PullResult, String, Boolean> rst = api.pullMessageFromSpecificBrokerAsync("", "", "", "", 1, 1, 1, 3000L).join(); + Assert.assertNull(rst.getLeft()); + Assert.assertTrue(rst.getMiddle().contains(ResponseCode.SUBSCRIPTION_NOT_EXIST + "")); + Assert.assertTrue(rst.getRight()); // need retry + } + + private RemotingCommand mockPullMessageResponse(int responseCode) throws Exception { + RemotingCommand response = RemotingCommand.createResponseCommand(PullMessageResponseHeader.class); + response.setCode(responseCode); + if (responseCode == ResponseCode.SUCCESS) { + MessageExt msg = new MessageExt(); + msg.setBody("HW".getBytes()); + msg.setTopic("topic"); + msg.setBornHost(new InetSocketAddress("127.0.0.1", 9000)); + msg.setStoreHost(new InetSocketAddress("127.0.0.1", 9000)); + byte[] encode = MessageDecoder.encode(msg, false); + response.setBody(encode); + } + PullMessageResponseHeader responseHeader = (PullMessageResponseHeader) response.readCustomHeader(); + responseHeader.setNextBeginOffset(0L); + responseHeader.setMaxOffset(0L); + responseHeader.setMinOffset(0L); + responseHeader.setOffsetDelta(0L); + responseHeader.setTopicSysFlag(0); + responseHeader.setGroupSysFlag(0); + responseHeader.setSuggestWhichBrokerId(0L); + responseHeader.setForbiddenType(0); + response.makeCustomHeaderToNet(); + return response; + } + } diff --git a/broker/src/test/java/org/apache/rocketmq/broker/failover/EscapeBridgeTest.java b/broker/src/test/java/org/apache/rocketmq/broker/failover/EscapeBridgeTest.java index d7bd753d77..7ea06665c3 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/failover/EscapeBridgeTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/failover/EscapeBridgeTest.java @@ -17,10 +17,14 @@ package org.apache.rocketmq.broker.failover; +import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; -import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; import java.util.concurrent.CompletableFuture; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.tuple.Triple; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.out.BrokerOuterAPI; import org.apache.rocketmq.broker.topic.TopicRouteInfoManager; @@ -28,7 +32,10 @@ import org.apache.rocketmq.client.consumer.PullResult; import org.apache.rocketmq.client.consumer.PullStatus; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.common.BrokerConfig; +import org.apache.rocketmq.common.message.MessageDecoder; +import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageExtBrokerInner; +import org.apache.rocketmq.remoting.exception.RemotingException; import org.apache.rocketmq.store.DefaultMessageStore; import org.apache.rocketmq.store.GetMessageResult; import org.apache.rocketmq.store.PutMessageResult; @@ -38,6 +45,7 @@ import org.apache.rocketmq.store.logfile.DefaultMappedFile; import org.apache.rocketmq.store.logfile.MappedFile; import org.assertj.core.api.Assertions; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -49,7 +57,6 @@ import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @RunWith(MockitoJUnitRunner.class) @@ -73,6 +80,12 @@ public class EscapeBridgeTest { @Mock private DefaultMQProducer defaultMQProducer; + @Mock + private TopicRouteInfoManager topicRouteInfoManager; + + @Mock + private BrokerOuterAPI brokerOuterAPI; + private static final String BROKER_NAME = "broker_a"; private static final String TEST_TOPIC = "TEST_TOPIC"; @@ -92,14 +105,10 @@ public class EscapeBridgeTest { when(brokerController.getMessageStore()).thenReturn(defaultMessageStore); when(defaultMessageStore.getMessageAsync(anyString(), anyString(), anyInt(), anyLong(), anyInt(), any())).thenReturn(CompletableFuture.completedFuture(getMessageResult)); - TopicRouteInfoManager topicRouteInfoManager = mock(TopicRouteInfoManager.class); when(brokerController.getTopicRouteInfoManager()).thenReturn(topicRouteInfoManager); when(topicRouteInfoManager.findBrokerAddressInSubscribe(anyString(), anyLong(), anyBoolean())).thenReturn(""); - BrokerOuterAPI brokerOuterAPI = mock(BrokerOuterAPI.class); when(brokerController.getBrokerOuterAPI()).thenReturn(brokerOuterAPI); - when(brokerOuterAPI.pullMessageFromSpecificBrokerAsync(anyString(), anyString(), anyString(), anyString(), anyInt(), anyLong(), anyInt(), anyLong())) - .thenReturn(CompletableFuture.completedFuture(new PullResult(PullStatus.FOUND, -1, -1, -1, new ArrayList<>()))); brokerConfig.setEnableSlaveActingMaster(true); brokerConfig.setEnableRemoteEscape(true); @@ -179,6 +188,52 @@ public class EscapeBridgeTest { Assertions.assertThatCode(() -> escapeBridge.getMessageAsync(TEST_TOPIC, 0, DEFAULT_QUEUE_ID, BROKER_NAME, false)).doesNotThrowAnyException(); } + @Test + public void getMessageAsyncTest_localStore_getMessageAsync_null() { + when(brokerController.getMessageStoreByBrokerName(any())).thenReturn(defaultMessageStore); + when(defaultMessageStore.getMessageAsync(anyString(), anyString(), anyInt(), anyLong(), anyInt(), any())) + .thenReturn(CompletableFuture.completedFuture(null)); + Triple<MessageExt, String, Boolean> rst = escapeBridge.getMessageAsync(TEST_TOPIC, 0, DEFAULT_QUEUE_ID, BROKER_NAME, false).join(); + Assert.assertNull(rst.getLeft()); + Assert.assertEquals("getMessageResult is null", rst.getMiddle()); + Assert.assertFalse(rst.getRight()); // no retry + } + + @Test + public void getMessageAsyncTest_localStore_decodeNothing() throws Exception { + when(brokerController.getMessageStoreByBrokerName(any())).thenReturn(defaultMessageStore); + when(defaultMessageStore.getMessageAsync(anyString(), anyString(), anyInt(), anyLong(), anyInt(), any())) + .thenReturn(CompletableFuture.completedFuture(mockGetMessageResult(0, TEST_TOPIC, null))); + Triple<MessageExt, String, Boolean> rst = escapeBridge.getMessageAsync(TEST_TOPIC, 0, DEFAULT_QUEUE_ID, BROKER_NAME, false).join(); + Assert.assertNull(rst.getLeft()); + Assert.assertEquals("Can not get msg", rst.getMiddle()); + Assert.assertFalse(rst.getRight()); // no retry + } + + @Test + public void getMessageAsyncTest_localStore_message_found() throws Exception { + when(brokerController.getMessageStoreByBrokerName(any())).thenReturn(defaultMessageStore); + when(defaultMessageStore.getMessageAsync(anyString(), anyString(), anyInt(), anyLong(), anyInt(), any())) + .thenReturn(CompletableFuture.completedFuture(mockGetMessageResult(2, TEST_TOPIC, "HW".getBytes()))); + Triple<MessageExt, String, Boolean> rst = escapeBridge.getMessageAsync(TEST_TOPIC, 0, DEFAULT_QUEUE_ID, BROKER_NAME, false).join(); + Assert.assertNotNull(rst.getLeft()); + Assert.assertEquals(0, rst.getLeft().getQueueOffset()); + Assert.assertTrue(Arrays.equals("HW".getBytes(), rst.getLeft().getBody())); + Assert.assertFalse(rst.getRight()); + } + + @Test + public void getMessageAsyncTest_remoteStore_addressNotFound() throws Exception { + when(brokerController.getMessageStoreByBrokerName(any())).thenReturn(null); + + // just test address not found, since we have complete tests of getMessageFromRemoteAsync() + when(topicRouteInfoManager.findBrokerAddressInSubscribe(anyString(), anyLong(), anyBoolean())).thenReturn(null); + Triple<MessageExt, String, Boolean> rst = escapeBridge.getMessageAsync(TEST_TOPIC, 0, DEFAULT_QUEUE_ID, BROKER_NAME, false).join(); + Assert.assertNull(rst.getLeft()); + Assert.assertEquals("brokerAddress not found", rst.getMiddle()); + Assert.assertTrue(rst.getRight()); // need retry + } + @Test public void getMessageFromRemoteTest() { Assertions.assertThatCode(() -> escapeBridge.getMessageFromRemote(TEST_TOPIC, 1, DEFAULT_QUEUE_ID, BROKER_NAME)).doesNotThrowAnyException(); @@ -189,6 +244,54 @@ public class EscapeBridgeTest { Assertions.assertThatCode(() -> escapeBridge.getMessageFromRemoteAsync(TEST_TOPIC, 1, DEFAULT_QUEUE_ID, BROKER_NAME)).doesNotThrowAnyException(); } + @Test + public void getMessageFromRemoteAsyncTest_exception_caught() throws Exception { + when(brokerOuterAPI.pullMessageFromSpecificBrokerAsync(anyString(), anyString(), anyString(), anyString(), anyInt(), anyLong(), anyInt(), anyLong())) + .thenThrow(new RemotingException("mock remoting exception")); + Triple<MessageExt, String, Boolean> rst = escapeBridge.getMessageFromRemoteAsync(TEST_TOPIC, 1, DEFAULT_QUEUE_ID, BROKER_NAME).join(); + Assert.assertNull(rst.getLeft()); + Assert.assertEquals("Get message from remote failed", rst.getMiddle()); + Assert.assertTrue(rst.getRight()); // need retry + } + + @Test + public void getMessageFromRemoteAsyncTest_brokerAddressNotFound() throws Exception { + when(topicRouteInfoManager.findBrokerAddressInSubscribe(anyString(), anyLong(), anyBoolean())).thenReturn(null); + Triple<MessageExt, String, Boolean> rst = escapeBridge.getMessageFromRemoteAsync(TEST_TOPIC, 1, DEFAULT_QUEUE_ID, BROKER_NAME).join(); + Assert.assertNull(rst.getLeft()); + Assert.assertEquals("brokerAddress not found", rst.getMiddle()); + Assert.assertTrue(rst.getRight()); // need retry + } + + @Test + public void getMessageFromRemoteAsyncTest_message_found() throws Exception { + PullResult pullResult = new PullResult(PullStatus.FOUND, 1, 1, 1, Arrays.asList(new MessageExt())); + when(brokerOuterAPI.pullMessageFromSpecificBrokerAsync(anyString(), anyString(), anyString(), anyString(), anyInt(), anyLong(), anyInt(), anyLong())) + .thenReturn(CompletableFuture.completedFuture(Triple.of(pullResult, "", false))); // right value is ignored + Triple<MessageExt, String, Boolean> rst = escapeBridge.getMessageFromRemoteAsync(TEST_TOPIC, 1, DEFAULT_QUEUE_ID, BROKER_NAME).join(); + Assert.assertNotNull(rst.getLeft()); + Assert.assertTrue(StringUtils.isEmpty(rst.getMiddle())); + Assert.assertFalse(rst.getRight()); // no retry + } + + @Test + public void getMessageFromRemoteAsyncTest_message_notFound() throws Exception { + PullResult pullResult = new PullResult(PullStatus.NO_MATCHED_MSG, 1, 1, 1, null); + when(brokerOuterAPI.pullMessageFromSpecificBrokerAsync(anyString(), anyString(), anyString(), anyString(), anyInt(), anyLong(), anyInt(), anyLong())) + .thenReturn(CompletableFuture.completedFuture(Triple.of(pullResult, "no msg", false))); + Triple<MessageExt, String, Boolean> rst = escapeBridge.getMessageFromRemoteAsync(TEST_TOPIC, 1, DEFAULT_QUEUE_ID, BROKER_NAME).join(); + Assert.assertNull(rst.getLeft()); + Assert.assertEquals("no msg", rst.getMiddle()); + Assert.assertFalse(rst.getRight()); // no retry + + when(brokerOuterAPI.pullMessageFromSpecificBrokerAsync(anyString(), anyString(), anyString(), anyString(), anyInt(), anyLong(), anyInt(), anyLong())) + .thenReturn(CompletableFuture.completedFuture(Triple.of(null, "other resp code", true))); + rst = escapeBridge.getMessageFromRemoteAsync(TEST_TOPIC, 1, DEFAULT_QUEUE_ID, BROKER_NAME).join(); + Assert.assertNull(rst.getLeft()); + Assert.assertEquals("other resp code", rst.getMiddle()); + Assert.assertTrue(rst.getRight()); // need retry + } + @Test public void decodeMsgListTest() { ByteBuffer byteBuffer = ByteBuffer.allocate(10); @@ -199,4 +302,39 @@ public class EscapeBridgeTest { Assertions.assertThatCode(() -> escapeBridge.decodeMsgList(getMessageResult, false)).doesNotThrowAnyException(); } + @Test + public void decodeMsgListTest_messageNotNull() throws Exception { + MessageExt msg = new MessageExt(); + msg.setBody("HW".getBytes()); + msg.setTopic("topic"); + msg.setBornHost(new InetSocketAddress("127.0.0.1", 9000)); + msg.setStoreHost(new InetSocketAddress("127.0.0.1", 9000)); + ByteBuffer byteBuffer = ByteBuffer.wrap(MessageDecoder.encode(msg, false)); + SelectMappedBufferResult result = new SelectMappedBufferResult(0, byteBuffer, 10, new DefaultMappedFile()); + + + getMessageResult.addMessage(result); + getMessageResult.getMessageQueueOffset().add(0L); + List<MessageExt> list = escapeBridge.decodeMsgList(getMessageResult, false); // skip deCompressBody test + Assert.assertEquals(1, list.size()); + Assert.assertTrue(Arrays.equals(msg.getBody(), list.get(0).getBody())); + } + + private GetMessageResult mockGetMessageResult(int count, String topic, byte[] body) throws Exception { + GetMessageResult result = new GetMessageResult(); + for (int i = 0; i < count; i++) { + MessageExt msg = new MessageExt(); + msg.setBody(body); + msg.setTopic(topic); + msg.setBornHost(new InetSocketAddress("127.0.0.1", 9000)); + msg.setStoreHost(new InetSocketAddress("127.0.0.1", 9000)); + ByteBuffer byteBuffer = ByteBuffer.wrap(MessageDecoder.encode(msg, false)); + SelectMappedBufferResult bufferResult = new SelectMappedBufferResult(0, byteBuffer, body.length, new DefaultMappedFile()); + + result.addMessage(bufferResult); + result.getMessageQueueOffset().add(i + 0L); + } + return result; + } + } diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/PopReviveServiceTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/PopReviveServiceTest.java index 78b76264fe..d7ea97c550 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/processor/PopReviveServiceTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/PopReviveServiceTest.java @@ -20,12 +20,17 @@ import com.alibaba.fastjson.JSON; import java.net.SocketAddress; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.commons.lang3.tuple.Triple; import org.apache.rocketmq.broker.BrokerController; +import org.apache.rocketmq.broker.failover.EscapeBridge; import org.apache.rocketmq.broker.offset.ConsumerOffsetManager; import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager; import org.apache.rocketmq.broker.topic.TopicConfigManager; import org.apache.rocketmq.common.BrokerConfig; +import org.apache.rocketmq.common.KeyBuilder; import org.apache.rocketmq.common.PopAckConstants; import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.message.MessageConst; @@ -36,9 +41,14 @@ import org.apache.rocketmq.common.utils.DataConverter; import org.apache.rocketmq.common.utils.NetworkUtil; import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig; import org.apache.rocketmq.store.MessageStore; +import org.apache.rocketmq.store.PutMessageResult; +import org.apache.rocketmq.store.PutMessageStatus; +import org.apache.rocketmq.store.AppendMessageResult; +import org.apache.rocketmq.store.AppendMessageStatus; import org.apache.rocketmq.store.pop.AckMsg; import org.apache.rocketmq.store.pop.PopCheckPoint; import org.apache.rocketmq.store.timer.TimerMessageStore; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -50,19 +60,25 @@ import static org.junit.Assert.assertEquals; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.times; @RunWith(MockitoJUnitRunner.Silent.class) public class PopReviveServiceTest { - private static final String REVIVE_TOPIC = PopAckConstants.REVIVE_TOPIC + "test"; + private static final String CLUSTER_NAME = "test"; + private static final String REVIVE_TOPIC = PopAckConstants.buildClusterReviveTopic(CLUSTER_NAME); private static final int REVIVE_QUEUE_ID = 0; private static final String GROUP = "group"; private static final String TOPIC = "topic"; private static final SocketAddress STORE_HOST = NetworkUtil.string2SocketAddress("127.0.0.1:8080"); + private static final Long INVISIBLE_TIME = 1000L; @Mock private MessageStore messageStore; @@ -76,6 +92,9 @@ public class PopReviveServiceTest { private SubscriptionGroupManager subscriptionGroupManager; @Mock private BrokerController brokerController; + @Mock + private EscapeBridge escapeBridge; + private PopMessageProcessor popMessageProcessor; private BrokerConfig brokerConfig; private PopReviveService popReviveService; @@ -83,12 +102,14 @@ public class PopReviveServiceTest { @Before public void before() { brokerConfig = new BrokerConfig(); - + brokerConfig.setBrokerClusterName(CLUSTER_NAME); + when(brokerController.getBrokerConfig()).thenReturn(brokerConfig); when(brokerController.getBrokerConfig()).thenReturn(brokerConfig); when(brokerController.getConsumerOffsetManager()).thenReturn(consumerOffsetManager); when(brokerController.getMessageStore()).thenReturn(messageStore); when(brokerController.getTopicConfigManager()).thenReturn(topicConfigManager); when(brokerController.getSubscriptionGroupManager()).thenReturn(subscriptionGroupManager); + when(brokerController.getEscapeBridge()).thenReturn(escapeBridge); when(messageStore.getTimerMessageStore()).thenReturn(timerMessageStore); when(timerMessageStore.getDequeueBehind()).thenReturn(0L); when(timerMessageStore.getEnqueueBehind()).thenReturn(0L); @@ -96,6 +117,9 @@ public class PopReviveServiceTest { when(topicConfigManager.selectTopicConfig(anyString())).thenReturn(new TopicConfig()); when(subscriptionGroupManager.findSubscriptionGroupConfig(anyString())).thenReturn(new SubscriptionGroupConfig()); + popMessageProcessor = new PopMessageProcessor(brokerController); // a real one, not mock + when(brokerController.getPopMessageProcessor()).thenReturn(popMessageProcessor); + popReviveService = spy(new PopReviveService(brokerController, REVIVE_TOPIC, REVIVE_QUEUE_ID)); popReviveService.setShouldRunPopRevive(true); } @@ -204,6 +228,141 @@ public class PopReviveServiceTest { assertEquals(maxReviveOffset, commitOffsetCaptor.getValue().longValue()); } + @Test + public void testReviveMsgFromCk_messageFound_writeRetryOK() throws Throwable { + PopCheckPoint ck = buildPopCheckPoint(0, 0, 0); + PopReviveService.ConsumeReviveObj reviveObj = new PopReviveService.ConsumeReviveObj(); + reviveObj.map.put("", ck); + reviveObj.endTime = System.currentTimeMillis(); + StringBuilder actualRetryTopic = new StringBuilder(); + + when(escapeBridge.getMessageAsync(anyString(), anyLong(), anyInt(), anyString(), anyBoolean())) + .thenReturn(CompletableFuture.completedFuture(Triple.of(new MessageExt(), "", false))); + when(escapeBridge.putMessageToSpecificQueue(any(MessageExtBrokerInner.class))).thenAnswer(invocation -> { + MessageExtBrokerInner msg = invocation.getArgument(0); + actualRetryTopic.append(msg.getTopic()); + return new PutMessageResult(PutMessageStatus.PUT_OK, new AppendMessageResult(AppendMessageStatus.PUT_OK)); + }); + + popReviveService.mergeAndRevive(reviveObj); + Assert.assertEquals(KeyBuilder.buildPopRetryTopic(TOPIC, GROUP, false), actualRetryTopic.toString()); + verify(escapeBridge, times(1)).putMessageToSpecificQueue(any(MessageExtBrokerInner.class)); // write retry + verify(messageStore, times(0)).putMessage(any(MessageExtBrokerInner.class)); // rewrite CK + } + + @Test + public void testReviveMsgFromCk_messageFound_writeRetryFailed_rewriteCK() throws Throwable { + PopCheckPoint ck = buildPopCheckPoint(0, 0, 0); + PopReviveService.ConsumeReviveObj reviveObj = new PopReviveService.ConsumeReviveObj(); + reviveObj.map.put("", ck); + reviveObj.endTime = System.currentTimeMillis(); + StringBuilder actualRetryTopic = new StringBuilder(); + StringBuilder actualReviveTopic = new StringBuilder(); + AtomicLong actualInvisibleTime = new AtomicLong(0L); + + when(escapeBridge.getMessageAsync(anyString(), anyLong(), anyInt(), anyString(), anyBoolean())) + .thenReturn(CompletableFuture.completedFuture(Triple.of(new MessageExt(), "", false))); + when(escapeBridge.putMessageToSpecificQueue(any(MessageExtBrokerInner.class))).thenAnswer(invocation -> { + MessageExtBrokerInner msg = invocation.getArgument(0); + actualRetryTopic.append(msg.getTopic()); + return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, new AppendMessageResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED)); + }); + when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenAnswer(invocation -> { + MessageExtBrokerInner msg = invocation.getArgument(0); + actualReviveTopic.append(msg.getTopic()); + PopCheckPoint rewriteCK = JSON.parseObject(msg.getBody(), PopCheckPoint.class); + actualInvisibleTime.set(rewriteCK.getReviveTime()); + return new PutMessageResult(PutMessageStatus.PUT_OK, new AppendMessageResult(AppendMessageStatus.PUT_OK)); + }); + + popReviveService.mergeAndRevive(reviveObj); + Assert.assertEquals(KeyBuilder.buildPopRetryTopic(TOPIC, GROUP, false), actualRetryTopic.toString()); + Assert.assertEquals(REVIVE_TOPIC, actualReviveTopic.toString()); + Assert.assertEquals(INVISIBLE_TIME + 10 * 1000L, actualInvisibleTime.get()); // first interval is 10s + verify(escapeBridge, times(1)).putMessageToSpecificQueue(any(MessageExtBrokerInner.class)); // write retry + verify(messageStore, times(1)).putMessage(any(MessageExtBrokerInner.class)); // rewrite CK + } + + @Test + public void testReviveMsgFromCk_messageFound_writeRetryFailed_rewriteCK_end() throws Throwable { + PopCheckPoint ck = buildPopCheckPoint(0, 0, 0); + ck.setRePutTimes("17"); + PopReviveService.ConsumeReviveObj reviveObj = new PopReviveService.ConsumeReviveObj(); + reviveObj.map.put("", ck); + reviveObj.endTime = System.currentTimeMillis(); + StringBuilder actualRetryTopic = new StringBuilder(); + + when(escapeBridge.getMessageAsync(anyString(), anyLong(), anyInt(), anyString(), anyBoolean())) + .thenReturn(CompletableFuture.completedFuture(Triple.of(new MessageExt(), "", false))); + when(escapeBridge.putMessageToSpecificQueue(any(MessageExtBrokerInner.class))).thenAnswer(invocation -> { + MessageExtBrokerInner msg = invocation.getArgument(0); + actualRetryTopic.append(msg.getTopic()); + return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, new AppendMessageResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED)); + }); + + popReviveService.mergeAndRevive(reviveObj); + Assert.assertEquals(KeyBuilder.buildPopRetryTopic(TOPIC, GROUP, false), actualRetryTopic.toString()); + verify(escapeBridge, times(1)).putMessageToSpecificQueue(any(MessageExtBrokerInner.class)); // write retry + verify(messageStore, times(0)).putMessage(any(MessageExtBrokerInner.class)); // rewrite CK + } + + @Test + public void testReviveMsgFromCk_messageNotFound_noRetry() throws Throwable { + PopCheckPoint ck = buildPopCheckPoint(0, 0, 0); + PopReviveService.ConsumeReviveObj reviveObj = new PopReviveService.ConsumeReviveObj(); + reviveObj.map.put("", ck); + reviveObj.endTime = System.currentTimeMillis(); + + when(escapeBridge.getMessageAsync(anyString(), anyLong(), anyInt(), anyString(), anyBoolean())) + .thenReturn(CompletableFuture.completedFuture(Triple.of(null, "", false))); + + popReviveService.mergeAndRevive(reviveObj); + verify(escapeBridge, times(0)).putMessageToSpecificQueue(any(MessageExtBrokerInner.class)); // write retry + verify(messageStore, times(0)).putMessage(any(MessageExtBrokerInner.class)); // rewrite CK + } + + @Test + public void testReviveMsgFromCk_messageNotFound_needRetry() throws Throwable { + PopCheckPoint ck = buildPopCheckPoint(0, 0, 0); + PopReviveService.ConsumeReviveObj reviveObj = new PopReviveService.ConsumeReviveObj(); + reviveObj.map.put("", ck); + reviveObj.endTime = System.currentTimeMillis(); + StringBuilder actualReviveTopic = new StringBuilder(); + AtomicLong actualInvisibleTime = new AtomicLong(0L); + + when(escapeBridge.getMessageAsync(anyString(), anyLong(), anyInt(), anyString(), anyBoolean())) + .thenReturn(CompletableFuture.completedFuture(Triple.of(null, "", true))); + when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenAnswer(invocation -> { + MessageExtBrokerInner msg = invocation.getArgument(0); + actualReviveTopic.append(msg.getTopic()); + PopCheckPoint rewriteCK = JSON.parseObject(msg.getBody(), PopCheckPoint.class); + actualInvisibleTime.set(rewriteCK.getReviveTime()); + return new PutMessageResult(PutMessageStatus.PUT_OK, new AppendMessageResult(AppendMessageStatus.PUT_OK)); + }); + + popReviveService.mergeAndRevive(reviveObj); + Assert.assertEquals(REVIVE_TOPIC, actualReviveTopic.toString()); + Assert.assertEquals(INVISIBLE_TIME + 10 * 1000L, actualInvisibleTime.get()); // first interval is 10s + verify(escapeBridge, times(0)).putMessageToSpecificQueue(any(MessageExtBrokerInner.class)); // write retry + verify(messageStore, times(1)).putMessage(any(MessageExtBrokerInner.class)); // rewrite CK + } + + @Test + public void testReviveMsgFromCk_messageNotFound_needRetry_end() throws Throwable { + PopCheckPoint ck = buildPopCheckPoint(0, 0, 0); + ck.setRePutTimes("17"); + PopReviveService.ConsumeReviveObj reviveObj = new PopReviveService.ConsumeReviveObj(); + reviveObj.map.put("", ck); + reviveObj.endTime = System.currentTimeMillis(); + + when(escapeBridge.getMessageAsync(anyString(), anyLong(), anyInt(), anyString(), anyBoolean())) + .thenReturn(CompletableFuture.completedFuture(Triple.of(null, "", true))); + + popReviveService.mergeAndRevive(reviveObj); + verify(escapeBridge, times(0)).putMessageToSpecificQueue(any(MessageExtBrokerInner.class)); // write retry + verify(messageStore, times(0)).putMessage(any(MessageExtBrokerInner.class)); // rewrite CK + } + public static PopCheckPoint buildPopCheckPoint(long startOffset, long popTime, long reviveOffset) { PopCheckPoint ck = new PopCheckPoint(); ck.setStartOffset(startOffset); @@ -214,7 +373,8 @@ public class PopReviveServiceTest { ck.setNum((byte) 1); ck.setBitMap(0); ck.setReviveOffset(reviveOffset); - ck.setInvisibleTime(1000); + ck.setInvisibleTime(INVISIBLE_TIME); + ck.setBrokerName("broker-a"); return ck; } diff --git a/store/src/main/java/org/apache/rocketmq/store/pop/PopCheckPoint.java b/store/src/main/java/org/apache/rocketmq/store/pop/PopCheckPoint.java index e041b66d9c..38e0a20752 100644 --- a/store/src/main/java/org/apache/rocketmq/store/pop/PopCheckPoint.java +++ b/store/src/main/java/org/apache/rocketmq/store/pop/PopCheckPoint.java @@ -43,6 +43,8 @@ public class PopCheckPoint implements Comparable<PopCheckPoint> { private List<Integer> queueOffsetDiff; @JSONField(name = "bn") String brokerName; + @JSONField(name = "rp") + String rePutTimes; // ck rePut times public long getReviveOffset() { return reviveOffset; @@ -136,6 +138,14 @@ public class PopCheckPoint implements Comparable<PopCheckPoint> { this.brokerName = brokerName; } + public String getRePutTimes() { + return rePutTimes; + } + + public void setRePutTimes(String rePutTimes) { + this.rePutTimes = rePutTimes; + } + public void addDiff(int diff) { if (this.queueOffsetDiff == null) { this.queueOffsetDiff = new ArrayList<>(8); @@ -171,10 +181,21 @@ public class PopCheckPoint implements Comparable<PopCheckPoint> { return startOffset + queueOffsetDiff.get(index); } + public int parseRePutTimes() { + if (null == rePutTimes) { + return 0; + } + try { + return Integer.parseInt(rePutTimes); + } catch (Exception e) { + } + return Byte.MAX_VALUE; + } + @Override public String toString() { return "PopCheckPoint [topic=" + topic + ", cid=" + cid + ", queueId=" + queueId + ", startOffset=" + startOffset + ", bitMap=" + bitMap + ", num=" + num + ", reviveTime=" + getReviveTime() - + ", reviveOffset=" + reviveOffset + ", diff=" + queueOffsetDiff + ", brokerName=" + brokerName + "]"; + + ", reviveOffset=" + reviveOffset + ", diff=" + queueOffsetDiff + ", brokerName=" + brokerName + ", rePutTimes=" + rePutTimes + "]"; } @Override