junrao commented on code in PR #21065:
URL: https://github.com/apache/kafka/pull/21065#discussion_r2666548111
##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java:
##########
@@ -1029,12 +1029,36 @@ BuiltInPartitioner createBuiltInPartitioner(LogContext
logContext, String topic,
/**
* Deallocate the record batch
*/
- public void deallocate(ProducerBatch batch) {
+ public void completeBatchAndDeallocate(ProducerBatch batch) {
incomplete.remove(batch);
+ deallocateIfNecessary(batch);
+ }
+
+ private void deallocateIfNecessary(ProducerBatch batch) {
// Only deallocate the batch if it is not a split batch because split
batch are allocated outside the
// buffer pool.
- if (!batch.isSplitBatch())
- free.deallocate(batch.buffer(), batch.initialCapacity());
+ if (!batch.isSplitBatch()) {
+ if (batch.isBufferDeallocated()) {
+ log.warn("Skipping deallocating a batch that has already been
deallocated. This is expected to happen rarely: for expired in-flight batches.
Batch is {}, created time is {}", batch, batch.createdMs);
Review Comment:
> This is expected to happen rarely: for expired in-flight batches
Calling deallocate() on the same batch more than once is not expected. For
inflight batches, deallocate() should only be called when the ProduceRequest
completes.
##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java:
##########
@@ -1029,12 +1029,36 @@ BuiltInPartitioner createBuiltInPartitioner(LogContext
logContext, String topic,
/**
* Deallocate the record batch
*/
- public void deallocate(ProducerBatch batch) {
+ public void completeBatchAndDeallocate(ProducerBatch batch) {
Review Comment:
Could we adjust the javadoc above to "Complete and deallocate the record
batch"?
##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java:
##########
@@ -749,12 +759,16 @@ private void completeBatch(ProducerBatch batch,
ProduceResponse.PartitionRespons
if (batch.complete(response.baseOffset, response.logAppendTime)) {
maybeRemoveAndDeallocateBatch(batch);
+ } else {
+ // Always safe to call dellocate because the batch keeps track of
whether or not it was deallocated yet
Review Comment:
typo dellocate
##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java:
##########
@@ -1132,7 +1156,7 @@ void abortBatches(final RuntimeException reason) {
dq.remove(batch);
}
batch.abort(reason);
- deallocate(batch);
+ completeBatchAndDeallocate(batch);
Review Comment:
Hmm, this is an existing issue and I am not sure if this is a real issue or
not. This method is called when the TxnManager is in certain error state. The
problem is that it completes and deallocates both unsent and sent batches. If
we deallocate sent batches and the producer is able to send new records
afterward, the deallocated buffer could be reused when the inflight sent may
not have completed, leading to the same issue in the jira.
@jolshan and @artemlivshits : When this method is called, is it possible for
`incomplete` to contain sent batches? After this method is called, could the
producer continue send new records?
##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java:
##########
@@ -1029,12 +1029,36 @@ BuiltInPartitioner createBuiltInPartitioner(LogContext
logContext, String topic,
/**
* Deallocate the record batch
*/
- public void deallocate(ProducerBatch batch) {
+ public void completeBatchAndDeallocate(ProducerBatch batch) {
incomplete.remove(batch);
+ deallocateIfNecessary(batch);
+ }
+
+ private void deallocateIfNecessary(ProducerBatch batch) {
Review Comment:
Could we just fold this into `deallocate()` and remove
`deallocateIfNecessary()`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]