guozhangwang commented on a change in pull request #11424:
URL: https://github.com/apache/kafka/pull/11424#discussion_r773490666



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -1074,9 +1083,22 @@ private static Metrics getMetrics(final StreamsConfig 
config, final Time time, f
      * The removed stream thread is gracefully shut down. This method does not 
specify which stream
      * thread is shut down.
      * <p>
+<<<<<<< HEAD

Review comment:
       Rebase leftovers here.

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -1224,17 +1248,20 @@ private int getNextThreadIndex() {
         }
     }
 
-    private long getCacheSizePerThread(final int numStreamThreads) {
+    private long getMemorySizePerThread(final int numStreamThreads, final 
boolean resizeCache) {

Review comment:
       I think what's originally suggested is a bit more than having a single 
consolidated function, but also to consolidate the caller: since we are always 
calling "getMemorySizePerThread" and `resizeThreadCacheOrBufferMemory` for both 
input buffer and cache size at the same place, we can have a single call rather 
than two calls. For example, we can just have:
   
   1) a single `resizeThreadCacheAndBufferMemory(final long numThreads)` in 
which we get the cache-size and input-buffer-size per thread based on the total 
values of the two and the passed in number of threads, and then let each thread 
set to the corresponding values directly.
   
   2) the `getMemorySizePerThread` function is then only needed for logging 
purposes, since otherwise it is now encapsulated inside the 
`resizeThreadCacheAndBufferMemory` function. We can have just a single 
`getThreadCacheAndBufferMemoryString()` which returns in the format of 
"value1/value2" and used in the logging message alone; here we can just get any 
thread and get its values.

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -1116,16 +1138,18 @@ private static Metrics getMetrics(final StreamsConfig 
config, final Time time, f
                             }
                         } else {
                             log.info("{} is the last remaining thread and must 
remove itself, therefore we cannot wait "
-                                + "for it to complete shutdown as this will 
result in deadlock.", streamThread.getName());
+                                    + "for it to complete shutdown as this 
will result in deadlock.", streamThread.getName());
                         }
 
-                        final long cacheSizePerThread = 
getCacheSizePerThread(getNumLiveStreamThreads());
-                        log.info("Resizing thread cache due to thread removal, 
new cache size per thread is {}", cacheSizePerThread);
-                        resizeThreadCache(cacheSizePerThread);
+                        final long cacheSizePerThread = 
getMemorySizePerThread(getNumLiveStreamThreads(), true);
+                        resizeThreadCacheOrBufferMemory(cacheSizePerThread, 
true);
+                        final long maxBufferSizePerThread = 
getMemorySizePerThread(getNumLiveStreamThreads(), false);
+                        
resizeThreadCacheOrBufferMemory(maxBufferSizePerThread, false);
+                        log.info("Resizing thread cache/max buffer size due to 
thread removal, new cache size/max buffer size per thread is {}/{}", 
cacheSizePerThread, maxBufferSizePerThread);

Review comment:
       Please see my other comment earlier: I'd suggest we also include the 
terminating thread name in the log message here for better trouble-shooting, 
this would be valuable when multiple threads are being created / terminated at 
the same time.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StampedRecord.java
##########
@@ -49,6 +49,14 @@ public Headers headers() {
         return value.headers();
     }
 
+    public int serializedKeySize() {

Review comment:
       These functions seem not used any more?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -967,6 +980,17 @@ private long pollPhase() {
         if (!records.isEmpty()) {
             pollRecordsSensor.record(numRecords, now);
             taskManager.addRecordsToTasks(records);
+            // Check buffer size after adding records to tasks
+            final long bufferSize = taskManager.getInputBufferSizeInBytes();
+            // Pausing partitions as the buffer size now exceeds max buffer 
size
+            if (bufferSize > maxBufferSizeBytes.get()) {
+                log.info("Buffered records size {} bytes exceeds {}. Pausing 
the consumer", bufferSize, maxBufferSizeBytes.get());
+                // Only non-empty partitions are paused here. Reason is that, 
if a task has multiple partitions with
+                // some of them empty, then in that case pausing even empty 
partitions would sacrifice ordered processing
+                // and even lead to temporal deadlock. More explanation can be 
found here:
+                // 
https://issues.apache.org/jira/browse/KAFKA-13152?focusedCommentId=17400647&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17400647

Review comment:
       nit: we do not need the 
`?focusedCommentId=17400647&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17400647`
 suffix in the javadoc :)

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
##########
@@ -354,10 +370,22 @@ int numBuffered(final TopicPartition partition) {
         return recordQueue.size();
     }
 
+    Set<TopicPartition> getNonEmptyTopicPartitions() {
+        final Set<TopicPartition> nonEmptyTopicPartitions = new HashSet<>();
+        for (final RecordQueue recordQueue : nonEmptyQueuesByTime) {
+            nonEmptyTopicPartitions.add(recordQueue.partition());
+        }
+        return nonEmptyTopicPartitions;
+    }
+
     int numBuffered() {
         return totalBuffered;
     }
 
+    long totalBytesBuffered() {

Review comment:
       This function seems not addressed? Or was I missing anything?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -728,8 +733,11 @@ private void subscribeConsumer() {
         }
     }
 
-    public void resizeCache(final long size) {
-        cacheResizeSize.set(size);
+    public void resizeCacheOrBufferMemory(final long size, final boolean 
cacheResize) {

Review comment:
       See the other comment: if we always size the two at the same time, we 
can just make one function call instead of two calls with a second boolean flag.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/TaskMetrics.java
##########
@@ -84,6 +84,9 @@ private TaskMetrics() {}
     private static final String NUM_BUFFERED_RECORDS_DESCRIPTION = "The count 
of buffered records that are polled " +
         "from consumer and not yet processed for this active task";
 
+    private static final String TOTAL_BYTES = "total-bytes";
+    private static final String TOTAL_BYTES_DESCRIPTION = "The total number of 
bytes accumulated by this task";

Review comment:
       This comment seems not addressed.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -738,7 +739,7 @@ public boolean process(final long wallClockTime) {
 
             // after processing this record, if its partition queue's buffered 
size has been
             // decreased to the threshold, we can then resume the consumption 
on this partition
-            if (recordInfo.queue().size() == maxBufferedSize) {
+            if (maxBufferedSize != -1 && recordInfo.queue().size() == 
maxBufferedSize) {

Review comment:
       nit: could we add a TODO here that this logic should be removed once we 
remove the deprecated old config as well?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to