junrao commented on code in PR #21065:
URL: https://github.com/apache/kafka/pull/21065#discussion_r2674334760


##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java:
##########
@@ -1132,7 +1155,14 @@ void abortBatches(final RuntimeException reason) {
                 dq.remove(batch);
             }
             batch.abort(reason);
-            deallocate(batch);
+            if (batch.isInflight()) {
+                completeBatch(batch);
+            } else {
+                // KAFKA-19012: if the batch has been sent it might still be 
in use by the network client so we cannot allow it to be reused yet.
+                // We skip deallocating it now. When the request in network 
client completes with a response, either completeBatch() or 
+                // failBatch() will be called with deallocateBatch=true. The 
buffer associated with the batch will be deallocated then.

Review Comment:
   Could we move the comment to the if clause?



##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java:
##########
@@ -1044,6 +1044,9 @@ public void deallocate(ProducerBatch batch) {
             if (batch.isBufferDeallocated()) {
                 log.warn("Skipping deallocating a batch that has already been 
deallocated. Batch is {}, created time is {}", batch, batch.createdMs);
             } else {
+                if (batch.isInflight()) {
+                    throw new IllegalStateException("Attempting to deallocate 
a batch that is not inflight. Batch is " + batch);

Review Comment:
   Since the Sender thread just logs an exception and continues, we could be 
silently leaking memory if this happens. An alternative is to log a warn and 
create a fresh buffer of batch.initialCapacity() size to deallocate. It would 
be useful to include the stacktrace in the log message. 
   
   > that is not inflight
   
   that is inflight



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to