junrao commented on code in PR #21065:
URL: https://github.com/apache/kafka/pull/21065#discussion_r2670500662
##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java:
##########
@@ -833,7 +849,17 @@ private void failBatch(
log.debug("Encountered error when transaction manager was
handling a failed batch", e);
}
}
- maybeRemoveAndDeallocateBatch(batch);
+ if (deallocateBatch) {
+ maybeRemoveAndDeallocateBatch(batch);
+ } else {
+ // Fix for KAFKA-19012
+ // The pooled ByteBuffer associated with this batch might
still be in use by the network client so we
+ // cannot allow it to be reused yet. We skip deallocating it
now, instead doing that in completeBatch
+ // or in another run of failBatch where we hit the else clause
and call deallocateAlreadyRemovedIncomplete
+ maybeRemoveAndDeallocateBatchLater(batch);
Review Comment:
> We skip deallocating it now, instead doing that in completeBatch
> or in another run of failBatch where we hit the else clause and call
deallocateAlreadyRemovedIncomplete
How about "We skip deallocating it now. When the request in network client
completes with a response, either completeBatch() or failBatch() will be called
with deallocateBatch=true. The buffer associated with the batch will be
deallocated then."?
##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java:
##########
@@ -72,6 +72,7 @@ private enum FinalState { ABORTED, FAILED, SUCCEEDED }
private final AtomicInteger attempts = new AtomicInteger(0);
private final boolean isSplitBatch;
private final AtomicReference<FinalState> finalState = new
AtomicReference<>(null);
+ private boolean bufferDeallocated;
Review Comment:
Could we explicitly initialize it to false?
##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java:
##########
@@ -581,6 +582,14 @@ public boolean sequenceHasBeenReset() {
return reopened;
}
+ public boolean isBufferDeallocated() {
+ return bufferDeallocated;
+ }
+
+ public void setBufferDeallocated(boolean bufferDeallocated) {
Review Comment:
Could we name this method markBufferDeallocated and remove the input param?
##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java:
##########
@@ -1029,12 +1029,36 @@ BuiltInPartitioner createBuiltInPartitioner(LogContext
logContext, String topic,
/**
* Deallocate the record batch
*/
- public void deallocate(ProducerBatch batch) {
+ public void completeBatchAndDeallocate(ProducerBatch batch) {
Review Comment:
completeBatchAndDeallocate => completeAndDeallocateBatch to match what's in
RecordAccumulator ?
##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java:
##########
@@ -1132,7 +1156,7 @@ void abortBatches(final RuntimeException reason) {
dq.remove(batch);
}
batch.abort(reason);
- deallocate(batch);
+ completeBatchAndDeallocate(batch);
Review Comment:
Thanks for doing the test. abortBatches() is called in the following 2 cases.
1. When the producer is closed (related to the failure in
SenderTest.testForceCloseWithProducerIdReset). This is not causing a real
issue since the networkClient won't be sending new requests after closing.
2. When the producer's txnManager is in fatal state (related to the failure
in SenderTest.testCancelInFlightRequestAfterFatalError) or when handling
authorization error. This could be a real issue.
I am thinking that we can fix this in the following way. In abortBatches(),
if a batch is in-flight, we call completeBatch() and otherwise, call
completeBatchAndDeallocate(). We can add a boolean in ProducerBatch to indicate
whether it's in-flight or not. The in-flight batches are already tracked in
Sender. But adding a flag in ProducerBatch is probably more convenient.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]