mjsax commented on a change in pull request #8776: URL: https://github.com/apache/kafka/pull/8776#discussion_r434117796
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ########## @@ -390,19 +387,17 @@ boolean tryToCompleteRestoration() { final List<Task> restoringTasks = new LinkedList<>(); for (final Task task : tasks.values()) { - if (task.state() == CREATED) { - try { - task.initializeIfNeeded(); - } catch (final LockException | TimeoutException e) { - // it is possible that if there are multiple threads within the instance that one thread - // trying to grab the task from the other, while the other has not released the lock since - // it did not participate in the rebalance. In this case we can just retry in the next iteration - log.debug("Could not initialize {} due to the following exception; will retry", task.id(), e); - allRunning = false; - } + try { + task.initializeIfNeeded(); + } catch (final LockException | TimeoutException e) { + // it is possible that if there are multiple threads within the instance that one thread + // trying to grab the task from the other, while the other has not released the lock since + // it did not participate in the rebalance. In this case we can just retry in the next iteration + log.debug("Could not initialize {} due to the following exception; will retry", task.id(), e); + allRunning = false; } - if (task.state() == RESTORING) { + if (task.isActive()) { Review comment: Ack. -- I think this need further refactoring... Will do in a follow up PR. Renaming for now. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org