cadonna commented on code in PR #17761:
URL: https://github.com/apache/kafka/pull/17761#discussion_r1933528869


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -731,15 +732,20 @@ private void addToTasksToClose(final Map<TaskId, 
CompletableFuture<StateUpdater.
                                    final Set<Task> 
tasksToCloseCleanFromStateUpdater,
                                    final Set<Task> 
tasksToCloseDirtyFromStateUpdater) {
         futures.entrySet().stream()
-            .map(entry -> waitForFuture(entry.getKey(), entry.getValue()))
-            .filter(Objects::nonNull)
-            .forEach(removedTaskResult -> {
-                if (removedTaskResult.exception().isPresent()) {
-                    
tasksToCloseDirtyFromStateUpdater.add(removedTaskResult.task());
-                } else {
-                    
tasksToCloseCleanFromStateUpdater.add(removedTaskResult.task());
-                }
-            });
+                .map(entry -> waitForFuture(entry.getKey(), entry.getValue()))
+                .filter(Objects::nonNull)
+                .forEach(removedTaskResult -> {
+                    if (removedTaskResult.exception().isPresent()) {
+                        final RuntimeException runtimeException = 
removedTaskResult.exception().get();
+                        if (runtimeException instanceof 
TaskCorruptedException) {

Review Comment:
   Sorry for being late here!
   
   We need to be careful clean-closing all `StreamsException`s since also 
`TaskCorruptedException`s and `TaskMigratedException`s are `StreamsException`s. 
But you probably know that. Would be interesting to know which 
`StreamsException`s @eduwercamacaro  encountered that do not require close 
dirty. Above you wrote "timeouts and generic Kafka client exceptions", but 
would be good to document them.   
   
   We absolutely need to close `IllegalStateException` dirty because they 
signal a state that should actually not be reached caused by an unknown bug. 
Since the bug is unknown, we cannot decide if we can close the task cleanly. 
Thus, we need to close the task dirty.
   
   Similar applies to `InterruptedException`. A stream thread and the state 
updater thread should never be interrupted. We own both threads and we never 
interrupt them. Thus, it is a kind of illegal state if a `InterruptedException` 
is thrown. Also in this case the task should be closed dirty.
   
   I like the idea of the `CloseCleanStreamsException` and the 
`CloseDirtyStreamsException` (although we should find better names 🙂). Those 
two classes would make the error handling clearer.
   
   A question for @eduwercamacaro: Do you also plan to handle the exceptions 
that do not require a dirty close in `handleExceptionsFromStateUpdater()`?  
Currently, you only consider exceptions discovered during rebalances and 
shutdown of the state updater.    



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to