junrao commented on code in PR #21065:
URL: https://github.com/apache/kafka/pull/21065#discussion_r2665725049


##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java:
##########
@@ -3591,6 +3596,44 @@ public void 
testWhenProduceResponseReturnsWithALeaderShipChangeErrorAndNewLeader
         }
     }
 
+    @Test
+    public void testNoBufferReuseWhenBatchHasTimeoutErrors() throws Exception {

Review Comment:
   HasTimeoutErrors => Expires



##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java:
##########
@@ -2499,12 +2500,16 @@ public void testNoDoubleDeallocation() throws Exception 
{
         sender.runOnce();  // send request
         assertEquals(1, client.inFlightRequestCount());
         assertEquals(1, sender.inFlightBatches(tp0).size());
+        assertFalse(sender.inFlightBatches(tp0).get(0).isBufferDeallocated(), 
"Buffer not deallocated yet");
+        ProducerBatch inflightBatch = sender.inFlightBatches(tp0).get(0);
+
 
         time.sleep(REQUEST_TIMEOUT);
         assertFalse(pool.allMatch());
 
         sender.runOnce();  // expire the batch
         assertTrue(request1.isDone());
+        assertTrue(inflightBatch.isBufferDeallocated(), "Buffer shold be 
deallocated after request timeout");

Review Comment:
   typo shold
   
   Also, a couple of existing issues. 
   1. It seems that we unnecessarily call pool.allMatch() twice below.
   2. A couple of lines above, "expire the batch" => "times out the request"



##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java:
##########
@@ -72,6 +73,7 @@ private enum FinalState { ABORTED, FAILED, SUCCEEDED }
     private final AtomicInteger attempts = new AtomicInteger(0);
     private final boolean isSplitBatch;
     private final AtomicReference<FinalState> finalState = new 
AtomicReference<>(null);
+    private final AtomicBoolean bufferDeallocated = new AtomicBoolean(false);

Review Comment:
   Since this is only set and checked by the Sender thread, we could just use a 
regular boolean.



##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java:
##########
@@ -3591,6 +3596,44 @@ public void 
testWhenProduceResponseReturnsWithALeaderShipChangeErrorAndNewLeader
         }
     }
 
+    @Test
+    public void testNoBufferReuseWhenBatchHasTimeoutErrors() throws Exception {
+        long totalSize = 1024 * 1024;
+        try (Metrics m = new Metrics()) {
+            BufferPool pool = new BufferPool(totalSize, batchSize, m, time, 
"producer-internal-metrics");
+
+            // Allocate and store a poolable buffer, then return it to the 
pool so the Sender can pick it up
+            ByteBuffer buffer = pool.allocate(batchSize, 0);
+            pool.deallocate(buffer);
+
+            setupWithTransactionState(null, false, pool);
+            StringBuilder sb = new StringBuilder();
+            // Make a value the right size so that RecordAccumulator calls 
BufferPool.allocate with the batchSize so it's pooled
+            for (int i = 0; i < 16290; i++) {
+                sb.append("a");
+            }

Review Comment:
   The above could just be 
   
   `String largeValue = "a".repeat(16290);`
   
   Also, why is it necessary to create a large value? RecordAccumulator picks 
batchSize for allocation if the data size is smaller.
   
   ```
                       int size = Math.max(this.batchSize, 
AbstractRecords.estimateSizeInBytesUpperBound(
                               RecordBatch.CURRENT_MAGIC_VALUE, 
compression.type(), key, value, headers));
                       log.trace("Allocating a new {} byte message buffer for 
topic {} partition {} with remaining timeout {}ms", size, topic, 
effectivePartition, maxTimeToBlock);
                       // This call may block if we exhausted buffer space.
                       buffer = free.allocate(size, maxTimeToBlock);
   ```



##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java:
##########
@@ -1031,12 +1031,30 @@ BuiltInPartitioner createBuiltInPartitioner(LogContext 
logContext, String topic,
      */
     public void deallocate(ProducerBatch batch) {
         incomplete.remove(batch);
+        deallocateIfNecessary(batch);
+    }
+
+    private void deallocateIfNecessary(ProducerBatch batch) {
         // Only deallocate the batch if it is not a split batch because split 
batch are allocated outside the
         // buffer pool.
-        if (!batch.isSplitBatch())
+        if (!batch.isSplitBatch() && !batch.isBufferDeallocated() && 
batch.setBufferDeallocated(true))

Review Comment:
   Could we log a warn if this method is called but 
`batch.isBufferDeallocated()` is true?



##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java:
##########
@@ -1031,12 +1031,30 @@ BuiltInPartitioner createBuiltInPartitioner(LogContext 
logContext, String topic,
      */
     public void deallocate(ProducerBatch batch) {
         incomplete.remove(batch);
+        deallocateIfNecessary(batch);
+    }
+
+    private void deallocateIfNecessary(ProducerBatch batch) {
         // Only deallocate the batch if it is not a split batch because split 
batch are allocated outside the
         // buffer pool.
-        if (!batch.isSplitBatch())
+        if (!batch.isSplitBatch() && !batch.isBufferDeallocated() && 
batch.setBufferDeallocated(true))
             free.deallocate(batch.buffer(), batch.initialCapacity());
     }
 
+    /**
+     * Remove from the incomplete list but do not free memory yet
+     */
+    public void deallocateLater(ProducerBatch batch) {
+        incomplete.remove(batch);
+    }
+
+    /**
+     * Only perform deallocation (and not removal from incomplete set)
+     */
+    public void deallocateAlreadyRemovedIncomplete(ProducerBatch batch) {

Review Comment:
   Perhaps it's clearer if we expose the following 3 methods.
   ```
   completeBatch()
   deallocate()
   completeBatchAndDeallocate()
   ```



##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java:
##########
@@ -833,7 +849,17 @@ private void failBatch(
                     log.debug("Encountered error when transaction manager was 
handling a failed batch", e);
                 }
             }
-            maybeRemoveAndDeallocateBatch(batch);
+            // Fix for KAFKA-19012

Review Comment:
   Could we move this comment to the else clause?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to