lizhanhui commented on code in PR #8842: URL: https://github.com/apache/rocketmq/pull/8842#discussion_r1810410921
########## store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueOffsetTable.java: ########## @@ -111,69 +117,147 @@ public class RocksDBConsumeQueueOffsetTable { /** * Although we have already put max(min) consumeQueueOffset and physicalOffset in rocksdb, we still hope to get them * from heap to avoid accessing rocksdb. + * * @see ConsumeQueue#getMaxPhysicOffset(), maxPhysicOffset --> topicQueueMaxCqOffset * @see ConsumeQueue#getMinLogicOffset(), minLogicOffset --> topicQueueMinOffset */ - private final Map<String/* topic-queueId */, PhyAndCQOffset> topicQueueMinOffset; - private final Map<String/* topic-queueId */, Long> topicQueueMaxCqOffset; + private final ConcurrentMap<String/* topic-queueId */, PhyAndCQOffset> topicQueueMinOffset; + private final ConcurrentMap<String/* topic-queueId */, Long> topicQueueMaxCqOffset; public RocksDBConsumeQueueOffsetTable(RocksDBConsumeQueueTable rocksDBConsumeQueueTable, ConsumeQueueRocksDBStorage rocksDBStorage, DefaultMessageStore messageStore) { this.rocksDBConsumeQueueTable = rocksDBConsumeQueueTable; this.rocksDBStorage = rocksDBStorage; this.messageStore = messageStore; - this.topicQueueMinOffset = new ConcurrentHashMap(1024); - this.topicQueueMaxCqOffset = new ConcurrentHashMap(1024); + this.topicQueueMinOffset = new ConcurrentHashMap<>(1024); + this.topicQueueMaxCqOffset = new ConcurrentHashMap<>(1024); this.maxPhyOffsetBB = ByteBuffer.allocateDirect(8); } public void load() { this.offsetCFH = this.rocksDBStorage.getOffsetCFHandle(); + loadMaxConsumeQueueOffsets(); } - public void updateTempTopicQueueMaxOffset(final Pair<ByteBuffer, ByteBuffer> offsetBBPair, - final byte[] topicBytes, final DispatchRequest request, - final Map<ByteBuffer, Pair<ByteBuffer, DispatchRequest>> tempTopicQueueMaxOffsetMap) { - buildOffsetKeyAndValueByteBuffer(offsetBBPair, topicBytes, request); - ByteBuffer topicQueueId = offsetBBPair.getObject1(); - ByteBuffer maxOffsetBB = offsetBBPair.getObject2(); - Pair<ByteBuffer, DispatchRequest> old = tempTopicQueueMaxOffsetMap.get(topicQueueId); - if (old == null) { - tempTopicQueueMaxOffsetMap.put(topicQueueId, new Pair(maxOffsetBB, request)); - } else { - long oldMaxOffset = old.getObject1().getLong(OFFSET_CQ_OFFSET); - long maxOffset = maxOffsetBB.getLong(OFFSET_CQ_OFFSET); - if (maxOffset >= oldMaxOffset) { - ERROR_LOG.error("cqOffset invalid1. old: {}, now: {}", oldMaxOffset, maxOffset); + private void loadMaxConsumeQueueOffsets() { + Function<OffsetEntry, Boolean> predicate = entry -> entry.type == OffsetEntryType.MAXIMUM; + Consumer<OffsetEntry> fn = entry -> { + topicQueueMaxCqOffset.putIfAbsent(entry.topic + "-" + entry.queueId, entry.offset); + ROCKSDB_LOG.info("Max {}:{} --> {}|{}", entry.topic, entry.queueId, entry.offset, entry.commitLogOffset); + }; + try { + forEach(predicate, fn); + } catch (RocksDBException e) { + log.error("Failed to maximum consume queue offset", e); + } + } + + public void forEach(Function<OffsetEntry, Boolean> predicate, Consumer<OffsetEntry> fn) throws RocksDBException { + try (RocksIterator iterator = this.rocksDBStorage.seekOffsetCF()) { + if (null == iterator) { + return; + } + + int keyBufferCapacity = 256; + iterator.seekToFirst(); + ByteBuffer keyBuffer = ByteBuffer.allocateDirect(keyBufferCapacity); + ByteBuffer valueBuffer = ByteBuffer.allocateDirect(16); + while (iterator.isValid()) { + // parse key buffer according to key layout + keyBuffer.clear(); // clear position and limit before reuse + int total = iterator.key(keyBuffer); + if (total > keyBufferCapacity) { Review Comment: @fuyou001 This method does not impose further restriction on topic length. ``` int total = iterator.key(keyBuffer); if (total > keyBufferCapacity) { keyBufferCapacity = total; PlatformDependent.freeDirectBuffer(keyBuffer); keyBuffer = ByteBuffer.allocateDirect(keyBufferCapacity); continue; } ``` This code blocks roughly means, try to read key into the pre-allocated direct buffer, if the key length is greater than the pre-allocated key buffer capacity, free the pre-allocated direct buffer(by default, 256 bytes) and re-allocate a larger one and then retry again. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org