junrao commented on code in PR #21065:
URL: https://github.com/apache/kafka/pull/21065#discussion_r2673633951
##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java:
##########
@@ -1132,7 +1152,14 @@ void abortBatches(final RuntimeException reason) {
dq.remove(batch);
}
batch.abort(reason);
- deallocate(batch);
+ if (batch.isSent()) {
Review Comment:
This is not a real issue, but it would safer for the caller
`Sender.maybeAbortBatches()` to clear `inFlightBatches` after aborting all
batches.
##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java:
##########
@@ -581,6 +583,22 @@ public boolean sequenceHasBeenReset() {
return reopened;
}
+ public boolean isBufferDeallocated() {
+ return bufferDeallocated;
+ }
+
+ public void markBufferDeallocated() {
+ bufferDeallocated = true;
+ }
+
+ public boolean isSent() {
+ return sent;
+ }
+
+ public void markSent() {
Review Comment:
It would be useful to add a method to mark the batch as unsent when the
request completes with a response. This is important since a batch could be
re-enqueued for retry. It would be useful to verify in the re-enqueue test that
the batch is marked as unsent after re-enqueuing.
##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java:
##########
@@ -2195,6 +2199,8 @@ public void testCancelInFlightRequestAfterFatalError()
throws Exception {
Future<RecordMetadata> future2 = appendToAccumulator(tp1);
sender.runOnce();
+ assertFalse(pool.allMatch());
Review Comment:
Could we do the same check just before the 2nd produce response completes?
When I added that, it exposed the bug in RecordAccumulator.
##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java:
##########
@@ -72,6 +72,8 @@ private enum FinalState { ABORTED, FAILED, SUCCEEDED }
private final AtomicInteger attempts = new AtomicInteger(0);
private final boolean isSplitBatch;
private final AtomicReference<FinalState> finalState = new
AtomicReference<>(null);
+ private boolean bufferDeallocated = false;
+ private boolean sent = false;
Review Comment:
Could we add a comment that this means that the batch is sent to the
NetworkClient? Also, probably inflight is a better name since it's more
consistent with Sender.inflights.
##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java:
##########
@@ -1132,7 +1156,7 @@ void abortBatches(final RuntimeException reason) {
dq.remove(batch);
}
batch.abort(reason);
- deallocate(batch);
+ completeBatchAndDeallocate(batch);
Review Comment:
> The problem is that the newly accumulated batches (that will get aborted
on runOnce()) can modify the infight batch? And then the problem is that we may
send the inflight batch to the wrong place?
When the TxnManager gets into the fatal state, there could be an inflight
batch in the NetworkClient. maybeAbortBatches() will return the buffer
associated with the inflight batch to buffer pool. RecordAccumulator.append()
is allowed when TxnManager is in the fatal state and it could put new records
into the just returned batch buffer. Sender continues to call client.poll() and
triggers the sending of the inflight request, which will pick up the wrong data
in the batch buffer. The changes in
SenderTest.testCancelInFlightRequestAfterFatalError() simulates the issue.
##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java:
##########
@@ -1132,7 +1152,14 @@ void abortBatches(final RuntimeException reason) {
dq.remove(batch);
}
batch.abort(reason);
- deallocate(batch);
+ if (batch.isSent()) {
+ completeAndDeallocateBatch(batch);
Review Comment:
This is a bug. If the batch is sent, we shouldn't deallocate the batch.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]