ableegoldman commented on a change in pull request #10565:
URL: https://github.com/apache/kafka/pull/10565#discussion_r617805969



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -667,8 +668,15 @@ public boolean isProcessable(final long wallClockTime) {
             // thus, the task is not processable, even if there is available 
data in the record queue
             return false;
         }
-
-        return partitionGroup.readyToProcess(wallClockTime);
+        final boolean readyToProcess = 
partitionGroup.readyToProcess(wallClockTime);
+        if (!readyToProcess) {
+            if (!timeCurrentIdlingStarted.isPresent()) {
+                timeCurrentIdlingStarted = Optional.of(wallClockTime);
+            }
+        } else {
+            timeCurrentIdlingStarted = Optional.empty();

Review comment:
       Just want to make sure I understand, previously we only considered a 
task as idling if it was suspended so we're just fixing it up to track the 
actual idling. And while since KIP-429 suspension is just a transient state 
that the task passes through right before being closed, it's still used during 
an upgrade from EAGER. So we're going to keep considering suspension as idling 
until we can finally drop support for EAGER -- does that sound right?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to