codelipenghui commented on code in PR #24522:
URL: https://github.com/apache/pulsar/pull/24522#discussion_r2223869907
##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java:
##########
@@ -143,10 +151,14 @@ public void initiate() {
payloadProcessorHandle = ml.getManagedLedgerInterceptor()
.processPayloadBeforeLedgerWrite(this.getCtx(),
duplicateBuffer);
} catch (Exception e) {
+ ManagedLedgerException mle = new
ManagedLedgerException.ManagedLedgerInterceptException(e);
+ ml.interceptorException = mle;
Review Comment:
If we go with the `State` management solution mentioned above, it seems like
we can just have a method to fence to the ManagedLedger, e.g.
`fenceForInterceptorException()`.
##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java:
##########
@@ -132,8 +132,16 @@ public void setCloseWhenDone(boolean closeWhenDone) {
public void initiate() {
if (STATE_UPDATER.compareAndSet(OpAddEntry.this, State.OPEN,
State.INITIATED)) {
- ByteBuf duplicateBuffer = data.retainedDuplicate();
+ // Fail the add operation if the managed ledger is in a state that
prevents adding entries.
+ ManagedLedgerException exbw = ml.interceptorException;
+ if (exbw != null) {
+ ml.pendingAddEntries.remove(this);
+ this.failed(exbw);
+ // Don't recycle the object here, see:
https://lists.apache.org/thread/po08w0tkhc7q8gc5khpdft6stxnr1v2y
+ return;
+ }
Review Comment:
We have `State` for the managed ledger which already have Fenced and
`FencedForDeletion`. Is it better to have a new one called
`FencedForInterceptorException`? So that we can add a API for ManageLedger
could called `unfenceForInterceptorException()`, is will be more clear than the
newly added `resetInterceptorException()`?
##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java:
##########
@@ -132,8 +132,16 @@ public void setCloseWhenDone(boolean closeWhenDone) {
public void initiate() {
if (STATE_UPDATER.compareAndSet(OpAddEntry.this, State.OPEN,
State.INITIATED)) {
- ByteBuf duplicateBuffer = data.retainedDuplicate();
+ // Fail the add operation if the managed ledger is in a state that
prevents adding entries.
+ ManagedLedgerException exbw = ml.interceptorException;
+ if (exbw != null) {
+ ml.pendingAddEntries.remove(this);
+ this.failed(exbw);
+ // Don't recycle the object here, see:
https://lists.apache.org/thread/po08w0tkhc7q8gc5khpdft6stxnr1v2y
+ return;
+ }
Review Comment:
And the method `ManagedLedgerImpl#internalAsyncAddEntry` should already have
the check for the for the `state`. Once we changed the state to
`FencedForInterceptorException`, the new add operation will not be added to the
pendingAddEntries?
But from this change, it allows the managed ledger to add the `OpAddEntry`
to the pendingAddEntries first and then remove it from here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]