gaoyunhaii commented on a change in pull request #14734:
URL: https://github.com/apache/flink/pull/14734#discussion_r567826582



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -2138,12 +2080,10 @@ private void reportToStatsTracker(
         }
         Map<JobVertexID, Integer> vertices =
                 tasks.values().stream()
-                        .map(ExecutionVertex::getJobVertex)
-                        .distinct()
                         .collect(
-                                toMap(
-                                        ExecutionJobVertex::getJobVertexId,
-                                        ExecutionJobVertex::getParallelism));
+                                Collectors.groupingBy(
+                                        ExecutionVertex::getJobvertexId,
+                                        Collectors.reducing(0, e -> 1, 
Integer::sum)));

Review comment:
       This change is doing a preparation for the case that are some tasks are 
finished. In this case the `tasksToWaitFor` might not contains all the job 
vertices and we need to compute based on both `tasksToWaitFor` and 
`finishedTasks`. But it should be more suitable to be changed together in the 
next PR, so I'll revert it first.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to