junrao commented on code in PR #21065:
URL: https://github.com/apache/kafka/pull/21065#discussion_r2673633951


##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java:
##########
@@ -1132,7 +1152,14 @@ void abortBatches(final RuntimeException reason) {
                 dq.remove(batch);
             }
             batch.abort(reason);
-            deallocate(batch);
+            if (batch.isSent()) {

Review Comment:
   This is not a real issue, but it would safer for the caller 
`Sender.maybeAbortBatches()` to clear `inFlightBatches` after aborting all 
batches.



##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java:
##########
@@ -581,6 +583,22 @@ public boolean sequenceHasBeenReset() {
         return reopened;
     }
 
+    public boolean isBufferDeallocated() {
+        return bufferDeallocated;
+    }
+
+    public void markBufferDeallocated() {
+        bufferDeallocated = true;
+    }
+
+    public boolean isSent() {
+        return sent;
+    }
+
+    public void markSent() {

Review Comment:
   It would be useful to add a method to mark the batch as unsent when the 
request completes with a response. This is important since a batch could be 
re-enqueued for retry. It would be useful to verify in the re-enqueue test that 
the batch is marked as unsent after re-enqueuing. 



##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java:
##########
@@ -2195,6 +2199,8 @@ public void testCancelInFlightRequestAfterFatalError() 
throws Exception {
         Future<RecordMetadata> future2 = appendToAccumulator(tp1);
         sender.runOnce();
 
+        assertFalse(pool.allMatch());

Review Comment:
   Could we do the same check just before the 2nd produce response completes? 
When I added that, it exposed the bug in RecordAccumulator.



##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java:
##########
@@ -72,6 +72,8 @@ private enum FinalState { ABORTED, FAILED, SUCCEEDED }
     private final AtomicInteger attempts = new AtomicInteger(0);
     private final boolean isSplitBatch;
     private final AtomicReference<FinalState> finalState = new 
AtomicReference<>(null);
+    private boolean bufferDeallocated = false;
+    private boolean sent = false;

Review Comment:
   Could we add a comment that this means that the batch is sent to the 
NetworkClient? Also, probably inflight is a better name since it's more 
consistent with Sender.inflights.



##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java:
##########
@@ -1132,7 +1156,7 @@ void abortBatches(final RuntimeException reason) {
                 dq.remove(batch);
             }
             batch.abort(reason);
-            deallocate(batch);
+            completeBatchAndDeallocate(batch);

Review Comment:
   > The problem is that the newly accumulated batches (that will get aborted 
on runOnce()) can modify the infight batch? And then the problem is that we may 
send the inflight batch to the wrong place?
   
   When the TxnManager gets into the fatal state, there could be an inflight 
batch in the NetworkClient. maybeAbortBatches() will return the buffer 
associated with the inflight batch to buffer pool. RecordAccumulator.append() 
is allowed when TxnManager is in the fatal state and it could put new records 
into the just returned batch buffer. Sender continues to call client.poll() and 
triggers the sending of the inflight request, which will pick up the wrong data 
in the batch buffer. The changes in 
SenderTest.testCancelInFlightRequestAfterFatalError() simulates the issue.



##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java:
##########
@@ -1132,7 +1152,14 @@ void abortBatches(final RuntimeException reason) {
                 dq.remove(batch);
             }
             batch.abort(reason);
-            deallocate(batch);
+            if (batch.isSent()) {
+                completeAndDeallocateBatch(batch);

Review Comment:
   This is a bug. If the batch is sent, we shouldn't deallocate the batch.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to