guozhangwang commented on a change in pull request #8964:
URL: https://github.com/apache/kafka/pull/8964#discussion_r466108344



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -479,24 +512,20 @@ boolean tryToCompleteRestoration() {
     void handleRevocation(final Collection<TopicPartition> revokedPartitions) {
         final Set<TopicPartition> remainingRevokedPartitions = new 
HashSet<>(revokedPartitions);
 
-        final Set<Task> revokedTasks = new HashSet<>();
-        final Set<Task> additionalTasksForCommitting = new HashSet<>();
-        final Map<TaskId, Map<TopicPartition, OffsetAndMetadata>> 
consumedOffsetsAndMetadataPerTask = new HashMap<>();
-
+        final Set<Task> revokedActiveTasks = new HashSet<>();
+        final Set<Task> nonRevokedActiveTasks = new HashSet<>();
+        final Map<TaskId, Map<TopicPartition, OffsetAndMetadata>> 
consumedOffsetsPerTask = new HashMap<>();
         final AtomicReference<RuntimeException> firstException = new 
AtomicReference<>(null);
+
         for (final Task task : activeTaskIterable()) {
             if 
(remainingRevokedPartitions.containsAll(task.inputPartitions())) {
-                try {
-                    task.suspend();
-                    revokedTasks.add(task);
-                } catch (final RuntimeException e) {
-                    log.error("Caught the following exception while trying to 
suspend revoked task " + task.id(), e);
-                    firstException.compareAndSet(null, new 
StreamsException("Failed to suspend " + task.id(), e));
-                }
+                // when the task input partitions are included in the revoked 
list,
+                // this is an active task and should be revoked
+                revokedActiveTasks.add(task);
+                remainingRevokedPartitions.removeAll(task.inputPartitions());
             } else if (task.commitNeeded()) {
-                additionalTasksForCommitting.add(task);
+                nonRevokedActiveTasks.add(task);

Review comment:
       Yeah I think I agree with you -- after a second thought I think this 
renaming is not very accurate. Will call it `commitNeededActiveTasks`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to