lizhanhui commented on code in PR #8842:
URL: https://github.com/apache/rocketmq/pull/8842#discussion_r1810410921


##########
store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueOffsetTable.java:
##########
@@ -111,69 +117,147 @@ public class RocksDBConsumeQueueOffsetTable {
     /**
      * Although we have already put max(min) consumeQueueOffset and 
physicalOffset in rocksdb, we still hope to get them
      * from heap to avoid accessing rocksdb.
+     *
      * @see ConsumeQueue#getMaxPhysicOffset(), maxPhysicOffset  --> 
topicQueueMaxCqOffset
      * @see ConsumeQueue#getMinLogicOffset(),   minLogicOffset  --> 
topicQueueMinOffset
      */
-    private final Map<String/* topic-queueId */, PhyAndCQOffset> 
topicQueueMinOffset;
-    private final Map<String/* topic-queueId */, Long> topicQueueMaxCqOffset;
+    private final ConcurrentMap<String/* topic-queueId */, PhyAndCQOffset> 
topicQueueMinOffset;
+    private final ConcurrentMap<String/* topic-queueId */, Long> 
topicQueueMaxCqOffset;
 
     public RocksDBConsumeQueueOffsetTable(RocksDBConsumeQueueTable 
rocksDBConsumeQueueTable,
         ConsumeQueueRocksDBStorage rocksDBStorage, DefaultMessageStore 
messageStore) {
         this.rocksDBConsumeQueueTable = rocksDBConsumeQueueTable;
         this.rocksDBStorage = rocksDBStorage;
         this.messageStore = messageStore;
-        this.topicQueueMinOffset = new ConcurrentHashMap(1024);
-        this.topicQueueMaxCqOffset = new ConcurrentHashMap(1024);
+        this.topicQueueMinOffset = new ConcurrentHashMap<>(1024);
+        this.topicQueueMaxCqOffset = new ConcurrentHashMap<>(1024);
 
         this.maxPhyOffsetBB = ByteBuffer.allocateDirect(8);
     }
 
     public void load() {
         this.offsetCFH = this.rocksDBStorage.getOffsetCFHandle();
+        loadMaxConsumeQueueOffsets();
     }
 
-    public void updateTempTopicQueueMaxOffset(final Pair<ByteBuffer, 
ByteBuffer> offsetBBPair,
-        final byte[] topicBytes, final DispatchRequest request,
-        final Map<ByteBuffer, Pair<ByteBuffer, DispatchRequest>> 
tempTopicQueueMaxOffsetMap) {
-        buildOffsetKeyAndValueByteBuffer(offsetBBPair, topicBytes, request);
-        ByteBuffer topicQueueId = offsetBBPair.getObject1();
-        ByteBuffer maxOffsetBB = offsetBBPair.getObject2();
-        Pair<ByteBuffer, DispatchRequest> old = 
tempTopicQueueMaxOffsetMap.get(topicQueueId);
-        if (old == null) {
-            tempTopicQueueMaxOffsetMap.put(topicQueueId, new Pair(maxOffsetBB, 
request));
-        } else {
-            long oldMaxOffset = old.getObject1().getLong(OFFSET_CQ_OFFSET);
-            long maxOffset = maxOffsetBB.getLong(OFFSET_CQ_OFFSET);
-            if (maxOffset >= oldMaxOffset) {
-                ERROR_LOG.error("cqOffset invalid1. old: {}, now: {}", 
oldMaxOffset, maxOffset);
+    private void loadMaxConsumeQueueOffsets() {
+        Function<OffsetEntry, Boolean> predicate = entry -> entry.type == 
OffsetEntryType.MAXIMUM;
+        Consumer<OffsetEntry> fn = entry -> {
+            topicQueueMaxCqOffset.putIfAbsent(entry.topic + "-" + 
entry.queueId, entry.offset);
+            ROCKSDB_LOG.info("Max {}:{} --> {}|{}", entry.topic, 
entry.queueId, entry.offset, entry.commitLogOffset);
+        };
+        try {
+            forEach(predicate, fn);
+        } catch (RocksDBException e) {
+            log.error("Failed to maximum consume queue offset", e);
+        }
+    }
+
+    public void forEach(Function<OffsetEntry, Boolean> predicate, 
Consumer<OffsetEntry> fn) throws RocksDBException {
+        try (RocksIterator iterator = this.rocksDBStorage.seekOffsetCF()) {
+            if (null == iterator) {
+                return;
+            }
+
+            int keyBufferCapacity = 256;
+            iterator.seekToFirst();
+            ByteBuffer keyBuffer = 
ByteBuffer.allocateDirect(keyBufferCapacity);
+            ByteBuffer valueBuffer = ByteBuffer.allocateDirect(16);
+            while (iterator.isValid()) {
+                // parse key buffer according to key layout
+                keyBuffer.clear(); // clear position and limit before reuse
+                int total = iterator.key(keyBuffer);
+                if (total > keyBufferCapacity) {

Review Comment:
   @fuyou001 This method does not impose further restriction on topic length. 
   ```
   int total = iterator.key(keyBuffer);
   if (total > keyBufferCapacity) {
       keyBufferCapacity = total;
       PlatformDependent.freeDirectBuffer(keyBuffer);
       keyBuffer = ByteBuffer.allocateDirect(keyBufferCapacity);
       continue;
   }
   ```
   This code blocks roughly means, try to read key into the pre-allocated 
direct buffer, if the key length is greater than the pre-allocated key buffer 
capacity, free the pre-allocated direct buffer(by default, 256 bytes) and 
re-allocate a larger one and then retry again.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to