mjsax commented on a change in pull request #8776:
URL: https://github.com/apache/kafka/pull/8776#discussion_r434117796



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -390,19 +387,17 @@ boolean tryToCompleteRestoration() {
 
         final List<Task> restoringTasks = new LinkedList<>();
         for (final Task task : tasks.values()) {
-            if (task.state() == CREATED) {
-                try {
-                    task.initializeIfNeeded();
-                } catch (final LockException | TimeoutException e) {
-                    // it is possible that if there are multiple threads 
within the instance that one thread
-                    // trying to grab the task from the other, while the other 
has not released the lock since
-                    // it did not participate in the rebalance. In this case 
we can just retry in the next iteration
-                    log.debug("Could not initialize {} due to the following 
exception; will retry", task.id(), e);
-                    allRunning = false;
-                }
+            try {
+                task.initializeIfNeeded();
+            } catch (final LockException | TimeoutException e) {
+                // it is possible that if there are multiple threads within 
the instance that one thread
+                // trying to grab the task from the other, while the other has 
not released the lock since
+                // it did not participate in the rebalance. In this case we 
can just retry in the next iteration
+                log.debug("Could not initialize {} due to the following 
exception; will retry", task.id(), e);
+                allRunning = false;
             }
 
-            if (task.state() == RESTORING) {
+            if (task.isActive()) {

Review comment:
       Ack. -- I think this need further refactoring... Will do in a follow up 
PR. Renaming for now.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to