Denovo1998 commented on code in PR #22560:
URL: https://github.com/apache/pulsar/pull/22560#discussion_r2901247811


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -2443,6 +2556,22 @@ protected void asyncReadEntry(ReadHandle ledger, long 
firstEntry, long lastEntry
         }
     }
 
+    protected void asyncReadEntry(ReadHandle ledger, long firstEntry, long 
lastEntry, ManagedCursorImpl cursor,
+                                  ReadEntriesCallback callback, Object ctx) {
+        IntSupplier expectedReadCount = 
cursor::getNumberOfCursorsAtSamePositionOrBefore;
+        if (config.getReadEntryTimeoutSeconds() > 0) {
+            // set readOpCount to uniquely validate if 
ReadEntryCallbackWrapper is already recycled
+            long readOpCount = READ_OP_COUNT_UPDATER.incrementAndGet(this);
+            long createdTime = System.nanoTime();
+            ReadEntryCallbackWrapper readCallback = 
ReadEntryCallbackWrapper.create(name, ledger.getId(), firstEntry,
+                    callback, readOpCount, createdTime, ctx);
+            lastReadCallback = readCallback;

Review Comment:
   This parallelizes a logical read into multiple BK range reads, but timeout 
tracking still relies on a single `lastReadCallback`, which each sub‑read 
overwrites here.
   
   That breaks the existing read-timeout semantics. For example, if Range A 
stalls while later ranges B and C update the `lastReadCallback`, then 
`checkReadTimeout()` cannot detect Range A, while `BatchReadEntriesCallback` 
may still be waiting for it. In that scenario, the outer read could hang 
indefinitely instead of timing out.
   
   Should we track all pending sub-read callbacks for one `OpReadEntry`?
   
   I think we also need a regression test for `asyncReadEntriesWithSkip(...)` 
with timeout enabled and more than one split range.
   



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -2373,43 +2376,153 @@ private void internalReadFromLedger(ReadHandle ledger, 
OpReadEntry opReadEntry)
 
         long lastEntry = min(firstEntry + 
opReadEntry.getNumberOfEntriesToRead() - 1, lastEntryInLedger);
 
-        // Filer out and skip unnecessary read entry
-        if (opReadEntry.skipCondition != null) {
-            long firstValidEntry = -1L;
-            long lastValidEntry = -1L;
-            long entryId = firstEntry;
-            for (; entryId <= lastEntry; entryId++) {
-                if 
(opReadEntry.skipCondition.test(PositionFactory.create(ledger.getId(), 
entryId))) {
-                    if (firstValidEntry != -1L) {
-                        break;
-                    }
-                } else {
-                    if (firstValidEntry == -1L) {
-                        firstValidEntry = entryId;
-                    }
+        Predicate<Position> skipCondition = opReadEntry.skipCondition;
+        if (skipCondition == null) {
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] Reading entries from ledger {} - first={} 
last={}", name, ledger.getId(), firstEntry,
+                        lastEntry);
+            }
+            asyncReadEntry(ledger, firstEntry, lastEntry, opReadEntry, 
opReadEntry.ctx);
+            return;
+        }
 
-                    lastValidEntry = entryId;
-                }
+        // Skip entries that don't match the predicate
+        LongSortedSet entryIds = new LongAVLTreeSet();

Review Comment:
   This still adds avoidable allocations and CPU in the hot path that this PR 
aims to optimize. Would it be simpler to build contiguous ranges directly 
during the scan and preserve result ordering by range index or submission order 
in `BatchReadEntriesCallback`? That should eliminate the set and at least one 
of the sorts.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to