This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new ae7179d75e [ISSUE #8765] fix low performance of delay message when 
enable rocksdb consume queue (#8766)
ae7179d75e is described below

commit ae7179d75e11f469d68be05fbf556fde42c8a795
Author: yuz10 <845238...@qq.com>
AuthorDate: Wed Nov 20 11:10:01 2024 +0800

    [ISSUE #8765] fix low performance of delay message when enable rocksdb 
consume queue (#8766)
    
    * #7538 fix wrong cachedMsgSize if msg body is changed in consumer callback
    * [ISSUE #8765] fix low performance of delay message when enable rocksdb 
consume queue
    * remove prefetch
---
 .../rocketmq/store/queue/RocksDBConsumeQueue.java  | 74 ++++++++++++++++++----
 .../store/queue/RocksDBConsumeQueueTest.java       | 73 +++++++++++++++++++++
 2 files changed, 136 insertions(+), 11 deletions(-)

diff --git 
a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueue.java 
b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueue.java
index 83ba7bebad..7bd3c2e305 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueue.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueue.java
@@ -271,22 +271,17 @@ public class RocksDBConsumeQueue implements 
ConsumeQueueInterface {
     private int pullNum(long cqOffset, long maxCqOffset) {
         long diffLong = maxCqOffset - cqOffset;
         if (diffLong < Integer.MAX_VALUE) {
-            int diffInt = (int) diffLong;
-            return diffInt > 16 ? 16 : diffInt;
+            return (int) diffLong;
         }
-        return 16;
+        return Integer.MAX_VALUE;
     }
 
     @Override
     public ReferredIterator<CqUnit> iterateFrom(final long startIndex) {
-        try {
-            long maxCqOffset = getMaxOffsetInQueue();
-            if (startIndex < maxCqOffset) {
-                int num = pullNum(startIndex, maxCqOffset);
-                return iterateFrom0(startIndex, num);
-            }
-        } catch (RocksDBException e) {
-            log.error("[RocksDBConsumeQueue] iterateFrom error!", e);
+        long maxCqOffset = getMaxOffsetInQueue();
+        if (startIndex < maxCqOffset) {
+            int num = pullNum(startIndex, maxCqOffset);
+            return new LargeRocksDBConsumeQueueIterator(startIndex, num);
         }
         return null;
     }
@@ -428,4 +423,61 @@ public class RocksDBConsumeQueue implements 
ConsumeQueueInterface {
             }
         }
     }
+
+    private class LargeRocksDBConsumeQueueIterator implements 
ReferredIterator<CqUnit> {
+        private final long startIndex;
+        private final int totalCount;
+        private int currentIndex;
+
+        public LargeRocksDBConsumeQueueIterator(final long startIndex, final 
int num) {
+            this.startIndex = startIndex;
+            this.totalCount = num;
+            this.currentIndex = 0;
+        }
+
+        @Override
+        public boolean hasNext() {
+            return this.currentIndex < this.totalCount;
+        }
+
+
+        @Override
+        public CqUnit next() {
+            if (!hasNext()) {
+                return null;
+            }
+
+            final ByteBuffer byteBuffer;
+            try {
+                byteBuffer = messageStore.getQueueStore().get(topic, queueId, 
startIndex + currentIndex);
+            } catch (RocksDBException e) {
+                ERROR_LOG.error("get cq from rocksdb failed. topic: {}, 
queueId: {}", topic, queueId, e);
+                return null;
+            }
+            if (byteBuffer == null || byteBuffer.remaining() < 
RocksDBConsumeQueueTable.CQ_UNIT_SIZE) {
+                return null;
+            }
+            CqUnit cqUnit = new CqUnit(this.startIndex + currentIndex, 
byteBuffer.getLong(), byteBuffer.getInt(), byteBuffer.getLong());
+            this.currentIndex++;
+            return cqUnit;
+        }
+
+        @Override
+        public void remove() {
+            throw new UnsupportedOperationException("remove");
+        }
+
+        @Override
+        public void release() {
+        }
+
+        @Override
+        public CqUnit nextAndRelease() {
+            try {
+                return next();
+            } finally {
+                release();
+            }
+        }
+    }
 }
diff --git 
a/store/src/test/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueTest.java
 
b/store/src/test/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueTest.java
new file mode 100644
index 0000000000..b907ce5951
--- /dev/null
+++ 
b/store/src/test/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueTest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.store.queue;
+
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.store.DefaultMessageStore;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.nio.ByteBuffer;
+
+import static 
org.apache.rocketmq.store.queue.RocksDBConsumeQueueTable.CQ_UNIT_SIZE;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class RocksDBConsumeQueueTest extends QueueTestBase {
+
+    @Test
+    public void testIterator() throws Exception {
+        if (MixAll.isMac()) {
+            return;
+        }
+        DefaultMessageStore messageStore = mock(DefaultMessageStore.class);
+        RocksDBConsumeQueueStore rocksDBConsumeQueueStore = 
mock(RocksDBConsumeQueueStore.class);
+        
when(messageStore.getQueueStore()).thenReturn(rocksDBConsumeQueueStore);
+        when(rocksDBConsumeQueueStore.getMaxOffsetInQueue(anyString(), 
anyInt())).thenReturn(10000L);
+        when(rocksDBConsumeQueueStore.get(anyString(), anyInt(), 
anyLong())).then(new Answer<ByteBuffer>() {
+            @Override
+            public ByteBuffer answer(InvocationOnMock mock) throws Throwable {
+                long startIndex = mock.getArgument(2);
+                final ByteBuffer byteBuffer = 
ByteBuffer.allocate(CQ_UNIT_SIZE);
+                long phyOffset = startIndex * 10;
+                byteBuffer.putLong(phyOffset);
+                byteBuffer.putInt(1);
+                byteBuffer.putLong(0);
+                byteBuffer.putLong(0);
+                byteBuffer.flip();
+                return byteBuffer;
+            }
+        });
+
+        RocksDBConsumeQueue consumeQueue = new 
RocksDBConsumeQueue(messageStore, "topic", 0);
+        ReferredIterator<CqUnit> it = consumeQueue.iterateFrom(9000);
+        for (int i = 0; i < 1000; i++) {
+            assertTrue(it.hasNext());
+            CqUnit next = it.next();
+            assertEquals(9000 + i, next.getQueueOffset());
+            assertEquals(10 * (9000 + i), next.getPos());
+        }
+        assertFalse(it.hasNext());
+    }
+}
\ No newline at end of file

Reply via email to