junrao commented on code in PR #21065:
URL: https://github.com/apache/kafka/pull/21065#discussion_r2674334760
##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java:
##########
@@ -1132,7 +1155,14 @@ void abortBatches(final RuntimeException reason) {
dq.remove(batch);
}
batch.abort(reason);
- deallocate(batch);
+ if (batch.isInflight()) {
+ completeBatch(batch);
+ } else {
+ // KAFKA-19012: if the batch has been sent it might still be
in use by the network client so we cannot allow it to be reused yet.
+ // We skip deallocating it now. When the request in network
client completes with a response, either completeBatch() or
+ // failBatch() will be called with deallocateBatch=true. The
buffer associated with the batch will be deallocated then.
Review Comment:
Could we move the comment to the if clause?
##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java:
##########
@@ -1044,6 +1044,9 @@ public void deallocate(ProducerBatch batch) {
if (batch.isBufferDeallocated()) {
log.warn("Skipping deallocating a batch that has already been
deallocated. Batch is {}, created time is {}", batch, batch.createdMs);
} else {
+ if (batch.isInflight()) {
+ throw new IllegalStateException("Attempting to deallocate
a batch that is not inflight. Batch is " + batch);
Review Comment:
Since the Sender thread just logs an exception and continues, we could be
silently leaking memory if this happens. An alternative is to log a warn and
create a fresh buffer of batch.initialCapacity() size to deallocate. It would
be useful to include the stacktrace in the log message.
> that is not inflight
that is inflight
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]