coderzc commented on code in PR #25280:
URL: https://github.com/apache/pulsar/pull/25280#discussion_r2903415240
##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/ReadEntryUtils.java:
##########
@@ -49,6 +61,79 @@ static CompletableFuture<LedgerEntries>
readAsync(ManagedLedger ml, ReadHandle h
return CompletableFuture.failedFuture(new
ManagedLedgerException("LastConfirmedEntry is "
+ lastConfirmedEntry + " when reading entry " +
lastEntry));
}
+
+ int numberOfEntries = (int) (lastEntry - firstEntry + 1);
+
+ // Use batch read for multiple entries when enabled
+ if (batchReadEnabled && numberOfEntries > 1) {
+ if (log.isDebugEnabled()) {
+ log.debug("Using batch read for ledger {} entries {}-{},
maxCount={}, maxSize={}",
+ handle.getId(), firstEntry, lastEntry,
numberOfEntries, batchReadMaxSize);
+ }
+ return batchReadWithAutoRefill(handle, firstEntry, lastEntry,
numberOfEntries, batchReadMaxSize);
+ }
+
return handle.readUnconfirmedAsync(firstEntry, lastEntry);
}
+
+ private static CompletableFuture<LedgerEntries> batchReadWithAutoRefill(
+ ReadHandle handle, long firstEntry, long lastEntry,
+ int maxCount, long maxSize) {
+
+ return handle.batchReadAsync(firstEntry, maxCount, maxSize)
+ .exceptionallyCompose(ex -> {
+ // Fallback to readUnconfirmedAsync if batch read fails
+ log.warn("Batch read failed for ledger {} entries {}-{},
falling back to regular read: {}",
+ handle.getId(), firstEntry, lastEntry,
ex.getMessage());
+ return handle.readUnconfirmedAsync(firstEntry, lastEntry);
+ })
+ .thenCompose(entries -> {
+ // Collect entries and find the last received entry id in
a single pass
+ List<LedgerEntry> receivedList = new ArrayList<>();
+ long lastReceivedEntryId = -1;
+ for (LedgerEntry e : entries) {
+ receivedList.add(e);
+ lastReceivedEntryId = e.getEntryId();
+ }
+ int receivedCount = receivedList.size();
+
+ // All entries received, return as-is
+ if (receivedCount >= maxCount) {
+ return CompletableFuture.completedFuture(entries);
+ }
+
+ // Partial result: need to read remaining entries
+ if (receivedCount == 0) {
+ // Edge case: no entries returned, use regular read
+ entries.close();
+ log.warn("Batch read returned 0 entries for ledger {}
entries {}-{}, falling back to "
+ + "regular read", handle.getId(), firstEntry,
lastEntry);
+ return handle.readUnconfirmedAsync(firstEntry,
lastEntry);
+ }
+
+ // Close the original entries since we've collected them
into receivedList
+ entries.close();
Review Comment:
`receivedList` stores the original LedgerEntry references, but
`entries.close()` is called before the merged result is returned. Since closing
`LedgerEntriesImpl` also closes the contained entries, the merged result may
end up holding entries whose buffers have already been released. Please
preserve independent ownership of the retained entries (for example by
retaining/duplicating them), or defer closing the original container until the
merged result is consumed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]