chia7712 commented on code in PR #20847:
URL: https://github.com/apache/kafka/pull/20847#discussion_r2557281553


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java:
##########
@@ -109,6 +110,10 @@ public class GroupCoordinatorConfig {
     public static final CompressionType 
OFFSETS_TOPIC_COMPRESSION_CODEC_DEFAULT = CompressionType.NONE;
     public static final String OFFSETS_TOPIC_COMPRESSION_CODEC_DOC = 
"Compression codec for the offsets topic - compression may be used to achieve 
\"atomic\" commits.";
 
+    public static final String APPEND_MAX_BUFFER_SIZE_CONFIG = 
"group.coordinator.append.max.buffer.size";
+    public static final int APPEND_MAX_BUFFER_SIZE_DEFAULT = 1024 * 1024 + 
Records.LOG_OVERHEAD;
+    public static final String APPEND_MAX_BUFFER_SIZE_DOC = "The largest 
buffer size allowed by GroupCoordinator (It is recommended not to exceed the 
maximum allowed message size).";

Review Comment:
   ditto



##########
share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorConfig.java:
##########
@@ -82,6 +83,10 @@ public class ShareCoordinatorConfig {
     public static final int COLD_PARTITION_SNAPSHOT_INTERVAL_MS_DEFAULT = 5 * 
60 * 1000; // 5 minutes
     public static final String COLD_PARTITION_SNAPSHOT_INTERVAL_MS_DOC = "The 
duration in milliseconds that the share coordinator will wait between force 
snapshotting share partitions which are not being updated.";
 
+    public static final String APPEND_MAX_BUFFER_SIZE_CONFIG = 
"share.coordinator.append.max.buffer.size";
+    public static final int APPEND_MAX_BUFFER_SIZE_DEFAULT = 1024 * 1024 + 
Records.LOG_OVERHEAD;
+    public static final String APPEND_MAX_BUFFER_SIZE_DOC = "The largest 
buffer size allowed by ShareCoordinator (It is recommended not to exceed the 
maximum allowed message size).";

Review Comment:
   `share.coordinator.append.max.buffer.size` CAN NOT be larger than message 
size, right? If so, we should highlight that limit.



##########
coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java:
##########
@@ -2127,6 +2146,10 @@ private CoordinatorRuntime(
         this.compression = compression;
         this.appendLingerMs = appendLingerMs;
         this.executorService = executorService;
+        this.appendMaxBufferSizeSupplier = appendMaxBufferSizeSupplier;
+        this.runtimeMetrics.registerAppendBufferSizeGauge(
+            () -> coordinators.values().stream().mapToLong(c -> 
c.bufferSupplier.size()).sum()

Review Comment:
   This appears to create an implicit call chain between `CoordinatorRuntime` 
and `CoordinatorRuntimeMetricsImpl`. Perhaps, `CoordinatorRuntimeMetricsImpl` 
could maintain a `AtomicLong` variable, and we could update its value via 
`freeCurrentBatch`. For example:
   ```java
               if (currentBatch.builder.buffer().capacity() <= maxBufferSize) {
                   var before = bufferSupplier.size();
                   bufferSupplier.release(currentBatch.builder.buffer());
                   runtimeMetrics.recordAppendBufferSize(bufferSupplier.size() 
- before);
               } else if (currentBatch.buffer.capacity() <= maxBufferSize) {
                   var before = bufferSupplier.size();
                   bufferSupplier.release(currentBatch.buffer);
                   runtimeMetrics.recordAppendBufferSize(bufferSupplier.size() 
- before);
               } else {
                   runtimeMetrics.recordAppendBufferDiscarded();
               }
   ```



##########
coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java:
##########
@@ -772,13 +782,15 @@ private void freeCurrentBatch() {
             // Cancel the linger timeout.
             currentBatch.lingerTimeoutTask.ifPresent(TimerTask::cancel);
 
-            // Release the buffer only if it is not larger than the 
maxBatchSize.
-            int maxBatchSize = partitionWriter.config(tp).maxMessageSize();
+            // Release the buffer only if it is not larger than the max buffer 
size.
+            int maxBufferSize = appendMaxBufferSizeSupplier.get();

Review Comment:
   Should `maybeAllocateNewBatch` also adopt `appendMaxBufferSizeSupplier` 
instead of message size?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to