cadonna commented on code in PR #12312: URL: https://github.com/apache/kafka/pull/12312#discussion_r904948682
########## streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java: ########## @@ -161,41 +159,62 @@ private void handleTaskCorruptedException(final TaskCorruptedException taskCorru final Set<TaskId> corruptedTaskIds = taskCorruptedException.corruptedTasks(); final Set<Task> corruptedTasks = new HashSet<>(); for (final TaskId taskId : corruptedTaskIds) { - final Task corruptedTask = updatingTasks.remove(taskId); + final Task corruptedTask = updatingTasks.get(taskId); if (corruptedTask == null) { throw new IllegalStateException("Task " + taskId + " is corrupted but is not updating. " + BUG_ERROR_MESSAGE); } corruptedTasks.add(corruptedTask); } - exceptionsAndFailedTasks.add(new ExceptionAndTasks(corruptedTasks, taskCorruptedException)); + addToExceptionsAndFailedTasksThenRemoveFromUpdatingTasks(new ExceptionAndTasks(corruptedTasks, taskCorruptedException)); } private void handleStreamsException(final StreamsException streamsException) { log.info("Encountered streams exception: ", streamsException); - final ExceptionAndTasks exceptionAndTasks; if (streamsException.taskId().isPresent()) { - exceptionAndTasks = handleStreamsExceptionWithTask(streamsException); + handleStreamsExceptionWithTask(streamsException); } else { - exceptionAndTasks = handleStreamsExceptionWithoutTask(streamsException); + handleStreamsExceptionWithoutTask(streamsException); } - exceptionsAndFailedTasks.add(exceptionAndTasks); } - private ExceptionAndTasks handleStreamsExceptionWithTask(final StreamsException streamsException) { + private void handleStreamsExceptionWithTask(final StreamsException streamsException) { final TaskId failedTaskId = streamsException.taskId().get(); if (!updatingTasks.containsKey(failedTaskId)) { throw new IllegalStateException("Task " + failedTaskId + " failed but is not updating. " + BUG_ERROR_MESSAGE); } final Set<Task> failedTask = new HashSet<>(); failedTask.add(updatingTasks.get(failedTaskId)); - updatingTasks.remove(failedTaskId); - return new ExceptionAndTasks(failedTask, streamsException); + addToExceptionsAndFailedTasksThenRemoveFromUpdatingTasks(new ExceptionAndTasks(failedTask, streamsException)); + } + + private void handleStreamsExceptionWithoutTask(final StreamsException streamsException) { + addToExceptionsAndFailedTasksThenClearUpdatingTasks( + new ExceptionAndTasks(new HashSet<>(updatingTasks.values()), streamsException)); } - private ExceptionAndTasks handleStreamsExceptionWithoutTask(final StreamsException streamsException) { - final ExceptionAndTasks exceptionAndTasks = new ExceptionAndTasks(new HashSet<>(updatingTasks.values()), streamsException); + // It is important to remove the corrupted tasks from the updating tasks after they were added to the + // failed tasks. + // This ensures that all tasks are found in DefaultStateUpdater#getTasks(). + private void addToExceptionsAndFailedTasksThenRemoveFromUpdatingTasks(final ExceptionAndTasks exceptionAndTasks) { + addToExceptionsAndFailedTasks(exceptionAndTasks); + exceptionAndTasks.getTasks().stream().map(Task::id).forEach(updatingTasks::remove); + if (onlyStandbyTasksLeft()) { Review Comment: Thanks! How could I miss that?! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org