DL1231 commented on code in PR #20847:
URL: https://github.com/apache/kafka/pull/20847#discussion_r2567035878


##########
coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java:
##########
@@ -901,10 +908,9 @@ private void maybeAllocateNewBatch(
             long currentTimeMs
         ) {
             if (currentBatch == null) {
-                LogConfig logConfig = partitionWriter.config(tp);
-                int maxBatchSize = logConfig.maxMessageSize();
+                int maxBatchSize = partitionWriter.config(tp).maxMessageSize();
                 long prevLastWrittenOffset = coordinator.lastWrittenOffset();
-                ByteBuffer buffer = 
bufferSupplier.get(min(INITIAL_BUFFER_SIZE, maxBatchSize));
+                ByteBuffer buffer = 
bufferSupplier.get(min(INITIAL_BUFFER_SIZE, appendMaxBufferSizeSupplier.get()));

Review Comment:
   Thinking about this a bit more, I've reverted to the original 
implementation. 
   
   The key reason is that the maxMessageSize determines the actual maximum size 
of a message that can be written. If the initially allocated buffer is larger 
than this maxMessageSize, it would lead to wasted memory space for any message 
that complies with this limit.
   
   Since the appendMaxBufferSize has a minimum value larger than the 
`INITIAL_BUFFER_SIZE`, the new implementation would always allocate a 512KB 
buffer. It loses the ability to scale down when the maxMessageSize is set to a 
smaller value, which is a valuable feature of the original code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to