gaoyunhaii commented on a change in pull request #14734: URL: https://github.com/apache/flink/pull/14734#discussion_r567826582
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java ########## @@ -2138,12 +2080,10 @@ private void reportToStatsTracker( } Map<JobVertexID, Integer> vertices = tasks.values().stream() - .map(ExecutionVertex::getJobVertex) - .distinct() .collect( - toMap( - ExecutionJobVertex::getJobVertexId, - ExecutionJobVertex::getParallelism)); + Collectors.groupingBy( + ExecutionVertex::getJobvertexId, + Collectors.reducing(0, e -> 1, Integer::sum))); Review comment: This change is doing a preparation for the case that are some tasks are finished. In this case the `tasksToWaitFor` might not contains all the job vertices and we need to compute based on both `tasksToWaitFor` and `finishedTasks`. But it should be more suitable to be changed together in the next PR, so I'll revert it first. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org