guozhangwang commented on code in PR #12569:
URL: https://github.com/apache/kafka/pull/12569#discussion_r958716921


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -1180,6 +1182,68 @@ void shutdown(final boolean clean) {
         }
     }
 
+    private void shutdownStateUpdater() {
+        if (stateUpdater != null) {
+            stateUpdater.shutdown(Duration.ofMillis(Long.MAX_VALUE));

Review Comment:
   This is a meta thought: I'm not sure if passing in Long.MAX_VALUE is the 
best thing to do. When we shutdown a stream thread we do have a timeout at the 
shutdown thread waiting for those threads to join, and I'm thinking maybe we 
should propagate that timeout from StreamThread.completeShutdown -> TaskManager 
-> StateUpdater (and similarly for all other modules of a stream thread that we 
are shutting down).



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -1180,6 +1182,68 @@ void shutdown(final boolean clean) {
         }
     }
 
+    private void shutdownStateUpdater() {
+        if (stateUpdater != null) {
+            stateUpdater.shutdown(Duration.ofMillis(Long.MAX_VALUE));
+            closeFailedTasks();
+            addRestoredTasksToTaskRegistry();
+            addRemovedTasksToTaskRegistry();
+        }
+    }
+
+    private void closeFailedTasks() {
+        final Set<Task> tasksToCloseDirty = 
stateUpdater.drainExceptionsAndFailedTasks().stream()
+            .flatMap(exAndTasks -> 
exAndTasks.getTasks().stream()).collect(Collectors.toSet());
+
+        for (final Task task : tasksToCloseDirty) {
+            try {
+                // we call this function only to flush the case if necessary
+                // before suspending and closing the topology
+                task.prepareCommit();
+            } catch (final RuntimeException swallow) {
+                log.error("Error flushing caches of dirty task {} ", 
task.id(), swallow);
+            }
+
+            try {
+                task.suspend();
+            } catch (final RuntimeException swallow) {
+                log.error("Error suspending dirty task {}: {}", task.id(), 
swallow.getMessage());
+            }
+
+            task.closeDirty();
+
+            try {
+                if (task.isActive()) {
+                    
activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(task.id());
+                }
+            } catch (final RuntimeException swallow) {
+                log.error("Error closing dirty task {}: {}", task.id(), 
swallow.getMessage());
+            }
+        }
+    }
+
+    private void addRestoredTasksToTaskRegistry() {

Review Comment:
   Again, here I'm assuming we are going to merge the two lists and hence their 
handling logic later, so did not focus too much on it.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -1180,6 +1182,68 @@ void shutdown(final boolean clean) {
         }
     }
 
+    private void shutdownStateUpdater() {
+        if (stateUpdater != null) {
+            stateUpdater.shutdown(Duration.ofMillis(Long.MAX_VALUE));
+            closeFailedTasks();
+            addRestoredTasksToTaskRegistry();
+            addRemovedTasksToTaskRegistry();
+        }
+    }
+
+    private void closeFailedTasks() {

Review Comment:
   nit: closeFailedTasksFromStateUpdater?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -1180,6 +1182,68 @@ void shutdown(final boolean clean) {
         }
     }
 
+    private void shutdownStateUpdater() {
+        if (stateUpdater != null) {
+            stateUpdater.shutdown(Duration.ofMillis(Long.MAX_VALUE));
+            closeFailedTasks();
+            addRestoredTasksToTaskRegistry();
+            addRemovedTasksToTaskRegistry();
+        }
+    }
+
+    private void closeFailedTasks() {

Review Comment:
   Also I have some thoughts about error/failed tasks handling in general, but 
let's table it for now and discuss after we've done the exception handling in 
https://github.com/apache/kafka/pull/12519.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to