guozhangwang commented on a change in pull request #8964:
URL: https://github.com/apache/kafka/pull/8964#discussion_r459857477



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -243,18 +242,24 @@ public void handleAssignment(final Map<TaskId, 
Set<TopicPartition>> activeTasks,
 
         for (final Task task : tasksToClose) {
             try {
-                if (task.isActive()) {
-                    // Active tasks are revoked and suspended/committed during 
#handleRevocation
-                    if (!task.state().equals(State.SUSPENDED)) {
-                        log.error("Active task {} should be suspended prior to 
attempting to close but was in {}",
-                                  task.id(), task.state());
-                        throw new IllegalStateException("Active task " + 
task.id() + " should have been suspended");
-                    }
-                } else {
-                    task.suspend();
-                    task.prepareCommit();
-                    task.postCommit();
+                // Always try to first suspend and commit the task before 
closing it;
+                // some tasks may already be suspended which should be a no-op.
+                //
+                // Also since active tasks should already be suspended / 
committed and
+                // standby tasks should have no offsets to commit, we should 
expect nothing to commit
+                task.suspend();
+
+                final Map<TopicPartition, OffsetAndMetadata> offsets = 
task.prepareCommit();
+
+                if (!offsets.isEmpty()) {
+                    log.error("Task {} should has been committed prior to 
attempting to close, but it reports non-empty offsets {} to commit",

Review comment:
       I thought if we hit a TaskMigrated during `handleRevocation` then it 
would be thrown all the way from consumer and cause the thread to handle it via 
`handleLostAll` and hence `handleAssignment` should not be called. 
   
   After checking the source code I think not callers are throwing the 
exception directly and may wrap it as a KafkaException, but even in that case 
we would just crash the client, right?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to