This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new c96f27a4dde [fix][broker] Fix ManagedCursor state management race
conditions and lifecycle issues (#24569)
c96f27a4dde is described below
commit c96f27a4dde9e4211a5b83e91f5e384d7ac8d904
Author: Lari Hotari <[email protected]>
AuthorDate: Tue Jul 29 14:07:54 2025 +0300
[fix][broker] Fix ManagedCursor state management race conditions and
lifecycle issues (#24569)
---
.../bookkeeper/mledger/impl/ManagedCursorImpl.java | 434 ++++++++++++---------
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 37 +-
.../mledger/impl/NonDurableCursorImpl.java | 2 +
.../bookkeeper/mledger/impl/OpReadEntry.java | 34 +-
.../mledger/impl/ReadOnlyCursorImpl.java | 2 +
.../service/persistent/PersistentSubscription.java | 10 +
.../pulsar/broker/service/ReplicatorTest.java | 34 +-
7 files changed, 343 insertions(+), 210 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 3d522f9b2dc..9cba4d863e2 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -49,7 +49,6 @@ import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -158,17 +157,10 @@ public class ManagedCursorImpl implements ManagedCursor {
MarkDeleteEntry.class, "lastMarkDeleteEntry");
protected volatile MarkDeleteEntry lastMarkDeleteEntry;
- /** Protects the method "asyncReadEntriesWithSkipOrWait" and
"cancelPendingReadRequest" runs concurrently. **/
- private final Object pendingReadOpMutex = new Object();
- /**
- * 'ManagedLedger.notifyCursors' relies on this CAS to avoid using
"pendingReadOpMutex" to guarantee thread-safety,
- * which improved the performance of publishing messages.
- */
protected static final AtomicReferenceFieldUpdater<ManagedCursorImpl,
OpReadEntry> WAITING_READ_OP_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(ManagedCursorImpl.class,
OpReadEntry.class, "waitingReadOp");
@SuppressWarnings("unused")
private volatile OpReadEntry waitingReadOp = null;
- private DelayCheckForNewEntriesTask delayCheckForNewEntriesTask;
public static final int FALSE = 0;
public static final int TRUE = 1;
@@ -235,6 +227,11 @@ public class ManagedCursorImpl implements ManagedCursor {
// active state cache in ManagedCursor. It should be in sync with the
state in activeCursors in ManagedLedger.
private volatile boolean isActive = false;
+ // This is a lock used to update the registration state of the cursor in
the managed ledger.
+ private final Object registerToWaitingCursorsLock = new Object();
+ // This is used to track if the cursor is registered in the managed
ledger's waitingCursors queue
+ boolean registeredToWaitingCursors = false;
+
class MarkDeleteEntry {
final Position newPosition;
final MarkDeleteCallback callback;
@@ -292,17 +289,38 @@ public class ManagedCursorImpl implements ManagedCursor {
private volatile long lastActive;
public enum State {
- Uninitialized, // Cursor is being initialized
- NoLedger, // There is no metadata ledger open for writing
- Open, // Metadata ledger is ready
- SwitchingLedger, // The metadata ledger is being switched
- Closing, // The managed cursor is closing
- Closed // The managed cursor has been closed
+ Uninitialized(false), // Cursor is being initialized
+ NoLedger(false), // There is no metadata ledger open for writing
+ Open(false), // Metadata ledger is ready
+ SwitchingLedger(false), // The metadata ledger is being switched
+ Closing(true), // The managed cursor is closing
+ Closed(true), // The managed cursor has been closed
+ Deleting(true), // The managed cursor is being deleted
+ Deleted(true), // The managed cursor has been deleted
+ DeletingFailed(true); // The managed cursor deletion failed, state
allows retrying deletion.
+
+ // Indicate if the cursor is in a state that is considered closed
+ private final boolean closedState;
+
+ State(boolean closedState) {
+ this.closedState = closedState;
+ }
+
+ /**
+ * Returns true if the state is considered closed.
+ */
+ public boolean isClosed() {
+ return closedState;
+ }
+
+ public boolean isDeletingOrDeleted() {
+ return this == Deleting || this == Deleted;
+ }
}
protected static final AtomicReferenceFieldUpdater<ManagedCursorImpl,
State> STATE_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(ManagedCursorImpl.class,
State.class, "state");
- protected volatile State state = null;
+ protected volatile State state = State.Uninitialized;
protected final ManagedCursorMXBean mbean;
@@ -331,7 +349,6 @@ public class ManagedCursorImpl implements ManagedCursor {
this.batchDeletedIndexes = null;
}
this.digestType =
BookKeeper.DigestType.fromApiDigestType(getConfig().getDigestType());
- STATE_UPDATER.set(this, State.Uninitialized);
PENDING_MARK_DELETED_SUBMITTED_COUNT_UPDATER.set(this, 0);
PENDING_READ_OPS_UPDATER.set(this, 0);
RESET_CURSOR_IN_PROGRESS_UPDATER.set(this, FALSE);
@@ -791,7 +808,23 @@ public class ManagedCursorImpl implements ManagedCursor {
// assign cursor-ledger so, it can be deleted when new ledger will be
switched
this.cursorLedger = recoveredFromCursorLedger;
this.isCursorLedgerReadOnly = true;
- STATE_UPDATER.set(this, State.NoLedger);
+ changeStateIfNotClosed(State.NoLedger);
+ }
+
+ /**
+ * Change the state of the cursor if it is not already considered closed.
+ * This is to prevent invalid state transitions when the cursor is already
closed.
+ *
+ * @param newState The new state to set
+ * @return The previous state of the cursor
+ */
+ private State changeStateIfNotClosed(State newState) {
+ return STATE_UPDATER.getAndUpdate(this, current -> {
+ if (current.isClosed()) {
+ return current;
+ }
+ return newState;
+ });
}
void initialize(Position position, Map<String, Long> properties,
Map<String, String> cursorProperties,
@@ -805,7 +838,7 @@ public class ManagedCursorImpl implements ManagedCursor {
new MetaStoreCallback<>() {
@Override
public void operationComplete(Void result, Stat stat) {
- STATE_UPDATER.set(ManagedCursorImpl.this,
State.NoLedger);
+ changeStateIfNotClosed(State.NoLedger);
callback.operationComplete();
}
@Override
@@ -1040,87 +1073,36 @@ public class ManagedCursorImpl implements ManagedCursor
{
skipCondition = skipCondition == null ? this::isMessageDeleted :
skipCondition.or(this::isMessageDeleted);
OpReadEntry op = OpReadEntry.create(this, readPosition,
numberOfEntriesToRead, callback,
ctx, maxPosition, skipCondition);
-
- synchronized (pendingReadOpMutex) {
- if (!WAITING_READ_OP_UPDATER.compareAndSet(this, null, op)) {
- op.recycle();
- callback.readEntriesFailed(new
ManagedLedgerException.ConcurrentWaitCallbackException(), ctx);
- return;
- }
-
- if (log.isDebugEnabled()) {
- log.debug("[{}] [{}] Deferring retry of read at position
{}", ledger.getName(), name,
- op.readPosition);
- }
-
- // Check again for new entries after the configured time, then
if still no entries are available
- // register to be notified.
- if (getConfig().getNewEntriesCheckDelayInMillis() > 0) {
- delayCheckForNewEntriesTask = new
DelayCheckForNewEntriesTask(op, callback, ctx);
- } else {
- // If there's no delay, check directly from the same thread
- checkForNewEntries(op, callback, ctx);
- }
+ int opReadId = op.id;
+ if (!WAITING_READ_OP_UPDATER.compareAndSet(this, null, op)) {
+ op.recycle();
+ callback.readEntriesFailed(new
ManagedLedgerException.ConcurrentWaitCallbackException(), ctx);
+ return;
}
- }
- }
-
- private enum DelayCheckForNewEntriesTaskState {
- INIT, RUNNING, CANCELLED, DONE
- }
-
- private class DelayCheckForNewEntriesTask implements Runnable {
-
- private final OpReadEntry op;
- private final ReadEntriesCallback callback;
- private final Object ctx;
- private final ScheduledFuture<?> scheduledFuture;
- private DelayCheckForNewEntriesTaskState state =
DelayCheckForNewEntriesTaskState.INIT;
- public DelayCheckForNewEntriesTask(OpReadEntry op, ReadEntriesCallback
callback, Object ctx) {
- this.op = op;
- this.callback = callback;
- this.ctx = ctx;
- scheduledFuture = ledger.getScheduledExecutor().schedule(this,
- getConfig().getNewEntriesCheckDelayInMillis(),
TimeUnit.MILLISECONDS);
- }
-
- @Override
- public void run() {
- synchronized (pendingReadOpMutex) {
- if (state != DelayCheckForNewEntriesTaskState.INIT) {
- return;
- }
- state = DelayCheckForNewEntriesTaskState.RUNNING;
- checkForNewEntries(op, callback, ctx);
- state = DelayCheckForNewEntriesTaskState.DONE;
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] [{}] Deferring retry of read at position {}",
ledger.getName(), name, op.readPosition);
}
- }
-
- public boolean isDone() {
- return state == DelayCheckForNewEntriesTaskState.DONE;
- }
- public boolean cancel() {
- synchronized (pendingReadOpMutex) {
- // Not all implementations of Executor guarantee that the
Runnable will be no long be executed after a
- // successful cancel, such as Guava MoreExecutors, see also
https://github.com/google/guava/blob
- //
/v32.1.2/guava/src/com/google/common/util/concurrent/MoreExecutors.java#L709.
- // The current task guarantees.
- if (state != DelayCheckForNewEntriesTaskState.INIT) {
- return false;
- }
- state = DelayCheckForNewEntriesTaskState.CANCELLED;
- scheduledFuture.cancel(false);
- return true;
+ // Check again for new entries after the configured time, then if
still no entries are available register
+ // to be notified
+ if (getConfig().getNewEntriesCheckDelayInMillis() > 0) {
+ ledger.getScheduledExecutor().schedule(() ->
checkForNewEntries(opReadId, op, callback, ctx),
+ getConfig().getNewEntriesCheckDelayInMillis(),
TimeUnit.MILLISECONDS);
+ } else {
+ // If there's no delay, check directly from the same thread
+ checkForNewEntries(opReadId, op, callback, ctx);
}
}
}
- private void checkForNewEntries(OpReadEntry op, ReadEntriesCallback
callback, Object ctx) {
+ // Please notice that OpReadEntry might be recycled due to sharing via
waitingReadOp field logic
+ // That's why the fields cannot be accessed before the reference is
removed from waitingReadOp atomically
+ // and the id matches the removed reference.
+ private void checkForNewEntries(int opReadId, OpReadEntry op,
ReadEntriesCallback callback, Object ctx) {
try {
if (log.isDebugEnabled()) {
- log.debug("[{}] [{}] Re-trying the read at position {}",
ledger.getName(), name, op.readPosition);
+ log.debug("[{}] [{}] Re-trying the read for op id {}",
ledger.getName(), name, opReadId);
}
if (isClosed()) {
@@ -1149,17 +1131,33 @@ public class ManagedCursorImpl implements ManagedCursor
{
log.debug("[{}] [{}] Found more entries",
ledger.getName(), name);
}
// Try to cancel the notification request
- if (WAITING_READ_OP_UPDATER.compareAndSet(this, op, null)) {
- ledger.removeWaitingCursor(this);
+ // Clear the waiting read op only if it matches the current
instance and the id matches
+ // the opReadId parameter. This avoids recycled OpReadEntry
instances from matching since their
+ // ids would be different after recycling.
+ OpReadEntry waitingReadOpItem =
WAITING_READ_OP_UPDATER.getAndUpdate(this,
+ current -> {
+ if (current == op && current.id == opReadId) {
+ // update the value to null to cancel the
waiting read op
+ return null;
+ } else {
+ // keep the current waiting read op value
+ return current;
+ }
+ });
+ // If the waiting read op was the same as the one we are
trying to cancel, it means that it was now
+ // cleared from the waitingReadOp field and therefore
"cancelled"
+ if (waitingReadOpItem == op && waitingReadOpItem.id ==
opReadId) {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Cancelled notification and
scheduled read at {}", ledger.getName(),
- name, op.readPosition);
+ name, op.readPosition);
}
PENDING_READ_OPS_UPDATER.incrementAndGet(this);
ledger.asyncReadEntries(op);
} else {
- log.info("[{}] [{}] notification that new entries added
was already be called, skipped the current"
- + " new entry checking", ledger.getName(), name);
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] [{}] notification was already
cancelled for op id {}", ledger.getName(), name,
+ opReadId);
+ }
}
} else if (ledger.isTerminated()) {
// At this point we registered for notification and still
there were no more available
@@ -1174,7 +1172,7 @@ public class ManagedCursorImpl implements ManagedCursor {
@Override
public boolean isClosed() {
- return state == State.Closed || state == State.Closing;
+ return state.isClosed();
}
@Override
@@ -1182,53 +1180,21 @@ public class ManagedCursorImpl implements ManagedCursor
{
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Cancel pending read request",
ledger.getName(), name);
}
- synchronized (pendingReadOpMutex) {
- final OpReadEntry op = WAITING_READ_OP_UPDATER.get(this);
- // Case 1: the pending read has executed, or there is no pending
read.
- if (op == null) {
- return false;
- }
- Function<Boolean, Boolean> clearWaitingReadOp =
removeWaitingCursor -> {
- if (WAITING_READ_OP_UPDATER.compareAndSet(this, op, null)) {
- if (removeWaitingCursor) {
- ledger.removeWaitingCursor(this);
- }
- op.recycle();
- return true;
- } else {
- // Managed ledger has noticed that new entries was added.
- if (WAITING_READ_OP_UPDATER.get(this) == null) {
- return false;
- }
- // It will never occur, it means the lock
"pendingReadOpMutex" does not work as expected, which
- // allowed other thread to modify the "waitingReadOp"
concurrently.
- // The waitingReadOp has been modified to other instance,
which will never occur.
- log.warn("[{}] [{}] Cancel pending request encountered an
unexpected error, the lock"
- + " \"pendingReadOpMutex\" does not work as expected,
which allowed other"
- + " thread to modify the \"waitingReadOp\"
concurrently..", ledger.getName(), name);
- return cancelPendingReadRequest();
- }
- };
- // There is a pending read,
- // Case 2: delayCheckForNewEntriesTask can be cancelled, no need
to remove cursor from "ml.waitingCursors",
- // because it has not added successfully yet.
- if (delayCheckForNewEntriesTask != null &&
delayCheckForNewEntriesTask.cancel()) {
- return clearWaitingReadOp.apply(false);
- }
- // Case 3: managedLedgerNewEntriesCheckDelayInMillis is "0".
- // Case 4: delayCheckForNewEntriesTask has done, which has added
cursor into "ml.waitingCursors".
- if (delayCheckForNewEntriesTask == null ||
delayCheckForNewEntriesTask.isDone()) {
- return clearWaitingReadOp.apply(true);
+ final OpReadEntry op = WAITING_READ_OP_UPDATER.getAndUpdate(this,
current -> {
+ if (current == OpReadEntry.WAITING_READ_OP_FOR_CLOSED_CURSOR) {
+ return current;
}
- // Case 5: delayCheckForNewEntriesTask is running, but not done,
which only occurs at a corner case. It
- // only happens when the task is starting. Calling
"cancelPendingReadRequest" here will release the lock
- // "pendingReadOpMutex" and let the task go ahead.
- return cancelPendingReadRequest();
+ return null;
+ });
+ if (op != null) {
+ op.recycle();
}
+ return op != null && op !=
OpReadEntry.WAITING_READ_OP_FOR_CLOSED_CURSOR;
}
public boolean hasPendingReadRequest() {
- return WAITING_READ_OP_UPDATER.get(this) != null;
+ OpReadEntry opReadEntry = WAITING_READ_OP_UPDATER.get(this);
+ return opReadEntry != null && opReadEntry !=
OpReadEntry.WAITING_READ_OP_FOR_CLOSED_CURSOR;
}
@Override
@@ -2246,7 +2212,7 @@ public class ManagedCursorImpl implements ManagedCursor {
// We cannot write to the ledger during the switch, need to wait until
the new metadata ledger is available
synchronized (pendingMarkDeleteOps) {
// The state might have changed while we were waiting on the queue
mutex
- switch (STATE_UPDATER.get(this)) {
+ switch (state) {
case Closed:
callback.markDeleteFailed(new ManagedLedgerException
.CursorAlreadyClosedException("Cursor was already
closed"), ctx);
@@ -2377,7 +2343,7 @@ public class ManagedCursorImpl implements ManagedCursor {
}
};
- if (State.NoLedger.equals(STATE_UPDATER.get(this))) {
+ if (state == State.NoLedger) {
if (ledger.isNoMessagesAfterPos(mdEntry.newPosition)) {
log.error("[{}][{}] Metadata ledger creation failed, try to
persist the position in the metadata"
+ " store.", ledger.getName(), name);
@@ -2386,7 +2352,7 @@ public class ManagedCursorImpl implements ManagedCursor {
cb.operationFailed(new ManagedLedgerException("Switch new
cursor ledger failed"));
}
} else {
- persistPositionToLedger(cursorLedger, mdEntry, cb);
+ persistPositionToLedger(cursorLedger, mdEntry, cb, false);
}
}
@@ -2831,7 +2797,7 @@ public class ManagedCursorImpl implements ManagedCursor {
ledger.getName(), name,
cursorLedger.getId(), e.getMessage());
callback.closeFailed(e, ctx);
}
- });
+ }, true);
} else {
persistPositionMetaStore(-1, position, properties, new
MetaStoreCallback<Void>() {
@Override
@@ -2958,12 +2924,18 @@ public class ManagedCursorImpl implements ManagedCursor
{
callback.closeComplete(ctx);
return;
}
+ closeWaitingCursor();
+ setInactive();
persistPositionWhenClosing(lastMarkDeleteEntry.newPosition,
lastMarkDeleteEntry.properties,
new AsyncCallbacks.CloseCallback(){
@Override
public void closeComplete(Object ctx) {
- STATE_UPDATER.set(ManagedCursorImpl.this,
State.Closed);
+ if
(!STATE_UPDATER.compareAndSet(ManagedCursorImpl.this, State.Closing,
State.Closed)) {
+ log.warn("[{}] [{}] State was modified from
closing to {} while closing", ledger.getName(),
+ name, state);
+ state = State.Closed;
+ }
callback.closeComplete(ctx);
}
@@ -2976,6 +2948,19 @@ public class ManagedCursorImpl implements ManagedCursor {
}, ctx);
}
+ protected void closeWaitingCursor() {
+ synchronized (registerToWaitingCursorsLock) {
+ if (registeredToWaitingCursors) {
+ ledger.removeWaitingCursor(this);
+ }
+ }
+ OpReadEntry opReadEntry = WAITING_READ_OP_UPDATER.getAndSet(this,
+ OpReadEntry.WAITING_READ_OP_FOR_CLOSED_CURSOR);
+ if (opReadEntry != null && opReadEntry !=
OpReadEntry.WAITING_READ_OP_FOR_CLOSED_CURSOR) {
+ opReadEntry.readEntriesFailed(new
CursorAlreadyClosedException("Cursor is closing"), opReadEntry.ctx);
+ }
+ }
+
/**
* Internal version of seek that doesn't do the validation check.
*
@@ -3074,8 +3059,8 @@ public class ManagedCursorImpl implements ManagedCursor {
void startCreatingNewMetadataLedger() {
// Change the state so that new mark-delete ops will be queued and not
immediately submitted
- State oldState = STATE_UPDATER.getAndSet(this, State.SwitchingLedger);
- if (oldState == State.SwitchingLedger) {
+ State oldState = changeStateIfNotClosed(State.SwitchingLedger);
+ if (oldState == State.SwitchingLedger || oldState.isClosed()) {
// Ignore double request
return;
}
@@ -3095,7 +3080,7 @@ public class ManagedCursorImpl implements ManagedCursor {
flushPendingMarkDeletes();
// Resume normal mark-delete operations
- STATE_UPDATER.set(ManagedCursorImpl.this, State.Open);
+ changeStateIfNotClosed(State.Open);
}
}
@@ -3104,7 +3089,7 @@ public class ManagedCursorImpl implements ManagedCursor {
log.error("[{}][{}] Metadata ledger creation failed {}",
ledger.getName(), name, exception);
synchronized (pendingMarkDeleteOps) {
// At this point we don't have a ledger ready
- STATE_UPDATER.set(ManagedCursorImpl.this, State.NoLedger);
+ changeStateIfNotClosed(State.NoLedger);
// There are two case may cause switch ledger fails.
// 1. No enough BKs; BKs are in read-only mode...
// 2. Write ZK fails.
@@ -3128,21 +3113,8 @@ public class ManagedCursorImpl implements ManagedCursor {
* @return false if the {@link #state} already is {@link State#Closing} or
{@link State#Closed}.
*/
private boolean trySetStateToClosing() {
- final AtomicBoolean notClosing = new AtomicBoolean(false);
- STATE_UPDATER.updateAndGet(this, state -> {
- switch (state){
- case Closing:
- case Closed: {
- notClosing.set(false);
- return state;
- }
- default: {
- notClosing.set(true);
- return State.Closing;
- }
- }
- });
- return notClosing.get();
+ State previousState = changeStateIfNotClosed(State.Closing);
+ return !previousState.isClosed();
}
private void flushPendingMarkDeletes() {
@@ -3185,7 +3157,7 @@ public class ManagedCursorImpl implements ManagedCursor {
deleteLedgerAsync(newLedgerHandle);
callback.operationFailed(exception);
}
- });
+ }, false);
}).whenComplete((result, e) -> {
ledger.mbean.endCursorLedgerCreateOp();
if (e != null) {
@@ -3344,7 +3316,8 @@ public class ManagedCursorImpl implements ManagedCursor {
}
}
- void persistPositionToLedger(final LedgerHandle lh, MarkDeleteEntry
mdEntry, final VoidCallback callback) {
+ void persistPositionToLedger(final LedgerHandle lh, MarkDeleteEntry
mdEntry, final VoidCallback callback,
+ boolean ignoreClosedStateAfterFailure) {
Position position = mdEntry.newPosition;
Builder piBuilder =
PositionInfo.newBuilder().setLedgerId(position.getLedgerId())
.setEntryId(position.getEntryId())
@@ -3398,7 +3371,7 @@ public class ManagedCursorImpl implements ManagedCursor {
mbean.addWriteCursorLedgerSize(data.length);
callback.operationComplete();
} else {
- if (state == State.Closed) {
+ if (!ignoreClosedStateAfterFailure && state.isClosed()) {
// After closed the cursor, the in-progress persistence
task will get a
// BKException.Code.LedgerClosedException.
callback.operationFailed(new
CursorAlreadyClosedException(String.format("%s %s skipped this"
@@ -3419,8 +3392,7 @@ public class ManagedCursorImpl implements ManagedCursor {
public boolean periodicRollover() {
LedgerHandle lh = cursorLedger;
- if (State.Open.equals(STATE_UPDATER.get(this))
- && lh != null && lh.getLength() > 0) {
+ if (state == State.Open && lh != null && lh.getLength() > 0) {
boolean triggered = rolloverLedgerIfNeeded(lh);
if (triggered) {
log.info("[{}] Periodic rollover triggered for cursor {}
(length={} bytes)",
@@ -3478,7 +3450,7 @@ public class ManagedCursorImpl implements ManagedCursor {
if (ledger.getFactory().isMetadataServiceAvailable()
&& (lh.getLastAddConfirmed() >=
getConfig().getMetadataMaxEntriesPerLedger()
|| lastLedgerSwitchTimestamp < (now -
getConfig().getLedgerRolloverTimeout() * 1000))
- && (STATE_UPDATER.get(this) != State.Closed &&
STATE_UPDATER.get(this) != State.Closing)) {
+ && !state.isClosed()) {
// It's safe to modify the timestamp since this method will be
only called from a callback, implying that
// calls will be serialized on one single thread
lastLedgerSwitchTimestamp = now;
@@ -3526,7 +3498,21 @@ public class ManagedCursorImpl implements ManagedCursor {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Received ml notification", ledger.getName(),
name);
}
- OpReadEntry opReadEntry = WAITING_READ_OP_UPDATER.getAndSet(this,
null);
+
+ OpReadEntry opReadEntry = WAITING_READ_OP_UPDATER.getAndUpdate(this,
current -> {
+ // if the waitingReadOp is WAITING_READ_OP_FOR_CLOSED_CURSOR, keep
it as is
+ if (current == OpReadEntry.WAITING_READ_OP_FOR_CLOSED_CURSOR) {
+ return current;
+ } else {
+ // Otherwise, clear the waiting read operation
+ return null;
+ }
+ });
+
+ // ignore the notification if the cursor is already closed
+ if (opReadEntry == OpReadEntry.WAITING_READ_OP_FOR_CLOSED_CURSOR) {
+ return;
+ }
if (opReadEntry != null) {
if (log.isDebugEnabled()) {
@@ -3535,7 +3521,15 @@ public class ManagedCursorImpl implements ManagedCursor {
log.debug("[{}] Consumer {} cursor notification: other
counters: consumed {} mdPos {} rdPos {}",
ledger.getName(), name, messagesConsumedCounter,
markDeletePosition, readPosition);
}
-
+ if (isClosed()) {
+ // If the cursor is closed, we should not read any more entries
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] [{}] Cursor is already closed, ignoring
notification", ledger.getName(), name);
+ }
+ opReadEntry.readEntriesFailed(new
ManagedLedgerException.CursorAlreadyClosedException(
+ "Cursor was already closed"), opReadEntry.ctx);
+ return;
+ }
PENDING_READ_OPS_UPDATER.incrementAndGet(this);
opReadEntry.readPosition = getReadPosition();
ledger.asyncReadEntries(opReadEntry);
@@ -3570,7 +3564,6 @@ public class ManagedCursorImpl implements ManagedCursor {
void decrementPendingMarkDeleteCount() {
if (PENDING_MARK_DELETED_SUBMITTED_COUNT_UPDATER.decrementAndGet(this)
== 0) {
- final State state = STATE_UPDATER.get(this);
if (state == State.SwitchingLedger) {
// A metadata ledger switch was pending and now we can do it
since we don't have any more
// outstanding mark-delete requests
@@ -3582,7 +3575,7 @@ public class ManagedCursorImpl implements ManagedCursor {
void readOperationCompleted() {
if (PENDING_READ_OPS_UPDATER.decrementAndGet(this) == 0) {
synchronized (pendingMarkDeleteOps) {
- if (STATE_UPDATER.get(this) == State.Open) {
+ if (state == State.Open) {
// Flush the pending writes only if the state is open.
flushPendingMarkDeletes();
} else if
(PENDING_MARK_DELETED_SUBMITTED_COUNT_UPDATER.get(this) != 0) {
@@ -3629,13 +3622,25 @@ public class ManagedCursorImpl implements ManagedCursor
{
}
private void asyncDeleteCursorLedger(int retry) {
- STATE_UPDATER.set(this, State.Closed);
+ State beforeChangingState = changeStateToDeletingIfNotDeleted();
+ if (beforeChangingState == State.Deleted) {
+ log.warn("[{}-{}] Cursor ledger is already deleted. state={}",
ledger.getName(), name,
+ beforeChangingState);
+ return;
+ }
- if (cursorLedger == null || retry <= 0) {
- if (cursorLedger != null) {
- log.warn("[{}-{}] Failed to delete ledger after retries {}",
ledger.getName(), name,
- cursorLedger.getId());
- }
+ closeWaitingCursor();
+
+ if (cursorLedger == null) {
+ log.warn("[{}-{}] There's no cursor ledger available for
deletion.", ledger.getName(), name);
+ state = State.DeletingFailed;
+ return;
+ }
+
+ if (retry <= 0) {
+ log.warn("[{}-{}] Failed to delete ledger after retries {}",
ledger.getName(), name,
+ cursorLedger.getId());
+ state = State.DeletingFailed;
return;
}
@@ -3643,18 +3648,36 @@ public class ManagedCursorImpl implements ManagedCursor
{
bookkeeper.asyncDeleteLedger(cursorLedger.getId(), (rc, ctx) -> {
ledger.mbean.endCursorLedgerDeleteOp();
if (rc == BKException.Code.OK) {
+ state = State.Deleted;
log.info("[{}][{}] Deleted cursor ledger {}",
ledger.getName(), name, cursorLedger.getId());
} else {
log.warn("[{}][{}] Failed to delete ledger {}: {}",
ledger.getName(), name, cursorLedger.getId(),
BKException.getMessage(rc));
if (!isNoSuchLedgerExistsException(rc)) {
+ state = State.DeletingFailed;
ledger.getScheduledExecutor().schedule(() ->
asyncDeleteCursorLedger(retry - 1),
DEFAULT_LEDGER_DELETE_BACKOFF_TIME_SEC,
TimeUnit.SECONDS);
+ } else {
+ state = State.Deleted;
}
}
}, null);
}
+ /**
+ * Change the state to {@link State#Deleting} if the current state is not
{@link State#Deleted}.
+ * @return The state before changing.
+ */
+ State changeStateToDeletingIfNotDeleted() {
+ return STATE_UPDATER.getAndUpdate(this, current -> {
+ // don't change the state if it's already deleted
+ if (current == State.Deleted) {
+ return current;
+ }
+ return State.Deleting;
+ });
+ }
+
/**
* return BK error codes that are considered not likely to be recoverable.
*/
@@ -3812,7 +3835,7 @@ public class ManagedCursorImpl implements ManagedCursor {
}
public String getState() {
- return STATE_UPDATER.get(this).toString();
+ return state.toString();
}
@Override
@@ -3936,8 +3959,8 @@ public class ManagedCursorImpl implements ManagedCursor {
}
@VisibleForTesting
- public void setState(State state) {
- this.state = state;
+ public State getAndSetState(State state) {
+ return STATE_UPDATER.getAndSet(this, state);
}
public void setCacheReadEntry(boolean cacheReadEntry) {
@@ -4012,4 +4035,55 @@ public class ManagedCursorImpl implements ManagedCursor {
cs.properties = getProperties();
return cs;
}
+
+ /**
+ * Called by ManagedLedgerImpl to execute the Runnable inside the lock to
remove the cursor from it's
+ * waiting cursors list.
+ * The cursor state is set to unregistered, and it can be registered again
for waiting in ManagedLedgerImpl.
+ */
+ void removeWaitingCursorRequested(Runnable removeWaitingCursorRunnable) {
+ synchronized (registerToWaitingCursorsLock) {
+ if (!registeredToWaitingCursors) {
+ // The cursor hasn't been registered, do not attempt to remove
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Skipping removing cursor {} from waiting
cursors since it's not registered.",
+ ledger.getName(), name);
+ }
+ return;
+ }
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Removing cursor {} from waiting cursors",
ledger.getName(), name);
+ }
+ removeWaitingCursorRunnable.run();
+ registeredToWaitingCursors = false;
+ }
+ }
+
+ /**
+ * Called by ManagedLedgerImpl to notify that the cursor has been dequeued
from the waiting cursors list.
+ */
+ void notifyWaitingCursorDequeued() {
+ synchronized (registerToWaitingCursorsLock) {
+ registeredToWaitingCursors = false;
+ }
+ }
+
+ /**
+ * Called by ManagedLedgerImpl to execute the Runnable inside the lock to
remove the cursor from it's
+ * waiting cursors list.
+ * This method is used to ensure that the cursor is not already
registered, resulting in duplicates.
+ */
+ void addWaitingCursorRequested(Runnable addWaitingCursorRunnable) {
+ synchronized (registerToWaitingCursorsLock) {
+ if (registeredToWaitingCursors || isClosed()) {
+ // The cursor is already registered or closed, do not register
again.
+ return;
+ }
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Adding cursor {} to waiting cursors",
ledger.getName(), name);
+ }
+ addWaitingCursorRunnable.run();
+ registeredToWaitingCursors = true;
+ }
+ }
}
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 390f3a9fd5a..4b630a6b362 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -1063,15 +1063,29 @@ public class ManagedLedgerImpl implements
ManagedLedger, CreateCallback {
callback.deleteCursorFailed(new
ManagedLedgerException.CursorNotFoundException("ManagedCursor not found: "
+ consumerName), ctx);
return;
- } else if (!cursor.isDurable()) {
- cursor.setState(ManagedCursorImpl.State.Closed);
- cursor.cancelPendingReadRequest();
+ }
+
+ // Non-durable cursors can be closed and removed immediately
+ if (!cursor.isDurable()) {
+ try {
+ cursor.close();
+ } catch (Exception e) {
+ log.warn("[{}] Failed to close non-durable cursor {}", name,
consumerName, e);
+ }
cursors.removeCursor(consumerName);
- deactivateCursorByName(consumerName);
callback.deleteCursorComplete(ctx);
return;
}
+ // If the cursor is active, we need to deactivate it first
+ cursor.setInactive();
+ // Set the state to deleting (which is a closed state) to avoid any
new writes
+ ManagedCursorImpl.State beforeChangingState =
cursor.changeStateToDeletingIfNotDeleted();
+ if (beforeChangingState.isDeletingOrDeleted()) {
+ log.warn("[{}] [{}] Cursor is already being deleted or has been
deleted.", name, consumerName);
+ return;
+ }
+
// First remove the consumer form the MetaStore. If this operation
succeeds and the next one (removing the
// ledger from BK) don't, we end up having a loose ledger leaked but
the state will be consistent.
store.asyncRemoveCursor(ManagedLedgerImpl.this.name, consumerName, new
MetaStoreCallback<Void>() {
@@ -1079,7 +1093,6 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
public void operationComplete(Void result, Stat stat) {
cursor.asyncDeleteCursorLedger();
cursors.removeCursor(consumerName);
- deactivateCursorByName(consumerName);
trimConsumedLedgersInBackground();
@@ -1089,7 +1102,7 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
@Override
public void operationFailed(MetaStoreException e) {
- handleBadVersion(e);
+ cursor.getAndSetState(ManagedCursorImpl.State.DeletingFailed);
callback.deleteCursorFailed(e, ctx);
}
@@ -2544,7 +2557,7 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
if (waitingCursor == null) {
break;
}
-
+ waitingCursor.notifyWaitingCursorDequeued();
executor.execute(waitingCursor::notifyEntriesAvailable);
}
}
@@ -4036,11 +4049,17 @@ public class ManagedLedgerImpl implements
ManagedLedger, CreateCallback {
}
public void removeWaitingCursor(ManagedCursor cursor) {
- this.waitingCursors.remove(cursor);
+ ((ManagedCursorImpl) cursor).removeWaitingCursorRequested(() -> {
+ // remove only if the cursor has been registered
+ this.waitingCursors.remove(cursor);
+ });
}
public void addWaitingCursor(ManagedCursorImpl cursor) {
- this.waitingCursors.add(cursor);
+ cursor.addWaitingCursorRequested(() -> {
+ // add only if the cursor has not been registered
+ this.waitingCursors.add(cursor);
+ });
}
public boolean isCursorActive(ManagedCursor cursor) {
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java
index 326f8216f1e..c2398aaf6ec 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java
@@ -113,6 +113,8 @@ public class NonDurableCursorImpl extends ManagedCursorImpl
{
@Override
public void asyncClose(CloseCallback callback, Object ctx) {
STATE_UPDATER.set(this, State.Closed);
+ closeWaitingCursor();
+ setInactive();
callback.closeComplete(ctx);
}
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
index 2dcbe50a62c..4576d7cd67e 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
@@ -22,6 +22,7 @@ import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
import org.apache.bookkeeper.mledger.Entry;
@@ -34,7 +35,14 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
class OpReadEntry implements ReadEntriesCallback {
-
+ static final OpReadEntry WAITING_READ_OP_FOR_CLOSED_CURSOR = new
OpReadEntry();
+ private static final AtomicInteger opReadIdGenerator = new
AtomicInteger(1);
+ /**
+ * id for this read operation. Value can be negative when integer value
overflow happens.
+ * Used for waitingReadOp consistency so the the correct instance is
handled after the instance has already been
+ * recycled.
+ */
+ int id;
ManagedCursorImpl cursor;
Position readPosition;
private int count;
@@ -51,6 +59,7 @@ class OpReadEntry implements ReadEntriesCallback {
public static OpReadEntry create(ManagedCursorImpl cursor, Position
readPositionRef, int count,
ReadEntriesCallback callback, Object ctx, Position maxPosition,
Predicate<Position> skipCondition) {
OpReadEntry op = RECYCLER.get();
+ op.id = opReadIdGenerator.getAndIncrement();
op.readPosition =
cursor.ledger.startReadOperationOnLedger(readPositionRef);
op.cursor = cursor;
op.count = count;
@@ -123,7 +132,7 @@ class OpReadEntry implements ReadEntriesCallback {
if (!entries.isEmpty()) {
// There were already some entries that were read before, we can
return them
complete(ctx);
- } else if (cursor.getConfig().isAutoSkipNonRecoverableData()
+ } else if (!cursor.isClosed() &&
cursor.getConfig().isAutoSkipNonRecoverableData()
&& exception instanceof NonRecoverableLedgerException) {
log.warn("[{}][{}] read failed from ledger at position:{} : {}",
cursor.ledger.getName(), cursor.getName(),
readPosition, exception.getMessage());
@@ -200,6 +209,22 @@ class OpReadEntry implements ReadEntriesCallback {
this.recyclerHandle = recyclerHandle;
}
+ // no-op constructor for EMPTY instance
+ private OpReadEntry() {
+ this.recyclerHandle = null;
+ this.callback = new ReadEntriesCallback() {
+ @Override
+ public void readEntriesComplete(List<Entry> entries, Object ctx) {
+ // no-op
+ }
+
+ @Override
+ public void readEntriesFailed(ManagedLedgerException exception,
Object ctx) {
+ // no-op
+ }
+ };
+ }
+
private static final Recycler<OpReadEntry> RECYCLER = new Recycler<>() {
@Override
protected OpReadEntry newObject(Recycler.Handle<OpReadEntry>
recyclerHandle) {
@@ -208,6 +233,11 @@ class OpReadEntry implements ReadEntriesCallback {
};
public void recycle() {
+ if (recyclerHandle == null) {
+ // This is the no-op instance, do not recycle
+ return;
+ }
+ id = -1;
count = 0;
cursor = null;
readPosition = null;
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyCursorImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyCursorImpl.java
index 00ed5a0c5b9..d79aa5e6145 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyCursorImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyCursorImpl.java
@@ -60,6 +60,8 @@ public class ReadOnlyCursorImpl extends ManagedCursorImpl
implements ReadOnlyCur
@Override
public void asyncClose(final AsyncCallbacks.CloseCallback callback, final
Object ctx) {
state = State.Closed;
+ closeWaitingCursor();
+ setInactive();
callback.closeComplete(ctx);
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 7f6f6fb4f1e..7568c612e14 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -354,6 +354,16 @@ public class PersistentSubscription extends
AbstractSubscription {
if (dispatcher != null && dispatcher.getConsumers().isEmpty()) {
deactivateCursor();
+ // Remove the cursor from the waiting cursors list.
+ // For durable cursors, we should *not* cancel the pending read
with cursor.cancelPendingReadRequest.
+ // This is because internally, in the dispatcher implementations,
there is a "havePendingRead" flag
+ // that is not reset. If the pending read is cancelled, the
dispatcher will not continue reading from
+ // the managed ledger when a new consumer is added to the
dispatcher since based on the "havePendingRead"
+ // state, it will continue to expect that a read is pending and
will not submit a new read.
+ // For non-durable cursors, there's no difference since the cursor
is not expected to be used again.
+
+ // remove waiting cursor from the managed ledger, this applies to
both durable and non-durable cursors.
+ topic.getManagedLedger().removeWaitingCursor(cursor);
if (!cursor.isDurable()) {
// If cursor is not durable, we need to clean up the
subscription as well. No need to check for active
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
index 907f208fa47..5b5e4328041 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
@@ -59,7 +59,6 @@ import lombok.Cleanup;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback;
import org.apache.bookkeeper.mledger.Entry;
-import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import
org.apache.bookkeeper.mledger.ManagedLedgerException.CursorAlreadyClosedException;
import org.apache.bookkeeper.mledger.Position;
@@ -1046,7 +1045,7 @@ public class ReplicatorTest extends ReplicatorTestBase {
*
* @throws Exception
*/
- @Test(timeOut = 15000)
+ @Test(timeOut = 30000)
public void testCloseReplicatorStartProducer() throws Exception {
TopicName dest =
TopicName.get(BrokerTestUtil.newUniqueName("persistent://pulsar/ns1/closeCursor"));
// Producer on r1
@@ -1063,33 +1062,30 @@ public class ReplicatorTest extends ReplicatorTestBase {
PersistentTopic topic = (PersistentTopic)
pulsar1.getBrokerService().getTopicReference(dest.toString()).get();
PersistentReplicator replicator = (PersistentReplicator)
topic.getPersistentReplicator("r2");
+ // check that the replicator producer is not null
+ Awaitility.await().untilAsserted(() -> {
+ assertNotNull(replicator.getProducer());
+ });
+
// close the cursor
- Field cursorField =
PersistentReplicator.class.getDeclaredField("cursor");
- cursorField.setAccessible(true);
- ManagedCursor cursor = (ManagedCursor) cursorField.get(replicator);
- cursor.close();
- // try to read entries
+ replicator.getCursor().close();
+
+ // try to produce entries
producer1.produce(10);
+ // attempt to read entries directly from replicator cursor
try {
- cursor.readEntriesOrWait(10);
+ replicator.getCursor().readEntriesOrWait(10);
fail("It should have failed");
} catch (Exception e) {
assertEquals(e.getClass(), CursorAlreadyClosedException.class);
}
- // replicator-readException: cursorAlreadyClosed
- replicator.readEntriesFailed(new CursorAlreadyClosedException("Cursor
already closed exception"), null);
-
// wait replicator producer to be closed
- Thread.sleep(100);
-
- // Replicator producer must be closed
- Field producerField =
AbstractReplicator.class.getDeclaredField("producer");
- producerField.setAccessible(true);
- @SuppressWarnings("unchecked")
- ProducerImpl<byte[]> replicatorProducer = (ProducerImpl<byte[]>)
producerField.get(replicator);
- assertNull(replicatorProducer);
+ // Replicator producer must be null after the producer has been closed
+ Awaitility.await().untilAsserted(() -> {
+ assertNull(replicator.getProducer());
+ });
}
@Test(timeOut = 30000)