codelipenghui commented on code in PR #24522:
URL: https://github.com/apache/pulsar/pull/24522#discussion_r2223869907


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java:
##########
@@ -143,10 +151,14 @@ public void initiate() {
                     payloadProcessorHandle = ml.getManagedLedgerInterceptor()
                             .processPayloadBeforeLedgerWrite(this.getCtx(), 
duplicateBuffer);
                 } catch (Exception e) {
+                    ManagedLedgerException mle = new 
ManagedLedgerException.ManagedLedgerInterceptException(e);
+                    ml.interceptorException = mle;

Review Comment:
   If we go with the `State` management solution mentioned above, it seems like 
we can just have a method to fence to the ManagedLedger, e.g. 
`fenceForInterceptorException()`.



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java:
##########
@@ -132,8 +132,16 @@ public void setCloseWhenDone(boolean closeWhenDone) {
 
     public void initiate() {
         if (STATE_UPDATER.compareAndSet(OpAddEntry.this, State.OPEN, 
State.INITIATED)) {
-            ByteBuf duplicateBuffer = data.retainedDuplicate();
+            // Fail the add operation if the managed ledger is in a state that 
prevents adding entries.
+            ManagedLedgerException exbw = ml.interceptorException;
+            if (exbw != null) {
+                ml.pendingAddEntries.remove(this);
+                this.failed(exbw);
+                // Don't recycle the object here, see: 
https://lists.apache.org/thread/po08w0tkhc7q8gc5khpdft6stxnr1v2y
+                return;
+            }

Review Comment:
   We have `State` for the managed ledger which already have Fenced and 
`FencedForDeletion`. Is it better to  have a new one called 
`FencedForInterceptorException`? So that we can add a API for ManageLedger 
could called `unfenceForInterceptorException()`, is will be more clear than the 
newly added `resetInterceptorException()`?



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java:
##########
@@ -132,8 +132,16 @@ public void setCloseWhenDone(boolean closeWhenDone) {
 
     public void initiate() {
         if (STATE_UPDATER.compareAndSet(OpAddEntry.this, State.OPEN, 
State.INITIATED)) {
-            ByteBuf duplicateBuffer = data.retainedDuplicate();
+            // Fail the add operation if the managed ledger is in a state that 
prevents adding entries.
+            ManagedLedgerException exbw = ml.interceptorException;
+            if (exbw != null) {
+                ml.pendingAddEntries.remove(this);
+                this.failed(exbw);
+                // Don't recycle the object here, see: 
https://lists.apache.org/thread/po08w0tkhc7q8gc5khpdft6stxnr1v2y
+                return;
+            }

Review Comment:
   And the method `ManagedLedgerImpl#internalAsyncAddEntry` should already have 
the check for the for the `state`. Once we changed the state to 
`FencedForInterceptorException`, the new add operation will not be added to the 
pendingAddEntries?
   
   But from this change, it allows the managed ledger to add the `OpAddEntry` 
to the pendingAddEntries first and then remove it from here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to