aljoscha commented on a change in pull request #14734: URL: https://github.com/apache/flink/pull/14734#discussion_r567793944
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java ########## @@ -1935,6 +1935,7 @@ private void abortPendingCheckpoint( } } finally { sendAbortedMessages( + pendingCheckpoint.getTasksToCommitTo(), Review comment: Why don't we need to send it to the `tasksToWaitFor`? Or is it that `tasksToCommitTo` is a strict superset of `tasksToWaitFor`? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ExecutionAttemptMappingProvider.java ########## @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Responses to query the current attempt for the tasks and provide the mapping from the execution + * attempt id to its task. + */ +public class ExecutionAttemptMappingProvider { + + private final List<ExecutionVertex> tasks; Review comment: This works because the list of `ExecutionVertex` is currently static in the `ExecutionGraph` for the lifetime of the execution graph, I hope. It might make sense to turn this into an interface that `ExecutionGraph` implements to allow querying this information. It's just an idea, so please feel free to ignore for now. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java ########## @@ -563,7 +554,7 @@ private void startTriggeringCheckpoint(CheckpointTriggerRequest request) { createPendingCheckpoint( timestamp, request.props, - ackTasks, + checkpointBrief, Review comment: I think both are good points, whatever we decide we could also change that in a later refactoring, right? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java ########## @@ -350,13 +340,15 @@ public CheckpointCoordinator( this.minPauseBetweenCheckpoints, this.pendingCheckpoints::size, this.checkpointsCleaner::getNumberOfCheckpointsToClean); + this.cachedTasksById = - new LinkedHashMap<ExecutionAttemptID, ExecutionVertex>(tasksToWaitFor.length) { + new LinkedHashMap<ExecutionAttemptID, ExecutionVertex>( + attemptMappingProvider.getNumberOfTasks()) { @Override protected boolean removeEldestEntry( Map.Entry<ExecutionAttemptID, ExecutionVertex> eldest) { - return size() > CheckpointCoordinator.this.tasksToWaitFor.length; + return size() > attemptMappingProvider.getNumberOfTasks(); Review comment: Why is it the number of all tasks and not just the `tasksToWaitFor`? I have the feeling that the code before was not correct when the number of tasks in the different states can change, right? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java ########## @@ -2138,12 +2080,10 @@ private void reportToStatsTracker( } Map<JobVertexID, Integer> vertices = tasks.values().stream() - .map(ExecutionVertex::getJobVertex) - .distinct() .collect( - toMap( - ExecutionJobVertex::getJobVertexId, - ExecutionJobVertex::getParallelism)); + Collectors.groupingBy( + ExecutionVertex::getJobvertexId, + Collectors.reducing(0, e -> 1, Integer::sum))); Review comment: I'm interested to know why this was necessary. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org