junrao commented on code in PR #21065:
URL: https://github.com/apache/kafka/pull/21065#discussion_r2673349295
##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java:
##########
@@ -1132,7 +1156,7 @@ void abortBatches(final RuntimeException reason) {
dq.remove(batch);
}
batch.abort(reason);
- deallocate(batch);
+ completeBatchAndDeallocate(batch);
Review Comment:
> Are these solutions mostly to indicate correctly what we should do with
the buffers and not reuse them?
Yes, we just want to make sure if a batch is inflight in the NetworkClient,
the deallocation of the buffer associated with the batch needs to wait until
the inflight request completes if the batch is aborted early.
> did you mean fatal errors or abortable here? If it is fatal, we also can't
transactionally produce anymore.
My understanding is that if TxnManager is in the fatal state, producer could
still accumulate record batches in the accumulator. The Sender will just abort
those batches in runOnce(). This could still expose the buffer reuse problem.
```
// do not continue sending if the transaction manager is in
a failed state
if (transactionManager.hasFatalError()) {
if (lastError != null)
maybeAbortBatches(lastError);
```
Also, there is a separate path calling maybeAbortBatches() when the
TxnManager is in the abortable state. The producer is expected to continue
afterwards.
```
private boolean shouldHandleAuthorizationError(RuntimeException
exception) {
if (exception instanceof TransactionalIdAuthorizationException ||
exception instanceof ClusterAuthorizationException) {
transactionManager.failPendingRequests(new
AuthenticationException(exception));
maybeAbortBatches(exception);
transactionManager.transitionToUninitialized(exception);
return true;
}
return false;
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]