cadonna commented on code in PR #12771:
URL: https://github.com/apache/kafka/pull/12771#discussion_r1001636845


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -1435,6 +1435,31 @@ public void 
shouldRethrowTaskCorruptedExceptionFromStateUpdater() {
         assertEquals("Tasks [0_1, 0_0] are corrupted and hence needs to be 
re-initialized", thrown.getMessage());
     }
 
+    @Test
+    public void shouldRethrowTaskCorruptedExceptionFromInitialization() {
+        final StreamTask statefulTask0 = statefulTask(taskId00, 
taskId00ChangelogPartitions)
+                .inState(State.CREATED)
+                .withInputPartitions(taskId00Partitions).build();
+        final StreamTask statefulTask1 = statefulTask(taskId01, 
taskId01ChangelogPartitions)
+                .inState(State.CREATED)
+                .withInputPartitions(taskId01Partitions).build();
+        final TasksRegistry tasks = mock(TasksRegistry.class);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.EXACTLY_ONCE_V2, tasks, true);
+        when(tasks.drainPendingTaskToInit()).thenReturn(mkSet(statefulTask0, 
statefulTask1));
+        doThrow(new 
TaskCorruptedException(Collections.singleton(statefulTask0.id))).when(statefulTask0).initializeIfNeeded();
+        doThrow(new 
TaskCorruptedException(Collections.singleton(statefulTask1.id))).when(statefulTask1).initializeIfNeeded();
+
+        final TaskCorruptedException thrown = assertThrows(
+                TaskCorruptedException.class,
+                () -> taskManager.checkStateUpdater(time.milliseconds(), 
noOpResetter)
+        );
+
+        Mockito.verify(tasks).addTask(statefulTask0);
+        Mockito.verify(tasks).addTask(statefulTask1);
+        assertEquals(mkSet(taskId00, taskId01), thrown.corruptedTasks());
+        assertEquals("Tasks [0_1, 0_0] are corrupted and hence needs to be 
re-initialized", thrown.getMessage());

Review Comment:
   nit:
   ```suggestion
           assertEquals("Tasks [0_1, 0_0] are corrupted and hence need to be 
re-initialized", thrown.getMessage());
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -1435,6 +1435,31 @@ public void 
shouldRethrowTaskCorruptedExceptionFromStateUpdater() {
         assertEquals("Tasks [0_1, 0_0] are corrupted and hence needs to be 
re-initialized", thrown.getMessage());
     }
 
+    @Test
+    public void shouldRethrowTaskCorruptedExceptionFromInitialization() {
+        final StreamTask statefulTask0 = statefulTask(taskId00, 
taskId00ChangelogPartitions)
+                .inState(State.CREATED)
+                .withInputPartitions(taskId00Partitions).build();
+        final StreamTask statefulTask1 = statefulTask(taskId01, 
taskId01ChangelogPartitions)
+                .inState(State.CREATED)
+                .withInputPartitions(taskId01Partitions).build();

Review Comment:
   Could you please also add a stateful task that does not throw and verify 
whether the task is added to the state updater? That would verify that when at 
least one task throws an exception the good tasks are still added to the state 
updater.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -810,10 +810,19 @@ private void transitRestoredTaskToRunning(final Task task,
     }
 
     private void addTasksToStateUpdater() {
+        final Map<TaskId, RuntimeException> taskExceptions = new 
LinkedHashMap<>();
         for (final Task task : tasks.drainPendingTaskToInit()) {
-            task.initializeIfNeeded();
-            stateUpdater.add(task);
+            try {
+                task.initializeIfNeeded();
+                stateUpdater.add(task);
+            } catch (final RuntimeException e) {
+                // need to add task back to the bookkeeping to be handled by 
the stream thread
+                tasks.addTask(task);

Review Comment:
   I am wondering if it wouldn't be better to add the entire task to the 
`TaskCorruptedException` instead of only the task ID. In such a way, we 
wouldn't need to add a corrupted task back to the `TasksRegistry`. Is there 
somewhere a check that avoids executing corrupted tasks? Before the state 
updater we did not execute tasks before all tasks were restored but that is not 
the case anymore with the state updater.
   Modifying the `TaskCorruptedException` would need a KIP. Instead of creating 
a KIP for modifying the `TaskCorruptedException`, we should create one to move 
`TaskCorruptedException` and `TaskMigratedException` to internal packages since 
they are completely handled within Streams and they should never be handled by 
the user.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to