This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit fd2b1093f68395398bd2251cf4549291461d12de Author: Zixuan Liu <[email protected]> AuthorDate: Wed Aug 20 21:45:57 2025 +0800 [fix][broker] Add double-check for non-durable cursor creation (#24643) Signed-off-by: Zixuan Liu <[email protected]> (cherry picked from commit e642e1798047c469256a6f93a2ac62167c7436e8) --- .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 25 ++++++++++++++++------ 1 file changed, 19 insertions(+), 6 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index f3250198dde..955cc02fbe3 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -1153,6 +1153,17 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { return newNonDurableCursor(startPosition, subscriptionName, InitialPosition.Latest, false); } + private ManagedCursor getCachedNonDurableCursor(String cursorName) { + ManagedCursor cachedCursor = cursors.get(cursorName); + if (cachedCursor != null) { + if (log.isDebugEnabled()) { + log.debug("[{}] Cursor was already created {}", name, cachedCursor); + } + return cachedCursor; + } + return null; + } + @Override public ManagedCursor newNonDurableCursor(Position startCursorPosition, String cursorName, InitialPosition initialPosition, boolean isReadCompacted) @@ -1161,18 +1172,20 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { checkManagedLedgerIsOpen(); checkFenced(); - ManagedCursor cachedCursor = cursors.get(cursorName); - if (cachedCursor != null) { - if (log.isDebugEnabled()) { - log.debug("[{}] Cursor was already created {}", name, cachedCursor); - } - return cachedCursor; + ManagedCursor cachedNonDurableCursor = getCachedNonDurableCursor(cursorName); + if (cachedNonDurableCursor != null) { + return cachedNonDurableCursor; } // The backlog of a non-durable cursor could be incorrect if the cursor is created before `internalTrimLedgers` // and added to the managed ledger after `internalTrimLedgers`. // For more details, see https://github.com/apache/pulsar/pull/23951. synchronized (this) { + cachedNonDurableCursor = getCachedNonDurableCursor(cursorName); + if (cachedNonDurableCursor != null) { + return cachedNonDurableCursor; + } + NonDurableCursorImpl cursor = new NonDurableCursorImpl(bookKeeper, this, cursorName, startCursorPosition, initialPosition, isReadCompacted); cursor.setActive();
