cadonna commented on code in PR #12519:
URL: https://github.com/apache/kafka/pull/12519#discussion_r954839073


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -877,6 +877,8 @@ private void initializeAndRestorePhase() {
             // transit to restore active is idempotent so we can call it 
multiple times
             changelogReader.enforceRestoreActive();
 
+            taskManager.tryHandleExceptionsFromStateUpdater();

Review Comment:
   Could we also handle the exceptions inside `tryToCompleteRestoration()`?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -667,6 +672,25 @@ private void addTasksToStateUpdater() {
         }
     }
 
+    public void tryHandleExceptionsFromStateUpdater() {
+        if (stateUpdater != null) {
+            final Map<TaskId, RuntimeException> taskExceptions = new 
LinkedHashMap<>();
+
+            for (final StateUpdater.ExceptionAndTasks exceptionAndTasks : 
stateUpdater.drainExceptionsAndFailedTasks()) {
+                final RuntimeException exception = 
exceptionAndTasks.exception();
+                final Set<Task> failedTasks = exceptionAndTasks.getTasks();
+
+                for (final Task failedTask : failedTasks) {
+                    // need to add task back to the bookkeeping to be handled 
by the stream thread
+                    tasks.addTask(failedTask);
+                    taskExceptions.put(failedTask.id(), exception);
+                }
+            }
+
+            maybeThrowTaskExceptions(taskExceptions);

Review Comment:
   Here we basically transform a mapping from exception -> set of tasks to a 
mapping task ID -> exception and then in `maybeThrowTaskExceptions()` we again 
transform the mapping task ID -> exception to a mapping exception -> set of 
tasks. 
   
   Can we not make this a bit more straightforward by skipping the 
transformation to task ID -> exception?
    
   Is it really necessary to aggregate all `TaskCorruptedException` into a new 
one? I think by re-throwing a brand new `TaskCorruptedException` we lose the 
information where the actual exception happened in the stacktrace which might 
make debugging harder. 
   
   I see that with aggregating the exception we can revive all corrupted tasks 
all at once, but I am not sure if it is worth losing debugging information. One 
option could be to not drain the exceptions from the state updater, but handle 
one after the other. Another option could be to collect the 
`TaskCorruptedException` in the new exception and log all of them when the 
exception is caught. 



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -667,6 +672,25 @@ private void addTasksToStateUpdater() {
         }
     }
 
+    public void tryHandleExceptionsFromStateUpdater() {
+        if (stateUpdater != null) {
+            final Map<TaskId, RuntimeException> taskExceptions = new 
LinkedHashMap<>();
+
+            for (final StateUpdater.ExceptionAndTasks exceptionAndTasks : 
stateUpdater.drainExceptionsAndFailedTasks()) {
+                final RuntimeException exception = 
exceptionAndTasks.exception();
+                final Set<Task> failedTasks = exceptionAndTasks.getTasks();
+
+                for (final Task failedTask : failedTasks) {
+                    // need to add task back to the bookkeeping to be handled 
by the stream thread
+                    tasks.addTask(failedTask);

Review Comment:
   Method `addTask()` does not add the partitions to the 
`activeTasksPerPartition` map in `Tasks`. We need to extend `addTask()` and 
consolidate it with `addNewActiveTask()`. 



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -227,34 +227,35 @@ boolean handleCorruption(final Set<TaskId> 
corruptedTasks) {
 
     private void closeDirtyAndRevive(final Collection<Task> 
taskWithChangelogs, final boolean markAsCorrupted) {
         for (final Task task : taskWithChangelogs) {
-            final Collection<TopicPartition> corruptedPartitions = 
task.changelogPartitions();
+            if (task.state() != State.CLOSED) {
+                final Collection<TopicPartition> corruptedPartitions = 
task.changelogPartitions();
 
-            // mark corrupted partitions to not be checkpointed, and then 
close the task as dirty
-            if (markAsCorrupted) {
-                task.markChangelogAsCorrupted(corruptedPartitions);
-            }
+                // mark corrupted partitions to not be checkpointed, and then 
close the task as dirty
+                if (markAsCorrupted) {
+                    task.markChangelogAsCorrupted(corruptedPartitions);
+                }
 
-            try {
-                // we do not need to take the returned offsets since we are 
not going to commit anyways;
-                // this call is only used for active tasks to flush the cache 
before suspending and
-                // closing the topology
-                task.prepareCommit();
-            } catch (final RuntimeException swallow) {
-                log.error("Error flushing cache for corrupted task {} ", 
task.id(), swallow);
-            }
+                try {
+                    // we do not need to take the returned offsets since we 
are not going to commit anyways;
+                    // this call is only used for active tasks to flush the 
cache before suspending and
+                    // closing the topology
+                    task.prepareCommit();
+                } catch (final RuntimeException swallow) {
+                    log.error("Error flushing cache for corrupted task {} ", 
task.id(), swallow);
+                }
 
-            try {
-                task.suspend();
+                try {
+                    task.suspend();
 
-                // we need to enforce a checkpoint that removes the corrupted 
partitions
-                if (markAsCorrupted) {
-                    task.postCommit(true);
+                    // we need to enforce a checkpoint that removes the 
corrupted partitions
+                    if (markAsCorrupted) {
+                        task.postCommit(true);

Review Comment:
   Aren't we removing the corrupted partitions twice if we use the state 
updater?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to