fuyou001 commented on code in PR #8842:
URL: https://github.com/apache/rocketmq/pull/8842#discussion_r1809752406


##########
store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueOffsetTable.java:
##########
@@ -111,69 +117,147 @@ public class RocksDBConsumeQueueOffsetTable {
     /**
      * Although we have already put max(min) consumeQueueOffset and 
physicalOffset in rocksdb, we still hope to get them
      * from heap to avoid accessing rocksdb.
+     *
      * @see ConsumeQueue#getMaxPhysicOffset(), maxPhysicOffset  --> 
topicQueueMaxCqOffset
      * @see ConsumeQueue#getMinLogicOffset(),   minLogicOffset  --> 
topicQueueMinOffset
      */
-    private final Map<String/* topic-queueId */, PhyAndCQOffset> 
topicQueueMinOffset;
-    private final Map<String/* topic-queueId */, Long> topicQueueMaxCqOffset;
+    private final ConcurrentMap<String/* topic-queueId */, PhyAndCQOffset> 
topicQueueMinOffset;
+    private final ConcurrentMap<String/* topic-queueId */, Long> 
topicQueueMaxCqOffset;
 
     public RocksDBConsumeQueueOffsetTable(RocksDBConsumeQueueTable 
rocksDBConsumeQueueTable,
         ConsumeQueueRocksDBStorage rocksDBStorage, DefaultMessageStore 
messageStore) {
         this.rocksDBConsumeQueueTable = rocksDBConsumeQueueTable;
         this.rocksDBStorage = rocksDBStorage;
         this.messageStore = messageStore;
-        this.topicQueueMinOffset = new ConcurrentHashMap(1024);
-        this.topicQueueMaxCqOffset = new ConcurrentHashMap(1024);
+        this.topicQueueMinOffset = new ConcurrentHashMap<>(1024);
+        this.topicQueueMaxCqOffset = new ConcurrentHashMap<>(1024);
 
         this.maxPhyOffsetBB = ByteBuffer.allocateDirect(8);
     }
 
     public void load() {
         this.offsetCFH = this.rocksDBStorage.getOffsetCFHandle();
+        loadMaxConsumeQueueOffsets();
     }
 
-    public void updateTempTopicQueueMaxOffset(final Pair<ByteBuffer, 
ByteBuffer> offsetBBPair,
-        final byte[] topicBytes, final DispatchRequest request,
-        final Map<ByteBuffer, Pair<ByteBuffer, DispatchRequest>> 
tempTopicQueueMaxOffsetMap) {
-        buildOffsetKeyAndValueByteBuffer(offsetBBPair, topicBytes, request);
-        ByteBuffer topicQueueId = offsetBBPair.getObject1();
-        ByteBuffer maxOffsetBB = offsetBBPair.getObject2();
-        Pair<ByteBuffer, DispatchRequest> old = 
tempTopicQueueMaxOffsetMap.get(topicQueueId);
-        if (old == null) {
-            tempTopicQueueMaxOffsetMap.put(topicQueueId, new Pair(maxOffsetBB, 
request));
-        } else {
-            long oldMaxOffset = old.getObject1().getLong(OFFSET_CQ_OFFSET);
-            long maxOffset = maxOffsetBB.getLong(OFFSET_CQ_OFFSET);
-            if (maxOffset >= oldMaxOffset) {
-                ERROR_LOG.error("cqOffset invalid1. old: {}, now: {}", 
oldMaxOffset, maxOffset);
+    private void loadMaxConsumeQueueOffsets() {
+        Function<OffsetEntry, Boolean> predicate = entry -> entry.type == 
OffsetEntryType.MAXIMUM;
+        Consumer<OffsetEntry> fn = entry -> {
+            topicQueueMaxCqOffset.putIfAbsent(entry.topic + "-" + 
entry.queueId, entry.offset);
+            ROCKSDB_LOG.info("Max {}:{} --> {}|{}", entry.topic, 
entry.queueId, entry.offset, entry.commitLogOffset);
+        };
+        try {
+            forEach(predicate, fn);
+        } catch (RocksDBException e) {
+            log.error("Failed to maximum consume queue offset", e);
+        }
+    }
+
+    public void forEach(Function<OffsetEntry, Boolean> predicate, 
Consumer<OffsetEntry> fn) throws RocksDBException {
+        try (RocksIterator iterator = this.rocksDBStorage.seekOffsetCF()) {
+            if (null == iterator) {
+                return;
+            }
+
+            int keyBufferCapacity = 256;
+            iterator.seekToFirst();
+            ByteBuffer keyBuffer = 
ByteBuffer.allocateDirect(keyBufferCapacity);
+            ByteBuffer valueBuffer = ByteBuffer.allocateDirect(16);
+            while (iterator.isValid()) {
+                // parse key buffer according to key layout
+                keyBuffer.clear(); // clear position and limit before reuse
+                int total = iterator.key(keyBuffer);
+                if (total > keyBufferCapacity) {

Review Comment:
   The key length of CQ needs to be reconsidered or should have the same 
restrictions as the file version on both sides
   
   topic v2  topic's lenght  may be greater 300  
   
   
   cqKey  lenght only 300 Byte
   org.apache.rocketmq.store.queue.RocksDBConsumeQueueTable#getCQByteBufferPair
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to