This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit fd2b1093f68395398bd2251cf4549291461d12de
Author: Zixuan Liu <[email protected]>
AuthorDate: Wed Aug 20 21:45:57 2025 +0800

    [fix][broker] Add double-check for non-durable cursor creation (#24643)
    
    Signed-off-by: Zixuan Liu <[email protected]>
    (cherry picked from commit e642e1798047c469256a6f93a2ac62167c7436e8)
---
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 25 ++++++++++++++++------
 1 file changed, 19 insertions(+), 6 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index f3250198dde..955cc02fbe3 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -1153,6 +1153,17 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
         return newNonDurableCursor(startPosition, subscriptionName, 
InitialPosition.Latest, false);
     }
 
+    private ManagedCursor getCachedNonDurableCursor(String cursorName) {
+        ManagedCursor cachedCursor = cursors.get(cursorName);
+        if (cachedCursor != null) {
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] Cursor was already created {}", name, 
cachedCursor);
+            }
+            return cachedCursor;
+        }
+        return null;
+    }
+
     @Override
     public ManagedCursor newNonDurableCursor(Position startCursorPosition, 
String cursorName,
                                              InitialPosition initialPosition, 
boolean isReadCompacted)
@@ -1161,18 +1172,20 @@ public class ManagedLedgerImpl implements 
ManagedLedger, CreateCallback {
         checkManagedLedgerIsOpen();
         checkFenced();
 
-        ManagedCursor cachedCursor = cursors.get(cursorName);
-        if (cachedCursor != null) {
-            if (log.isDebugEnabled()) {
-                log.debug("[{}] Cursor was already created {}", name, 
cachedCursor);
-            }
-            return cachedCursor;
+        ManagedCursor cachedNonDurableCursor = 
getCachedNonDurableCursor(cursorName);
+        if (cachedNonDurableCursor != null) {
+            return cachedNonDurableCursor;
         }
 
         // The backlog of a non-durable cursor could be incorrect if the 
cursor is created before `internalTrimLedgers`
         // and added to the managed ledger after `internalTrimLedgers`.
         // For more details, see https://github.com/apache/pulsar/pull/23951.
         synchronized (this) {
+            cachedNonDurableCursor = getCachedNonDurableCursor(cursorName);
+            if (cachedNonDurableCursor != null) {
+                return cachedNonDurableCursor;
+            }
+
             NonDurableCursorImpl cursor = new NonDurableCursorImpl(bookKeeper, 
this, cursorName,
                     startCursorPosition, initialPosition, isReadCompacted);
             cursor.setActive();

Reply via email to