junrao commented on code in PR #21065:
URL: https://github.com/apache/kafka/pull/21065#discussion_r2674049829


##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java:
##########
@@ -1027,14 +1027,34 @@ BuiltInPartitioner createBuiltInPartitioner(LogContext 
logContext, String topic,
     }
 
     /**
-     * Deallocate the record batch
+     * Complete and deallocate the record batch
+     */
+    public void completeAndDeallocateBatch(ProducerBatch batch) {
+        completeBatch(batch);
+        deallocate(batch);
+    }
+
+    /**
+     * Only perform deallocation (and not removal from the incomplete set)
      */
     public void deallocate(ProducerBatch batch) {
-        incomplete.remove(batch);
         // Only deallocate the batch if it is not a split batch because split 
batch are allocated outside the
         // buffer pool.
-        if (!batch.isSplitBatch())
-            free.deallocate(batch.buffer(), batch.initialCapacity());
+        if (!batch.isSplitBatch()) {
+            if (batch.isBufferDeallocated()) {
+                log.warn("Skipping deallocating a batch that has already been 
deallocated. Batch is {}, created time is {}", batch, batch.createdMs);
+            } else {
+                batch.markBufferDeallocated();

Review Comment:
   Could we also verify that batch.sent is false and if not, throw an 
IllegalStateException?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to