guozhangwang commented on a change in pull request #11424:
URL: https://github.com/apache/kafka/pull/11424#discussion_r742366571



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -1030,8 +1038,11 @@ private static Metrics getMetrics(final StreamsConfig 
config, final Time time, f
                     streamThread.shutdown();
                     threads.remove(streamThread);
                     final long cacheSizePerThread = 
getCacheSizePerThread(getNumLiveStreamThreads());
-                    log.info("Resizing thread cache due to terminating added 
thread, new cache size per thread is {}", cacheSizePerThread);
+                    final long maxBufferSizePerThread = 
getBufferSizePerThread(getNumLiveStreamThreads());
+                    log.info("Resizing thread cache again since new thread can 
not be started, final cache size per thread is {}", cacheSizePerThread);

Review comment:
       nit: ditto here. See above for the consolidated log line. Here we can 
emphasize it is "Terminating newly added threads".

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/TaskMetrics.java
##########
@@ -84,6 +84,9 @@ private TaskMetrics() {}
     private static final String NUM_BUFFERED_RECORDS_DESCRIPTION = "The count 
of buffered records that are polled " +
         "from consumer and not yet processed for this active task";
 
+    private static final String TOTAL_BYTES = "total-bytes";
+    private static final String TOTAL_BYTES_DESCRIPTION = "The total number of 
bytes accumulated by this task";

Review comment:
       We should be more specific about the description here: the total number 
of bytes accumulated in this task's input buffer.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -1288,6 +1326,10 @@ int currentNumIterations() {
         return numIterations;
     }
 
+    long bufferSize() {

Review comment:
       This function seems not used.
   
   BTW if we do not maintain the local `bufferSize` then we would not need it 
anyways :)
   

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
##########
@@ -266,6 +270,10 @@ StampedRecord nextRecord(final RecordInfo info, final long 
wallClockTime) {
             if (record != null) {
                 --totalBuffered;
 
+                totalBytesBuffered -= (record.key() != null ? 
record.serializedKeySize() : 0) +

Review comment:
       How about adding a `sizeInBytes` function to `StampedRecord` class, to 
avoid duplicating the calculation each time?
   
   Also we do not need to check for `key/value == null` or not, since in that 
case the `ConsumerRecord#serializedKeySize/ValueSize` would be 0.
   
   Also note that the total bytes taken here is more than just the key/value 
since we have other fields like theaders, timestamps etc, but since it's hard 
to capture exactly how many bytes are taken in JVM to store a 
ConsumerRecord<byte[], byte[]> I think getting just getting the sum of 
key/value/timestamp/offset/topic/partition/headers is fine.

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -1013,12 +1017,16 @@ private static Metrics getMetrics(final StreamsConfig 
config, final Time time, f
                 final int threadIdx = getNextThreadIndex();
                 final int numLiveThreads = getNumLiveStreamThreads();
                 final long cacheSizePerThread = 
getCacheSizePerThread(numLiveThreads + 1);
+                final long maxBufferSizePerThread = 
getBufferSizePerThread(numLiveThreads + 1);
                 log.info("Adding StreamThread-{}, there will now be {} live 
threads and the new cache size per thread is {}",
                          threadIdx, numLiveThreads + 1, cacheSizePerThread);
                 resizeThreadCache(cacheSizePerThread);
+                log.info("Adding StreamThread-{}, there are now {} threads 
with a buffer size {} and cache size {} per thread.",

Review comment:
       nit: Could we merge these two info lines into a single one? It seems a 
bit redundant to log twice here. Also this new log line seems wrong since it 
has four parameters but only three values provided.
   
   E.g.
   
   ```
   Adding StreamThread-{}, the current total number of thread is {}, each 
thread now has a buffer size {} and cache size {}
   ```
   
   And
   
   ```
   Terminating StreamThread-{}, the current total number of thread is {}, each 
thread now has a buffer size {} and cache size {}
   ```

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -1221,13 +1236,27 @@ private long getCacheSizePerThread(final int 
numStreamThreads) {
         return totalCacheSize / (numStreamThreads + 
(topologyMetadata.hasGlobalTopology() ? 1 : 0));
     }
 
+    private long getBufferSizePerThread(final int numStreamThreads) {
+        if (numStreamThreads == 0) {
+            return inputBufferMaxBytes;
+        }
+        return inputBufferMaxBytes / (numStreamThreads + 
(topologyMetadata.hasGlobalTopology() ? 1 : 0));
+    }
+
     private void resizeThreadCache(final long cacheSizePerThread) {
         processStreamThread(thread -> thread.resizeCache(cacheSizePerThread));
         if (globalStreamThread != null) {
             globalStreamThread.resize(cacheSizePerThread);
         }
     }
 
+    private void resizeMaxBufferSize(final long maxBufferSize) {

Review comment:
       +1 here as well. I think we would always resize both buffer and state 
cache at the same time moving forward.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
##########
@@ -354,10 +370,22 @@ int numBuffered(final TopicPartition partition) {
         return recordQueue.size();
     }
 
+    Set<TopicPartition> getNonEmptyTopicPartitions() {
+        final Set<TopicPartition> nonEmptyTopicPartitions = new HashSet<>();
+        for (final RecordQueue recordQueue : nonEmptyQueuesByTime) {
+            nonEmptyTopicPartitions.add(recordQueue.partition());
+        }
+        return nonEmptyTopicPartitions;
+    }
+
     int numBuffered() {
         return totalBuffered;
     }
 
+    long totalBytesBuffered() {

Review comment:
       This function seems only used for testing? If yes, please move it to the 
bottom of the class and add a comment that "below are for testing only".

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -601,6 +608,10 @@ boolean runLoop() {
                 if (size != -1L) {
                     cacheResizer.accept(size);
                 }
+                final long bufferBytesSize = 
maxBufferResizeSize.getAndSet(-1L);
+                if (size != -1) {

Review comment:
       This does not look right to me: why we use the size value read from 
`cacheResizeSize` to assign to `maxBufferSizeBytes`? They should be totally 
orthogonal.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -1269,20 +1280,36 @@ void handleTopologyUpdates() {
         }
     }
 
+    static class RecordsProcessedMetadata {
+
+        int totalProcessed;
+
+        long totalBytesConsumed;
+
+        RecordsProcessedMetadata(final int totalProcessed, final long 
totalBytesConsumed) {
+            this.totalProcessed = totalProcessed;
+            this.totalBytesConsumed = totalBytesConsumed;
+        }
+    }
+
     /**
      * @throws TaskMigratedException if the task producer got fenced (EOS only)
      */
-    int process(final int maxNumRecords, final Time time) {
+    RecordsProcessedMetadata process(final int maxNumRecords, final Time time) 
{
         int totalProcessed = 0;
+        long totalBytesConsumed = 0L;
 
         long now = time.milliseconds();
         for (final Task task : activeTaskIterable()) {
             int processed = 0;
+            long bytesConsumed = 0L;
+            task.setBytesConsumed(0L);

Review comment:
       Not sure I understand this logic here: it seems we only call 
`setBytesConsumed` once with 0 here and there's no other callers?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -1269,20 +1280,36 @@ void handleTopologyUpdates() {
         }
     }
 
+    static class RecordsProcessedMetadata {

Review comment:
       See my other comment: I think we can avoid propagating both 
processed-records and processed-bytes from the `process` call.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/TaskMetrics.java
##########
@@ -128,6 +131,22 @@ public static Sensor activeBufferedRecordsSensor(final 
String threadId,
         return sensor;
     }
 
+    public static Sensor totalBytesSensor(final String threadId,

Review comment:
       While reviewing the PR, I feel the name `total-bytes` under
   
   ```
   type = stream-task-metrics
   thread-id = [thread ID]
   task-id = [task ID]
   ```
   
   is a bit too vague, what about renaming it to `total-input-buffer-bytes`. 
WDYT @vamossagar12 @ableegoldman ? If we agree here we'd need to update the KIP 
as well.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -1269,20 +1280,36 @@ void handleTopologyUpdates() {
         }
     }
 
+    static class RecordsProcessedMetadata {
+
+        int totalProcessed;
+
+        long totalBytesConsumed;
+
+        RecordsProcessedMetadata(final int totalProcessed, final long 
totalBytesConsumed) {
+            this.totalProcessed = totalProcessed;
+            this.totalBytesConsumed = totalBytesConsumed;
+        }
+    }
+
     /**
      * @throws TaskMigratedException if the task producer got fenced (EOS only)
      */
-    int process(final int maxNumRecords, final Time time) {
+    RecordsProcessedMetadata process(final int maxNumRecords, final Time time) 
{
         int totalProcessed = 0;
+        long totalBytesConsumed = 0L;

Review comment:
       I think we can simply the logic and do not need to keep track of 
"consumed bytes" within a task here, see my other comment.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StampedRecord.java
##########
@@ -49,6 +49,14 @@ public Headers headers() {
         return value.headers();
     }
 
+    public int serializedKeySize() {

Review comment:
       Please see my other comment above: how about just have a `sizeInBytes` 
function which takes key/value/timestamp/offset/topic/partition/headers into 
account?

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -1112,6 +1124,9 @@ private static Metrics getMetrics(final StreamsConfig 
config, final Time time, f
                         final long cacheSizePerThread = 
getCacheSizePerThread(getNumLiveStreamThreads());
                         log.info("Resizing thread cache due to thread removal, 
new cache size per thread is {}", cacheSizePerThread);
                         resizeThreadCache(cacheSizePerThread);
+                        final long maxBufferSizePerThread = 
getBufferSizePerThread(getNumLiveStreamThreads());
+                        log.info("Resizing max buffer size due to thread 
removal, new buffer size per thread is {}", maxBufferSizePerThread);

Review comment:
       Ditto here as wel.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -1269,20 +1280,36 @@ void handleTopologyUpdates() {
         }
     }
 
+    static class RecordsProcessedMetadata {
+
+        int totalProcessed;
+
+        long totalBytesConsumed;
+
+        RecordsProcessedMetadata(final int totalProcessed, final long 
totalBytesConsumed) {
+            this.totalProcessed = totalProcessed;
+            this.totalBytesConsumed = totalBytesConsumed;
+        }
+    }
+
     /**
      * @throws TaskMigratedException if the task producer got fenced (EOS only)
      */
-    int process(final int maxNumRecords, final Time time) {
+    RecordsProcessedMetadata process(final int maxNumRecords, final Time time) 
{
         int totalProcessed = 0;
+        long totalBytesConsumed = 0L;
 
         long now = time.milliseconds();
         for (final Task task : activeTaskIterable()) {
             int processed = 0;
+            long bytesConsumed = 0L;
+            task.setBytesConsumed(0L);

Review comment:
       Also, even with the correct logic, I'm wondering if we can just define 
it as a local variable within the `process` here instead of augmenting the 
`Task` interface?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -793,6 +809,10 @@ void runOnce() {
 
                     totalProcessed += processed;
                     totalRecordsProcessedSinceLastSummary += processed;
+                    if (bufferSize > maxBufferSizeBytes && bufferSize - 
processedData.totalBytesConsumed <= maxBufferSizeBytes) {

Review comment:
       I also feel this logic is a bit awkward, starting from the fact that we 
need to report how many bytes we've consumed from the process :) I think we can 
simply do the following:
   
   At the end of polling phase, and at the end the process loop (a.k.a. here), 
we loop over all the active tasks, and get their "input buffer size", which 
would delegate to each task's corresponding `PartitionGroup` and then 
`RecordQueue`. And then based on that we can decide whether to resume / pause 
accordingly. Then
   
   1) we do not need to maintain a local `bufferSize` at the task here, i.e. we 
always re-compute from the task's record queue, which is the source of truth.
   2) we do not need to maintain and propagate up the `consumed bytes` within 
each iteration here.

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -1030,8 +1038,11 @@ private static Metrics getMetrics(final StreamsConfig 
config, final Time time, f
                     streamThread.shutdown();
                     threads.remove(streamThread);
                     final long cacheSizePerThread = 
getCacheSizePerThread(getNumLiveStreamThreads());
-                    log.info("Resizing thread cache due to terminating added 
thread, new cache size per thread is {}", cacheSizePerThread);
+                    final long maxBufferSizePerThread = 
getBufferSizePerThread(getNumLiveStreamThreads());
+                    log.info("Resizing thread cache again since new thread can 
not be started, final cache size per thread is {}", cacheSizePerThread);

Review comment:
       nit: ditto here. See above for the consolidated log line. Here we can 
emphasize it is "Terminating newly added threads".

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/TaskMetrics.java
##########
@@ -84,6 +84,9 @@ private TaskMetrics() {}
     private static final String NUM_BUFFERED_RECORDS_DESCRIPTION = "The count 
of buffered records that are polled " +
         "from consumer and not yet processed for this active task";
 
+    private static final String TOTAL_BYTES = "total-bytes";
+    private static final String TOTAL_BYTES_DESCRIPTION = "The total number of 
bytes accumulated by this task";

Review comment:
       We should be more specific about the description here: the total number 
of bytes accumulated in this task's input buffer.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -1288,6 +1326,10 @@ int currentNumIterations() {
         return numIterations;
     }
 
+    long bufferSize() {

Review comment:
       This function seems not used.
   
   BTW if we do not maintain the local `bufferSize` then we would not need it 
anyways :)
   

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
##########
@@ -266,6 +270,10 @@ StampedRecord nextRecord(final RecordInfo info, final long 
wallClockTime) {
             if (record != null) {
                 --totalBuffered;
 
+                totalBytesBuffered -= (record.key() != null ? 
record.serializedKeySize() : 0) +

Review comment:
       How about adding a `sizeInBytes` function to `StampedRecord` class, to 
avoid duplicating the calculation each time?
   
   Also we do not need to check for `key/value == null` or not, since in that 
case the `ConsumerRecord#serializedKeySize/ValueSize` would be 0.
   
   Also note that the total bytes taken here is more than just the key/value 
since we have other fields like theaders, timestamps etc, but since it's hard 
to capture exactly how many bytes are taken in JVM to store a 
ConsumerRecord<byte[], byte[]> I think getting just getting the sum of 
key/value/timestamp/offset/topic/partition/headers is fine.

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -1013,12 +1017,16 @@ private static Metrics getMetrics(final StreamsConfig 
config, final Time time, f
                 final int threadIdx = getNextThreadIndex();
                 final int numLiveThreads = getNumLiveStreamThreads();
                 final long cacheSizePerThread = 
getCacheSizePerThread(numLiveThreads + 1);
+                final long maxBufferSizePerThread = 
getBufferSizePerThread(numLiveThreads + 1);
                 log.info("Adding StreamThread-{}, there will now be {} live 
threads and the new cache size per thread is {}",
                          threadIdx, numLiveThreads + 1, cacheSizePerThread);
                 resizeThreadCache(cacheSizePerThread);
+                log.info("Adding StreamThread-{}, there are now {} threads 
with a buffer size {} and cache size {} per thread.",

Review comment:
       nit: Could we merge these two info lines into a single one? It seems a 
bit redundant to log twice here. Also this new log line seems wrong since it 
has four parameters but only three values provided.
   
   E.g.
   
   ```
   Adding StreamThread-{}, the current total number of thread is {}, each 
thread now has a buffer size {} and cache size {}
   ```
   
   And
   
   ```
   Terminating StreamThread-{}, the current total number of thread is {}, each 
thread now has a buffer size {} and cache size {}
   ```

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -1221,13 +1236,27 @@ private long getCacheSizePerThread(final int 
numStreamThreads) {
         return totalCacheSize / (numStreamThreads + 
(topologyMetadata.hasGlobalTopology() ? 1 : 0));
     }
 
+    private long getBufferSizePerThread(final int numStreamThreads) {
+        if (numStreamThreads == 0) {
+            return inputBufferMaxBytes;
+        }
+        return inputBufferMaxBytes / (numStreamThreads + 
(topologyMetadata.hasGlobalTopology() ? 1 : 0));
+    }
+
     private void resizeThreadCache(final long cacheSizePerThread) {
         processStreamThread(thread -> thread.resizeCache(cacheSizePerThread));
         if (globalStreamThread != null) {
             globalStreamThread.resize(cacheSizePerThread);
         }
     }
 
+    private void resizeMaxBufferSize(final long maxBufferSize) {

Review comment:
       +1 here as well. I think we would always resize both buffer and state 
cache at the same time moving forward.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
##########
@@ -354,10 +370,22 @@ int numBuffered(final TopicPartition partition) {
         return recordQueue.size();
     }
 
+    Set<TopicPartition> getNonEmptyTopicPartitions() {
+        final Set<TopicPartition> nonEmptyTopicPartitions = new HashSet<>();
+        for (final RecordQueue recordQueue : nonEmptyQueuesByTime) {
+            nonEmptyTopicPartitions.add(recordQueue.partition());
+        }
+        return nonEmptyTopicPartitions;
+    }
+
     int numBuffered() {
         return totalBuffered;
     }
 
+    long totalBytesBuffered() {

Review comment:
       This function seems only used for testing? If yes, please move it to the 
bottom of the class and add a comment that "below are for testing only".

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -601,6 +608,10 @@ boolean runLoop() {
                 if (size != -1L) {
                     cacheResizer.accept(size);
                 }
+                final long bufferBytesSize = 
maxBufferResizeSize.getAndSet(-1L);
+                if (size != -1) {

Review comment:
       This does not look right to me: why we use the size value read from 
`cacheResizeSize` to assign to `maxBufferSizeBytes`? They should be totally 
orthogonal.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -1269,20 +1280,36 @@ void handleTopologyUpdates() {
         }
     }
 
+    static class RecordsProcessedMetadata {
+
+        int totalProcessed;
+
+        long totalBytesConsumed;
+
+        RecordsProcessedMetadata(final int totalProcessed, final long 
totalBytesConsumed) {
+            this.totalProcessed = totalProcessed;
+            this.totalBytesConsumed = totalBytesConsumed;
+        }
+    }
+
     /**
      * @throws TaskMigratedException if the task producer got fenced (EOS only)
      */
-    int process(final int maxNumRecords, final Time time) {
+    RecordsProcessedMetadata process(final int maxNumRecords, final Time time) 
{
         int totalProcessed = 0;
+        long totalBytesConsumed = 0L;
 
         long now = time.milliseconds();
         for (final Task task : activeTaskIterable()) {
             int processed = 0;
+            long bytesConsumed = 0L;
+            task.setBytesConsumed(0L);

Review comment:
       Not sure I understand this logic here: it seems we only call 
`setBytesConsumed` once with 0 here and there's no other callers?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -1269,20 +1280,36 @@ void handleTopologyUpdates() {
         }
     }
 
+    static class RecordsProcessedMetadata {

Review comment:
       See my other comment: I think we can avoid propagating both 
processed-records and processed-bytes from the `process` call.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/TaskMetrics.java
##########
@@ -128,6 +131,22 @@ public static Sensor activeBufferedRecordsSensor(final 
String threadId,
         return sensor;
     }
 
+    public static Sensor totalBytesSensor(final String threadId,

Review comment:
       While reviewing the PR, I feel the name `total-bytes` under
   
   ```
   type = stream-task-metrics
   thread-id = [thread ID]
   task-id = [task ID]
   ```
   
   is a bit too vague, what about renaming it to `total-input-buffer-bytes`. 
WDYT @vamossagar12 @ableegoldman ? If we agree here we'd need to update the KIP 
as well.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -1269,20 +1280,36 @@ void handleTopologyUpdates() {
         }
     }
 
+    static class RecordsProcessedMetadata {
+
+        int totalProcessed;
+
+        long totalBytesConsumed;
+
+        RecordsProcessedMetadata(final int totalProcessed, final long 
totalBytesConsumed) {
+            this.totalProcessed = totalProcessed;
+            this.totalBytesConsumed = totalBytesConsumed;
+        }
+    }
+
     /**
      * @throws TaskMigratedException if the task producer got fenced (EOS only)
      */
-    int process(final int maxNumRecords, final Time time) {
+    RecordsProcessedMetadata process(final int maxNumRecords, final Time time) 
{
         int totalProcessed = 0;
+        long totalBytesConsumed = 0L;

Review comment:
       I think we can simply the logic and do not need to keep track of 
"consumed bytes" within a task here, see my other comment.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StampedRecord.java
##########
@@ -49,6 +49,14 @@ public Headers headers() {
         return value.headers();
     }
 
+    public int serializedKeySize() {

Review comment:
       Please see my other comment above: how about just have a `sizeInBytes` 
function which takes key/value/timestamp/offset/topic/partition/headers into 
account?

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -1112,6 +1124,9 @@ private static Metrics getMetrics(final StreamsConfig 
config, final Time time, f
                         final long cacheSizePerThread = 
getCacheSizePerThread(getNumLiveStreamThreads());
                         log.info("Resizing thread cache due to thread removal, 
new cache size per thread is {}", cacheSizePerThread);
                         resizeThreadCache(cacheSizePerThread);
+                        final long maxBufferSizePerThread = 
getBufferSizePerThread(getNumLiveStreamThreads());
+                        log.info("Resizing max buffer size due to thread 
removal, new buffer size per thread is {}", maxBufferSizePerThread);

Review comment:
       Ditto here as wel.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -1269,20 +1280,36 @@ void handleTopologyUpdates() {
         }
     }
 
+    static class RecordsProcessedMetadata {
+
+        int totalProcessed;
+
+        long totalBytesConsumed;
+
+        RecordsProcessedMetadata(final int totalProcessed, final long 
totalBytesConsumed) {
+            this.totalProcessed = totalProcessed;
+            this.totalBytesConsumed = totalBytesConsumed;
+        }
+    }
+
     /**
      * @throws TaskMigratedException if the task producer got fenced (EOS only)
      */
-    int process(final int maxNumRecords, final Time time) {
+    RecordsProcessedMetadata process(final int maxNumRecords, final Time time) 
{
         int totalProcessed = 0;
+        long totalBytesConsumed = 0L;
 
         long now = time.milliseconds();
         for (final Task task : activeTaskIterable()) {
             int processed = 0;
+            long bytesConsumed = 0L;
+            task.setBytesConsumed(0L);

Review comment:
       Also, even with the correct logic, I'm wondering if we can just define 
it as a local variable within the `process` here instead of augmenting the 
`Task` interface?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -793,6 +809,10 @@ void runOnce() {
 
                     totalProcessed += processed;
                     totalRecordsProcessedSinceLastSummary += processed;
+                    if (bufferSize > maxBufferSizeBytes && bufferSize - 
processedData.totalBytesConsumed <= maxBufferSizeBytes) {

Review comment:
       I also feel this logic is a bit awkward, starting from the fact that we 
need to report how many bytes we've consumed from the process :) I think we can 
simply do the following:
   
   At the end of polling phase, and at the end the process loop (a.k.a. here), 
we loop over all the active tasks, and get their "input buffer size", which 
would delegate to each task's corresponding `PartitionGroup` and then 
`RecordQueue`. And then based on that we can decide whether to resume / pause 
accordingly. Then
   
   1) we do not need to maintain a local `bufferSize` at the task here, i.e. we 
always re-compute from the task's record queue, which is the source of truth.
   2) we do not need to maintain and propagate up the `consumed bytes` within 
each iteration here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to