ableegoldman commented on a change in pull request #8964:
URL: https://github.com/apache/kafka/pull/8964#discussion_r463395140



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -368,7 +313,96 @@ public void handleAssignment(final Map<TaskId, 
Set<TopicPartition>> activeTasks,
                 addNewTask(task);
             }
         }
+    }
+
+    private void handleCloseAndRecycle(final List<Task> tasksToRecycle,
+                                       final List<Task> tasksToCloseClean,
+                                       final List<Task> tasksToCloseDirty,
+                                       final Map<TaskId, Set<TopicPartition>> 
activeTasksToCreate,
+                                       final Map<TaskId, Set<TopicPartition>> 
standbyTasksToCreate,
+                                       final LinkedHashMap<TaskId, 
RuntimeException> taskCloseExceptions) {
+        if (!tasksToCloseDirty.isEmpty()) {
+            throw new IllegalArgumentException("Tasks to close-dirty should be 
empty");
+        }
+
+        // for all tasks to close or recycle, we should first right a 
checkpoint as in post-commit
+        final List<Task> tasksToCheckpoint = new 
ArrayList<>(tasksToCloseClean);
+        tasksToCheckpoint.addAll(tasksToRecycle);
+        for (final Task task : tasksToCheckpoint) {
+            try {
+                // Always try to first suspend and commit the task before 
checkpointing it;
+                // some tasks may already be suspended which should be a no-op.
+                //
+                // Also since active tasks should already be suspended / 
committed and
+                // standby tasks should have no offsets to commit, we should 
expect nothing to commit
+                task.suspend();
+
+                // Note that we are not actually committing here but just 
check if we need to write checkpoint file:
+                // 1) for active tasks prepareCommit should return empty if it 
has committed during suspension successfully,
+                //    and their changelog positions should not change at all 
postCommit would not write the checkpoint again.
+                // 2) for standby tasks prepareCommit should always return 
empty, and then in postCommit we would probably
+                //    write the checkpoint file.
+                final Map<TopicPartition, OffsetAndMetadata> offsets = 
task.prepareCommit();

Review comment:
       It's still making me a bit uncomfortable that we call 
`prepare/postCommit` during `handleRevocation`. and `handleAssignment`. Maybe 
I'm being paranoid but experience has shown it's been difficult for us to keep 
track of which methods need to be called when, and in what order.
   It seems like, now that we've decoupled flushing from committing, the only 
reason for calling pre/postCommit in `handleRevocation` is so that the record 
collector is flushed before committing offsets. So what if we extract the 
record collector flushing out into a separate StreamTask method that is only 
ever called in `TaskManager#commitOffsetsOrTransaction`? I haven't thought this 
all the way through but it just seems like we may as well go all the way in 
decoupling flushing from committing and split them out into separate methods. 
Maybe `preCommit` and `postCommit` have become relics of the past




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to