guozhangwang commented on a change in pull request #8964:
URL: https://github.com/apache/kafka/pull/8964#discussion_r466107280



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -368,7 +306,102 @@ public void handleAssignment(final Map<TaskId, 
Set<TopicPartition>> activeTasks,
                 addNewTask(task);
             }
         }
+    }
+
+    private void handleCloseAndRecycle(final List<Task> tasksToRecycle,
+                                       final List<Task> tasksToCloseClean,
+                                       final List<Task> tasksToCloseDirty,
+                                       final Map<TaskId, Set<TopicPartition>> 
activeTasksToCreate,
+                                       final Map<TaskId, Set<TopicPartition>> 
standbyTasksToCreate,
+                                       final LinkedHashMap<TaskId, 
RuntimeException> taskCloseExceptions) {
+        if (!tasksToCloseDirty.isEmpty()) {
+            throw new IllegalArgumentException("Tasks to close-dirty should be 
empty");
+        }
+
+        // for all tasks to close or recycle, we should first right a 
checkpoint as in post-commit
+        final List<Task> tasksToCheckpoint = new 
ArrayList<>(tasksToCloseClean);
+        tasksToCheckpoint.addAll(tasksToRecycle);
+        for (final Task task : tasksToCheckpoint) {
+            try {
+                // Note that we are not actually committing here but just 
check if we need to write checkpoint file:
+                // 1) for active tasks prepareCommit should return empty if it 
has committed during suspension successfully,
+                //    and their changelog positions should not change at all 
postCommit would not write the checkpoint again.
+                // 2) for standby tasks prepareCommit should always return 
empty, and then in postCommit we would probably
+                //    write the checkpoint file.
+                final Map<TopicPartition, OffsetAndMetadata> offsets = 
task.prepareCommit();
+                if (!offsets.isEmpty()) {
+                    log.error("Task {} should has been committed when it was 
suspended, but it reports non-empty " +
+                                    "offsets {} to commit; it means it fails 
during last commit and hence should be closed dirty",
+                            task.id(), offsets);
+
+                    tasksToCloseDirty.add(task);
+                } else if (!task.isActive()) {
+                    // For standby tasks, always try to first suspend before 
committing (checkpointing) it;
+                    // Since standby tasks do not actually need to commit 
offsets but only need to
+                    // flush / checkpoint state stores, so we only need to 
call postCommit here.
+                    task.suspend();
+
+                    task.postCommit(true);
+                }
+            } catch (final RuntimeException e) {
+                final String uncleanMessage = String.format(
+                        "Failed to checkpoint task %s. Attempting to close 
remaining tasks before re-throwing:",
+                        task.id());
+                log.error(uncleanMessage, e);
+                taskCloseExceptions.put(task.id(), e);
+                // We've already recorded the exception (which is the point of 
clean).
+                // Now, we should go ahead and complete the close because a 
half-closed task is no good to anyone.
+                tasksToCloseDirty.add(task);
+            }
+        }
 
+        for (final Task task : tasksToCloseClean) {
+            try {
+                if (!tasksToCloseDirty.contains(task)) {

Review comment:
       I've thought about that, but the tricky thing is that I'm iterating the 
union of two lists. I can, remove the tasksToCloseDirty from the other two 
after the iteration to make it cleaner.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to