zhuzhurk commented on a change in pull request #13028:
URL: https://github.com/apache/flink/pull/13028#discussion_r464215530



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/MergingSharedSlotProfileRetrieverFactory.java
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * Factory for {@link MergingSharedSlotProfileRetriever}.
+ */
+class MergingSharedSlotProfileRetrieverFactory implements 
SharedSlotProfileRetriever.SharedSlotProfileRetrieverFactory {
+       private final PreferredLocationsRetriever preferredLocationsRetriever;
+
+       private final Function<ExecutionVertexID, ResourceProfile> 
resourceProfileRetriever;
+
+       private final Function<ExecutionVertexID, AllocationID> 
prioAllocationIdRetriever;
+
+       MergingSharedSlotProfileRetrieverFactory(
+                       PreferredLocationsRetriever preferredLocationsRetriever,
+                       Function<ExecutionVertexID, ResourceProfile> 
resourceProfileRetriever,
+                       Function<ExecutionVertexID, AllocationID> 
prioAllocationIdRetriever) {

Review comment:
       `prioAllocationIdRetriever` -> `priorAllocationIdRetriever `

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/MergingSharedSlotProfileRetrieverFactory.java
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * Factory for {@link MergingSharedSlotProfileRetriever}.
+ */
+class MergingSharedSlotProfileRetrieverFactory implements 
SharedSlotProfileRetriever.SharedSlotProfileRetrieverFactory {
+       private final PreferredLocationsRetriever preferredLocationsRetriever;
+
+       private final Function<ExecutionVertexID, ResourceProfile> 
resourceProfileRetriever;
+
+       private final Function<ExecutionVertexID, AllocationID> 
prioAllocationIdRetriever;
+
+       MergingSharedSlotProfileRetrieverFactory(
+                       PreferredLocationsRetriever preferredLocationsRetriever,
+                       Function<ExecutionVertexID, ResourceProfile> 
resourceProfileRetriever,
+                       Function<ExecutionVertexID, AllocationID> 
prioAllocationIdRetriever) {

Review comment:
       Also applies to the this factory field with the same name.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/MergingSharedSlotProfileRetrieverFactory.java
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * Factory for {@link MergingSharedSlotProfileRetriever}.
+ */
+class MergingSharedSlotProfileRetrieverFactory implements 
SharedSlotProfileRetriever.SharedSlotProfileRetrieverFactory {
+       private final PreferredLocationsRetriever preferredLocationsRetriever;
+
+       private final Function<ExecutionVertexID, ResourceProfile> 
resourceProfileRetriever;
+
+       private final Function<ExecutionVertexID, AllocationID> 
prioAllocationIdRetriever;
+
+       MergingSharedSlotProfileRetrieverFactory(
+                       PreferredLocationsRetriever preferredLocationsRetriever,
+                       Function<ExecutionVertexID, ResourceProfile> 
resourceProfileRetriever,
+                       Function<ExecutionVertexID, AllocationID> 
prioAllocationIdRetriever) {
+               this.preferredLocationsRetriever = 
Preconditions.checkNotNull(preferredLocationsRetriever);
+               this.resourceProfileRetriever = 
Preconditions.checkNotNull(resourceProfileRetriever);
+               this.prioAllocationIdRetriever = 
Preconditions.checkNotNull(prioAllocationIdRetriever);
+       }
+
+       @Override
+       public SharedSlotProfileRetriever createFromBulk(Set<ExecutionVertexID> 
bulk) {
+               Set<AllocationID> allPriorAllocationIds = bulk
+                       .stream()
+                       .map(prioAllocationIdRetriever)
+                       .filter(Objects::nonNull)
+                       .collect(Collectors.toSet());
+               return new 
MergingSharedSlotProfileRetriever(allPriorAllocationIds, bulk);
+       }
+
+       /**
+        * Computes a merged {@link SlotProfile} of an execution slot sharing 
group within a bulk to schedule.
+        */
+       private class MergingSharedSlotProfileRetriever implements 
SharedSlotProfileRetriever {
+               /**
+                * All previous {@link AllocationID}s of the bulk to schedule.
+                */
+               private final Set<AllocationID> allBulkPriorAllocationIds;
+
+               /**
+                * All {@link ExecutionVertexID}s of the bulk.
+                */
+               private final Set<ExecutionVertexID> producersToIgnore;
+
+               private MergingSharedSlotProfileRetriever(
+                       Set<AllocationID> allBulkPriorAllocationIds,
+                       Set<ExecutionVertexID> producersToIgnore) {

Review comment:
       maybe one more indentation or an empty line to separate the method 
header and body?

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/MergingSharedSlotProfileRetrieverTest.java
##########
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+import org.junit.Test;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link 
org.apache.flink.runtime.scheduler.MergingSharedSlotProfileRetrieverFactory}.
+ */
+public class MergingSharedSlotProfileRetrieverTest {
+
+       private static final PreferredLocationsRetriever 
EMPTY_PREFERRED_LOCATIONS_RETRIEVER =
+               (executionVertexId, producersToIgnore) -> 
CompletableFuture.completedFuture(Collections.emptyList());
+
+       @Test
+       public void testGetEmptySlotProfile() throws ExecutionException, 
InterruptedException {
+               SharedSlotProfileRetriever sharedSlotProfileRetriever = new 
MergingSharedSlotProfileRetrieverFactory(
+                       EMPTY_PREFERRED_LOCATIONS_RETRIEVER,
+                       executionVertexID -> ResourceProfile.ZERO,
+                       executionVertexID -> new AllocationID()
+               ).createFromBulk(Collections.emptySet());
+
+               SlotProfile slotProfile = 
sharedSlotProfileRetriever.getSlotProfileFuture(new 
ExecutionSlotSharingGroup()).get();
+
+               assertThat(slotProfile.getTaskResourceProfile(), 
is(ResourceProfile.ZERO));
+               assertThat(slotProfile.getPhysicalSlotResourceProfile(), 
is(ResourceProfile.ZERO));
+               assertThat(slotProfile.getPreferredLocations(), 
containsInAnyOrder(Collections.emptyList()));

Review comment:
       ```suggestion
                assertThat(slotProfile.getPreferredLocations(), hasSize(0));
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/MergingSharedSlotProfileRetrieverFactory.java
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * Factory for {@link MergingSharedSlotProfileRetriever}.
+ */
+class MergingSharedSlotProfileRetrieverFactory implements 
SharedSlotProfileRetriever.SharedSlotProfileRetrieverFactory {
+       private final PreferredLocationsRetriever preferredLocationsRetriever;
+
+       private final Function<ExecutionVertexID, ResourceProfile> 
resourceProfileRetriever;
+
+       private final Function<ExecutionVertexID, AllocationID> 
prioAllocationIdRetriever;
+
+       MergingSharedSlotProfileRetrieverFactory(
+                       PreferredLocationsRetriever preferredLocationsRetriever,
+                       Function<ExecutionVertexID, ResourceProfile> 
resourceProfileRetriever,
+                       Function<ExecutionVertexID, AllocationID> 
prioAllocationIdRetriever) {
+               this.preferredLocationsRetriever = 
Preconditions.checkNotNull(preferredLocationsRetriever);
+               this.resourceProfileRetriever = 
Preconditions.checkNotNull(resourceProfileRetriever);
+               this.prioAllocationIdRetriever = 
Preconditions.checkNotNull(prioAllocationIdRetriever);
+       }
+
+       @Override
+       public SharedSlotProfileRetriever createFromBulk(Set<ExecutionVertexID> 
bulk) {
+               Set<AllocationID> allPriorAllocationIds = bulk
+                       .stream()
+                       .map(prioAllocationIdRetriever)
+                       .filter(Objects::nonNull)
+                       .collect(Collectors.toSet());
+               return new 
MergingSharedSlotProfileRetriever(allPriorAllocationIds, bulk);
+       }
+
+       /**
+        * Computes a merged {@link SlotProfile} of an execution slot sharing 
group within a bulk to schedule.
+        */
+       private class MergingSharedSlotProfileRetriever implements 
SharedSlotProfileRetriever {
+               /**
+                * All previous {@link AllocationID}s of the bulk to schedule.
+                */
+               private final Set<AllocationID> allBulkPriorAllocationIds;
+
+               /**
+                * All {@link ExecutionVertexID}s of the bulk.
+                */
+               private final Set<ExecutionVertexID> producersToIgnore;
+
+               private MergingSharedSlotProfileRetriever(
+                       Set<AllocationID> allBulkPriorAllocationIds,
+                       Set<ExecutionVertexID> producersToIgnore) {
+                       this.allBulkPriorAllocationIds = 
Preconditions.checkNotNull(allBulkPriorAllocationIds);
+                       this.producersToIgnore = 
Preconditions.checkNotNull(producersToIgnore);
+               }
+
+               /**
+                * Computes a {@link SlotProfile} of an execution slot sharing 
group.
+                *
+                * <p>The {@link ResourceProfile} of the {@link SlotProfile} is 
the merged {@link ResourceProfile}s
+                * of all executions sharing the slot.
+                *
+                * <p>The preferred locations of the {@link SlotProfile} is a 
union of the preferred locations
+                * of all executions sharing the slot. The input locations 
within the bulk are ignored to avoid cyclic dependencies
+                * within the region, e.g. in case of all-to-all pipelined 
connections, so that the allocations do not block each other.
+                *
+                * <p>The preferred {@link AllocationID}s of the {@link 
SlotProfile} are all previous {@link AllocationID}s
+                * of all executions sharing the slot.
+                *
+                * <p>The {@link SlotProfile} also refers to all previous 
{@link AllocationID}s
+                * of all executions within the bulk.
+                *
+                * @param executionSlotSharingGroup executions sharing the slot.
+                * @return a future of the {@link SlotProfile} to allocate for 
the {@code executionSlotSharingGroup}.
+                */
+               @Override
+               public CompletableFuture<SlotProfile> 
getSlotProfileFuture(ExecutionSlotSharingGroup executionSlotSharingGroup) {
+                       ResourceProfile totalSlotResourceProfile = 
ResourceProfile.ZERO;
+                       Collection<AllocationID> priorAllocations = new 
HashSet<>();
+                       
Collection<CompletableFuture<Collection<TaskManagerLocation>>> 
preferredLocationsPerExecution = new ArrayList<>();
+                       for (ExecutionVertexID execution : 
executionSlotSharingGroup.getExecutionVertexIds()) {
+                               totalSlotResourceProfile = 
totalSlotResourceProfile.merge(resourceProfileRetriever.apply(execution));
+                               
priorAllocations.add(prioAllocationIdRetriever.apply(execution));
+                               
preferredLocationsPerExecution.add(preferredLocationsRetriever
+                                       .getPreferredLocations(execution, 
producersToIgnore));
+                       }
+
+                       CompletableFuture<Collection<TaskManagerLocation>> 
preferredLocationsFuture = FutureUtils
+                               .combineAll(preferredLocationsPerExecution)
+                               .thenApply(executionPreferredLocations ->
+                                       
executionPreferredLocations.stream().flatMap(Collection::stream).collect(Collectors.toSet()));

Review comment:
       I think a list is better than a set here because the frequency of a 
location will be accounted when selecting the best location in 
`LocationPreferenceSlotSelectionStrategy`. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to