zhuzhurk commented on a change in pull request #13028: URL: https://github.com/apache/flink/pull/13028#discussion_r464215530
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/MergingSharedSlotProfileRetrieverFactory.java ########## @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.clusterframework.types.SlotProfile; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import org.apache.flink.util.Preconditions; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.function.Function; +import java.util.stream.Collectors; + +/** + * Factory for {@link MergingSharedSlotProfileRetriever}. + */ +class MergingSharedSlotProfileRetrieverFactory implements SharedSlotProfileRetriever.SharedSlotProfileRetrieverFactory { + private final PreferredLocationsRetriever preferredLocationsRetriever; + + private final Function<ExecutionVertexID, ResourceProfile> resourceProfileRetriever; + + private final Function<ExecutionVertexID, AllocationID> prioAllocationIdRetriever; + + MergingSharedSlotProfileRetrieverFactory( + PreferredLocationsRetriever preferredLocationsRetriever, + Function<ExecutionVertexID, ResourceProfile> resourceProfileRetriever, + Function<ExecutionVertexID, AllocationID> prioAllocationIdRetriever) { Review comment: `prioAllocationIdRetriever` -> `priorAllocationIdRetriever ` ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/MergingSharedSlotProfileRetrieverFactory.java ########## @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.clusterframework.types.SlotProfile; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import org.apache.flink.util.Preconditions; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.function.Function; +import java.util.stream.Collectors; + +/** + * Factory for {@link MergingSharedSlotProfileRetriever}. + */ +class MergingSharedSlotProfileRetrieverFactory implements SharedSlotProfileRetriever.SharedSlotProfileRetrieverFactory { + private final PreferredLocationsRetriever preferredLocationsRetriever; + + private final Function<ExecutionVertexID, ResourceProfile> resourceProfileRetriever; + + private final Function<ExecutionVertexID, AllocationID> prioAllocationIdRetriever; + + MergingSharedSlotProfileRetrieverFactory( + PreferredLocationsRetriever preferredLocationsRetriever, + Function<ExecutionVertexID, ResourceProfile> resourceProfileRetriever, + Function<ExecutionVertexID, AllocationID> prioAllocationIdRetriever) { Review comment: Also applies to the this factory field with the same name. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/MergingSharedSlotProfileRetrieverFactory.java ########## @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.clusterframework.types.SlotProfile; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import org.apache.flink.util.Preconditions; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.function.Function; +import java.util.stream.Collectors; + +/** + * Factory for {@link MergingSharedSlotProfileRetriever}. + */ +class MergingSharedSlotProfileRetrieverFactory implements SharedSlotProfileRetriever.SharedSlotProfileRetrieverFactory { + private final PreferredLocationsRetriever preferredLocationsRetriever; + + private final Function<ExecutionVertexID, ResourceProfile> resourceProfileRetriever; + + private final Function<ExecutionVertexID, AllocationID> prioAllocationIdRetriever; + + MergingSharedSlotProfileRetrieverFactory( + PreferredLocationsRetriever preferredLocationsRetriever, + Function<ExecutionVertexID, ResourceProfile> resourceProfileRetriever, + Function<ExecutionVertexID, AllocationID> prioAllocationIdRetriever) { + this.preferredLocationsRetriever = Preconditions.checkNotNull(preferredLocationsRetriever); + this.resourceProfileRetriever = Preconditions.checkNotNull(resourceProfileRetriever); + this.prioAllocationIdRetriever = Preconditions.checkNotNull(prioAllocationIdRetriever); + } + + @Override + public SharedSlotProfileRetriever createFromBulk(Set<ExecutionVertexID> bulk) { + Set<AllocationID> allPriorAllocationIds = bulk + .stream() + .map(prioAllocationIdRetriever) + .filter(Objects::nonNull) + .collect(Collectors.toSet()); + return new MergingSharedSlotProfileRetriever(allPriorAllocationIds, bulk); + } + + /** + * Computes a merged {@link SlotProfile} of an execution slot sharing group within a bulk to schedule. + */ + private class MergingSharedSlotProfileRetriever implements SharedSlotProfileRetriever { + /** + * All previous {@link AllocationID}s of the bulk to schedule. + */ + private final Set<AllocationID> allBulkPriorAllocationIds; + + /** + * All {@link ExecutionVertexID}s of the bulk. + */ + private final Set<ExecutionVertexID> producersToIgnore; + + private MergingSharedSlotProfileRetriever( + Set<AllocationID> allBulkPriorAllocationIds, + Set<ExecutionVertexID> producersToIgnore) { Review comment: maybe one more indentation or an empty line to separate the method header and body? ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/MergingSharedSlotProfileRetrieverTest.java ########## @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.configuration.MemorySize; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.clusterframework.types.SlotProfile; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.Preconditions; +import org.junit.Test; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; + +/** + * Tests for {@link org.apache.flink.runtime.scheduler.MergingSharedSlotProfileRetrieverFactory}. + */ +public class MergingSharedSlotProfileRetrieverTest { + + private static final PreferredLocationsRetriever EMPTY_PREFERRED_LOCATIONS_RETRIEVER = + (executionVertexId, producersToIgnore) -> CompletableFuture.completedFuture(Collections.emptyList()); + + @Test + public void testGetEmptySlotProfile() throws ExecutionException, InterruptedException { + SharedSlotProfileRetriever sharedSlotProfileRetriever = new MergingSharedSlotProfileRetrieverFactory( + EMPTY_PREFERRED_LOCATIONS_RETRIEVER, + executionVertexID -> ResourceProfile.ZERO, + executionVertexID -> new AllocationID() + ).createFromBulk(Collections.emptySet()); + + SlotProfile slotProfile = sharedSlotProfileRetriever.getSlotProfileFuture(new ExecutionSlotSharingGroup()).get(); + + assertThat(slotProfile.getTaskResourceProfile(), is(ResourceProfile.ZERO)); + assertThat(slotProfile.getPhysicalSlotResourceProfile(), is(ResourceProfile.ZERO)); + assertThat(slotProfile.getPreferredLocations(), containsInAnyOrder(Collections.emptyList())); Review comment: ```suggestion assertThat(slotProfile.getPreferredLocations(), hasSize(0)); ``` ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/MergingSharedSlotProfileRetrieverFactory.java ########## @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.clusterframework.types.SlotProfile; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import org.apache.flink.util.Preconditions; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.function.Function; +import java.util.stream.Collectors; + +/** + * Factory for {@link MergingSharedSlotProfileRetriever}. + */ +class MergingSharedSlotProfileRetrieverFactory implements SharedSlotProfileRetriever.SharedSlotProfileRetrieverFactory { + private final PreferredLocationsRetriever preferredLocationsRetriever; + + private final Function<ExecutionVertexID, ResourceProfile> resourceProfileRetriever; + + private final Function<ExecutionVertexID, AllocationID> prioAllocationIdRetriever; + + MergingSharedSlotProfileRetrieverFactory( + PreferredLocationsRetriever preferredLocationsRetriever, + Function<ExecutionVertexID, ResourceProfile> resourceProfileRetriever, + Function<ExecutionVertexID, AllocationID> prioAllocationIdRetriever) { + this.preferredLocationsRetriever = Preconditions.checkNotNull(preferredLocationsRetriever); + this.resourceProfileRetriever = Preconditions.checkNotNull(resourceProfileRetriever); + this.prioAllocationIdRetriever = Preconditions.checkNotNull(prioAllocationIdRetriever); + } + + @Override + public SharedSlotProfileRetriever createFromBulk(Set<ExecutionVertexID> bulk) { + Set<AllocationID> allPriorAllocationIds = bulk + .stream() + .map(prioAllocationIdRetriever) + .filter(Objects::nonNull) + .collect(Collectors.toSet()); + return new MergingSharedSlotProfileRetriever(allPriorAllocationIds, bulk); + } + + /** + * Computes a merged {@link SlotProfile} of an execution slot sharing group within a bulk to schedule. + */ + private class MergingSharedSlotProfileRetriever implements SharedSlotProfileRetriever { + /** + * All previous {@link AllocationID}s of the bulk to schedule. + */ + private final Set<AllocationID> allBulkPriorAllocationIds; + + /** + * All {@link ExecutionVertexID}s of the bulk. + */ + private final Set<ExecutionVertexID> producersToIgnore; + + private MergingSharedSlotProfileRetriever( + Set<AllocationID> allBulkPriorAllocationIds, + Set<ExecutionVertexID> producersToIgnore) { + this.allBulkPriorAllocationIds = Preconditions.checkNotNull(allBulkPriorAllocationIds); + this.producersToIgnore = Preconditions.checkNotNull(producersToIgnore); + } + + /** + * Computes a {@link SlotProfile} of an execution slot sharing group. + * + * <p>The {@link ResourceProfile} of the {@link SlotProfile} is the merged {@link ResourceProfile}s + * of all executions sharing the slot. + * + * <p>The preferred locations of the {@link SlotProfile} is a union of the preferred locations + * of all executions sharing the slot. The input locations within the bulk are ignored to avoid cyclic dependencies + * within the region, e.g. in case of all-to-all pipelined connections, so that the allocations do not block each other. + * + * <p>The preferred {@link AllocationID}s of the {@link SlotProfile} are all previous {@link AllocationID}s + * of all executions sharing the slot. + * + * <p>The {@link SlotProfile} also refers to all previous {@link AllocationID}s + * of all executions within the bulk. + * + * @param executionSlotSharingGroup executions sharing the slot. + * @return a future of the {@link SlotProfile} to allocate for the {@code executionSlotSharingGroup}. + */ + @Override + public CompletableFuture<SlotProfile> getSlotProfileFuture(ExecutionSlotSharingGroup executionSlotSharingGroup) { + ResourceProfile totalSlotResourceProfile = ResourceProfile.ZERO; + Collection<AllocationID> priorAllocations = new HashSet<>(); + Collection<CompletableFuture<Collection<TaskManagerLocation>>> preferredLocationsPerExecution = new ArrayList<>(); + for (ExecutionVertexID execution : executionSlotSharingGroup.getExecutionVertexIds()) { + totalSlotResourceProfile = totalSlotResourceProfile.merge(resourceProfileRetriever.apply(execution)); + priorAllocations.add(prioAllocationIdRetriever.apply(execution)); + preferredLocationsPerExecution.add(preferredLocationsRetriever + .getPreferredLocations(execution, producersToIgnore)); + } + + CompletableFuture<Collection<TaskManagerLocation>> preferredLocationsFuture = FutureUtils + .combineAll(preferredLocationsPerExecution) + .thenApply(executionPreferredLocations -> + executionPreferredLocations.stream().flatMap(Collection::stream).collect(Collectors.toSet())); Review comment: I think a list is better than a set here because the frequency of a location will be accounted when selecting the best location in `LocationPreferenceSlotSelectionStrategy`. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org