azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface URL: https://github.com/apache/flink/pull/8362#discussion_r287609890
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java ########## @@ -566,6 +615,62 @@ public void setInitialState(@Nullable JobManagerTaskRestore taskRestore) { } } + @VisibleForTesting + CompletableFuture<Execution> registerProducedPartitions(TaskManagerLocation location) { + assertRunningInJobMasterMainThread(); + + return registerProducedPartitions(vertex, location, attemptId) + .thenApplyAsync(producedPartitionsCache -> { + producedPartitions = producedPartitionsCache; + return this; + }, vertex.getExecutionGraph().getJobMasterMainThreadExecutor()); + } + + @VisibleForTesting + static CompletableFuture<Map<IntermediateResultPartitionID, ResultPartitionDeploymentDescriptor>> + registerProducedPartitions( + ExecutionVertex vertex, + TaskManagerLocation location, + ExecutionAttemptID attemptId) { + + ProducerShuffleDescriptor producerShuffleDescriptor = ProducerShuffleDescriptor.create( + location, attemptId); + + boolean lazyScheduling = vertex.getExecutionGraph().getScheduleMode().allowLazyDeployment(); + + Collection<IntermediateResultPartition> partitions = vertex.getProducedPartitions().values(); + Collection<CompletableFuture<ResultPartitionDeploymentDescriptor>> partitionRegistrations = + new ArrayList<>(partitions.size()); + + for (IntermediateResultPartition partition : partitions) { + PartitionShuffleDescriptor partitionShuffleDescriptor = PartitionShuffleDescriptor.from( + partition, getPartitionMaxParallelism(partition)); + partitionRegistrations.add(vertex.getExecutionGraph().getShuffleMaster() + .registerPartitionWithProducer(partitionShuffleDescriptor, producerShuffleDescriptor) + .thenApply(shuffleDescriptor -> new ResultPartitionDeploymentDescriptor( + partitionShuffleDescriptor, shuffleDescriptor, lazyScheduling))); + } + + return FutureUtils.combineAll(partitionRegistrations).thenApply(rpdds -> { + Map<IntermediateResultPartitionID, ResultPartitionDeploymentDescriptor> producedPartitions = + new LinkedHashMap<>(partitions.size()); + rpdds.forEach(rpdd -> producedPartitions.put(rpdd.getPartitionId(), rpdd)); + return producedPartitions; + }); + } + + private static int getPartitionMaxParallelism(IntermediateResultPartition partition) { + // TODO consumers.isEmpty() only exists for test, currently there has to be exactly one consumer in real jobs! Review comment: This TODO existed before the PR, I suggest we tackle it separately. I created an issue for this https://issues.apache.org/jira/browse/FLINK-12628. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services