chia7712 commented on code in PR #21065:
URL: https://github.com/apache/kafka/pull/21065#discussion_r2673822421


##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java:
##########
@@ -1132,7 +1156,7 @@ void abortBatches(final RuntimeException reason) {
                 dq.remove(batch);
             }
             batch.abort(reason);
-            deallocate(batch);
+            completeBatchAndDeallocate(batch);

Review Comment:
   > However, the record will first be added to the batch buffer in 
RecordAccumulator and then aborted by Sender. Adding to the RecordAccumulator 
can cause the buffer reuse issue.
   
   1. Trigger Request 1: Send a record to p0. This initiates an 
AddPartitionsToTxnRequest (for p0) and stages a ProducerRequest.
   2. Trigger Request 2: Send a record to p1. This initiates an 
AddPartitionsToTxnRequest (for p1).
   3. In-flight State: AddPartitionsToTxnRequest(p0) succeeds. The 
corresponding ProducerRequest(p0) is sent and becomes in-flight.
   4. Fatal Error Trigger: AddPartitionsToTxnRequest(p1) fails with an 
authorization error. The TxnManager transitions to a fatal state and triggers 
abortBatches.
   5. Unsafe Deallocation: batch(p0) is **prematurely deallocated** by 
abortBatches while still in-flight.
   6. Memory Pollution: The application sends a record to px. This new batch 
acquires the dangling buffer from batch(p0) and overwrites the memory.
   7. Corrupted Send: The NetworkClient, still holding the reference to the 
original buffer, sends the now-corrupted data to p0.
   
   Please let me know if I missed anything in this analysis



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to