This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 340707410faad47979a97bc57661cd315744335e
Author: zhouyifan279 <[email protected]>
AuthorDate: Fri Sep 19 02:16:51 2025 +0800

    [fix][broker] First entry will be skipped if opening NonDurableCursor while 
trimmed ledger is adding first entry. (#24738)
    
    Co-authored-by: Yunze Xu <[email protected]>
    Co-authored-by: Lari Hotari <[email protected]>
    (cherry picked from commit 7a120f310c286eed78701e10c7e6985bc3b6d001)
---
 .../mledger/impl/NonDurableCursorImpl.java         |  5 +-
 .../mledger/impl/NonDurableCursorTest.java         | 63 ++++++++++++++++++++--
 2 files changed, 62 insertions(+), 6 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java
index 7dbf5400da1..333ec00f52a 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java
@@ -43,11 +43,12 @@ public class NonDurableCursorImpl extends ManagedCursorImpl 
{
         // Compare with "latest" position marker by using only the ledger id. 
Since the C++ client is using 48bits to
         // store the entryId, it's not able to pass a Long.max() as entryId. 
In this case there's no point to require
         // both ledgerId and entryId to be Long.max()
-        if (startCursorPosition == null || 
startCursorPosition.compareTo(ledger.lastConfirmedEntry) > 0) {
+        Pair<PositionImpl, Long> lastPositionCounter = 
ledger.getLastPositionAndCounter();
+        if (startCursorPosition == null || 
startCursorPosition.compareTo(lastPositionCounter.getLeft()) > 0) {
             // Start from last entry
             switch (initialPosition) {
                 case Latest:
-                    
initializeCursorPosition(ledger.getLastPositionAndCounter());
+                    initializeCursorPosition(lastPositionCounter);
                     break;
                 case Earliest:
                     
initializeCursorPosition(ledger.getFirstPositionAndCounter());
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java
index 82141bfd0ee..d2cc796a7d1 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java
@@ -19,15 +19,15 @@
 package org.apache.bookkeeper.mledger.impl;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
+import static 
org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.ENTRIES_ADDED_COUNTER_UPDATER;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotEquals;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
-
 import com.google.common.collect.Iterables;
-
+import io.netty.buffer.ByteBuf;
 import java.nio.charset.Charset;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
@@ -37,8 +37,6 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
-
-import io.netty.buffer.ByteBuf;
 import lombok.Cleanup;
 import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
@@ -52,6 +50,11 @@ import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.common.api.proto.CommandSubscribe;
+import org.awaitility.Awaitility;
+import org.mockito.Mockito;
+import org.mockito.stubbing.Answer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
@@ -105,6 +108,58 @@ public class NonDurableCursorTest extends 
MockedBookKeeperTestCase {
         ledger.close();
     }
 
+    @Test(timeOut = 20000)
+    void testOpenNonDurableCursorWhileLedgerIsAddingFirstEntryAfterTrimmed() 
throws Exception {
+        ManagedLedgerConfig config = new 
ManagedLedgerConfig().setMaxEntriesPerLedger(1)
+                .setRetentionTime(0, TimeUnit.MILLISECONDS);
+        config.setMinimumRolloverTime(0, TimeUnit.MILLISECONDS);
+        @Cleanup
+        ManagedLedgerImpl ledgerSpy =
+                Mockito.spy((ManagedLedgerImpl) 
factory.open("non_durable_cursor_while_ledger_trimmed", config));
+
+        ledgerSpy.addEntry("message1".getBytes());
+
+        ledgerSpy.rollCurrentLedgerIfFull();
+        Awaitility.waitAtMost(5, TimeUnit.SECONDS).until(() ->
+                ledgerSpy.getLedgersInfoAsList().size() > 1
+        );
+        CompletableFuture<Void> trimFuture = new CompletableFuture<>();
+        ledgerSpy.trimConsumedLedgersInBackground(trimFuture);
+        trimFuture.join();
+
+        // Use (currentLedgerId, -1) as startCursorPosition after ledger was 
trimmed
+        PositionImpl startCursorPosition = 
PositionImpl.get(ledgerSpy.getCurrentLedger().getId(), -1);
+        assertTrue(startCursorPosition.compareTo(ledgerSpy.lastConfirmedEntry) 
> 0);
+
+        CountDownLatch getLastPositionLatch = new CountDownLatch(1);
+        CountDownLatch newNonDurableCursorLatch = new CountDownLatch(1);
+        
Mockito.when(ledgerSpy.getLastPositionAndCounter()).then((Answer<Pair<Position, 
Long>>) invocation -> {
+            newNonDurableCursorLatch.countDown();
+            getLastPositionLatch.await();
+            return Pair.of(ledgerSpy.lastConfirmedEntry, 
ENTRIES_ADDED_COUNTER_UPDATER.get(ledgerSpy));
+        });
+
+        CompletableFuture<ManagedCursor> cursorFuture = new 
CompletableFuture<ManagedCursor>()
+                .completeAsync(() ->
+                        new NonDurableCursorImpl(bkc, ledgerSpy, 
"my_test_cursor",
+                                startCursorPosition, 
CommandSubscribe.InitialPosition.Latest, false)
+                );
+        PositionImpl oldLastConfirmedEntry = ledgerSpy.lastConfirmedEntry;
+
+        // Wait until NonDurableCursorImpl constructor invokes 
ManagedLedgerImpl.getLastPositionAndCounter
+        newNonDurableCursorLatch.await();
+        // Add first entry after ledger was trimmed
+        ledgerSpy.addEntry("message2".getBytes());
+        
assertTrue(oldLastConfirmedEntry.compareTo(ledgerSpy.lastConfirmedEntry) < 0);
+
+        // Unblock NonDurableCursorImpl constructor
+        getLastPositionLatch.countDown();
+
+        // cursor should read from lastConfirmedEntry
+        ManagedCursor cursor = cursorFuture.join();
+        assertEquals(cursor.getReadPosition(), ledgerSpy.lastConfirmedEntry);
+    }
+
     @Test(timeOut = 20000)
     void testZNodeBypassed() throws Exception {
         ManagedLedger ledger = factory.open("my_test_ledger");

Reply via email to