junrao commented on code in PR #21065:
URL: https://github.com/apache/kafka/pull/21065#discussion_r2673349295


##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java:
##########
@@ -1132,7 +1156,7 @@ void abortBatches(final RuntimeException reason) {
                 dq.remove(batch);
             }
             batch.abort(reason);
-            deallocate(batch);
+            completeBatchAndDeallocate(batch);

Review Comment:
   > Are these solutions mostly to indicate correctly what we should do with 
the buffers and not reuse them?
   
   Yes, we just want to make sure if a batch is inflight in the NetworkClient, 
the deallocation of the buffer associated with the batch needs to wait until 
the inflight request completes if the batch is aborted early.
   
   > did you mean fatal errors or abortable here? If it is fatal, we also can't 
transactionally produce anymore.
   
   My understanding is that if TxnManager is in the fatal state, producer could 
still accumulate record batches in the accumulator. The Sender will just abort 
those batches in runOnce(). This could still expose the buffer reuse problem.
   
   ```
                   // do not continue sending if the transaction manager is in 
a failed state
                   if (transactionManager.hasFatalError()) {
                       if (lastError != null)
                           maybeAbortBatches(lastError);
   ```
   
   Also, there is a separate path calling maybeAbortBatches() when the 
TxnManager is in the abortable state. The producer is expected to continue 
afterwards.
   
   ```
       private boolean shouldHandleAuthorizationError(RuntimeException 
exception) {
           if (exception instanceof TransactionalIdAuthorizationException ||
                           exception instanceof ClusterAuthorizationException) {
               transactionManager.failPendingRequests(new 
AuthenticationException(exception));
               maybeAbortBatches(exception);
               transactionManager.transitionToUninitialized(exception);
               return true;
           }
           return false;
       }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to