junrao commented on code in PR #12365:
URL: https://github.com/apache/kafka/pull/12365#discussion_r914251606


##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java:
##########
@@ -297,7 +297,12 @@ public RecordAppendResult append(String topic,
                     byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
                     int size = Math.max(this.batchSize, 
AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, 
value, headers));
                     log.trace("Allocating a new {} byte message buffer for 
topic {} partition {} with remaining timeout {}ms", size, topic, partition, 
maxTimeToBlock);
+                    // This call may block if we exhausted buffer space.
                     buffer = free.allocate(size, maxTimeToBlock);
+                    // Update the current time in case the buffer allocation 
blocked above.
+                    // NOTE: getting time may be expensive, so calling it 
under a lock
+                    // should be avoided.
+                    nowMs = time.milliseconds();

Review Comment:
   @divijvaidya : For 2, before 
[KAFKA-10888](https://issues.apache.org/jira/browse/KAFKA-10888), nowMs is also 
computed before synchronized. So, it has the same behavior as this PR.
   
   Looking at the code, I am not sure if nowMs is strictly needed. nowMs is 
used to populate ProducerBatch.lastAppendTime. However, since KAFKA-5886, 
expiration is based on createTime and not on lastAppendTime. lastAppendTime is 
only used to upper bound lastAttemptMs. This may not be needed. @hachikuji : 
Could we just get right of ProducerBatch.lastAppendTime?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to