guozhangwang commented on code in PR #12312:
URL: https://github.com/apache/kafka/pull/12312#discussion_r903296637


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java:
##########
@@ -603,6 +655,178 @@ public void shouldDrainFailedTasksAndExceptions() throws 
Exception {
         verifyDrainingExceptionsAndFailedTasks(expectedExceptionAndTasks2, 
expectedExceptionAndTasks3, expectedExceptionAndTasks4);
     }
 
+    @Test
+    public void shouldGetTasksFromInputQueue() {
+        stateUpdater.shutdown(Duration.ofMillis(Long.MAX_VALUE));
+        stateUpdater = new DefaultStateUpdater(
+            changelogReader,
+            offsetResetter,
+            Time.SYSTEM,
+            true
+        );
+        final StreamTask activeTask1 = 
createActiveStatefulTaskInStateRestoring(TASK_0_0, 
Collections.singletonList(TOPIC_PARTITION_A_0));
+        final StreamTask activeTask2 = 
createActiveStatefulTaskInStateRestoring(TASK_1_0, 
Collections.singletonList(TOPIC_PARTITION_B_0));
+        final StandbyTask standbyTask1 = 
createStandbyTaskInStateRunning(TASK_0_2, 
Collections.singletonList(TOPIC_PARTITION_C_0));
+        final StandbyTask standbyTask2 = 
createStandbyTaskInStateRunning(TASK_1_1, 
Collections.singletonList(TOPIC_PARTITION_D_0));
+        final StandbyTask standbyTask3 = 
createStandbyTaskInStateRunning(TASK_0_1, 
Collections.singletonList(TOPIC_PARTITION_A_1));
+        stateUpdater.add(activeTask1);
+        stateUpdater.add(standbyTask1);
+        stateUpdater.add(standbyTask2);
+        stateUpdater.remove(TASK_0_0);
+        stateUpdater.add(activeTask2);
+        stateUpdater.add(standbyTask3);
+
+        final Set<Task> tasks = stateUpdater.getTasks();
+
+        assertEquals(5, tasks.size());
+        assertTrue(tasks.containsAll(mkSet(activeTask1, activeTask2, 
standbyTask1, standbyTask2, standbyTask3)));
+
+        final Set<StreamTask> activeTasks = stateUpdater.getActiveTasks();
+
+        assertEquals(2, activeTasks.size());
+        assertTrue(activeTasks.containsAll(mkSet(activeTask1, activeTask2)));
+
+        final Set<StandbyTask> standbyTasks = stateUpdater.getStandbyTasks();
+
+        assertEquals(3, standbyTasks.size());
+        assertTrue(standbyTasks.containsAll(mkSet(standbyTask1, standbyTask2, 
standbyTask3)));
+    }
+
+    @Test
+    public void shouldGetTasksFromUpdatingTasks() throws Exception {
+        final StreamTask activeTask1 = 
createActiveStatefulTaskInStateRestoring(TASK_0_0, 
Collections.singletonList(TOPIC_PARTITION_A_0));
+        final StreamTask activeTask2 = 
createActiveStatefulTaskInStateRestoring(TASK_1_0, 
Collections.singletonList(TOPIC_PARTITION_B_0));
+        final StandbyTask standbyTask1 = 
createStandbyTaskInStateRunning(TASK_0_2, 
Collections.singletonList(TOPIC_PARTITION_C_0));
+        final StandbyTask standbyTask2 = 
createStandbyTaskInStateRunning(TASK_1_1, 
Collections.singletonList(TOPIC_PARTITION_D_0));
+        final StandbyTask standbyTask3 = 
createStandbyTaskInStateRunning(TASK_0_1, 
Collections.singletonList(TOPIC_PARTITION_A_1));
+        when(changelogReader.completedChangelogs())
+            .thenReturn(Collections.emptySet());
+        when(changelogReader.allChangelogsCompleted())
+            .thenReturn(false);
+        stateUpdater.add(activeTask1);
+        stateUpdater.add(standbyTask1);
+        stateUpdater.add(standbyTask2);
+        stateUpdater.add(activeTask2);
+        stateUpdater.add(standbyTask3);
+        verifyUpdatingTasks(activeTask1, activeTask2, standbyTask1, 
standbyTask2, standbyTask3);
+
+        final Set<Task> tasks = stateUpdater.getTasks();
+
+        assertEquals(5, tasks.size());
+        assertTrue(tasks.containsAll(mkSet(activeTask1, activeTask2, 
standbyTask1, standbyTask2, standbyTask3)));
+
+        final Set<StreamTask> activeTasks = stateUpdater.getActiveTasks();
+
+        assertEquals(2, activeTasks.size());
+        assertTrue(activeTasks.containsAll(mkSet(activeTask1, activeTask2)));
+
+        final Set<StandbyTask> standbyTasks = stateUpdater.getStandbyTasks();
+
+        assertEquals(3, standbyTasks.size());
+        assertTrue(standbyTasks.containsAll(mkSet(standbyTask1, standbyTask2, 
standbyTask3)));
+    }
+
+    @Test
+    public void shouldGetTasksFromRestoredActiveTasks() throws Exception {

Review Comment:
   In these three tests, could we also add coverage that after the 
`drainXXTasks` call, the `getTasks()`'s results would change?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##########
@@ -295,32 +329,53 @@ private void addTaskToRestoredTasks(final StreamTask 
task) {
     private final Time time;
     private final ChangelogReader changelogReader;
     private final Consumer<Set<TopicPartition>> offsetResetter;
+    private final boolean manualStart;

Review Comment:
   Do we need this boolean flag, or could we just not call `start()` in unit 
tests?



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java:
##########
@@ -603,6 +655,178 @@ public void shouldDrainFailedTasksAndExceptions() throws 
Exception {
         verifyDrainingExceptionsAndFailedTasks(expectedExceptionAndTasks2, 
expectedExceptionAndTasks3, expectedExceptionAndTasks4);
     }
 
+    @Test
+    public void shouldGetTasksFromInputQueue() {
+        stateUpdater.shutdown(Duration.ofMillis(Long.MAX_VALUE));

Review Comment:
   I had another thought/comment about this: what if we just rely on `start()` 
of the DefaultStateUpdater (I know it means in the production code we'd need to 
cast to DefaultStateUpdater and call `start()` upon initialization), and then 
we would not need the extra boolean, and also do not need to lazy-create the 
thread upon `add`.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##########
@@ -295,32 +329,53 @@ private void addTaskToRestoredTasks(final StreamTask 
task) {
     private final Time time;
     private final ChangelogReader changelogReader;
     private final Consumer<Set<TopicPartition>> offsetResetter;
+    private final boolean manualStart;
     private final Queue<TaskAndAction> tasksAndActions = new LinkedList<>();
     private final Lock tasksAndActionsLock = new ReentrantLock();
     private final Condition tasksAndActionsCondition = 
tasksAndActionsLock.newCondition();
     private final Queue<StreamTask> restoredActiveTasks = new LinkedList<>();
     private final Lock restoredActiveTasksLock = new ReentrantLock();
     private final Condition restoredActiveTasksCondition = 
restoredActiveTasksLock.newCondition();
-    private final BlockingQueue<ExceptionAndTasks> exceptionsAndFailedTasks = 
new LinkedBlockingQueue<>();
-    private final BlockingQueue<Task> removedTasks = new 
LinkedBlockingQueue<>();
-    private CountDownLatch shutdownGate;
+    private final Queue<ExceptionAndTasks> exceptionsAndFailedTasks = new 
LinkedList<>();
+    private final Lock exceptionsAndFailedTasksLock = new ReentrantLock();
+    private final Queue<Task> removedTasks = new LinkedList<>();
+    private final Lock removedTasksLock = new ReentrantLock();
 
     private StateUpdaterThread stateUpdaterThread = null;
+    private CountDownLatch shutdownGate;
 
     public DefaultStateUpdater(final ChangelogReader changelogReader,
                                final Consumer<Set<TopicPartition>> 
offsetResetter,
                                final Time time) {
+        this(changelogReader, offsetResetter, time, false);
+    }
+
+    public DefaultStateUpdater(final ChangelogReader changelogReader,
+                               final Consumer<Set<TopicPartition>> 
offsetResetter,
+                               final Time time,
+                               final boolean manualStart) {
         this.changelogReader = changelogReader;
         this.offsetResetter = offsetResetter;
         this.time = time;
+        this.manualStart = manualStart;
+    }
+
+    public void start() {

Review Comment:
   This function seems not used.
   
   BTW if we are adding this function, could we just call that function in the 
production code upon initiating the state updater, and remove the logic of 
starting the thread in `add` function? 



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##########
@@ -295,32 +329,53 @@ private void addTaskToRestoredTasks(final StreamTask 
task) {
     private final Time time;
     private final ChangelogReader changelogReader;
     private final Consumer<Set<TopicPartition>> offsetResetter;
+    private final boolean manualStart;
     private final Queue<TaskAndAction> tasksAndActions = new LinkedList<>();
     private final Lock tasksAndActionsLock = new ReentrantLock();
     private final Condition tasksAndActionsCondition = 
tasksAndActionsLock.newCondition();
     private final Queue<StreamTask> restoredActiveTasks = new LinkedList<>();
     private final Lock restoredActiveTasksLock = new ReentrantLock();
     private final Condition restoredActiveTasksCondition = 
restoredActiveTasksLock.newCondition();
-    private final BlockingQueue<ExceptionAndTasks> exceptionsAndFailedTasks = 
new LinkedBlockingQueue<>();
-    private final BlockingQueue<Task> removedTasks = new 
LinkedBlockingQueue<>();
-    private CountDownLatch shutdownGate;
+    private final Queue<ExceptionAndTasks> exceptionsAndFailedTasks = new 
LinkedList<>();

Review Comment:
   Why switch `removedTasks` and `exceptionsAndFailedTasks` from blocking 
queues to locks+linkedlist?



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java:
##########
@@ -652,7 +876,10 @@ private void verifyUpdatingTasks(final Task... tasks) 
throws Exception {
                 "Did not get all updating task within the given timeout!"
             );
             assertEquals(expectedUpdatingTasks.size(), updatingTasks.size());
-            assertTrue(updatingTasks.stream().allMatch(task -> task.state() == 
State.RESTORING));
+            assertTrue(updatingTasks.stream()
+                .allMatch(task -> task.isActive() && task.state() == 
State.RESTORING
+                    ||
+                    !task.isActive() && task.state() == State.RUNNING));

Review Comment:
   Good call.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##########
@@ -161,41 +159,62 @@ private void handleTaskCorruptedException(final 
TaskCorruptedException taskCorru
             final Set<TaskId> corruptedTaskIds = 
taskCorruptedException.corruptedTasks();
             final Set<Task> corruptedTasks = new HashSet<>();
             for (final TaskId taskId : corruptedTaskIds) {
-                final Task corruptedTask = updatingTasks.remove(taskId);
+                final Task corruptedTask = updatingTasks.get(taskId);
                 if (corruptedTask == null) {
                     throw new IllegalStateException("Task " + taskId + " is 
corrupted but is not updating. " + BUG_ERROR_MESSAGE);
                 }
                 corruptedTasks.add(corruptedTask);
             }
-            exceptionsAndFailedTasks.add(new ExceptionAndTasks(corruptedTasks, 
taskCorruptedException));
+            addToExceptionsAndFailedTasksThenRemoveFromUpdatingTasks(new 
ExceptionAndTasks(corruptedTasks, taskCorruptedException));
         }
 
         private void handleStreamsException(final StreamsException 
streamsException) {
             log.info("Encountered streams exception: ", streamsException);
-            final ExceptionAndTasks exceptionAndTasks;
             if (streamsException.taskId().isPresent()) {
-                exceptionAndTasks = 
handleStreamsExceptionWithTask(streamsException);
+                handleStreamsExceptionWithTask(streamsException);
             } else {
-                exceptionAndTasks = 
handleStreamsExceptionWithoutTask(streamsException);
+                handleStreamsExceptionWithoutTask(streamsException);
             }
-            exceptionsAndFailedTasks.add(exceptionAndTasks);
         }
 
-        private ExceptionAndTasks handleStreamsExceptionWithTask(final 
StreamsException streamsException) {
+        private void handleStreamsExceptionWithTask(final StreamsException 
streamsException) {
             final TaskId failedTaskId = streamsException.taskId().get();
             if (!updatingTasks.containsKey(failedTaskId)) {
                 throw new IllegalStateException("Task " + failedTaskId + " 
failed but is not updating. " + BUG_ERROR_MESSAGE);
             }
             final Set<Task> failedTask = new HashSet<>();
             failedTask.add(updatingTasks.get(failedTaskId));
-            updatingTasks.remove(failedTaskId);
-            return new ExceptionAndTasks(failedTask, streamsException);
+            addToExceptionsAndFailedTasksThenRemoveFromUpdatingTasks(new 
ExceptionAndTasks(failedTask, streamsException));
+        }
+
+        private void handleStreamsExceptionWithoutTask(final StreamsException 
streamsException) {
+            addToExceptionsAndFailedTasksThenClearUpdatingTasks(
+                new ExceptionAndTasks(new HashSet<>(updatingTasks.values()), 
streamsException));
         }
 
-        private ExceptionAndTasks handleStreamsExceptionWithoutTask(final 
StreamsException streamsException) {
-            final ExceptionAndTasks exceptionAndTasks = new 
ExceptionAndTasks(new HashSet<>(updatingTasks.values()), streamsException);
+        // It is important to remove the corrupted tasks from the updating 
tasks after they were added to the
+        // failed tasks.
+        // This ensures that all tasks are found in 
DefaultStateUpdater#getTasks().
+        private void 
addToExceptionsAndFailedTasksThenRemoveFromUpdatingTasks(final 
ExceptionAndTasks exceptionAndTasks) {
+            addToExceptionsAndFailedTasks(exceptionAndTasks);
+            
exceptionAndTasks.getTasks().stream().map(Task::id).forEach(updatingTasks::remove);
+            if (onlyStandbyTasksLeft()) {

Review Comment:
   nit: replace with "transitToUpdateStandbysIfOnlyStandbysLeft"?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to