junrao commented on code in PR #12365: URL: https://github.com/apache/kafka/pull/12365#discussion_r914251606
########## clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java: ########## @@ -297,7 +297,12 @@ public RecordAppendResult append(String topic, byte maxUsableMagic = apiVersions.maxUsableProduceMagic(); int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers)); log.trace("Allocating a new {} byte message buffer for topic {} partition {} with remaining timeout {}ms", size, topic, partition, maxTimeToBlock); + // This call may block if we exhausted buffer space. buffer = free.allocate(size, maxTimeToBlock); + // Update the current time in case the buffer allocation blocked above. + // NOTE: getting time may be expensive, so calling it under a lock + // should be avoided. + nowMs = time.milliseconds(); Review Comment: @divijvaidya : For 2, before [KAFKA-10888](https://issues.apache.org/jira/browse/KAFKA-10888), nowMs is also computed before synchronized. So, it has the same behavior as this PR. Looking at the code, I am not sure if nowMs is strictly needed. nowMs is used to populate ProducerBatch.lastAppendTime. However, since KAFKA-5886, expiration is based on createTime and not on lastAppendTime. lastAppendTime is only used to upper bound lastAttemptMs. This may not be needed. @hachikuji : Could we just get right of ProducerBatch.lastAppendTime? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org