This is an automated email from the ASF dual-hosted git repository.

caigy pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new af43a3e71f Fix exception when pop messages with multiple LMQ indexes 
(#7863)
af43a3e71f is described below

commit af43a3e71f2bdb4765294f7d6314b1428737849d
Author: Liu Shengzhong <szliu0...@gmail.com>
AuthorDate: Tue Apr 30 12:46:47 2024 +0800

    Fix exception when pop messages with multiple LMQ indexes (#7863)
---
 .../rocketmq/client/impl/MQClientAPIImpl.java      | 35 ++++++----
 .../rocketmq/client/impl/MQClientAPIImplTest.java  | 81 ++++++++++++++++++++++
 2 files changed, 101 insertions(+), 15 deletions(-)

diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java 
b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index 12d305b612..0c58affa34 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -30,6 +30,7 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.commons.lang3.ArrayUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.client.ClientConfig;
 import org.apache.rocketmq.client.Validators;
@@ -1155,15 +1156,18 @@ public class MQClientAPIImpl implements 
NameServerUpdateCallback {
                         final Long msgQueueOffset;
                         if (MixAll.isLmq(topic) && 
messageExt.getReconsumeTimes() == 0 && StringUtils.isNotEmpty(
                             
messageExt.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH))) {
-                            // process LMQ, LMQ topic has only 1 queue, which 
queue id is 0
+                            // process LMQ
+                            String[] queues = 
messageExt.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH)
+                                .split(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER);
+                            String[] queueOffsets = 
messageExt.getProperty(MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET)
+                                .split(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER);
+                            long offset = 
Long.parseLong(queueOffsets[ArrayUtils.indexOf(queues, topic)]);
+                            // LMQ topic has only 1 queue, which queue id is 0
                             queueIdKey = 
ExtraInfoUtil.getStartOffsetInfoMapKey(topic, MixAll.LMQ_QUEUE_ID);
-                            queueOffsetKey = 
ExtraInfoUtil.getQueueOffsetMapKey(topic, MixAll.LMQ_QUEUE_ID, Long.parseLong(
-                                
messageExt.getProperty(MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET)));
-                            index = sortMap.get(queueIdKey).indexOf(
-                                
Long.parseLong(messageExt.getProperty(MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET)));
+                            queueOffsetKey = 
ExtraInfoUtil.getQueueOffsetMapKey(topic, MixAll.LMQ_QUEUE_ID, offset);
+                            index = sortMap.get(queueIdKey).indexOf(offset);
                             msgQueueOffset = 
msgOffsetInfo.get(queueIdKey).get(index);
-                            if (msgQueueOffset != Long.parseLong(
-                                
messageExt.getProperty(MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET))) {
+                            if (msgQueueOffset != offset) {
                                 log.warn("Queue offset[%d] of msg is strange, 
not equal to the stored in msg, %s",
                                     msgQueueOffset, messageExt);
                             }
@@ -1217,14 +1221,15 @@ public class MQClientAPIImpl implements 
NameServerUpdateCallback {
             final String key;
             if (MixAll.isLmq(topic) && messageExt.getReconsumeTimes() == 0
                 && 
StringUtils.isNotEmpty(messageExt.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH)))
 {
-                // process LMQ, LMQ topic has only 1 queue, which queue id is 0
-                key = ExtraInfoUtil.getStartOffsetInfoMapKey(
-                    
messageExt.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH), 0);
-                if (!sortMap.containsKey(key)) {
-                    sortMap.put(key, new ArrayList<>(4));
-                }
-                sortMap.get(key).add(
-                    
Long.parseLong(messageExt.getProperty(MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET)));
+                // process LMQ
+                String[] queues = 
messageExt.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH)
+                    .split(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER);
+                String[] queueOffsets = 
messageExt.getProperty(MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET)
+                    .split(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER);
+                // LMQ topic has only 1 queue, which queue id is 0
+                key = ExtraInfoUtil.getStartOffsetInfoMapKey(topic, 
MixAll.LMQ_QUEUE_ID);
+                sortMap.putIfAbsent(key, new ArrayList<>(4));
+                
sortMap.get(key).add(Long.parseLong(queueOffsets[ArrayUtils.indexOf(queues, 
topic)]));
                 continue;
             }
             // Value of POP_CK is used to determine whether it is a pop retry,
diff --git 
a/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java 
b/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java
index 08e7fbe09a..dc892a3548 100644
--- 
a/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java
+++ 
b/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java
@@ -41,6 +41,7 @@ import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.PlainAccessConfig;
 import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageAccessor;
 import org.apache.rocketmq.common.message.MessageConst;
 import org.apache.rocketmq.common.message.MessageDecoder;
 import org.apache.rocketmq.common.message.MessageExt;
@@ -570,6 +571,86 @@ public class MQClientAPIImplTest {
         done.await();
     }
 
+    @Test
+    public void testPopMultiLmqMessage_async() throws Exception {
+        final long popTime = System.currentTimeMillis();
+        final int invisibleTime = 10 * 1000;
+        final String lmqTopic = MixAll.LMQ_PREFIX + "lmq1";
+        final String lmqTopic2 = MixAll.LMQ_PREFIX + "lmq2";
+        final String multiDispatch = 
String.join(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER, lmqTopic, lmqTopic2);
+        final String multiOffset = 
String.join(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER, "0", "0");
+        doAnswer(new Answer<Void>() {
+            @Override
+            public Void answer(InvocationOnMock mock) throws Throwable {
+                InvokeCallback callback = mock.getArgument(3);
+                RemotingCommand request = mock.getArgument(1);
+                ResponseFuture responseFuture = new ResponseFuture(null, 
request.getOpaque(), 3 * 1000, null, null);
+                RemotingCommand response = 
RemotingCommand.createResponseCommand(PopMessageResponseHeader.class);
+                response.setCode(ResponseCode.SUCCESS);
+                response.setOpaque(request.getOpaque());
+
+                PopMessageResponseHeader responseHeader = 
(PopMessageResponseHeader) response.readCustomHeader();
+                responseHeader.setInvisibleTime(invisibleTime);
+                responseHeader.setPopTime(popTime);
+                responseHeader.setReviveQid(0);
+                responseHeader.setRestNum(1);
+                StringBuilder startOffsetInfo = new StringBuilder(64);
+                ExtraInfoUtil.buildStartOffsetInfo(startOffsetInfo, topic, 0, 
0L);
+                responseHeader.setStartOffsetInfo(startOffsetInfo.toString());
+                StringBuilder msgOffsetInfo = new StringBuilder(64);
+                ExtraInfoUtil.buildMsgOffsetInfo(msgOffsetInfo, topic, 0, 
Collections.singletonList(0L));
+                responseHeader.setMsgOffsetInfo(msgOffsetInfo.toString());
+                response.setRemark("FOUND");
+                response.makeCustomHeaderToNet();
+
+                MessageExt message = new MessageExt();
+                message.setQueueId(0);
+                message.setFlag(0);
+                message.setQueueOffset(10L);
+                message.setCommitLogOffset(10000L);
+                message.setSysFlag(0);
+                message.setBornTimestamp(System.currentTimeMillis());
+                message.setBornHost(new InetSocketAddress("127.0.0.1", 10));
+                message.setStoreTimestamp(System.currentTimeMillis());
+                message.setStoreHost(new InetSocketAddress("127.0.0.1", 11));
+                message.setBody("body".getBytes());
+                message.setTopic(topic);
+                MessageAccessor.putProperty(message, 
MessageConst.PROPERTY_INNER_MULTI_DISPATCH, multiDispatch);
+                MessageAccessor.putProperty(message, 
MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET, multiOffset);
+                response.setBody(MessageDecoder.encode(message, false));
+                responseFuture.setResponseCommand(response);
+                callback.operationSucceed(responseFuture.getResponseCommand());
+                return null;
+            }
+        }).when(remotingClient).invokeAsync(anyString(), 
any(RemotingCommand.class), anyLong(), any(InvokeCallback.class));
+        final CountDownLatch done = new CountDownLatch(1);
+        final PopMessageRequestHeader requestHeader = new 
PopMessageRequestHeader();
+        requestHeader.setTopic(lmqTopic);
+        mqClientAPI.popMessageAsync(brokerName, brokerAddr, requestHeader, 10 
* 1000, new PopCallback() {
+            @Override
+            public void onSuccess(PopResult popResult) {
+                
assertThat(popResult.getPopStatus()).isEqualTo(PopStatus.FOUND);
+                assertThat(popResult.getRestNum()).isEqualTo(1);
+                
assertThat(popResult.getInvisibleTime()).isEqualTo(invisibleTime);
+                assertThat(popResult.getPopTime()).isEqualTo(popTime);
+                assertThat(popResult.getMsgFoundList()).size().isEqualTo(1);
+                
assertThat(popResult.getMsgFoundList().get(0).getTopic()).isEqualTo(lmqTopic);
+                
assertThat(popResult.getMsgFoundList().get(0).getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH))
+                    .isEqualTo(multiDispatch);
+                
assertThat(popResult.getMsgFoundList().get(0).getProperty(MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET))
+                    .isEqualTo(multiOffset);
+                done.countDown();
+            }
+
+            @Override
+            public void onException(Throwable e) {
+                Assertions.fail("want no exception but got one", e);
+                done.countDown();
+            }
+        });
+        done.await();
+    }
+
     @Test
     public void testAckMessageAsync_Success() throws Exception {
         doAnswer(new Answer<Void>() {

Reply via email to