guozhangwang commented on code in PR #12312: URL: https://github.com/apache/kafka/pull/12312#discussion_r903296637
########## streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java: ########## @@ -603,6 +655,178 @@ public void shouldDrainFailedTasksAndExceptions() throws Exception { verifyDrainingExceptionsAndFailedTasks(expectedExceptionAndTasks2, expectedExceptionAndTasks3, expectedExceptionAndTasks4); } + @Test + public void shouldGetTasksFromInputQueue() { + stateUpdater.shutdown(Duration.ofMillis(Long.MAX_VALUE)); + stateUpdater = new DefaultStateUpdater( + changelogReader, + offsetResetter, + Time.SYSTEM, + true + ); + final StreamTask activeTask1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0)); + final StreamTask activeTask2 = createActiveStatefulTaskInStateRestoring(TASK_1_0, Collections.singletonList(TOPIC_PARTITION_B_0)); + final StandbyTask standbyTask1 = createStandbyTaskInStateRunning(TASK_0_2, Collections.singletonList(TOPIC_PARTITION_C_0)); + final StandbyTask standbyTask2 = createStandbyTaskInStateRunning(TASK_1_1, Collections.singletonList(TOPIC_PARTITION_D_0)); + final StandbyTask standbyTask3 = createStandbyTaskInStateRunning(TASK_0_1, Collections.singletonList(TOPIC_PARTITION_A_1)); + stateUpdater.add(activeTask1); + stateUpdater.add(standbyTask1); + stateUpdater.add(standbyTask2); + stateUpdater.remove(TASK_0_0); + stateUpdater.add(activeTask2); + stateUpdater.add(standbyTask3); + + final Set<Task> tasks = stateUpdater.getTasks(); + + assertEquals(5, tasks.size()); + assertTrue(tasks.containsAll(mkSet(activeTask1, activeTask2, standbyTask1, standbyTask2, standbyTask3))); + + final Set<StreamTask> activeTasks = stateUpdater.getActiveTasks(); + + assertEquals(2, activeTasks.size()); + assertTrue(activeTasks.containsAll(mkSet(activeTask1, activeTask2))); + + final Set<StandbyTask> standbyTasks = stateUpdater.getStandbyTasks(); + + assertEquals(3, standbyTasks.size()); + assertTrue(standbyTasks.containsAll(mkSet(standbyTask1, standbyTask2, standbyTask3))); + } + + @Test + public void shouldGetTasksFromUpdatingTasks() throws Exception { + final StreamTask activeTask1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0)); + final StreamTask activeTask2 = createActiveStatefulTaskInStateRestoring(TASK_1_0, Collections.singletonList(TOPIC_PARTITION_B_0)); + final StandbyTask standbyTask1 = createStandbyTaskInStateRunning(TASK_0_2, Collections.singletonList(TOPIC_PARTITION_C_0)); + final StandbyTask standbyTask2 = createStandbyTaskInStateRunning(TASK_1_1, Collections.singletonList(TOPIC_PARTITION_D_0)); + final StandbyTask standbyTask3 = createStandbyTaskInStateRunning(TASK_0_1, Collections.singletonList(TOPIC_PARTITION_A_1)); + when(changelogReader.completedChangelogs()) + .thenReturn(Collections.emptySet()); + when(changelogReader.allChangelogsCompleted()) + .thenReturn(false); + stateUpdater.add(activeTask1); + stateUpdater.add(standbyTask1); + stateUpdater.add(standbyTask2); + stateUpdater.add(activeTask2); + stateUpdater.add(standbyTask3); + verifyUpdatingTasks(activeTask1, activeTask2, standbyTask1, standbyTask2, standbyTask3); + + final Set<Task> tasks = stateUpdater.getTasks(); + + assertEquals(5, tasks.size()); + assertTrue(tasks.containsAll(mkSet(activeTask1, activeTask2, standbyTask1, standbyTask2, standbyTask3))); + + final Set<StreamTask> activeTasks = stateUpdater.getActiveTasks(); + + assertEquals(2, activeTasks.size()); + assertTrue(activeTasks.containsAll(mkSet(activeTask1, activeTask2))); + + final Set<StandbyTask> standbyTasks = stateUpdater.getStandbyTasks(); + + assertEquals(3, standbyTasks.size()); + assertTrue(standbyTasks.containsAll(mkSet(standbyTask1, standbyTask2, standbyTask3))); + } + + @Test + public void shouldGetTasksFromRestoredActiveTasks() throws Exception { Review Comment: In these three tests, could we also add coverage that after the `drainXXTasks` call, the `getTasks()`'s results would change? ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java: ########## @@ -295,32 +329,53 @@ private void addTaskToRestoredTasks(final StreamTask task) { private final Time time; private final ChangelogReader changelogReader; private final Consumer<Set<TopicPartition>> offsetResetter; + private final boolean manualStart; Review Comment: Do we need this boolean flag, or could we just not call `start()` in unit tests? ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java: ########## @@ -603,6 +655,178 @@ public void shouldDrainFailedTasksAndExceptions() throws Exception { verifyDrainingExceptionsAndFailedTasks(expectedExceptionAndTasks2, expectedExceptionAndTasks3, expectedExceptionAndTasks4); } + @Test + public void shouldGetTasksFromInputQueue() { + stateUpdater.shutdown(Duration.ofMillis(Long.MAX_VALUE)); Review Comment: I had another thought/comment about this: what if we just rely on `start()` of the DefaultStateUpdater (I know it means in the production code we'd need to cast to DefaultStateUpdater and call `start()` upon initialization), and then we would not need the extra boolean, and also do not need to lazy-create the thread upon `add`. ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java: ########## @@ -295,32 +329,53 @@ private void addTaskToRestoredTasks(final StreamTask task) { private final Time time; private final ChangelogReader changelogReader; private final Consumer<Set<TopicPartition>> offsetResetter; + private final boolean manualStart; private final Queue<TaskAndAction> tasksAndActions = new LinkedList<>(); private final Lock tasksAndActionsLock = new ReentrantLock(); private final Condition tasksAndActionsCondition = tasksAndActionsLock.newCondition(); private final Queue<StreamTask> restoredActiveTasks = new LinkedList<>(); private final Lock restoredActiveTasksLock = new ReentrantLock(); private final Condition restoredActiveTasksCondition = restoredActiveTasksLock.newCondition(); - private final BlockingQueue<ExceptionAndTasks> exceptionsAndFailedTasks = new LinkedBlockingQueue<>(); - private final BlockingQueue<Task> removedTasks = new LinkedBlockingQueue<>(); - private CountDownLatch shutdownGate; + private final Queue<ExceptionAndTasks> exceptionsAndFailedTasks = new LinkedList<>(); + private final Lock exceptionsAndFailedTasksLock = new ReentrantLock(); + private final Queue<Task> removedTasks = new LinkedList<>(); + private final Lock removedTasksLock = new ReentrantLock(); private StateUpdaterThread stateUpdaterThread = null; + private CountDownLatch shutdownGate; public DefaultStateUpdater(final ChangelogReader changelogReader, final Consumer<Set<TopicPartition>> offsetResetter, final Time time) { + this(changelogReader, offsetResetter, time, false); + } + + public DefaultStateUpdater(final ChangelogReader changelogReader, + final Consumer<Set<TopicPartition>> offsetResetter, + final Time time, + final boolean manualStart) { this.changelogReader = changelogReader; this.offsetResetter = offsetResetter; this.time = time; + this.manualStart = manualStart; + } + + public void start() { Review Comment: This function seems not used. BTW if we are adding this function, could we just call that function in the production code upon initiating the state updater, and remove the logic of starting the thread in `add` function? ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java: ########## @@ -295,32 +329,53 @@ private void addTaskToRestoredTasks(final StreamTask task) { private final Time time; private final ChangelogReader changelogReader; private final Consumer<Set<TopicPartition>> offsetResetter; + private final boolean manualStart; private final Queue<TaskAndAction> tasksAndActions = new LinkedList<>(); private final Lock tasksAndActionsLock = new ReentrantLock(); private final Condition tasksAndActionsCondition = tasksAndActionsLock.newCondition(); private final Queue<StreamTask> restoredActiveTasks = new LinkedList<>(); private final Lock restoredActiveTasksLock = new ReentrantLock(); private final Condition restoredActiveTasksCondition = restoredActiveTasksLock.newCondition(); - private final BlockingQueue<ExceptionAndTasks> exceptionsAndFailedTasks = new LinkedBlockingQueue<>(); - private final BlockingQueue<Task> removedTasks = new LinkedBlockingQueue<>(); - private CountDownLatch shutdownGate; + private final Queue<ExceptionAndTasks> exceptionsAndFailedTasks = new LinkedList<>(); Review Comment: Why switch `removedTasks` and `exceptionsAndFailedTasks` from blocking queues to locks+linkedlist? ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java: ########## @@ -652,7 +876,10 @@ private void verifyUpdatingTasks(final Task... tasks) throws Exception { "Did not get all updating task within the given timeout!" ); assertEquals(expectedUpdatingTasks.size(), updatingTasks.size()); - assertTrue(updatingTasks.stream().allMatch(task -> task.state() == State.RESTORING)); + assertTrue(updatingTasks.stream() + .allMatch(task -> task.isActive() && task.state() == State.RESTORING + || + !task.isActive() && task.state() == State.RUNNING)); Review Comment: Good call. ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java: ########## @@ -161,41 +159,62 @@ private void handleTaskCorruptedException(final TaskCorruptedException taskCorru final Set<TaskId> corruptedTaskIds = taskCorruptedException.corruptedTasks(); final Set<Task> corruptedTasks = new HashSet<>(); for (final TaskId taskId : corruptedTaskIds) { - final Task corruptedTask = updatingTasks.remove(taskId); + final Task corruptedTask = updatingTasks.get(taskId); if (corruptedTask == null) { throw new IllegalStateException("Task " + taskId + " is corrupted but is not updating. " + BUG_ERROR_MESSAGE); } corruptedTasks.add(corruptedTask); } - exceptionsAndFailedTasks.add(new ExceptionAndTasks(corruptedTasks, taskCorruptedException)); + addToExceptionsAndFailedTasksThenRemoveFromUpdatingTasks(new ExceptionAndTasks(corruptedTasks, taskCorruptedException)); } private void handleStreamsException(final StreamsException streamsException) { log.info("Encountered streams exception: ", streamsException); - final ExceptionAndTasks exceptionAndTasks; if (streamsException.taskId().isPresent()) { - exceptionAndTasks = handleStreamsExceptionWithTask(streamsException); + handleStreamsExceptionWithTask(streamsException); } else { - exceptionAndTasks = handleStreamsExceptionWithoutTask(streamsException); + handleStreamsExceptionWithoutTask(streamsException); } - exceptionsAndFailedTasks.add(exceptionAndTasks); } - private ExceptionAndTasks handleStreamsExceptionWithTask(final StreamsException streamsException) { + private void handleStreamsExceptionWithTask(final StreamsException streamsException) { final TaskId failedTaskId = streamsException.taskId().get(); if (!updatingTasks.containsKey(failedTaskId)) { throw new IllegalStateException("Task " + failedTaskId + " failed but is not updating. " + BUG_ERROR_MESSAGE); } final Set<Task> failedTask = new HashSet<>(); failedTask.add(updatingTasks.get(failedTaskId)); - updatingTasks.remove(failedTaskId); - return new ExceptionAndTasks(failedTask, streamsException); + addToExceptionsAndFailedTasksThenRemoveFromUpdatingTasks(new ExceptionAndTasks(failedTask, streamsException)); + } + + private void handleStreamsExceptionWithoutTask(final StreamsException streamsException) { + addToExceptionsAndFailedTasksThenClearUpdatingTasks( + new ExceptionAndTasks(new HashSet<>(updatingTasks.values()), streamsException)); } - private ExceptionAndTasks handleStreamsExceptionWithoutTask(final StreamsException streamsException) { - final ExceptionAndTasks exceptionAndTasks = new ExceptionAndTasks(new HashSet<>(updatingTasks.values()), streamsException); + // It is important to remove the corrupted tasks from the updating tasks after they were added to the + // failed tasks. + // This ensures that all tasks are found in DefaultStateUpdater#getTasks(). + private void addToExceptionsAndFailedTasksThenRemoveFromUpdatingTasks(final ExceptionAndTasks exceptionAndTasks) { + addToExceptionsAndFailedTasks(exceptionAndTasks); + exceptionAndTasks.getTasks().stream().map(Task::id).forEach(updatingTasks::remove); + if (onlyStandbyTasksLeft()) { Review Comment: nit: replace with "transitToUpdateStandbysIfOnlyStandbysLeft"? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org