cadonna commented on code in PR #12771: URL: https://github.com/apache/kafka/pull/12771#discussion_r1001636845
########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ########## @@ -1435,6 +1435,31 @@ public void shouldRethrowTaskCorruptedExceptionFromStateUpdater() { assertEquals("Tasks [0_1, 0_0] are corrupted and hence needs to be re-initialized", thrown.getMessage()); } + @Test + public void shouldRethrowTaskCorruptedExceptionFromInitialization() { + final StreamTask statefulTask0 = statefulTask(taskId00, taskId00ChangelogPartitions) + .inState(State.CREATED) + .withInputPartitions(taskId00Partitions).build(); + final StreamTask statefulTask1 = statefulTask(taskId01, taskId01ChangelogPartitions) + .inState(State.CREATED) + .withInputPartitions(taskId01Partitions).build(); + final TasksRegistry tasks = mock(TasksRegistry.class); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.EXACTLY_ONCE_V2, tasks, true); + when(tasks.drainPendingTaskToInit()).thenReturn(mkSet(statefulTask0, statefulTask1)); + doThrow(new TaskCorruptedException(Collections.singleton(statefulTask0.id))).when(statefulTask0).initializeIfNeeded(); + doThrow(new TaskCorruptedException(Collections.singleton(statefulTask1.id))).when(statefulTask1).initializeIfNeeded(); + + final TaskCorruptedException thrown = assertThrows( + TaskCorruptedException.class, + () -> taskManager.checkStateUpdater(time.milliseconds(), noOpResetter) + ); + + Mockito.verify(tasks).addTask(statefulTask0); + Mockito.verify(tasks).addTask(statefulTask1); + assertEquals(mkSet(taskId00, taskId01), thrown.corruptedTasks()); + assertEquals("Tasks [0_1, 0_0] are corrupted and hence needs to be re-initialized", thrown.getMessage()); Review Comment: nit: ```suggestion assertEquals("Tasks [0_1, 0_0] are corrupted and hence need to be re-initialized", thrown.getMessage()); ``` ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ########## @@ -1435,6 +1435,31 @@ public void shouldRethrowTaskCorruptedExceptionFromStateUpdater() { assertEquals("Tasks [0_1, 0_0] are corrupted and hence needs to be re-initialized", thrown.getMessage()); } + @Test + public void shouldRethrowTaskCorruptedExceptionFromInitialization() { + final StreamTask statefulTask0 = statefulTask(taskId00, taskId00ChangelogPartitions) + .inState(State.CREATED) + .withInputPartitions(taskId00Partitions).build(); + final StreamTask statefulTask1 = statefulTask(taskId01, taskId01ChangelogPartitions) + .inState(State.CREATED) + .withInputPartitions(taskId01Partitions).build(); Review Comment: Could you please also add a stateful task that does not throw and verify whether the task is added to the state updater? That would verify that when at least one task throws an exception the good tasks are still added to the state updater. ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ########## @@ -810,10 +810,19 @@ private void transitRestoredTaskToRunning(final Task task, } private void addTasksToStateUpdater() { + final Map<TaskId, RuntimeException> taskExceptions = new LinkedHashMap<>(); for (final Task task : tasks.drainPendingTaskToInit()) { - task.initializeIfNeeded(); - stateUpdater.add(task); + try { + task.initializeIfNeeded(); + stateUpdater.add(task); + } catch (final RuntimeException e) { + // need to add task back to the bookkeeping to be handled by the stream thread + tasks.addTask(task); Review Comment: I am wondering if it wouldn't be better to add the entire task to the `TaskCorruptedException` instead of only the task ID. In such a way, we wouldn't need to add a corrupted task back to the `TasksRegistry`. Is there somewhere a check that avoids executing corrupted tasks? Before the state updater we did not execute tasks before all tasks were restored but that is not the case anymore with the state updater. Modifying the `TaskCorruptedException` would need a KIP. Instead of creating a KIP for modifying the `TaskCorruptedException`, we should create one to move `TaskCorruptedException` and `TaskMigratedException` to internal packages since they are completely handled within Streams and they should never be handled by the user. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org