fuyou001 commented on code in PR #8842: URL: https://github.com/apache/rocketmq/pull/8842#discussion_r1809752406
########## store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueOffsetTable.java: ########## @@ -111,69 +117,147 @@ public class RocksDBConsumeQueueOffsetTable { /** * Although we have already put max(min) consumeQueueOffset and physicalOffset in rocksdb, we still hope to get them * from heap to avoid accessing rocksdb. + * * @see ConsumeQueue#getMaxPhysicOffset(), maxPhysicOffset --> topicQueueMaxCqOffset * @see ConsumeQueue#getMinLogicOffset(), minLogicOffset --> topicQueueMinOffset */ - private final Map<String/* topic-queueId */, PhyAndCQOffset> topicQueueMinOffset; - private final Map<String/* topic-queueId */, Long> topicQueueMaxCqOffset; + private final ConcurrentMap<String/* topic-queueId */, PhyAndCQOffset> topicQueueMinOffset; + private final ConcurrentMap<String/* topic-queueId */, Long> topicQueueMaxCqOffset; public RocksDBConsumeQueueOffsetTable(RocksDBConsumeQueueTable rocksDBConsumeQueueTable, ConsumeQueueRocksDBStorage rocksDBStorage, DefaultMessageStore messageStore) { this.rocksDBConsumeQueueTable = rocksDBConsumeQueueTable; this.rocksDBStorage = rocksDBStorage; this.messageStore = messageStore; - this.topicQueueMinOffset = new ConcurrentHashMap(1024); - this.topicQueueMaxCqOffset = new ConcurrentHashMap(1024); + this.topicQueueMinOffset = new ConcurrentHashMap<>(1024); + this.topicQueueMaxCqOffset = new ConcurrentHashMap<>(1024); this.maxPhyOffsetBB = ByteBuffer.allocateDirect(8); } public void load() { this.offsetCFH = this.rocksDBStorage.getOffsetCFHandle(); + loadMaxConsumeQueueOffsets(); } - public void updateTempTopicQueueMaxOffset(final Pair<ByteBuffer, ByteBuffer> offsetBBPair, - final byte[] topicBytes, final DispatchRequest request, - final Map<ByteBuffer, Pair<ByteBuffer, DispatchRequest>> tempTopicQueueMaxOffsetMap) { - buildOffsetKeyAndValueByteBuffer(offsetBBPair, topicBytes, request); - ByteBuffer topicQueueId = offsetBBPair.getObject1(); - ByteBuffer maxOffsetBB = offsetBBPair.getObject2(); - Pair<ByteBuffer, DispatchRequest> old = tempTopicQueueMaxOffsetMap.get(topicQueueId); - if (old == null) { - tempTopicQueueMaxOffsetMap.put(topicQueueId, new Pair(maxOffsetBB, request)); - } else { - long oldMaxOffset = old.getObject1().getLong(OFFSET_CQ_OFFSET); - long maxOffset = maxOffsetBB.getLong(OFFSET_CQ_OFFSET); - if (maxOffset >= oldMaxOffset) { - ERROR_LOG.error("cqOffset invalid1. old: {}, now: {}", oldMaxOffset, maxOffset); + private void loadMaxConsumeQueueOffsets() { + Function<OffsetEntry, Boolean> predicate = entry -> entry.type == OffsetEntryType.MAXIMUM; + Consumer<OffsetEntry> fn = entry -> { + topicQueueMaxCqOffset.putIfAbsent(entry.topic + "-" + entry.queueId, entry.offset); + ROCKSDB_LOG.info("Max {}:{} --> {}|{}", entry.topic, entry.queueId, entry.offset, entry.commitLogOffset); + }; + try { + forEach(predicate, fn); + } catch (RocksDBException e) { + log.error("Failed to maximum consume queue offset", e); + } + } + + public void forEach(Function<OffsetEntry, Boolean> predicate, Consumer<OffsetEntry> fn) throws RocksDBException { + try (RocksIterator iterator = this.rocksDBStorage.seekOffsetCF()) { + if (null == iterator) { + return; + } + + int keyBufferCapacity = 256; + iterator.seekToFirst(); + ByteBuffer keyBuffer = ByteBuffer.allocateDirect(keyBufferCapacity); + ByteBuffer valueBuffer = ByteBuffer.allocateDirect(16); + while (iterator.isValid()) { + // parse key buffer according to key layout + keyBuffer.clear(); // clear position and limit before reuse + int total = iterator.key(keyBuffer); + if (total > keyBufferCapacity) { Review Comment: The key length of CQ needs to be reconsidered or should have the same restrictions as the file version on both sides topic v2 topic's lenght may be greater 300 cqKey lenght only 300 Byte org.apache.rocketmq.store.queue.RocksDBConsumeQueueTable#getCQByteBufferPair -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org