zhuzhurk commented on a change in pull request #13071: URL: https://github.com/apache/flink/pull/13071#discussion_r469316182
########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocatorTest.java ########## @@ -0,0 +1,575 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.clusterframework.types.SlotProfile; +import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway; +import org.apache.flink.runtime.instance.SimpleSlotContext; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobmaster.LogicalSlot; +import org.apache.flink.runtime.jobmaster.SlotRequestId; +import org.apache.flink.runtime.jobmaster.TestingPayload; +import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot; +import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProvider; +import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequest; +import org.apache.flink.runtime.scheduler.SharedSlotProfileRetriever.SharedSlotProfileRetrieverFactory; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.Preconditions; + +import org.junit.Test; +import org.powermock.core.IdentityHashSet; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.IdentityHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.function.BiConsumer; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.notNullValue; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; + +/** + * Test suite for {@link SlotSharingExecutionSlotAllocator}. + */ +public class SlotSharingExecutionSlotAllocatorTest { + @Test + public void testSlotProfileRequestAskedBulkAndGroup() { + AllocationContext context = AllocationContext.newBuilder().setGroups(2).build(); + ExecutionSlotSharingGroup executionSlotSharingGroup = + context.getSlotSharingStrategy().getExecutionSlotSharingGroup(context.getReqIds(0, 1).get(0)); + + context.allocateSlotsFor(0, 2); + + Set<ExecutionVertexID> ids = new HashSet<>(context.getReqIds(0, 2)); + assertThat(context.getSlotProfileRetrieverFactory().getAskedBulks(), containsInAnyOrder(ids)); + assertThat(context.getSlotProfileRetrieverFactory().getAskedGroups(), containsInAnyOrder(executionSlotSharingGroup)); + } + + @Test + public void testSlotRequestCompletionAfterProfileCompletion() { + AllocationContext context = AllocationContext.newBuilder().setGroups(2).completeSlotProfileFutureManually().build(); + ExecutionSlotSharingGroup executionSlotSharingGroup = + context.getSlotSharingStrategy().getExecutionSlotSharingGroup(context.getReqIds(0, 1).get(0)); + + List<SlotExecutionVertexAssignment> executionVertexAssignments = context.allocateSlotsFor(0, 2); + + executionVertexAssignments.forEach(assignment -> assertThat(assignment.getLogicalSlotFuture().isDone(), is(false))); + context.getSlotProfileRetrieverFactory().completeSlotProfileFutureFor(executionSlotSharingGroup); + executionVertexAssignments.forEach(assignment -> assertThat(assignment.getLogicalSlotFuture().isDone(), is(true))); + } + + @Test + public void testSlotRequestProfile() { + ResourceProfile physicalsSlotResourceProfile = ResourceProfile.fromResources(3, 5); + AllocationContext context = AllocationContext.newBuilder().setGroups(2).build(); + ExecutionSlotSharingGroup executionSlotSharingGroup = + context.getSlotSharingStrategy().getExecutionSlotSharingGroup(context.getReqIds(0, 1).get(0)); + context.getSlotProfileRetrieverFactory().addGroupResourceProfile(executionSlotSharingGroup, physicalsSlotResourceProfile); + + context.allocateSlotsFor(0, 2); + + Optional<PhysicalSlotRequest> slotRequest = context.getSlotProvider().getRequests().values().stream().findFirst(); + assertThat(slotRequest.isPresent(), is(true)); + slotRequest.ifPresent(r -> assertThat(r.getSlotProfile().getPhysicalSlotResourceProfile(), is(physicalsSlotResourceProfile))); + } + + @Test + public void testNewAllocatePhysicalSlotForSharedSlot() { + AllocationContext context = AllocationContext.newBuilder().setGroups(2, 2).build(); + + List<SlotExecutionVertexAssignment> executionVertexAssignments = context.allocateSlotsFor(0, 4); + Collection<ExecutionVertexID> assignIds = getAssignIds(executionVertexAssignments); + + assertThat(assignIds, containsInAnyOrder(context.getReqIds(0, 4).toArray())); + assertThat(context.getSlotProvider().getRequests().keySet(), hasSize(2)); + } + + @Test + public void testAllocateLogicalSlotFromAvailableSharedSlot() { + AllocationContext context = AllocationContext.newBuilder().setGroups(2).build(); + + context.allocateSlotsFor(0, 1); + List<SlotExecutionVertexAssignment> executionVertexAssignments = context.allocateSlotsFor(1, 2); + Collection<ExecutionVertexID> assignIds = getAssignIds(executionVertexAssignments); + + // execution 0 from the first allocateSlotsFor call and execution 1 from the second allocateSlotsFor call + // share a slot, therefore only one physical slot allocation should happen + assertThat(assignIds, containsInAnyOrder(context.getReqIds(1, 2).toArray())); + assertThat(context.getSlotProvider().getRequests().keySet(), hasSize(1)); + } + + @Test + public void testDuplicateAllocationDoesNotRecreateLogicalSlotFuture() throws ExecutionException, InterruptedException { + AllocationContext context = AllocationContext.newBuilder().setGroups(1).build(); + + SlotExecutionVertexAssignment assignment1 = context.allocateSlotsFor(0, 1).get(0); + SlotExecutionVertexAssignment assignment2 = context.allocateSlotsFor(0, 1).get(0); + + assertThat(assignment1.getLogicalSlotFuture().get() == assignment2.getLogicalSlotFuture().get(), is(true)); + } + + @Test + public void testFailedPhysicalSlotRequestFailsLogicalSlotFuturesAndRemovesSharedSlot() { + AllocationContext context = AllocationContext + .newBuilder() + .setGroups(1) + .completePhysicalSlotFutureManually() + .build(); + CompletableFuture<LogicalSlot> logicalSlotFuture = context.allocateSlotsFor(0, 1).get(0).getLogicalSlotFuture(); + SlotRequestId slotRequestId = context.getSlotProvider().getRequests().values().stream().findFirst().get().getSlotRequestId(); + + assertThat(logicalSlotFuture.isDone(), is(false)); + context.getSlotProvider().failPhysicalSlotFutureFor(slotRequestId, new Throwable()); + assertThat(logicalSlotFuture.isCompletedExceptionally(), is(true)); + + // next allocation allocates new shared slot + context.allocateSlotsFor(0, 1); + assertThat(context.getSlotProvider().getRequests().keySet(), hasSize(2)); + } + + @Test + public void testSlotWillBeOccupiedIndefinitelyFalse() throws ExecutionException, InterruptedException { + testSlotWillBeOccupiedIndefinitely(false); + } + + @Test + public void testSlotWillBeOccupiedIndefinitelyTrue() throws ExecutionException, InterruptedException { + testSlotWillBeOccupiedIndefinitely(true); + } + + private static void testSlotWillBeOccupiedIndefinitely(boolean slotWillBeOccupiedIndefinitely) throws ExecutionException, InterruptedException { + AllocationContext context = AllocationContext + .newBuilder() + .setGroups(1) + .setSlotWillBeOccupiedIndefinitely(slotWillBeOccupiedIndefinitely) + .build(); + context.allocateSlotsFor(0, 1); + + PhysicalSlotRequest slotRequest = context.getSlotProvider().getRequests().values().stream().findFirst().get(); + assertThat(slotRequest.willSlotBeOccupiedIndefinitely(), is(slotWillBeOccupiedIndefinitely)); + + TestingPhysicalSlot physicalSlot = context.getSlotProvider().getResponses().get(slotRequest.getSlotRequestId()).get(); + assertThat(physicalSlot.getPayload(), notNullValue()); + assertThat(physicalSlot.getPayload().willOccupySlotIndefinitely(), is(slotWillBeOccupiedIndefinitely)); + } + + @Test + public void testReturningLogicalSlotsRemovesSharedSlot() { + testLogicalSlotRequestCancellation( + false, + (context, assignment) -> { + try { + assignment.getLogicalSlotFuture().get().releaseSlot(null); + } catch (InterruptedException | ExecutionException e) { + throw new FlinkRuntimeException("Unexpected", e); + } + }); + } + + @Test + public void testLogicalSlotCancelsPhysicalSlotRequestAndRemovesSharedSlot() { + testLogicalSlotRequestCancellation( + true, + (context, assignment) -> { + context.getAllocator().cancel(assignment.getExecutionVertexId()); + try { + assignment.getLogicalSlotFuture().get(); + fail("THe logical future must finish with the cancellation exception"); + } catch (InterruptedException | ExecutionException e) { + assertThat(e.getCause(), instanceOf(CancellationException.class)); + } + }); + } + + private static void testLogicalSlotRequestCancellation( + boolean completePhysicalSlotFutureManually, + BiConsumer<AllocationContext, SlotExecutionVertexAssignment> cancelAction) { + //if (completePhysicalSlotRequest) { + AllocationContext context = AllocationContext + .newBuilder() + .setGroups(2) + .completePhysicalSlotFutureManually(completePhysicalSlotFutureManually) + .build(); + + List<SlotExecutionVertexAssignment> assignments = context.allocateSlotsFor(0, 2); + assertThat(context.getSlotProvider().getRequests().keySet(), hasSize(1)); + + // cancel only one sharing logical slots + cancelAction.accept(context, assignments.get(0)); + assignments = context.allocateSlotsFor(0, 2); + // there should be no more physical slot allocations, as the first logical slot reuses the previous shared slot + assertThat(context.getSlotProvider().getRequests().keySet(), hasSize(1)); + + // cancel all sharing logical slots + for (SlotExecutionVertexAssignment assignment : assignments) { + cancelAction.accept(context, assignment); + } + SlotRequestId slotRequestId = context.getSlotProvider().getRequests().keySet().stream().findFirst().get(); + assertThat(context.getSlotProvider().getCancelations().containsKey(slotRequestId), is(true)); + + context.allocateSlotsFor(0, 2); + // there should be one more physical slot allocation, as the first allocation should be removed after releasing all logical slots + assertThat(context.getSlotProvider().getRequests().keySet(), hasSize(2)); + } + + @Test + public void testPhysicalSlotReleaseLogicalSlots() throws ExecutionException, InterruptedException { + AllocationContext context = AllocationContext.newBuilder().setGroups(2).build(); + List<SlotExecutionVertexAssignment> assignments = context.allocateSlotsFor(0, 2); + List<TestingPayload> payloads = assignments + .stream() + .map(assignment -> { + TestingPayload payload = new TestingPayload(); + assignment.getLogicalSlotFuture().thenAccept(logicalSlot -> logicalSlot.tryAssignPayload(payload)); + return payload; + }) + .collect(Collectors.toList()); + TestingPhysicalSlot physicalSlot = context.getSlotProvider().getResponses().values().stream().findFirst().get().get(); + + assertThat(payloads.stream().allMatch(payload -> payload.getTerminalStateFuture().isDone()), is(false)); + assertThat(physicalSlot.getPayload(), notNullValue()); + physicalSlot.getPayload().release(new Throwable()); + assertThat(payloads.stream().allMatch(payload -> payload.getTerminalStateFuture().isDone()), is(true)); + } + + private static List<ExecutionVertexID> getAssignIds(Collection<SlotExecutionVertexAssignment> assignments) { + return assignments.stream().map(SlotExecutionVertexAssignment::getExecutionVertexId).collect(Collectors.toList()); + } + + private static class AllocationContext { + private final List<ExecutionVertexSchedulingRequirements> requirements; + private final TestingPhysicalSlotProvider slotProvider; + private final TestingSlotSharingStrategy slotSharingStrategy; + private final SlotSharingExecutionSlotAllocator allocator; + private final TestingSharedSlotProfileRetrieverFactory slotProfileRetrieverFactory; + + AllocationContext( + List<ExecutionVertexSchedulingRequirements> requirements, + TestingPhysicalSlotProvider slotProvider, + TestingSlotSharingStrategy slotSharingStrategy, + SlotSharingExecutionSlotAllocator allocator, + TestingSharedSlotProfileRetrieverFactory slotProfileRetrieverFactory) { + this.requirements = requirements; + this.slotProvider = slotProvider; + this.slotSharingStrategy = slotSharingStrategy; + this.allocator = allocator; + this.slotProfileRetrieverFactory = slotProfileRetrieverFactory; + } + + public SlotSharingExecutionSlotAllocator getAllocator() { + return allocator; + } + + private List<SlotExecutionVertexAssignment> allocateSlotsFor(int start, int end) { + return allocator.allocateSlotsFor(requirements.subList(start, end)); + } + + private TestingSlotSharingStrategy getSlotSharingStrategy() { + return slotSharingStrategy; + } + + private List<ExecutionVertexID> getReqIds(int start, int end) { + return getReqIds(requirements.subList(start, end)); + } + + private TestingPhysicalSlotProvider getSlotProvider() { + return slotProvider; + } + + private TestingSharedSlotProfileRetrieverFactory getSlotProfileRetrieverFactory() { + return slotProfileRetrieverFactory; + } + + private static List<ExecutionVertexID> getReqIds(Collection<ExecutionVertexSchedulingRequirements> requirements) { + return requirements.stream().map(ExecutionVertexSchedulingRequirements::getExecutionVertexId).collect(Collectors.toList()); + } + + static Builder newBuilder() { + return new Builder(); + } + + private static class Builder { + private int[] groups = { 2, 1 }; // 2 executions in the first group, 1 in the second etc + private List<ResourceProfile> resourceProfiles; + private boolean completePhysicalSlotFutureManually; + private boolean completeSlotProfileFutureManually; + private boolean slotWillBeOccupiedIndefinitely; + + private Builder setGroups(int... groups) { + int reqNumber = IntStream.of(groups).sum(); + List<ResourceProfile> resourceProfiles = Collections.nCopies(reqNumber, ResourceProfile.UNKNOWN); + return setGroupsWithResourceProfiles(resourceProfiles, groups); + } + + private Builder setGroupsWithResourceProfiles(List<ResourceProfile> resourceProfiles, int... groups) { + Preconditions.checkArgument(resourceProfiles.size() == IntStream.of(groups).sum()); + this.resourceProfiles = resourceProfiles; + this.groups = groups; + return this; + } + + private Builder completePhysicalSlotFutureManually() { + completePhysicalSlotFutureManually(true); + return this; + } + + private Builder completePhysicalSlotFutureManually(boolean value) { + this.completePhysicalSlotFutureManually = value; + return this; + } + + private Builder completeSlotProfileFutureManually() { + this.completeSlotProfileFutureManually = true; + return this; + } + + private Builder setSlotWillBeOccupiedIndefinitely(boolean slotWillBeOccupiedIndefinitely) { + this.slotWillBeOccupiedIndefinitely = slotWillBeOccupiedIndefinitely; + return this; + } + + private AllocationContext build() { + List<ExecutionVertexSchedulingRequirements> requirements = createSchedulingRequirements(); + TestingPhysicalSlotProvider slotProvider = new TestingPhysicalSlotProvider(completePhysicalSlotFutureManually); + TestingSharedSlotProfileRetrieverFactory sharedSlotProfileRetrieverFactory = + new TestingSharedSlotProfileRetrieverFactory(completeSlotProfileFutureManually); + TestingSlotSharingStrategy slotSharingStrategy = TestingSlotSharingStrategy.createWithGroups(getReqIds(requirements), groups); + SlotSharingExecutionSlotAllocator allocator = new SlotSharingExecutionSlotAllocator( + slotProvider, + slotWillBeOccupiedIndefinitely, + slotSharingStrategy, + sharedSlotProfileRetrieverFactory); + return new AllocationContext( + requirements, + slotProvider, + slotSharingStrategy, + allocator, + sharedSlotProfileRetrieverFactory); + } + + private List<ExecutionVertexSchedulingRequirements> createSchedulingRequirements() { + return resourceProfiles + .stream() + .map(resourceProfile -> + new ExecutionVertexSchedulingRequirements + .Builder() + .withExecutionVertexId(new ExecutionVertexID(new JobVertexID(), 0)) + .withTaskResourceProfile(resourceProfile) + .withPhysicalSlotResourceProfile(resourceProfile) // not used + .build()) + .collect(Collectors.toList()); + } + } + } + + private static class TestingPhysicalSlotProvider implements PhysicalSlotProvider { + private final Map<SlotRequestId, PhysicalSlotRequest> requests; + private final Map<SlotRequestId, CompletableFuture<TestingPhysicalSlot>> responses; + private final Map<SlotRequestId, Throwable> cancelations; Review comment: typo: `cancelations` -> `cancellations` ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocatorTest.java ########## @@ -0,0 +1,575 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.clusterframework.types.SlotProfile; +import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway; +import org.apache.flink.runtime.instance.SimpleSlotContext; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobmaster.LogicalSlot; +import org.apache.flink.runtime.jobmaster.SlotRequestId; +import org.apache.flink.runtime.jobmaster.TestingPayload; +import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot; +import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProvider; +import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequest; +import org.apache.flink.runtime.scheduler.SharedSlotProfileRetriever.SharedSlotProfileRetrieverFactory; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.Preconditions; + +import org.junit.Test; +import org.powermock.core.IdentityHashSet; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.IdentityHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.function.BiConsumer; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.notNullValue; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; + +/** + * Test suite for {@link SlotSharingExecutionSlotAllocator}. + */ +public class SlotSharingExecutionSlotAllocatorTest { + @Test + public void testSlotProfileRequestAskedBulkAndGroup() { + AllocationContext context = AllocationContext.newBuilder().setGroups(2).build(); + ExecutionSlotSharingGroup executionSlotSharingGroup = + context.getSlotSharingStrategy().getExecutionSlotSharingGroup(context.getReqIds(0, 1).get(0)); + + context.allocateSlotsFor(0, 2); + + Set<ExecutionVertexID> ids = new HashSet<>(context.getReqIds(0, 2)); + assertThat(context.getSlotProfileRetrieverFactory().getAskedBulks(), containsInAnyOrder(ids)); + assertThat(context.getSlotProfileRetrieverFactory().getAskedGroups(), containsInAnyOrder(executionSlotSharingGroup)); + } + + @Test + public void testSlotRequestCompletionAfterProfileCompletion() { + AllocationContext context = AllocationContext.newBuilder().setGroups(2).completeSlotProfileFutureManually().build(); + ExecutionSlotSharingGroup executionSlotSharingGroup = + context.getSlotSharingStrategy().getExecutionSlotSharingGroup(context.getReqIds(0, 1).get(0)); + + List<SlotExecutionVertexAssignment> executionVertexAssignments = context.allocateSlotsFor(0, 2); + + executionVertexAssignments.forEach(assignment -> assertThat(assignment.getLogicalSlotFuture().isDone(), is(false))); + context.getSlotProfileRetrieverFactory().completeSlotProfileFutureFor(executionSlotSharingGroup); + executionVertexAssignments.forEach(assignment -> assertThat(assignment.getLogicalSlotFuture().isDone(), is(true))); + } + + @Test + public void testSlotRequestProfile() { + ResourceProfile physicalsSlotResourceProfile = ResourceProfile.fromResources(3, 5); + AllocationContext context = AllocationContext.newBuilder().setGroups(2).build(); + ExecutionSlotSharingGroup executionSlotSharingGroup = + context.getSlotSharingStrategy().getExecutionSlotSharingGroup(context.getReqIds(0, 1).get(0)); + context.getSlotProfileRetrieverFactory().addGroupResourceProfile(executionSlotSharingGroup, physicalsSlotResourceProfile); + + context.allocateSlotsFor(0, 2); + + Optional<PhysicalSlotRequest> slotRequest = context.getSlotProvider().getRequests().values().stream().findFirst(); + assertThat(slotRequest.isPresent(), is(true)); + slotRequest.ifPresent(r -> assertThat(r.getSlotProfile().getPhysicalSlotResourceProfile(), is(physicalsSlotResourceProfile))); + } + + @Test + public void testNewAllocatePhysicalSlotForSharedSlot() { + AllocationContext context = AllocationContext.newBuilder().setGroups(2, 2).build(); + + List<SlotExecutionVertexAssignment> executionVertexAssignments = context.allocateSlotsFor(0, 4); + Collection<ExecutionVertexID> assignIds = getAssignIds(executionVertexAssignments); + + assertThat(assignIds, containsInAnyOrder(context.getReqIds(0, 4).toArray())); + assertThat(context.getSlotProvider().getRequests().keySet(), hasSize(2)); + } + + @Test + public void testAllocateLogicalSlotFromAvailableSharedSlot() { + AllocationContext context = AllocationContext.newBuilder().setGroups(2).build(); + + context.allocateSlotsFor(0, 1); + List<SlotExecutionVertexAssignment> executionVertexAssignments = context.allocateSlotsFor(1, 2); + Collection<ExecutionVertexID> assignIds = getAssignIds(executionVertexAssignments); + + // execution 0 from the first allocateSlotsFor call and execution 1 from the second allocateSlotsFor call + // share a slot, therefore only one physical slot allocation should happen + assertThat(assignIds, containsInAnyOrder(context.getReqIds(1, 2).toArray())); + assertThat(context.getSlotProvider().getRequests().keySet(), hasSize(1)); + } + + @Test + public void testDuplicateAllocationDoesNotRecreateLogicalSlotFuture() throws ExecutionException, InterruptedException { + AllocationContext context = AllocationContext.newBuilder().setGroups(1).build(); + + SlotExecutionVertexAssignment assignment1 = context.allocateSlotsFor(0, 1).get(0); + SlotExecutionVertexAssignment assignment2 = context.allocateSlotsFor(0, 1).get(0); + + assertThat(assignment1.getLogicalSlotFuture().get() == assignment2.getLogicalSlotFuture().get(), is(true)); + } + + @Test + public void testFailedPhysicalSlotRequestFailsLogicalSlotFuturesAndRemovesSharedSlot() { + AllocationContext context = AllocationContext + .newBuilder() + .setGroups(1) + .completePhysicalSlotFutureManually() + .build(); + CompletableFuture<LogicalSlot> logicalSlotFuture = context.allocateSlotsFor(0, 1).get(0).getLogicalSlotFuture(); + SlotRequestId slotRequestId = context.getSlotProvider().getRequests().values().stream().findFirst().get().getSlotRequestId(); + + assertThat(logicalSlotFuture.isDone(), is(false)); + context.getSlotProvider().failPhysicalSlotFutureFor(slotRequestId, new Throwable()); + assertThat(logicalSlotFuture.isCompletedExceptionally(), is(true)); + + // next allocation allocates new shared slot + context.allocateSlotsFor(0, 1); + assertThat(context.getSlotProvider().getRequests().keySet(), hasSize(2)); + } + + @Test + public void testSlotWillBeOccupiedIndefinitelyFalse() throws ExecutionException, InterruptedException { + testSlotWillBeOccupiedIndefinitely(false); + } + + @Test + public void testSlotWillBeOccupiedIndefinitelyTrue() throws ExecutionException, InterruptedException { + testSlotWillBeOccupiedIndefinitely(true); + } + + private static void testSlotWillBeOccupiedIndefinitely(boolean slotWillBeOccupiedIndefinitely) throws ExecutionException, InterruptedException { + AllocationContext context = AllocationContext + .newBuilder() + .setGroups(1) + .setSlotWillBeOccupiedIndefinitely(slotWillBeOccupiedIndefinitely) + .build(); + context.allocateSlotsFor(0, 1); + + PhysicalSlotRequest slotRequest = context.getSlotProvider().getRequests().values().stream().findFirst().get(); + assertThat(slotRequest.willSlotBeOccupiedIndefinitely(), is(slotWillBeOccupiedIndefinitely)); + + TestingPhysicalSlot physicalSlot = context.getSlotProvider().getResponses().get(slotRequest.getSlotRequestId()).get(); + assertThat(physicalSlot.getPayload(), notNullValue()); + assertThat(physicalSlot.getPayload().willOccupySlotIndefinitely(), is(slotWillBeOccupiedIndefinitely)); + } + + @Test + public void testReturningLogicalSlotsRemovesSharedSlot() { + testLogicalSlotRequestCancellation( + false, + (context, assignment) -> { + try { + assignment.getLogicalSlotFuture().get().releaseSlot(null); + } catch (InterruptedException | ExecutionException e) { + throw new FlinkRuntimeException("Unexpected", e); + } + }); + } + + @Test + public void testLogicalSlotCancelsPhysicalSlotRequestAndRemovesSharedSlot() { + testLogicalSlotRequestCancellation( + true, + (context, assignment) -> { + context.getAllocator().cancel(assignment.getExecutionVertexId()); + try { + assignment.getLogicalSlotFuture().get(); + fail("THe logical future must finish with the cancellation exception"); + } catch (InterruptedException | ExecutionException e) { + assertThat(e.getCause(), instanceOf(CancellationException.class)); + } + }); + } + + private static void testLogicalSlotRequestCancellation( + boolean completePhysicalSlotFutureManually, + BiConsumer<AllocationContext, SlotExecutionVertexAssignment> cancelAction) { + //if (completePhysicalSlotRequest) { + AllocationContext context = AllocationContext + .newBuilder() + .setGroups(2) + .completePhysicalSlotFutureManually(completePhysicalSlotFutureManually) + .build(); + + List<SlotExecutionVertexAssignment> assignments = context.allocateSlotsFor(0, 2); + assertThat(context.getSlotProvider().getRequests().keySet(), hasSize(1)); + + // cancel only one sharing logical slots + cancelAction.accept(context, assignments.get(0)); + assignments = context.allocateSlotsFor(0, 2); + // there should be no more physical slot allocations, as the first logical slot reuses the previous shared slot + assertThat(context.getSlotProvider().getRequests().keySet(), hasSize(1)); + + // cancel all sharing logical slots + for (SlotExecutionVertexAssignment assignment : assignments) { + cancelAction.accept(context, assignment); + } + SlotRequestId slotRequestId = context.getSlotProvider().getRequests().keySet().stream().findFirst().get(); + assertThat(context.getSlotProvider().getCancelations().containsKey(slotRequestId), is(true)); + + context.allocateSlotsFor(0, 2); + // there should be one more physical slot allocation, as the first allocation should be removed after releasing all logical slots + assertThat(context.getSlotProvider().getRequests().keySet(), hasSize(2)); + } + + @Test + public void testPhysicalSlotReleaseLogicalSlots() throws ExecutionException, InterruptedException { + AllocationContext context = AllocationContext.newBuilder().setGroups(2).build(); + List<SlotExecutionVertexAssignment> assignments = context.allocateSlotsFor(0, 2); + List<TestingPayload> payloads = assignments + .stream() + .map(assignment -> { + TestingPayload payload = new TestingPayload(); + assignment.getLogicalSlotFuture().thenAccept(logicalSlot -> logicalSlot.tryAssignPayload(payload)); + return payload; + }) + .collect(Collectors.toList()); + TestingPhysicalSlot physicalSlot = context.getSlotProvider().getResponses().values().stream().findFirst().get().get(); + + assertThat(payloads.stream().allMatch(payload -> payload.getTerminalStateFuture().isDone()), is(false)); + assertThat(physicalSlot.getPayload(), notNullValue()); + physicalSlot.getPayload().release(new Throwable()); + assertThat(payloads.stream().allMatch(payload -> payload.getTerminalStateFuture().isDone()), is(true)); + } + + private static List<ExecutionVertexID> getAssignIds(Collection<SlotExecutionVertexAssignment> assignments) { + return assignments.stream().map(SlotExecutionVertexAssignment::getExecutionVertexId).collect(Collectors.toList()); + } + + private static class AllocationContext { + private final List<ExecutionVertexSchedulingRequirements> requirements; + private final TestingPhysicalSlotProvider slotProvider; + private final TestingSlotSharingStrategy slotSharingStrategy; + private final SlotSharingExecutionSlotAllocator allocator; + private final TestingSharedSlotProfileRetrieverFactory slotProfileRetrieverFactory; + + AllocationContext( + List<ExecutionVertexSchedulingRequirements> requirements, + TestingPhysicalSlotProvider slotProvider, + TestingSlotSharingStrategy slotSharingStrategy, + SlotSharingExecutionSlotAllocator allocator, + TestingSharedSlotProfileRetrieverFactory slotProfileRetrieverFactory) { + this.requirements = requirements; + this.slotProvider = slotProvider; + this.slotSharingStrategy = slotSharingStrategy; + this.allocator = allocator; + this.slotProfileRetrieverFactory = slotProfileRetrieverFactory; + } + + public SlotSharingExecutionSlotAllocator getAllocator() { + return allocator; + } + + private List<SlotExecutionVertexAssignment> allocateSlotsFor(int start, int end) { + return allocator.allocateSlotsFor(requirements.subList(start, end)); + } + + private TestingSlotSharingStrategy getSlotSharingStrategy() { + return slotSharingStrategy; + } + + private List<ExecutionVertexID> getReqIds(int start, int end) { + return getReqIds(requirements.subList(start, end)); + } + + private TestingPhysicalSlotProvider getSlotProvider() { + return slotProvider; + } + + private TestingSharedSlotProfileRetrieverFactory getSlotProfileRetrieverFactory() { + return slotProfileRetrieverFactory; + } + + private static List<ExecutionVertexID> getReqIds(Collection<ExecutionVertexSchedulingRequirements> requirements) { + return requirements.stream().map(ExecutionVertexSchedulingRequirements::getExecutionVertexId).collect(Collectors.toList()); + } + + static Builder newBuilder() { + return new Builder(); + } + + private static class Builder { + private int[] groups = { 2, 1 }; // 2 executions in the first group, 1 in the second etc + private List<ResourceProfile> resourceProfiles; + private boolean completePhysicalSlotFutureManually; + private boolean completeSlotProfileFutureManually; + private boolean slotWillBeOccupiedIndefinitely; + + private Builder setGroups(int... groups) { + int reqNumber = IntStream.of(groups).sum(); + List<ResourceProfile> resourceProfiles = Collections.nCopies(reqNumber, ResourceProfile.UNKNOWN); + return setGroupsWithResourceProfiles(resourceProfiles, groups); + } + + private Builder setGroupsWithResourceProfiles(List<ResourceProfile> resourceProfiles, int... groups) { + Preconditions.checkArgument(resourceProfiles.size() == IntStream.of(groups).sum()); + this.resourceProfiles = resourceProfiles; + this.groups = groups; + return this; + } + + private Builder completePhysicalSlotFutureManually() { + completePhysicalSlotFutureManually(true); + return this; + } + + private Builder completePhysicalSlotFutureManually(boolean value) { + this.completePhysicalSlotFutureManually = value; + return this; + } + + private Builder completeSlotProfileFutureManually() { + this.completeSlotProfileFutureManually = true; + return this; + } + + private Builder setSlotWillBeOccupiedIndefinitely(boolean slotWillBeOccupiedIndefinitely) { + this.slotWillBeOccupiedIndefinitely = slotWillBeOccupiedIndefinitely; + return this; + } + + private AllocationContext build() { + List<ExecutionVertexSchedulingRequirements> requirements = createSchedulingRequirements(); + TestingPhysicalSlotProvider slotProvider = new TestingPhysicalSlotProvider(completePhysicalSlotFutureManually); + TestingSharedSlotProfileRetrieverFactory sharedSlotProfileRetrieverFactory = + new TestingSharedSlotProfileRetrieverFactory(completeSlotProfileFutureManually); + TestingSlotSharingStrategy slotSharingStrategy = TestingSlotSharingStrategy.createWithGroups(getReqIds(requirements), groups); + SlotSharingExecutionSlotAllocator allocator = new SlotSharingExecutionSlotAllocator( + slotProvider, + slotWillBeOccupiedIndefinitely, + slotSharingStrategy, + sharedSlotProfileRetrieverFactory); + return new AllocationContext( + requirements, + slotProvider, + slotSharingStrategy, + allocator, + sharedSlotProfileRetrieverFactory); + } + + private List<ExecutionVertexSchedulingRequirements> createSchedulingRequirements() { + return resourceProfiles + .stream() + .map(resourceProfile -> + new ExecutionVertexSchedulingRequirements + .Builder() + .withExecutionVertexId(new ExecutionVertexID(new JobVertexID(), 0)) + .withTaskResourceProfile(resourceProfile) + .withPhysicalSlotResourceProfile(resourceProfile) // not used + .build()) + .collect(Collectors.toList()); + } + } + } + + private static class TestingPhysicalSlotProvider implements PhysicalSlotProvider { + private final Map<SlotRequestId, PhysicalSlotRequest> requests; + private final Map<SlotRequestId, CompletableFuture<TestingPhysicalSlot>> responses; + private final Map<SlotRequestId, Throwable> cancelations; + private final boolean completePhysicalSlotFutureManually; + + private TestingPhysicalSlotProvider(boolean completePhysicalSlotFutureManually) { + this.completePhysicalSlotFutureManually = completePhysicalSlotFutureManually; + this.requests = new HashMap<>(); + this.responses = new HashMap<>(); + this.cancelations = new HashMap<>(); + } + + @Override + public CompletableFuture<PhysicalSlotRequest.Result> allocatePhysicalSlot(PhysicalSlotRequest physicalSlotRequest) { + SlotRequestId slotRequestId = physicalSlotRequest.getSlotRequestId(); + requests.put(slotRequestId, physicalSlotRequest); + CompletableFuture<TestingPhysicalSlot> resultFuture = new CompletableFuture<>(); + responses.put(slotRequestId, resultFuture); + if (!completePhysicalSlotFutureManually) { + completePhysicalSlotFutureFor(slotRequestId); + } + return resultFuture.thenApply(physicalSlot -> new PhysicalSlotRequest.Result(slotRequestId, physicalSlot)); + } + + private void completePhysicalSlotFutureFor(SlotRequestId slotRequestId) { + ResourceProfile resourceProfile = requests.get(slotRequestId).getSlotProfile().getPhysicalSlotResourceProfile(); + TestingPhysicalSlot physicalSlot = new TestingPhysicalSlot(resourceProfile); + responses.get(slotRequestId).complete(physicalSlot); + } + + private void failPhysicalSlotFutureFor(SlotRequestId slotRequestId, Throwable cause) { + responses.get(slotRequestId).completeExceptionally(cause); + } + + @Override + public void cancelSlotRequest(SlotRequestId slotRequestId, Throwable cause) { + cancelations.put(slotRequestId, cause); + } + + private Map<SlotRequestId, PhysicalSlotRequest> getRequests() { + return Collections.unmodifiableMap(requests); + } + + private Map<SlotRequestId, CompletableFuture<TestingPhysicalSlot>> getResponses() { + return Collections.unmodifiableMap(responses); + } + + private Map<SlotRequestId, Throwable> getCancelations() { + return Collections.unmodifiableMap(cancelations); + } + } + + private static class TestingSlotSharingStrategy implements SlotSharingStrategy { + private final Map<ExecutionVertexID, ExecutionSlotSharingGroup> executionSlotSharingGroups; + + private TestingSlotSharingStrategy(Map<ExecutionVertexID, ExecutionSlotSharingGroup> executionSlotSharingGroups) { + this.executionSlotSharingGroups = executionSlotSharingGroups; + } + + @Override + public ExecutionSlotSharingGroup getExecutionSlotSharingGroup(ExecutionVertexID executionVertexId) { + return executionSlotSharingGroups.get(executionVertexId); + } + + @Override + public Set<ExecutionSlotSharingGroup> getExecutionSlotSharingGroups() { + Set<ExecutionSlotSharingGroup> groups = new IdentityHashSet<>(); + groups.addAll(executionSlotSharingGroups.values()); + return groups; + } + + private static TestingSlotSharingStrategy createWithGroups( + List<ExecutionVertexID> executionVertexIds, + int... groupSizes) { + Map<ExecutionVertexID, ExecutionSlotSharingGroup> executionSlotSharingGroups = new HashMap<>(); + int startIndex = 0; + int nextIndex = 0; + for (int groupSize : groupSizes) { + nextIndex = startIndex + groupSize; + createGroup(executionSlotSharingGroups, executionVertexIds, startIndex, nextIndex); + startIndex = nextIndex; + } + if (nextIndex < executionVertexIds.size()) { + createGroup(executionSlotSharingGroups, executionVertexIds, nextIndex, executionVertexIds.size()); + } + return new TestingSlotSharingStrategy(executionSlotSharingGroups); + } + + private static void createGroup( + Map<ExecutionVertexID, ExecutionSlotSharingGroup> executionSlotSharingGroups, + List<ExecutionVertexID> executionVertexIds, + int startIndex, + int nextIndex) { + ExecutionSlotSharingGroup executionSlotSharingGroup = new ExecutionSlotSharingGroup(); + executionSlotSharingGroup.addVertex(new ExecutionVertexID(new JobVertexID(), 0)); + executionVertexIds.subList(startIndex, nextIndex).forEach(executionVertexId -> { + executionSlotSharingGroup.addVertex(executionVertexId); + executionSlotSharingGroups.put(executionVertexId, executionSlotSharingGroup); + }); + } + } + + private static class TestingSharedSlotProfileRetrieverFactory implements SharedSlotProfileRetrieverFactory { + private final List<Set<ExecutionVertexID>> askedBulks; + private final List<ExecutionSlotSharingGroup> askedGroups; + private final Map<ExecutionSlotSharingGroup, ResourceProfile> resourceProfiles; + private final Map<ExecutionSlotSharingGroup, CompletableFuture<SlotProfile>> slotProfileFutures; + private final boolean completeSlotProfileFutureManually; + + private TestingSharedSlotProfileRetrieverFactory(boolean completeSlotProfileFutureManually) { + this.completeSlotProfileFutureManually = completeSlotProfileFutureManually; + this.askedBulks = new ArrayList<>(); + this.askedGroups = new ArrayList<>(); + this.resourceProfiles = new IdentityHashMap<>(); + this.slotProfileFutures = new IdentityHashMap<>(); + } + + @Override + public SharedSlotProfileRetriever createFromBulk(Set<ExecutionVertexID> bulk) { + askedBulks.add(bulk); + return group -> { + askedGroups.add(group); + CompletableFuture<SlotProfile> slotProfileFuture = + slotProfileFutures.computeIfAbsent(group, g -> new CompletableFuture<>()); + if (!completeSlotProfileFutureManually) { + completeSlotProfileFutureFor(group); + } + return slotProfileFuture; + }; + } + + private void addGroupResourceProfile(ExecutionSlotSharingGroup group, ResourceProfile resourceProfile) { + resourceProfiles.put(group, resourceProfile); + } + + private void completeSlotProfileFutureFor(ExecutionSlotSharingGroup group) { + slotProfileFutures.get(group).complete(SlotProfile.noLocality(resourceProfiles.getOrDefault(group, ResourceProfile.ANY))); + } + + private List<Set<ExecutionVertexID>> getAskedBulks() { + return Collections.unmodifiableList(askedBulks); + } + + private List<ExecutionSlotSharingGroup> getAskedGroups() { + return Collections.unmodifiableList(askedGroups); + } + } + + private static class TestingPhysicalSlot extends SimpleSlotContext implements PhysicalSlot { + private Payload payload; Review comment: should be marked as `@Nullable` ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocatorTest.java ########## @@ -0,0 +1,575 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.clusterframework.types.SlotProfile; +import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway; +import org.apache.flink.runtime.instance.SimpleSlotContext; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobmaster.LogicalSlot; +import org.apache.flink.runtime.jobmaster.SlotRequestId; +import org.apache.flink.runtime.jobmaster.TestingPayload; +import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot; +import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProvider; +import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequest; +import org.apache.flink.runtime.scheduler.SharedSlotProfileRetriever.SharedSlotProfileRetrieverFactory; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.Preconditions; + +import org.junit.Test; +import org.powermock.core.IdentityHashSet; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.IdentityHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.function.BiConsumer; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.notNullValue; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; + +/** + * Test suite for {@link SlotSharingExecutionSlotAllocator}. + */ +public class SlotSharingExecutionSlotAllocatorTest { + @Test + public void testSlotProfileRequestAskedBulkAndGroup() { + AllocationContext context = AllocationContext.newBuilder().setGroups(2).build(); + ExecutionSlotSharingGroup executionSlotSharingGroup = + context.getSlotSharingStrategy().getExecutionSlotSharingGroup(context.getReqIds(0, 1).get(0)); + + context.allocateSlotsFor(0, 2); + + Set<ExecutionVertexID> ids = new HashSet<>(context.getReqIds(0, 2)); + assertThat(context.getSlotProfileRetrieverFactory().getAskedBulks(), containsInAnyOrder(ids)); + assertThat(context.getSlotProfileRetrieverFactory().getAskedGroups(), containsInAnyOrder(executionSlotSharingGroup)); + } + + @Test + public void testSlotRequestCompletionAfterProfileCompletion() { + AllocationContext context = AllocationContext.newBuilder().setGroups(2).completeSlotProfileFutureManually().build(); + ExecutionSlotSharingGroup executionSlotSharingGroup = + context.getSlotSharingStrategy().getExecutionSlotSharingGroup(context.getReqIds(0, 1).get(0)); + + List<SlotExecutionVertexAssignment> executionVertexAssignments = context.allocateSlotsFor(0, 2); + + executionVertexAssignments.forEach(assignment -> assertThat(assignment.getLogicalSlotFuture().isDone(), is(false))); + context.getSlotProfileRetrieverFactory().completeSlotProfileFutureFor(executionSlotSharingGroup); + executionVertexAssignments.forEach(assignment -> assertThat(assignment.getLogicalSlotFuture().isDone(), is(true))); + } + + @Test + public void testSlotRequestProfile() { + ResourceProfile physicalsSlotResourceProfile = ResourceProfile.fromResources(3, 5); + AllocationContext context = AllocationContext.newBuilder().setGroups(2).build(); + ExecutionSlotSharingGroup executionSlotSharingGroup = + context.getSlotSharingStrategy().getExecutionSlotSharingGroup(context.getReqIds(0, 1).get(0)); + context.getSlotProfileRetrieverFactory().addGroupResourceProfile(executionSlotSharingGroup, physicalsSlotResourceProfile); + + context.allocateSlotsFor(0, 2); + + Optional<PhysicalSlotRequest> slotRequest = context.getSlotProvider().getRequests().values().stream().findFirst(); + assertThat(slotRequest.isPresent(), is(true)); + slotRequest.ifPresent(r -> assertThat(r.getSlotProfile().getPhysicalSlotResourceProfile(), is(physicalsSlotResourceProfile))); + } + + @Test + public void testNewAllocatePhysicalSlotForSharedSlot() { + AllocationContext context = AllocationContext.newBuilder().setGroups(2, 2).build(); + + List<SlotExecutionVertexAssignment> executionVertexAssignments = context.allocateSlotsFor(0, 4); + Collection<ExecutionVertexID> assignIds = getAssignIds(executionVertexAssignments); + + assertThat(assignIds, containsInAnyOrder(context.getReqIds(0, 4).toArray())); + assertThat(context.getSlotProvider().getRequests().keySet(), hasSize(2)); + } + + @Test + public void testAllocateLogicalSlotFromAvailableSharedSlot() { + AllocationContext context = AllocationContext.newBuilder().setGroups(2).build(); + + context.allocateSlotsFor(0, 1); + List<SlotExecutionVertexAssignment> executionVertexAssignments = context.allocateSlotsFor(1, 2); + Collection<ExecutionVertexID> assignIds = getAssignIds(executionVertexAssignments); + + // execution 0 from the first allocateSlotsFor call and execution 1 from the second allocateSlotsFor call + // share a slot, therefore only one physical slot allocation should happen + assertThat(assignIds, containsInAnyOrder(context.getReqIds(1, 2).toArray())); + assertThat(context.getSlotProvider().getRequests().keySet(), hasSize(1)); + } + + @Test + public void testDuplicateAllocationDoesNotRecreateLogicalSlotFuture() throws ExecutionException, InterruptedException { + AllocationContext context = AllocationContext.newBuilder().setGroups(1).build(); + + SlotExecutionVertexAssignment assignment1 = context.allocateSlotsFor(0, 1).get(0); + SlotExecutionVertexAssignment assignment2 = context.allocateSlotsFor(0, 1).get(0); + + assertThat(assignment1.getLogicalSlotFuture().get() == assignment2.getLogicalSlotFuture().get(), is(true)); + } + + @Test + public void testFailedPhysicalSlotRequestFailsLogicalSlotFuturesAndRemovesSharedSlot() { + AllocationContext context = AllocationContext + .newBuilder() + .setGroups(1) + .completePhysicalSlotFutureManually() + .build(); + CompletableFuture<LogicalSlot> logicalSlotFuture = context.allocateSlotsFor(0, 1).get(0).getLogicalSlotFuture(); + SlotRequestId slotRequestId = context.getSlotProvider().getRequests().values().stream().findFirst().get().getSlotRequestId(); + + assertThat(logicalSlotFuture.isDone(), is(false)); + context.getSlotProvider().failPhysicalSlotFutureFor(slotRequestId, new Throwable()); + assertThat(logicalSlotFuture.isCompletedExceptionally(), is(true)); + + // next allocation allocates new shared slot + context.allocateSlotsFor(0, 1); + assertThat(context.getSlotProvider().getRequests().keySet(), hasSize(2)); + } + + @Test + public void testSlotWillBeOccupiedIndefinitelyFalse() throws ExecutionException, InterruptedException { + testSlotWillBeOccupiedIndefinitely(false); + } + + @Test + public void testSlotWillBeOccupiedIndefinitelyTrue() throws ExecutionException, InterruptedException { + testSlotWillBeOccupiedIndefinitely(true); + } + + private static void testSlotWillBeOccupiedIndefinitely(boolean slotWillBeOccupiedIndefinitely) throws ExecutionException, InterruptedException { + AllocationContext context = AllocationContext + .newBuilder() + .setGroups(1) + .setSlotWillBeOccupiedIndefinitely(slotWillBeOccupiedIndefinitely) + .build(); + context.allocateSlotsFor(0, 1); + + PhysicalSlotRequest slotRequest = context.getSlotProvider().getRequests().values().stream().findFirst().get(); + assertThat(slotRequest.willSlotBeOccupiedIndefinitely(), is(slotWillBeOccupiedIndefinitely)); + + TestingPhysicalSlot physicalSlot = context.getSlotProvider().getResponses().get(slotRequest.getSlotRequestId()).get(); + assertThat(physicalSlot.getPayload(), notNullValue()); + assertThat(physicalSlot.getPayload().willOccupySlotIndefinitely(), is(slotWillBeOccupiedIndefinitely)); + } + + @Test + public void testReturningLogicalSlotsRemovesSharedSlot() { + testLogicalSlotRequestCancellation( + false, + (context, assignment) -> { + try { + assignment.getLogicalSlotFuture().get().releaseSlot(null); + } catch (InterruptedException | ExecutionException e) { + throw new FlinkRuntimeException("Unexpected", e); + } + }); + } + + @Test + public void testLogicalSlotCancelsPhysicalSlotRequestAndRemovesSharedSlot() { + testLogicalSlotRequestCancellation( + true, + (context, assignment) -> { + context.getAllocator().cancel(assignment.getExecutionVertexId()); + try { + assignment.getLogicalSlotFuture().get(); + fail("THe logical future must finish with the cancellation exception"); + } catch (InterruptedException | ExecutionException e) { + assertThat(e.getCause(), instanceOf(CancellationException.class)); + } + }); + } + + private static void testLogicalSlotRequestCancellation( + boolean completePhysicalSlotFutureManually, + BiConsumer<AllocationContext, SlotExecutionVertexAssignment> cancelAction) { + //if (completePhysicalSlotRequest) { + AllocationContext context = AllocationContext + .newBuilder() + .setGroups(2) + .completePhysicalSlotFutureManually(completePhysicalSlotFutureManually) + .build(); + + List<SlotExecutionVertexAssignment> assignments = context.allocateSlotsFor(0, 2); + assertThat(context.getSlotProvider().getRequests().keySet(), hasSize(1)); + + // cancel only one sharing logical slots + cancelAction.accept(context, assignments.get(0)); + assignments = context.allocateSlotsFor(0, 2); + // there should be no more physical slot allocations, as the first logical slot reuses the previous shared slot + assertThat(context.getSlotProvider().getRequests().keySet(), hasSize(1)); + + // cancel all sharing logical slots + for (SlotExecutionVertexAssignment assignment : assignments) { + cancelAction.accept(context, assignment); + } + SlotRequestId slotRequestId = context.getSlotProvider().getRequests().keySet().stream().findFirst().get(); + assertThat(context.getSlotProvider().getCancelations().containsKey(slotRequestId), is(true)); + + context.allocateSlotsFor(0, 2); + // there should be one more physical slot allocation, as the first allocation should be removed after releasing all logical slots + assertThat(context.getSlotProvider().getRequests().keySet(), hasSize(2)); + } + + @Test + public void testPhysicalSlotReleaseLogicalSlots() throws ExecutionException, InterruptedException { + AllocationContext context = AllocationContext.newBuilder().setGroups(2).build(); + List<SlotExecutionVertexAssignment> assignments = context.allocateSlotsFor(0, 2); + List<TestingPayload> payloads = assignments + .stream() + .map(assignment -> { + TestingPayload payload = new TestingPayload(); + assignment.getLogicalSlotFuture().thenAccept(logicalSlot -> logicalSlot.tryAssignPayload(payload)); + return payload; + }) + .collect(Collectors.toList()); + TestingPhysicalSlot physicalSlot = context.getSlotProvider().getResponses().values().stream().findFirst().get().get(); + + assertThat(payloads.stream().allMatch(payload -> payload.getTerminalStateFuture().isDone()), is(false)); + assertThat(physicalSlot.getPayload(), notNullValue()); + physicalSlot.getPayload().release(new Throwable()); + assertThat(payloads.stream().allMatch(payload -> payload.getTerminalStateFuture().isDone()), is(true)); + } + + private static List<ExecutionVertexID> getAssignIds(Collection<SlotExecutionVertexAssignment> assignments) { + return assignments.stream().map(SlotExecutionVertexAssignment::getExecutionVertexId).collect(Collectors.toList()); + } + + private static class AllocationContext { + private final List<ExecutionVertexSchedulingRequirements> requirements; + private final TestingPhysicalSlotProvider slotProvider; + private final TestingSlotSharingStrategy slotSharingStrategy; + private final SlotSharingExecutionSlotAllocator allocator; + private final TestingSharedSlotProfileRetrieverFactory slotProfileRetrieverFactory; + + AllocationContext( + List<ExecutionVertexSchedulingRequirements> requirements, + TestingPhysicalSlotProvider slotProvider, + TestingSlotSharingStrategy slotSharingStrategy, + SlotSharingExecutionSlotAllocator allocator, + TestingSharedSlotProfileRetrieverFactory slotProfileRetrieverFactory) { + this.requirements = requirements; + this.slotProvider = slotProvider; + this.slotSharingStrategy = slotSharingStrategy; + this.allocator = allocator; + this.slotProfileRetrieverFactory = slotProfileRetrieverFactory; + } + + public SlotSharingExecutionSlotAllocator getAllocator() { + return allocator; + } + + private List<SlotExecutionVertexAssignment> allocateSlotsFor(int start, int end) { + return allocator.allocateSlotsFor(requirements.subList(start, end)); + } + + private TestingSlotSharingStrategy getSlotSharingStrategy() { + return slotSharingStrategy; + } + + private List<ExecutionVertexID> getReqIds(int start, int end) { + return getReqIds(requirements.subList(start, end)); + } + + private TestingPhysicalSlotProvider getSlotProvider() { + return slotProvider; + } + + private TestingSharedSlotProfileRetrieverFactory getSlotProfileRetrieverFactory() { + return slotProfileRetrieverFactory; + } + + private static List<ExecutionVertexID> getReqIds(Collection<ExecutionVertexSchedulingRequirements> requirements) { + return requirements.stream().map(ExecutionVertexSchedulingRequirements::getExecutionVertexId).collect(Collectors.toList()); + } + + static Builder newBuilder() { + return new Builder(); + } + + private static class Builder { + private int[] groups = { 2, 1 }; // 2 executions in the first group, 1 in the second etc + private List<ResourceProfile> resourceProfiles; + private boolean completePhysicalSlotFutureManually; + private boolean completeSlotProfileFutureManually; + private boolean slotWillBeOccupiedIndefinitely; + + private Builder setGroups(int... groups) { + int reqNumber = IntStream.of(groups).sum(); + List<ResourceProfile> resourceProfiles = Collections.nCopies(reqNumber, ResourceProfile.UNKNOWN); + return setGroupsWithResourceProfiles(resourceProfiles, groups); + } + + private Builder setGroupsWithResourceProfiles(List<ResourceProfile> resourceProfiles, int... groups) { + Preconditions.checkArgument(resourceProfiles.size() == IntStream.of(groups).sum()); + this.resourceProfiles = resourceProfiles; + this.groups = groups; + return this; + } + + private Builder completePhysicalSlotFutureManually() { + completePhysicalSlotFutureManually(true); + return this; + } + + private Builder completePhysicalSlotFutureManually(boolean value) { + this.completePhysicalSlotFutureManually = value; + return this; + } + + private Builder completeSlotProfileFutureManually() { + this.completeSlotProfileFutureManually = true; + return this; + } + + private Builder setSlotWillBeOccupiedIndefinitely(boolean slotWillBeOccupiedIndefinitely) { + this.slotWillBeOccupiedIndefinitely = slotWillBeOccupiedIndefinitely; + return this; + } + + private AllocationContext build() { + List<ExecutionVertexSchedulingRequirements> requirements = createSchedulingRequirements(); + TestingPhysicalSlotProvider slotProvider = new TestingPhysicalSlotProvider(completePhysicalSlotFutureManually); + TestingSharedSlotProfileRetrieverFactory sharedSlotProfileRetrieverFactory = + new TestingSharedSlotProfileRetrieverFactory(completeSlotProfileFutureManually); + TestingSlotSharingStrategy slotSharingStrategy = TestingSlotSharingStrategy.createWithGroups(getReqIds(requirements), groups); + SlotSharingExecutionSlotAllocator allocator = new SlotSharingExecutionSlotAllocator( + slotProvider, + slotWillBeOccupiedIndefinitely, + slotSharingStrategy, + sharedSlotProfileRetrieverFactory); + return new AllocationContext( + requirements, + slotProvider, + slotSharingStrategy, + allocator, + sharedSlotProfileRetrieverFactory); + } + + private List<ExecutionVertexSchedulingRequirements> createSchedulingRequirements() { + return resourceProfiles + .stream() + .map(resourceProfile -> + new ExecutionVertexSchedulingRequirements + .Builder() + .withExecutionVertexId(new ExecutionVertexID(new JobVertexID(), 0)) + .withTaskResourceProfile(resourceProfile) + .withPhysicalSlotResourceProfile(resourceProfile) // not used + .build()) + .collect(Collectors.toList()); + } + } + } + + private static class TestingPhysicalSlotProvider implements PhysicalSlotProvider { + private final Map<SlotRequestId, PhysicalSlotRequest> requests; + private final Map<SlotRequestId, CompletableFuture<TestingPhysicalSlot>> responses; + private final Map<SlotRequestId, Throwable> cancelations; + private final boolean completePhysicalSlotFutureManually; + + private TestingPhysicalSlotProvider(boolean completePhysicalSlotFutureManually) { + this.completePhysicalSlotFutureManually = completePhysicalSlotFutureManually; + this.requests = new HashMap<>(); + this.responses = new HashMap<>(); + this.cancelations = new HashMap<>(); + } + + @Override + public CompletableFuture<PhysicalSlotRequest.Result> allocatePhysicalSlot(PhysicalSlotRequest physicalSlotRequest) { + SlotRequestId slotRequestId = physicalSlotRequest.getSlotRequestId(); + requests.put(slotRequestId, physicalSlotRequest); + CompletableFuture<TestingPhysicalSlot> resultFuture = new CompletableFuture<>(); + responses.put(slotRequestId, resultFuture); + if (!completePhysicalSlotFutureManually) { + completePhysicalSlotFutureFor(slotRequestId); + } + return resultFuture.thenApply(physicalSlot -> new PhysicalSlotRequest.Result(slotRequestId, physicalSlot)); + } + + private void completePhysicalSlotFutureFor(SlotRequestId slotRequestId) { + ResourceProfile resourceProfile = requests.get(slotRequestId).getSlotProfile().getPhysicalSlotResourceProfile(); + TestingPhysicalSlot physicalSlot = new TestingPhysicalSlot(resourceProfile); + responses.get(slotRequestId).complete(physicalSlot); + } + + private void failPhysicalSlotFutureFor(SlotRequestId slotRequestId, Throwable cause) { + responses.get(slotRequestId).completeExceptionally(cause); + } + + @Override + public void cancelSlotRequest(SlotRequestId slotRequestId, Throwable cause) { + cancelations.put(slotRequestId, cause); + } + + private Map<SlotRequestId, PhysicalSlotRequest> getRequests() { + return Collections.unmodifiableMap(requests); + } + + private Map<SlotRequestId, CompletableFuture<TestingPhysicalSlot>> getResponses() { + return Collections.unmodifiableMap(responses); + } + + private Map<SlotRequestId, Throwable> getCancelations() { + return Collections.unmodifiableMap(cancelations); + } + } + + private static class TestingSlotSharingStrategy implements SlotSharingStrategy { + private final Map<ExecutionVertexID, ExecutionSlotSharingGroup> executionSlotSharingGroups; + + private TestingSlotSharingStrategy(Map<ExecutionVertexID, ExecutionSlotSharingGroup> executionSlotSharingGroups) { + this.executionSlotSharingGroups = executionSlotSharingGroups; + } + + @Override + public ExecutionSlotSharingGroup getExecutionSlotSharingGroup(ExecutionVertexID executionVertexId) { + return executionSlotSharingGroups.get(executionVertexId); + } + + @Override + public Set<ExecutionSlotSharingGroup> getExecutionSlotSharingGroups() { + Set<ExecutionSlotSharingGroup> groups = new IdentityHashSet<>(); + groups.addAll(executionSlotSharingGroups.values()); + return groups; + } + + private static TestingSlotSharingStrategy createWithGroups( + List<ExecutionVertexID> executionVertexIds, + int... groupSizes) { + Map<ExecutionVertexID, ExecutionSlotSharingGroup> executionSlotSharingGroups = new HashMap<>(); + int startIndex = 0; + int nextIndex = 0; + for (int groupSize : groupSizes) { + nextIndex = startIndex + groupSize; + createGroup(executionSlotSharingGroups, executionVertexIds, startIndex, nextIndex); + startIndex = nextIndex; + } + if (nextIndex < executionVertexIds.size()) { + createGroup(executionSlotSharingGroups, executionVertexIds, nextIndex, executionVertexIds.size()); + } + return new TestingSlotSharingStrategy(executionSlotSharingGroups); + } + + private static void createGroup( + Map<ExecutionVertexID, ExecutionSlotSharingGroup> executionSlotSharingGroups, + List<ExecutionVertexID> executionVertexIds, + int startIndex, + int nextIndex) { + ExecutionSlotSharingGroup executionSlotSharingGroup = new ExecutionSlotSharingGroup(); + executionSlotSharingGroup.addVertex(new ExecutionVertexID(new JobVertexID(), 0)); Review comment: I guess this statement is added by mistake? All tests can pass without it. ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocatorTest.java ########## @@ -0,0 +1,575 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.clusterframework.types.SlotProfile; +import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway; +import org.apache.flink.runtime.instance.SimpleSlotContext; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobmaster.LogicalSlot; +import org.apache.flink.runtime.jobmaster.SlotRequestId; +import org.apache.flink.runtime.jobmaster.TestingPayload; +import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot; +import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProvider; +import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequest; +import org.apache.flink.runtime.scheduler.SharedSlotProfileRetriever.SharedSlotProfileRetrieverFactory; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.Preconditions; + +import org.junit.Test; +import org.powermock.core.IdentityHashSet; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.IdentityHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.function.BiConsumer; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.notNullValue; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; + +/** + * Test suite for {@link SlotSharingExecutionSlotAllocator}. + */ +public class SlotSharingExecutionSlotAllocatorTest { + @Test + public void testSlotProfileRequestAskedBulkAndGroup() { + AllocationContext context = AllocationContext.newBuilder().setGroups(2).build(); + ExecutionSlotSharingGroup executionSlotSharingGroup = + context.getSlotSharingStrategy().getExecutionSlotSharingGroup(context.getReqIds(0, 1).get(0)); + + context.allocateSlotsFor(0, 2); + + Set<ExecutionVertexID> ids = new HashSet<>(context.getReqIds(0, 2)); + assertThat(context.getSlotProfileRetrieverFactory().getAskedBulks(), containsInAnyOrder(ids)); + assertThat(context.getSlotProfileRetrieverFactory().getAskedGroups(), containsInAnyOrder(executionSlotSharingGroup)); + } + + @Test + public void testSlotRequestCompletionAfterProfileCompletion() { + AllocationContext context = AllocationContext.newBuilder().setGroups(2).completeSlotProfileFutureManually().build(); + ExecutionSlotSharingGroup executionSlotSharingGroup = + context.getSlotSharingStrategy().getExecutionSlotSharingGroup(context.getReqIds(0, 1).get(0)); + + List<SlotExecutionVertexAssignment> executionVertexAssignments = context.allocateSlotsFor(0, 2); + + executionVertexAssignments.forEach(assignment -> assertThat(assignment.getLogicalSlotFuture().isDone(), is(false))); + context.getSlotProfileRetrieverFactory().completeSlotProfileFutureFor(executionSlotSharingGroup); + executionVertexAssignments.forEach(assignment -> assertThat(assignment.getLogicalSlotFuture().isDone(), is(true))); + } + + @Test + public void testSlotRequestProfile() { + ResourceProfile physicalsSlotResourceProfile = ResourceProfile.fromResources(3, 5); + AllocationContext context = AllocationContext.newBuilder().setGroups(2).build(); + ExecutionSlotSharingGroup executionSlotSharingGroup = + context.getSlotSharingStrategy().getExecutionSlotSharingGroup(context.getReqIds(0, 1).get(0)); + context.getSlotProfileRetrieverFactory().addGroupResourceProfile(executionSlotSharingGroup, physicalsSlotResourceProfile); + + context.allocateSlotsFor(0, 2); + + Optional<PhysicalSlotRequest> slotRequest = context.getSlotProvider().getRequests().values().stream().findFirst(); + assertThat(slotRequest.isPresent(), is(true)); + slotRequest.ifPresent(r -> assertThat(r.getSlotProfile().getPhysicalSlotResourceProfile(), is(physicalsSlotResourceProfile))); + } + + @Test + public void testNewAllocatePhysicalSlotForSharedSlot() { + AllocationContext context = AllocationContext.newBuilder().setGroups(2, 2).build(); + + List<SlotExecutionVertexAssignment> executionVertexAssignments = context.allocateSlotsFor(0, 4); + Collection<ExecutionVertexID> assignIds = getAssignIds(executionVertexAssignments); + + assertThat(assignIds, containsInAnyOrder(context.getReqIds(0, 4).toArray())); + assertThat(context.getSlotProvider().getRequests().keySet(), hasSize(2)); + } + + @Test + public void testAllocateLogicalSlotFromAvailableSharedSlot() { + AllocationContext context = AllocationContext.newBuilder().setGroups(2).build(); + + context.allocateSlotsFor(0, 1); + List<SlotExecutionVertexAssignment> executionVertexAssignments = context.allocateSlotsFor(1, 2); + Collection<ExecutionVertexID> assignIds = getAssignIds(executionVertexAssignments); + + // execution 0 from the first allocateSlotsFor call and execution 1 from the second allocateSlotsFor call + // share a slot, therefore only one physical slot allocation should happen + assertThat(assignIds, containsInAnyOrder(context.getReqIds(1, 2).toArray())); + assertThat(context.getSlotProvider().getRequests().keySet(), hasSize(1)); + } + + @Test + public void testDuplicateAllocationDoesNotRecreateLogicalSlotFuture() throws ExecutionException, InterruptedException { + AllocationContext context = AllocationContext.newBuilder().setGroups(1).build(); + + SlotExecutionVertexAssignment assignment1 = context.allocateSlotsFor(0, 1).get(0); + SlotExecutionVertexAssignment assignment2 = context.allocateSlotsFor(0, 1).get(0); + + assertThat(assignment1.getLogicalSlotFuture().get() == assignment2.getLogicalSlotFuture().get(), is(true)); + } + + @Test + public void testFailedPhysicalSlotRequestFailsLogicalSlotFuturesAndRemovesSharedSlot() { + AllocationContext context = AllocationContext + .newBuilder() + .setGroups(1) + .completePhysicalSlotFutureManually() + .build(); + CompletableFuture<LogicalSlot> logicalSlotFuture = context.allocateSlotsFor(0, 1).get(0).getLogicalSlotFuture(); + SlotRequestId slotRequestId = context.getSlotProvider().getRequests().values().stream().findFirst().get().getSlotRequestId(); + + assertThat(logicalSlotFuture.isDone(), is(false)); + context.getSlotProvider().failPhysicalSlotFutureFor(slotRequestId, new Throwable()); + assertThat(logicalSlotFuture.isCompletedExceptionally(), is(true)); + + // next allocation allocates new shared slot + context.allocateSlotsFor(0, 1); + assertThat(context.getSlotProvider().getRequests().keySet(), hasSize(2)); + } + + @Test + public void testSlotWillBeOccupiedIndefinitelyFalse() throws ExecutionException, InterruptedException { + testSlotWillBeOccupiedIndefinitely(false); + } + + @Test + public void testSlotWillBeOccupiedIndefinitelyTrue() throws ExecutionException, InterruptedException { + testSlotWillBeOccupiedIndefinitely(true); + } + + private static void testSlotWillBeOccupiedIndefinitely(boolean slotWillBeOccupiedIndefinitely) throws ExecutionException, InterruptedException { + AllocationContext context = AllocationContext + .newBuilder() + .setGroups(1) + .setSlotWillBeOccupiedIndefinitely(slotWillBeOccupiedIndefinitely) + .build(); + context.allocateSlotsFor(0, 1); + + PhysicalSlotRequest slotRequest = context.getSlotProvider().getRequests().values().stream().findFirst().get(); + assertThat(slotRequest.willSlotBeOccupiedIndefinitely(), is(slotWillBeOccupiedIndefinitely)); + + TestingPhysicalSlot physicalSlot = context.getSlotProvider().getResponses().get(slotRequest.getSlotRequestId()).get(); + assertThat(physicalSlot.getPayload(), notNullValue()); + assertThat(physicalSlot.getPayload().willOccupySlotIndefinitely(), is(slotWillBeOccupiedIndefinitely)); + } + + @Test + public void testReturningLogicalSlotsRemovesSharedSlot() { + testLogicalSlotRequestCancellation( + false, + (context, assignment) -> { + try { + assignment.getLogicalSlotFuture().get().releaseSlot(null); + } catch (InterruptedException | ExecutionException e) { + throw new FlinkRuntimeException("Unexpected", e); + } + }); + } + + @Test + public void testLogicalSlotCancelsPhysicalSlotRequestAndRemovesSharedSlot() { + testLogicalSlotRequestCancellation( + true, + (context, assignment) -> { + context.getAllocator().cancel(assignment.getExecutionVertexId()); + try { + assignment.getLogicalSlotFuture().get(); + fail("THe logical future must finish with the cancellation exception"); + } catch (InterruptedException | ExecutionException e) { + assertThat(e.getCause(), instanceOf(CancellationException.class)); + } + }); + } + + private static void testLogicalSlotRequestCancellation( + boolean completePhysicalSlotFutureManually, + BiConsumer<AllocationContext, SlotExecutionVertexAssignment> cancelAction) { + //if (completePhysicalSlotRequest) { + AllocationContext context = AllocationContext + .newBuilder() + .setGroups(2) + .completePhysicalSlotFutureManually(completePhysicalSlotFutureManually) + .build(); + + List<SlotExecutionVertexAssignment> assignments = context.allocateSlotsFor(0, 2); + assertThat(context.getSlotProvider().getRequests().keySet(), hasSize(1)); + + // cancel only one sharing logical slots + cancelAction.accept(context, assignments.get(0)); + assignments = context.allocateSlotsFor(0, 2); + // there should be no more physical slot allocations, as the first logical slot reuses the previous shared slot + assertThat(context.getSlotProvider().getRequests().keySet(), hasSize(1)); + + // cancel all sharing logical slots + for (SlotExecutionVertexAssignment assignment : assignments) { + cancelAction.accept(context, assignment); + } + SlotRequestId slotRequestId = context.getSlotProvider().getRequests().keySet().stream().findFirst().get(); + assertThat(context.getSlotProvider().getCancelations().containsKey(slotRequestId), is(true)); + + context.allocateSlotsFor(0, 2); + // there should be one more physical slot allocation, as the first allocation should be removed after releasing all logical slots + assertThat(context.getSlotProvider().getRequests().keySet(), hasSize(2)); + } + + @Test + public void testPhysicalSlotReleaseLogicalSlots() throws ExecutionException, InterruptedException { + AllocationContext context = AllocationContext.newBuilder().setGroups(2).build(); + List<SlotExecutionVertexAssignment> assignments = context.allocateSlotsFor(0, 2); + List<TestingPayload> payloads = assignments + .stream() + .map(assignment -> { + TestingPayload payload = new TestingPayload(); + assignment.getLogicalSlotFuture().thenAccept(logicalSlot -> logicalSlot.tryAssignPayload(payload)); + return payload; + }) + .collect(Collectors.toList()); + TestingPhysicalSlot physicalSlot = context.getSlotProvider().getResponses().values().stream().findFirst().get().get(); + + assertThat(payloads.stream().allMatch(payload -> payload.getTerminalStateFuture().isDone()), is(false)); + assertThat(physicalSlot.getPayload(), notNullValue()); + physicalSlot.getPayload().release(new Throwable()); + assertThat(payloads.stream().allMatch(payload -> payload.getTerminalStateFuture().isDone()), is(true)); + } + + private static List<ExecutionVertexID> getAssignIds(Collection<SlotExecutionVertexAssignment> assignments) { + return assignments.stream().map(SlotExecutionVertexAssignment::getExecutionVertexId).collect(Collectors.toList()); + } + + private static class AllocationContext { + private final List<ExecutionVertexSchedulingRequirements> requirements; + private final TestingPhysicalSlotProvider slotProvider; + private final TestingSlotSharingStrategy slotSharingStrategy; + private final SlotSharingExecutionSlotAllocator allocator; + private final TestingSharedSlotProfileRetrieverFactory slotProfileRetrieverFactory; + + AllocationContext( + List<ExecutionVertexSchedulingRequirements> requirements, + TestingPhysicalSlotProvider slotProvider, + TestingSlotSharingStrategy slotSharingStrategy, + SlotSharingExecutionSlotAllocator allocator, + TestingSharedSlotProfileRetrieverFactory slotProfileRetrieverFactory) { + this.requirements = requirements; + this.slotProvider = slotProvider; + this.slotSharingStrategy = slotSharingStrategy; + this.allocator = allocator; + this.slotProfileRetrieverFactory = slotProfileRetrieverFactory; + } + + public SlotSharingExecutionSlotAllocator getAllocator() { Review comment: can be `private` ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocatorTest.java ########## @@ -0,0 +1,575 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.clusterframework.types.SlotProfile; +import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway; +import org.apache.flink.runtime.instance.SimpleSlotContext; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobmaster.LogicalSlot; +import org.apache.flink.runtime.jobmaster.SlotRequestId; +import org.apache.flink.runtime.jobmaster.TestingPayload; +import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot; +import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProvider; +import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequest; +import org.apache.flink.runtime.scheduler.SharedSlotProfileRetriever.SharedSlotProfileRetrieverFactory; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.Preconditions; + +import org.junit.Test; +import org.powermock.core.IdentityHashSet; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.IdentityHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.function.BiConsumer; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.notNullValue; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; + +/** + * Test suite for {@link SlotSharingExecutionSlotAllocator}. + */ +public class SlotSharingExecutionSlotAllocatorTest { + @Test + public void testSlotProfileRequestAskedBulkAndGroup() { + AllocationContext context = AllocationContext.newBuilder().setGroups(2).build(); + ExecutionSlotSharingGroup executionSlotSharingGroup = + context.getSlotSharingStrategy().getExecutionSlotSharingGroup(context.getReqIds(0, 1).get(0)); + + context.allocateSlotsFor(0, 2); + + Set<ExecutionVertexID> ids = new HashSet<>(context.getReqIds(0, 2)); + assertThat(context.getSlotProfileRetrieverFactory().getAskedBulks(), containsInAnyOrder(ids)); + assertThat(context.getSlotProfileRetrieverFactory().getAskedGroups(), containsInAnyOrder(executionSlotSharingGroup)); + } + + @Test + public void testSlotRequestCompletionAfterProfileCompletion() { + AllocationContext context = AllocationContext.newBuilder().setGroups(2).completeSlotProfileFutureManually().build(); + ExecutionSlotSharingGroup executionSlotSharingGroup = + context.getSlotSharingStrategy().getExecutionSlotSharingGroup(context.getReqIds(0, 1).get(0)); + + List<SlotExecutionVertexAssignment> executionVertexAssignments = context.allocateSlotsFor(0, 2); + + executionVertexAssignments.forEach(assignment -> assertThat(assignment.getLogicalSlotFuture().isDone(), is(false))); + context.getSlotProfileRetrieverFactory().completeSlotProfileFutureFor(executionSlotSharingGroup); + executionVertexAssignments.forEach(assignment -> assertThat(assignment.getLogicalSlotFuture().isDone(), is(true))); + } + + @Test + public void testSlotRequestProfile() { + ResourceProfile physicalsSlotResourceProfile = ResourceProfile.fromResources(3, 5); + AllocationContext context = AllocationContext.newBuilder().setGroups(2).build(); + ExecutionSlotSharingGroup executionSlotSharingGroup = + context.getSlotSharingStrategy().getExecutionSlotSharingGroup(context.getReqIds(0, 1).get(0)); + context.getSlotProfileRetrieverFactory().addGroupResourceProfile(executionSlotSharingGroup, physicalsSlotResourceProfile); + + context.allocateSlotsFor(0, 2); + + Optional<PhysicalSlotRequest> slotRequest = context.getSlotProvider().getRequests().values().stream().findFirst(); + assertThat(slotRequest.isPresent(), is(true)); + slotRequest.ifPresent(r -> assertThat(r.getSlotProfile().getPhysicalSlotResourceProfile(), is(physicalsSlotResourceProfile))); + } + + @Test + public void testNewAllocatePhysicalSlotForSharedSlot() { + AllocationContext context = AllocationContext.newBuilder().setGroups(2, 2).build(); + + List<SlotExecutionVertexAssignment> executionVertexAssignments = context.allocateSlotsFor(0, 4); + Collection<ExecutionVertexID> assignIds = getAssignIds(executionVertexAssignments); + + assertThat(assignIds, containsInAnyOrder(context.getReqIds(0, 4).toArray())); + assertThat(context.getSlotProvider().getRequests().keySet(), hasSize(2)); + } + + @Test + public void testAllocateLogicalSlotFromAvailableSharedSlot() { + AllocationContext context = AllocationContext.newBuilder().setGroups(2).build(); + + context.allocateSlotsFor(0, 1); + List<SlotExecutionVertexAssignment> executionVertexAssignments = context.allocateSlotsFor(1, 2); + Collection<ExecutionVertexID> assignIds = getAssignIds(executionVertexAssignments); + + // execution 0 from the first allocateSlotsFor call and execution 1 from the second allocateSlotsFor call + // share a slot, therefore only one physical slot allocation should happen + assertThat(assignIds, containsInAnyOrder(context.getReqIds(1, 2).toArray())); + assertThat(context.getSlotProvider().getRequests().keySet(), hasSize(1)); + } + + @Test + public void testDuplicateAllocationDoesNotRecreateLogicalSlotFuture() throws ExecutionException, InterruptedException { + AllocationContext context = AllocationContext.newBuilder().setGroups(1).build(); + + SlotExecutionVertexAssignment assignment1 = context.allocateSlotsFor(0, 1).get(0); + SlotExecutionVertexAssignment assignment2 = context.allocateSlotsFor(0, 1).get(0); + + assertThat(assignment1.getLogicalSlotFuture().get() == assignment2.getLogicalSlotFuture().get(), is(true)); + } + + @Test + public void testFailedPhysicalSlotRequestFailsLogicalSlotFuturesAndRemovesSharedSlot() { + AllocationContext context = AllocationContext + .newBuilder() + .setGroups(1) + .completePhysicalSlotFutureManually() + .build(); + CompletableFuture<LogicalSlot> logicalSlotFuture = context.allocateSlotsFor(0, 1).get(0).getLogicalSlotFuture(); + SlotRequestId slotRequestId = context.getSlotProvider().getRequests().values().stream().findFirst().get().getSlotRequestId(); + + assertThat(logicalSlotFuture.isDone(), is(false)); + context.getSlotProvider().failPhysicalSlotFutureFor(slotRequestId, new Throwable()); + assertThat(logicalSlotFuture.isCompletedExceptionally(), is(true)); + + // next allocation allocates new shared slot + context.allocateSlotsFor(0, 1); + assertThat(context.getSlotProvider().getRequests().keySet(), hasSize(2)); + } + + @Test + public void testSlotWillBeOccupiedIndefinitelyFalse() throws ExecutionException, InterruptedException { + testSlotWillBeOccupiedIndefinitely(false); + } + + @Test + public void testSlotWillBeOccupiedIndefinitelyTrue() throws ExecutionException, InterruptedException { + testSlotWillBeOccupiedIndefinitely(true); + } + + private static void testSlotWillBeOccupiedIndefinitely(boolean slotWillBeOccupiedIndefinitely) throws ExecutionException, InterruptedException { + AllocationContext context = AllocationContext + .newBuilder() + .setGroups(1) + .setSlotWillBeOccupiedIndefinitely(slotWillBeOccupiedIndefinitely) + .build(); + context.allocateSlotsFor(0, 1); + + PhysicalSlotRequest slotRequest = context.getSlotProvider().getRequests().values().stream().findFirst().get(); + assertThat(slotRequest.willSlotBeOccupiedIndefinitely(), is(slotWillBeOccupiedIndefinitely)); + + TestingPhysicalSlot physicalSlot = context.getSlotProvider().getResponses().get(slotRequest.getSlotRequestId()).get(); + assertThat(physicalSlot.getPayload(), notNullValue()); + assertThat(physicalSlot.getPayload().willOccupySlotIndefinitely(), is(slotWillBeOccupiedIndefinitely)); + } + + @Test + public void testReturningLogicalSlotsRemovesSharedSlot() { + testLogicalSlotRequestCancellation( + false, + (context, assignment) -> { + try { + assignment.getLogicalSlotFuture().get().releaseSlot(null); + } catch (InterruptedException | ExecutionException e) { + throw new FlinkRuntimeException("Unexpected", e); + } + }); + } + + @Test + public void testLogicalSlotCancelsPhysicalSlotRequestAndRemovesSharedSlot() { + testLogicalSlotRequestCancellation( + true, + (context, assignment) -> { + context.getAllocator().cancel(assignment.getExecutionVertexId()); + try { + assignment.getLogicalSlotFuture().get(); + fail("THe logical future must finish with the cancellation exception"); + } catch (InterruptedException | ExecutionException e) { + assertThat(e.getCause(), instanceOf(CancellationException.class)); + } + }); + } + + private static void testLogicalSlotRequestCancellation( + boolean completePhysicalSlotFutureManually, + BiConsumer<AllocationContext, SlotExecutionVertexAssignment> cancelAction) { + //if (completePhysicalSlotRequest) { + AllocationContext context = AllocationContext + .newBuilder() + .setGroups(2) + .completePhysicalSlotFutureManually(completePhysicalSlotFutureManually) + .build(); + + List<SlotExecutionVertexAssignment> assignments = context.allocateSlotsFor(0, 2); + assertThat(context.getSlotProvider().getRequests().keySet(), hasSize(1)); + + // cancel only one sharing logical slots + cancelAction.accept(context, assignments.get(0)); + assignments = context.allocateSlotsFor(0, 2); + // there should be no more physical slot allocations, as the first logical slot reuses the previous shared slot + assertThat(context.getSlotProvider().getRequests().keySet(), hasSize(1)); + + // cancel all sharing logical slots + for (SlotExecutionVertexAssignment assignment : assignments) { + cancelAction.accept(context, assignment); + } + SlotRequestId slotRequestId = context.getSlotProvider().getRequests().keySet().stream().findFirst().get(); + assertThat(context.getSlotProvider().getCancelations().containsKey(slotRequestId), is(true)); + + context.allocateSlotsFor(0, 2); + // there should be one more physical slot allocation, as the first allocation should be removed after releasing all logical slots + assertThat(context.getSlotProvider().getRequests().keySet(), hasSize(2)); + } + + @Test + public void testPhysicalSlotReleaseLogicalSlots() throws ExecutionException, InterruptedException { + AllocationContext context = AllocationContext.newBuilder().setGroups(2).build(); + List<SlotExecutionVertexAssignment> assignments = context.allocateSlotsFor(0, 2); + List<TestingPayload> payloads = assignments + .stream() + .map(assignment -> { + TestingPayload payload = new TestingPayload(); + assignment.getLogicalSlotFuture().thenAccept(logicalSlot -> logicalSlot.tryAssignPayload(payload)); + return payload; + }) + .collect(Collectors.toList()); + TestingPhysicalSlot physicalSlot = context.getSlotProvider().getResponses().values().stream().findFirst().get().get(); + + assertThat(payloads.stream().allMatch(payload -> payload.getTerminalStateFuture().isDone()), is(false)); + assertThat(physicalSlot.getPayload(), notNullValue()); + physicalSlot.getPayload().release(new Throwable()); + assertThat(payloads.stream().allMatch(payload -> payload.getTerminalStateFuture().isDone()), is(true)); + } + + private static List<ExecutionVertexID> getAssignIds(Collection<SlotExecutionVertexAssignment> assignments) { + return assignments.stream().map(SlotExecutionVertexAssignment::getExecutionVertexId).collect(Collectors.toList()); + } + + private static class AllocationContext { + private final List<ExecutionVertexSchedulingRequirements> requirements; + private final TestingPhysicalSlotProvider slotProvider; + private final TestingSlotSharingStrategy slotSharingStrategy; + private final SlotSharingExecutionSlotAllocator allocator; + private final TestingSharedSlotProfileRetrieverFactory slotProfileRetrieverFactory; + + AllocationContext( + List<ExecutionVertexSchedulingRequirements> requirements, + TestingPhysicalSlotProvider slotProvider, + TestingSlotSharingStrategy slotSharingStrategy, + SlotSharingExecutionSlotAllocator allocator, + TestingSharedSlotProfileRetrieverFactory slotProfileRetrieverFactory) { + this.requirements = requirements; + this.slotProvider = slotProvider; + this.slotSharingStrategy = slotSharingStrategy; + this.allocator = allocator; + this.slotProfileRetrieverFactory = slotProfileRetrieverFactory; + } + + public SlotSharingExecutionSlotAllocator getAllocator() { + return allocator; + } + + private List<SlotExecutionVertexAssignment> allocateSlotsFor(int start, int end) { + return allocator.allocateSlotsFor(requirements.subList(start, end)); + } + + private TestingSlotSharingStrategy getSlotSharingStrategy() { + return slotSharingStrategy; + } + + private List<ExecutionVertexID> getReqIds(int start, int end) { + return getReqIds(requirements.subList(start, end)); + } + + private TestingPhysicalSlotProvider getSlotProvider() { + return slotProvider; + } + + private TestingSharedSlotProfileRetrieverFactory getSlotProfileRetrieverFactory() { + return slotProfileRetrieverFactory; + } + + private static List<ExecutionVertexID> getReqIds(Collection<ExecutionVertexSchedulingRequirements> requirements) { + return requirements.stream().map(ExecutionVertexSchedulingRequirements::getExecutionVertexId).collect(Collectors.toList()); + } + + static Builder newBuilder() { + return new Builder(); + } + + private static class Builder { + private int[] groups = { 2, 1 }; // 2 executions in the first group, 1 in the second etc + private List<ResourceProfile> resourceProfiles; + private boolean completePhysicalSlotFutureManually; + private boolean completeSlotProfileFutureManually; + private boolean slotWillBeOccupiedIndefinitely; + + private Builder setGroups(int... groups) { + int reqNumber = IntStream.of(groups).sum(); + List<ResourceProfile> resourceProfiles = Collections.nCopies(reqNumber, ResourceProfile.UNKNOWN); + return setGroupsWithResourceProfiles(resourceProfiles, groups); + } + + private Builder setGroupsWithResourceProfiles(List<ResourceProfile> resourceProfiles, int... groups) { + Preconditions.checkArgument(resourceProfiles.size() == IntStream.of(groups).sum()); + this.resourceProfiles = resourceProfiles; + this.groups = groups; + return this; + } + + private Builder completePhysicalSlotFutureManually() { + completePhysicalSlotFutureManually(true); + return this; + } + + private Builder completePhysicalSlotFutureManually(boolean value) { + this.completePhysicalSlotFutureManually = value; + return this; + } + + private Builder completeSlotProfileFutureManually() { + this.completeSlotProfileFutureManually = true; + return this; + } + + private Builder setSlotWillBeOccupiedIndefinitely(boolean slotWillBeOccupiedIndefinitely) { + this.slotWillBeOccupiedIndefinitely = slotWillBeOccupiedIndefinitely; + return this; + } + + private AllocationContext build() { + List<ExecutionVertexSchedulingRequirements> requirements = createSchedulingRequirements(); + TestingPhysicalSlotProvider slotProvider = new TestingPhysicalSlotProvider(completePhysicalSlotFutureManually); + TestingSharedSlotProfileRetrieverFactory sharedSlotProfileRetrieverFactory = + new TestingSharedSlotProfileRetrieverFactory(completeSlotProfileFutureManually); + TestingSlotSharingStrategy slotSharingStrategy = TestingSlotSharingStrategy.createWithGroups(getReqIds(requirements), groups); + SlotSharingExecutionSlotAllocator allocator = new SlotSharingExecutionSlotAllocator( + slotProvider, + slotWillBeOccupiedIndefinitely, + slotSharingStrategy, + sharedSlotProfileRetrieverFactory); + return new AllocationContext( + requirements, + slotProvider, + slotSharingStrategy, + allocator, + sharedSlotProfileRetrieverFactory); + } + + private List<ExecutionVertexSchedulingRequirements> createSchedulingRequirements() { + return resourceProfiles + .stream() + .map(resourceProfile -> + new ExecutionVertexSchedulingRequirements + .Builder() + .withExecutionVertexId(new ExecutionVertexID(new JobVertexID(), 0)) + .withTaskResourceProfile(resourceProfile) Review comment: Both `taskResourceProfile` and `physicalSlotResourceProfile` of `ExecutionVertexSchedulingRequirements` are not used in `SlotSharingExecutionSlotAllocator`. I think we do not need to set them and therefore no need to create the `resourceProfiles`. ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocatorTest.java ########## @@ -0,0 +1,575 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.clusterframework.types.SlotProfile; +import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway; +import org.apache.flink.runtime.instance.SimpleSlotContext; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobmaster.LogicalSlot; +import org.apache.flink.runtime.jobmaster.SlotRequestId; +import org.apache.flink.runtime.jobmaster.TestingPayload; +import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot; +import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProvider; +import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequest; +import org.apache.flink.runtime.scheduler.SharedSlotProfileRetriever.SharedSlotProfileRetrieverFactory; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.Preconditions; + +import org.junit.Test; +import org.powermock.core.IdentityHashSet; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.IdentityHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.function.BiConsumer; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.notNullValue; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; + +/** + * Test suite for {@link SlotSharingExecutionSlotAllocator}. + */ +public class SlotSharingExecutionSlotAllocatorTest { + @Test + public void testSlotProfileRequestAskedBulkAndGroup() { + AllocationContext context = AllocationContext.newBuilder().setGroups(2).build(); + ExecutionSlotSharingGroup executionSlotSharingGroup = + context.getSlotSharingStrategy().getExecutionSlotSharingGroup(context.getReqIds(0, 1).get(0)); + + context.allocateSlotsFor(0, 2); + + Set<ExecutionVertexID> ids = new HashSet<>(context.getReqIds(0, 2)); + assertThat(context.getSlotProfileRetrieverFactory().getAskedBulks(), containsInAnyOrder(ids)); + assertThat(context.getSlotProfileRetrieverFactory().getAskedGroups(), containsInAnyOrder(executionSlotSharingGroup)); + } + + @Test + public void testSlotRequestCompletionAfterProfileCompletion() { + AllocationContext context = AllocationContext.newBuilder().setGroups(2).completeSlotProfileFutureManually().build(); + ExecutionSlotSharingGroup executionSlotSharingGroup = + context.getSlotSharingStrategy().getExecutionSlotSharingGroup(context.getReqIds(0, 1).get(0)); + + List<SlotExecutionVertexAssignment> executionVertexAssignments = context.allocateSlotsFor(0, 2); + + executionVertexAssignments.forEach(assignment -> assertThat(assignment.getLogicalSlotFuture().isDone(), is(false))); + context.getSlotProfileRetrieverFactory().completeSlotProfileFutureFor(executionSlotSharingGroup); + executionVertexAssignments.forEach(assignment -> assertThat(assignment.getLogicalSlotFuture().isDone(), is(true))); + } + + @Test + public void testSlotRequestProfile() { + ResourceProfile physicalsSlotResourceProfile = ResourceProfile.fromResources(3, 5); + AllocationContext context = AllocationContext.newBuilder().setGroups(2).build(); + ExecutionSlotSharingGroup executionSlotSharingGroup = + context.getSlotSharingStrategy().getExecutionSlotSharingGroup(context.getReqIds(0, 1).get(0)); + context.getSlotProfileRetrieverFactory().addGroupResourceProfile(executionSlotSharingGroup, physicalsSlotResourceProfile); + + context.allocateSlotsFor(0, 2); + + Optional<PhysicalSlotRequest> slotRequest = context.getSlotProvider().getRequests().values().stream().findFirst(); + assertThat(slotRequest.isPresent(), is(true)); + slotRequest.ifPresent(r -> assertThat(r.getSlotProfile().getPhysicalSlotResourceProfile(), is(physicalsSlotResourceProfile))); + } + + @Test + public void testNewAllocatePhysicalSlotForSharedSlot() { + AllocationContext context = AllocationContext.newBuilder().setGroups(2, 2).build(); + + List<SlotExecutionVertexAssignment> executionVertexAssignments = context.allocateSlotsFor(0, 4); + Collection<ExecutionVertexID> assignIds = getAssignIds(executionVertexAssignments); + + assertThat(assignIds, containsInAnyOrder(context.getReqIds(0, 4).toArray())); + assertThat(context.getSlotProvider().getRequests().keySet(), hasSize(2)); + } + + @Test + public void testAllocateLogicalSlotFromAvailableSharedSlot() { + AllocationContext context = AllocationContext.newBuilder().setGroups(2).build(); + + context.allocateSlotsFor(0, 1); + List<SlotExecutionVertexAssignment> executionVertexAssignments = context.allocateSlotsFor(1, 2); + Collection<ExecutionVertexID> assignIds = getAssignIds(executionVertexAssignments); + + // execution 0 from the first allocateSlotsFor call and execution 1 from the second allocateSlotsFor call + // share a slot, therefore only one physical slot allocation should happen + assertThat(assignIds, containsInAnyOrder(context.getReqIds(1, 2).toArray())); + assertThat(context.getSlotProvider().getRequests().keySet(), hasSize(1)); + } + + @Test + public void testDuplicateAllocationDoesNotRecreateLogicalSlotFuture() throws ExecutionException, InterruptedException { + AllocationContext context = AllocationContext.newBuilder().setGroups(1).build(); + + SlotExecutionVertexAssignment assignment1 = context.allocateSlotsFor(0, 1).get(0); + SlotExecutionVertexAssignment assignment2 = context.allocateSlotsFor(0, 1).get(0); + + assertThat(assignment1.getLogicalSlotFuture().get() == assignment2.getLogicalSlotFuture().get(), is(true)); + } + + @Test + public void testFailedPhysicalSlotRequestFailsLogicalSlotFuturesAndRemovesSharedSlot() { + AllocationContext context = AllocationContext + .newBuilder() + .setGroups(1) + .completePhysicalSlotFutureManually() + .build(); + CompletableFuture<LogicalSlot> logicalSlotFuture = context.allocateSlotsFor(0, 1).get(0).getLogicalSlotFuture(); + SlotRequestId slotRequestId = context.getSlotProvider().getRequests().values().stream().findFirst().get().getSlotRequestId(); + + assertThat(logicalSlotFuture.isDone(), is(false)); + context.getSlotProvider().failPhysicalSlotFutureFor(slotRequestId, new Throwable()); + assertThat(logicalSlotFuture.isCompletedExceptionally(), is(true)); + + // next allocation allocates new shared slot + context.allocateSlotsFor(0, 1); + assertThat(context.getSlotProvider().getRequests().keySet(), hasSize(2)); + } + + @Test + public void testSlotWillBeOccupiedIndefinitelyFalse() throws ExecutionException, InterruptedException { + testSlotWillBeOccupiedIndefinitely(false); + } + + @Test + public void testSlotWillBeOccupiedIndefinitelyTrue() throws ExecutionException, InterruptedException { + testSlotWillBeOccupiedIndefinitely(true); + } + + private static void testSlotWillBeOccupiedIndefinitely(boolean slotWillBeOccupiedIndefinitely) throws ExecutionException, InterruptedException { + AllocationContext context = AllocationContext + .newBuilder() + .setGroups(1) + .setSlotWillBeOccupiedIndefinitely(slotWillBeOccupiedIndefinitely) + .build(); + context.allocateSlotsFor(0, 1); + + PhysicalSlotRequest slotRequest = context.getSlotProvider().getRequests().values().stream().findFirst().get(); + assertThat(slotRequest.willSlotBeOccupiedIndefinitely(), is(slotWillBeOccupiedIndefinitely)); + + TestingPhysicalSlot physicalSlot = context.getSlotProvider().getResponses().get(slotRequest.getSlotRequestId()).get(); + assertThat(physicalSlot.getPayload(), notNullValue()); + assertThat(physicalSlot.getPayload().willOccupySlotIndefinitely(), is(slotWillBeOccupiedIndefinitely)); + } + + @Test + public void testReturningLogicalSlotsRemovesSharedSlot() { + testLogicalSlotRequestCancellation( + false, + (context, assignment) -> { + try { + assignment.getLogicalSlotFuture().get().releaseSlot(null); + } catch (InterruptedException | ExecutionException e) { + throw new FlinkRuntimeException("Unexpected", e); + } + }); + } + + @Test + public void testLogicalSlotCancelsPhysicalSlotRequestAndRemovesSharedSlot() { + testLogicalSlotRequestCancellation( + true, + (context, assignment) -> { + context.getAllocator().cancel(assignment.getExecutionVertexId()); + try { + assignment.getLogicalSlotFuture().get(); + fail("THe logical future must finish with the cancellation exception"); + } catch (InterruptedException | ExecutionException e) { + assertThat(e.getCause(), instanceOf(CancellationException.class)); + } + }); + } + + private static void testLogicalSlotRequestCancellation( + boolean completePhysicalSlotFutureManually, + BiConsumer<AllocationContext, SlotExecutionVertexAssignment> cancelAction) { + //if (completePhysicalSlotRequest) { + AllocationContext context = AllocationContext + .newBuilder() + .setGroups(2) + .completePhysicalSlotFutureManually(completePhysicalSlotFutureManually) + .build(); + + List<SlotExecutionVertexAssignment> assignments = context.allocateSlotsFor(0, 2); + assertThat(context.getSlotProvider().getRequests().keySet(), hasSize(1)); + + // cancel only one sharing logical slots + cancelAction.accept(context, assignments.get(0)); + assignments = context.allocateSlotsFor(0, 2); + // there should be no more physical slot allocations, as the first logical slot reuses the previous shared slot + assertThat(context.getSlotProvider().getRequests().keySet(), hasSize(1)); + + // cancel all sharing logical slots + for (SlotExecutionVertexAssignment assignment : assignments) { + cancelAction.accept(context, assignment); + } + SlotRequestId slotRequestId = context.getSlotProvider().getRequests().keySet().stream().findFirst().get(); + assertThat(context.getSlotProvider().getCancelations().containsKey(slotRequestId), is(true)); + + context.allocateSlotsFor(0, 2); + // there should be one more physical slot allocation, as the first allocation should be removed after releasing all logical slots + assertThat(context.getSlotProvider().getRequests().keySet(), hasSize(2)); + } + + @Test + public void testPhysicalSlotReleaseLogicalSlots() throws ExecutionException, InterruptedException { + AllocationContext context = AllocationContext.newBuilder().setGroups(2).build(); + List<SlotExecutionVertexAssignment> assignments = context.allocateSlotsFor(0, 2); + List<TestingPayload> payloads = assignments + .stream() + .map(assignment -> { + TestingPayload payload = new TestingPayload(); + assignment.getLogicalSlotFuture().thenAccept(logicalSlot -> logicalSlot.tryAssignPayload(payload)); + return payload; + }) + .collect(Collectors.toList()); + TestingPhysicalSlot physicalSlot = context.getSlotProvider().getResponses().values().stream().findFirst().get().get(); + + assertThat(payloads.stream().allMatch(payload -> payload.getTerminalStateFuture().isDone()), is(false)); + assertThat(physicalSlot.getPayload(), notNullValue()); + physicalSlot.getPayload().release(new Throwable()); + assertThat(payloads.stream().allMatch(payload -> payload.getTerminalStateFuture().isDone()), is(true)); + } + + private static List<ExecutionVertexID> getAssignIds(Collection<SlotExecutionVertexAssignment> assignments) { + return assignments.stream().map(SlotExecutionVertexAssignment::getExecutionVertexId).collect(Collectors.toList()); + } + + private static class AllocationContext { + private final List<ExecutionVertexSchedulingRequirements> requirements; + private final TestingPhysicalSlotProvider slotProvider; + private final TestingSlotSharingStrategy slotSharingStrategy; + private final SlotSharingExecutionSlotAllocator allocator; + private final TestingSharedSlotProfileRetrieverFactory slotProfileRetrieverFactory; + + AllocationContext( Review comment: can be `private` ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocatorTest.java ########## @@ -0,0 +1,575 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.clusterframework.types.SlotProfile; +import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway; +import org.apache.flink.runtime.instance.SimpleSlotContext; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobmaster.LogicalSlot; +import org.apache.flink.runtime.jobmaster.SlotRequestId; +import org.apache.flink.runtime.jobmaster.TestingPayload; +import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot; +import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProvider; +import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequest; +import org.apache.flink.runtime.scheduler.SharedSlotProfileRetriever.SharedSlotProfileRetrieverFactory; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.Preconditions; + +import org.junit.Test; +import org.powermock.core.IdentityHashSet; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.IdentityHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.function.BiConsumer; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.notNullValue; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; + +/** + * Test suite for {@link SlotSharingExecutionSlotAllocator}. + */ +public class SlotSharingExecutionSlotAllocatorTest { + @Test + public void testSlotProfileRequestAskedBulkAndGroup() { + AllocationContext context = AllocationContext.newBuilder().setGroups(2).build(); Review comment: I feel that using indices here and there makes it a bit hard for reading and reasoning. How about let the test case decide the grouping in a more direct way. Below is an example showing a way that I feel more readable. I also implies some implementation changes on `AllocationContext`. ``` public class SlotSharingExecutionSlotAllocatorTest { private static final ExecutionVertexID EV1 = createRandomExecutionVertexID(); private static final ExecutionVertexID EV2 = createRandomExecutionVertexID(); private static final ExecutionVertexID EV3 = createRandomExecutionVertexID(); private static final ExecutionVertexID EV4 = createRandomExecutionVertexID(); @Test public void testNewAllocatePhysicalSlotForSharedSlot() { List<List<ExecutionVertexID>> vertices = Arrays.asList( Arrays.asList(EV1, EV2), Arrays.asList(EV3, EV4)); AllocationContext context = AllocationContext.newBuilder().setVertices(vertices).build(); List<SlotExecutionVertexAssignment> executionVertexAssignments = context.allocateSlotsFor(EV1, EV2, EV3, EV4); Collection<ExecutionVertexID> assignIds = getAssignIds(executionVertexAssignments); assertThat(assignIds, containsInAnyOrder(EV1, EV2, EV3, EV4)); assertThat(context.getSlotProvider().getRequests().keySet(), hasSize(2)); } @Test public void testSlotProfileRequestAskedBulkAndGroup() { List<ExecutionVertexID> vertexGroup1 = Arrays.asList(EV1, EV2); List<List<ExecutionVertexID>> vertices = Arrays.asList(vertexGroup1); AllocationContext context = AllocationContext.newBuilder().setVertices(vertices).build(); ExecutionSlotSharingGroup executionSlotSharingGroup = context.getSlotSharingStrategy().getExecutionSlotSharingGroup(EV1); context.allocateSlotsFor(EV1, EV2); Set<ExecutionVertexID> ids = new HashSet<>(vertexGroup1); assertThat(context.getSlotProfileRetrieverFactory().getAskedBulks(), containsInAnyOrder(ids)); assertThat(context.getSlotProfileRetrieverFactory().getAskedGroups(), containsInAnyOrder(executionSlotSharingGroup)); } ... ``` ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocatorTest.java ########## @@ -0,0 +1,575 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.clusterframework.types.SlotProfile; +import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway; +import org.apache.flink.runtime.instance.SimpleSlotContext; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobmaster.LogicalSlot; +import org.apache.flink.runtime.jobmaster.SlotRequestId; +import org.apache.flink.runtime.jobmaster.TestingPayload; +import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot; +import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProvider; +import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequest; +import org.apache.flink.runtime.scheduler.SharedSlotProfileRetriever.SharedSlotProfileRetrieverFactory; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.Preconditions; + +import org.junit.Test; +import org.powermock.core.IdentityHashSet; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.IdentityHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.function.BiConsumer; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.notNullValue; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; + +/** + * Test suite for {@link SlotSharingExecutionSlotAllocator}. + */ +public class SlotSharingExecutionSlotAllocatorTest { + @Test + public void testSlotProfileRequestAskedBulkAndGroup() { + AllocationContext context = AllocationContext.newBuilder().setGroups(2).build(); + ExecutionSlotSharingGroup executionSlotSharingGroup = + context.getSlotSharingStrategy().getExecutionSlotSharingGroup(context.getReqIds(0, 1).get(0)); + + context.allocateSlotsFor(0, 2); + + Set<ExecutionVertexID> ids = new HashSet<>(context.getReqIds(0, 2)); + assertThat(context.getSlotProfileRetrieverFactory().getAskedBulks(), containsInAnyOrder(ids)); + assertThat(context.getSlotProfileRetrieverFactory().getAskedGroups(), containsInAnyOrder(executionSlotSharingGroup)); + } + + @Test + public void testSlotRequestCompletionAfterProfileCompletion() { + AllocationContext context = AllocationContext.newBuilder().setGroups(2).completeSlotProfileFutureManually().build(); + ExecutionSlotSharingGroup executionSlotSharingGroup = + context.getSlotSharingStrategy().getExecutionSlotSharingGroup(context.getReqIds(0, 1).get(0)); + + List<SlotExecutionVertexAssignment> executionVertexAssignments = context.allocateSlotsFor(0, 2); + + executionVertexAssignments.forEach(assignment -> assertThat(assignment.getLogicalSlotFuture().isDone(), is(false))); + context.getSlotProfileRetrieverFactory().completeSlotProfileFutureFor(executionSlotSharingGroup); + executionVertexAssignments.forEach(assignment -> assertThat(assignment.getLogicalSlotFuture().isDone(), is(true))); + } + + @Test + public void testSlotRequestProfile() { + ResourceProfile physicalsSlotResourceProfile = ResourceProfile.fromResources(3, 5); + AllocationContext context = AllocationContext.newBuilder().setGroups(2).build(); + ExecutionSlotSharingGroup executionSlotSharingGroup = + context.getSlotSharingStrategy().getExecutionSlotSharingGroup(context.getReqIds(0, 1).get(0)); + context.getSlotProfileRetrieverFactory().addGroupResourceProfile(executionSlotSharingGroup, physicalsSlotResourceProfile); + + context.allocateSlotsFor(0, 2); + + Optional<PhysicalSlotRequest> slotRequest = context.getSlotProvider().getRequests().values().stream().findFirst(); + assertThat(slotRequest.isPresent(), is(true)); + slotRequest.ifPresent(r -> assertThat(r.getSlotProfile().getPhysicalSlotResourceProfile(), is(physicalsSlotResourceProfile))); + } + + @Test + public void testNewAllocatePhysicalSlotForSharedSlot() { + AllocationContext context = AllocationContext.newBuilder().setGroups(2, 2).build(); + + List<SlotExecutionVertexAssignment> executionVertexAssignments = context.allocateSlotsFor(0, 4); + Collection<ExecutionVertexID> assignIds = getAssignIds(executionVertexAssignments); + + assertThat(assignIds, containsInAnyOrder(context.getReqIds(0, 4).toArray())); + assertThat(context.getSlotProvider().getRequests().keySet(), hasSize(2)); + } + + @Test + public void testAllocateLogicalSlotFromAvailableSharedSlot() { + AllocationContext context = AllocationContext.newBuilder().setGroups(2).build(); + + context.allocateSlotsFor(0, 1); + List<SlotExecutionVertexAssignment> executionVertexAssignments = context.allocateSlotsFor(1, 2); + Collection<ExecutionVertexID> assignIds = getAssignIds(executionVertexAssignments); + + // execution 0 from the first allocateSlotsFor call and execution 1 from the second allocateSlotsFor call + // share a slot, therefore only one physical slot allocation should happen + assertThat(assignIds, containsInAnyOrder(context.getReqIds(1, 2).toArray())); + assertThat(context.getSlotProvider().getRequests().keySet(), hasSize(1)); + } + + @Test + public void testDuplicateAllocationDoesNotRecreateLogicalSlotFuture() throws ExecutionException, InterruptedException { + AllocationContext context = AllocationContext.newBuilder().setGroups(1).build(); + + SlotExecutionVertexAssignment assignment1 = context.allocateSlotsFor(0, 1).get(0); + SlotExecutionVertexAssignment assignment2 = context.allocateSlotsFor(0, 1).get(0); + + assertThat(assignment1.getLogicalSlotFuture().get() == assignment2.getLogicalSlotFuture().get(), is(true)); + } + + @Test + public void testFailedPhysicalSlotRequestFailsLogicalSlotFuturesAndRemovesSharedSlot() { + AllocationContext context = AllocationContext + .newBuilder() + .setGroups(1) + .completePhysicalSlotFutureManually() + .build(); + CompletableFuture<LogicalSlot> logicalSlotFuture = context.allocateSlotsFor(0, 1).get(0).getLogicalSlotFuture(); + SlotRequestId slotRequestId = context.getSlotProvider().getRequests().values().stream().findFirst().get().getSlotRequestId(); + + assertThat(logicalSlotFuture.isDone(), is(false)); + context.getSlotProvider().failPhysicalSlotFutureFor(slotRequestId, new Throwable()); + assertThat(logicalSlotFuture.isCompletedExceptionally(), is(true)); + + // next allocation allocates new shared slot + context.allocateSlotsFor(0, 1); + assertThat(context.getSlotProvider().getRequests().keySet(), hasSize(2)); + } + + @Test + public void testSlotWillBeOccupiedIndefinitelyFalse() throws ExecutionException, InterruptedException { + testSlotWillBeOccupiedIndefinitely(false); + } + + @Test + public void testSlotWillBeOccupiedIndefinitelyTrue() throws ExecutionException, InterruptedException { + testSlotWillBeOccupiedIndefinitely(true); + } + + private static void testSlotWillBeOccupiedIndefinitely(boolean slotWillBeOccupiedIndefinitely) throws ExecutionException, InterruptedException { + AllocationContext context = AllocationContext + .newBuilder() + .setGroups(1) + .setSlotWillBeOccupiedIndefinitely(slotWillBeOccupiedIndefinitely) + .build(); + context.allocateSlotsFor(0, 1); + + PhysicalSlotRequest slotRequest = context.getSlotProvider().getRequests().values().stream().findFirst().get(); + assertThat(slotRequest.willSlotBeOccupiedIndefinitely(), is(slotWillBeOccupiedIndefinitely)); + + TestingPhysicalSlot physicalSlot = context.getSlotProvider().getResponses().get(slotRequest.getSlotRequestId()).get(); + assertThat(physicalSlot.getPayload(), notNullValue()); + assertThat(physicalSlot.getPayload().willOccupySlotIndefinitely(), is(slotWillBeOccupiedIndefinitely)); + } + + @Test + public void testReturningLogicalSlotsRemovesSharedSlot() { + testLogicalSlotRequestCancellation( + false, + (context, assignment) -> { + try { + assignment.getLogicalSlotFuture().get().releaseSlot(null); + } catch (InterruptedException | ExecutionException e) { + throw new FlinkRuntimeException("Unexpected", e); + } + }); + } + + @Test + public void testLogicalSlotCancelsPhysicalSlotRequestAndRemovesSharedSlot() { + testLogicalSlotRequestCancellation( + true, + (context, assignment) -> { + context.getAllocator().cancel(assignment.getExecutionVertexId()); + try { + assignment.getLogicalSlotFuture().get(); + fail("THe logical future must finish with the cancellation exception"); + } catch (InterruptedException | ExecutionException e) { + assertThat(e.getCause(), instanceOf(CancellationException.class)); + } + }); + } + + private static void testLogicalSlotRequestCancellation( + boolean completePhysicalSlotFutureManually, + BiConsumer<AllocationContext, SlotExecutionVertexAssignment> cancelAction) { + //if (completePhysicalSlotRequest) { + AllocationContext context = AllocationContext + .newBuilder() + .setGroups(2) + .completePhysicalSlotFutureManually(completePhysicalSlotFutureManually) + .build(); + + List<SlotExecutionVertexAssignment> assignments = context.allocateSlotsFor(0, 2); + assertThat(context.getSlotProvider().getRequests().keySet(), hasSize(1)); + + // cancel only one sharing logical slots + cancelAction.accept(context, assignments.get(0)); + assignments = context.allocateSlotsFor(0, 2); + // there should be no more physical slot allocations, as the first logical slot reuses the previous shared slot + assertThat(context.getSlotProvider().getRequests().keySet(), hasSize(1)); + + // cancel all sharing logical slots + for (SlotExecutionVertexAssignment assignment : assignments) { + cancelAction.accept(context, assignment); + } + SlotRequestId slotRequestId = context.getSlotProvider().getRequests().keySet().stream().findFirst().get(); + assertThat(context.getSlotProvider().getCancelations().containsKey(slotRequestId), is(true)); + + context.allocateSlotsFor(0, 2); + // there should be one more physical slot allocation, as the first allocation should be removed after releasing all logical slots + assertThat(context.getSlotProvider().getRequests().keySet(), hasSize(2)); + } + + @Test + public void testPhysicalSlotReleaseLogicalSlots() throws ExecutionException, InterruptedException { + AllocationContext context = AllocationContext.newBuilder().setGroups(2).build(); + List<SlotExecutionVertexAssignment> assignments = context.allocateSlotsFor(0, 2); + List<TestingPayload> payloads = assignments + .stream() + .map(assignment -> { + TestingPayload payload = new TestingPayload(); + assignment.getLogicalSlotFuture().thenAccept(logicalSlot -> logicalSlot.tryAssignPayload(payload)); + return payload; + }) + .collect(Collectors.toList()); + TestingPhysicalSlot physicalSlot = context.getSlotProvider().getResponses().values().stream().findFirst().get().get(); + + assertThat(payloads.stream().allMatch(payload -> payload.getTerminalStateFuture().isDone()), is(false)); + assertThat(physicalSlot.getPayload(), notNullValue()); + physicalSlot.getPayload().release(new Throwable()); + assertThat(payloads.stream().allMatch(payload -> payload.getTerminalStateFuture().isDone()), is(true)); + } + + private static List<ExecutionVertexID> getAssignIds(Collection<SlotExecutionVertexAssignment> assignments) { + return assignments.stream().map(SlotExecutionVertexAssignment::getExecutionVertexId).collect(Collectors.toList()); + } + + private static class AllocationContext { + private final List<ExecutionVertexSchedulingRequirements> requirements; + private final TestingPhysicalSlotProvider slotProvider; + private final TestingSlotSharingStrategy slotSharingStrategy; + private final SlotSharingExecutionSlotAllocator allocator; + private final TestingSharedSlotProfileRetrieverFactory slotProfileRetrieverFactory; + + AllocationContext( + List<ExecutionVertexSchedulingRequirements> requirements, + TestingPhysicalSlotProvider slotProvider, + TestingSlotSharingStrategy slotSharingStrategy, + SlotSharingExecutionSlotAllocator allocator, + TestingSharedSlotProfileRetrieverFactory slotProfileRetrieverFactory) { + this.requirements = requirements; + this.slotProvider = slotProvider; + this.slotSharingStrategy = slotSharingStrategy; + this.allocator = allocator; + this.slotProfileRetrieverFactory = slotProfileRetrieverFactory; + } + + public SlotSharingExecutionSlotAllocator getAllocator() { + return allocator; + } + + private List<SlotExecutionVertexAssignment> allocateSlotsFor(int start, int end) { + return allocator.allocateSlotsFor(requirements.subList(start, end)); + } + + private TestingSlotSharingStrategy getSlotSharingStrategy() { + return slotSharingStrategy; + } + + private List<ExecutionVertexID> getReqIds(int start, int end) { + return getReqIds(requirements.subList(start, end)); + } + + private TestingPhysicalSlotProvider getSlotProvider() { + return slotProvider; + } + + private TestingSharedSlotProfileRetrieverFactory getSlotProfileRetrieverFactory() { + return slotProfileRetrieverFactory; + } + + private static List<ExecutionVertexID> getReqIds(Collection<ExecutionVertexSchedulingRequirements> requirements) { + return requirements.stream().map(ExecutionVertexSchedulingRequirements::getExecutionVertexId).collect(Collectors.toList()); + } + + static Builder newBuilder() { Review comment: can be `private` ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocatorTest.java ########## @@ -0,0 +1,575 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.clusterframework.types.SlotProfile; +import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway; +import org.apache.flink.runtime.instance.SimpleSlotContext; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobmaster.LogicalSlot; +import org.apache.flink.runtime.jobmaster.SlotRequestId; +import org.apache.flink.runtime.jobmaster.TestingPayload; +import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot; +import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProvider; +import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequest; +import org.apache.flink.runtime.scheduler.SharedSlotProfileRetriever.SharedSlotProfileRetrieverFactory; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.Preconditions; + +import org.junit.Test; +import org.powermock.core.IdentityHashSet; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.IdentityHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.function.BiConsumer; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.notNullValue; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; + +/** + * Test suite for {@link SlotSharingExecutionSlotAllocator}. + */ +public class SlotSharingExecutionSlotAllocatorTest { + @Test + public void testSlotProfileRequestAskedBulkAndGroup() { + AllocationContext context = AllocationContext.newBuilder().setGroups(2).build(); + ExecutionSlotSharingGroup executionSlotSharingGroup = + context.getSlotSharingStrategy().getExecutionSlotSharingGroup(context.getReqIds(0, 1).get(0)); + + context.allocateSlotsFor(0, 2); + + Set<ExecutionVertexID> ids = new HashSet<>(context.getReqIds(0, 2)); + assertThat(context.getSlotProfileRetrieverFactory().getAskedBulks(), containsInAnyOrder(ids)); + assertThat(context.getSlotProfileRetrieverFactory().getAskedGroups(), containsInAnyOrder(executionSlotSharingGroup)); + } + + @Test + public void testSlotRequestCompletionAfterProfileCompletion() { + AllocationContext context = AllocationContext.newBuilder().setGroups(2).completeSlotProfileFutureManually().build(); + ExecutionSlotSharingGroup executionSlotSharingGroup = + context.getSlotSharingStrategy().getExecutionSlotSharingGroup(context.getReqIds(0, 1).get(0)); + + List<SlotExecutionVertexAssignment> executionVertexAssignments = context.allocateSlotsFor(0, 2); + + executionVertexAssignments.forEach(assignment -> assertThat(assignment.getLogicalSlotFuture().isDone(), is(false))); + context.getSlotProfileRetrieverFactory().completeSlotProfileFutureFor(executionSlotSharingGroup); + executionVertexAssignments.forEach(assignment -> assertThat(assignment.getLogicalSlotFuture().isDone(), is(true))); + } + + @Test + public void testSlotRequestProfile() { + ResourceProfile physicalsSlotResourceProfile = ResourceProfile.fromResources(3, 5); + AllocationContext context = AllocationContext.newBuilder().setGroups(2).build(); + ExecutionSlotSharingGroup executionSlotSharingGroup = + context.getSlotSharingStrategy().getExecutionSlotSharingGroup(context.getReqIds(0, 1).get(0)); + context.getSlotProfileRetrieverFactory().addGroupResourceProfile(executionSlotSharingGroup, physicalsSlotResourceProfile); + + context.allocateSlotsFor(0, 2); + + Optional<PhysicalSlotRequest> slotRequest = context.getSlotProvider().getRequests().values().stream().findFirst(); + assertThat(slotRequest.isPresent(), is(true)); + slotRequest.ifPresent(r -> assertThat(r.getSlotProfile().getPhysicalSlotResourceProfile(), is(physicalsSlotResourceProfile))); + } + + @Test + public void testNewAllocatePhysicalSlotForSharedSlot() { + AllocationContext context = AllocationContext.newBuilder().setGroups(2, 2).build(); + + List<SlotExecutionVertexAssignment> executionVertexAssignments = context.allocateSlotsFor(0, 4); + Collection<ExecutionVertexID> assignIds = getAssignIds(executionVertexAssignments); + + assertThat(assignIds, containsInAnyOrder(context.getReqIds(0, 4).toArray())); + assertThat(context.getSlotProvider().getRequests().keySet(), hasSize(2)); + } + + @Test + public void testAllocateLogicalSlotFromAvailableSharedSlot() { + AllocationContext context = AllocationContext.newBuilder().setGroups(2).build(); + + context.allocateSlotsFor(0, 1); + List<SlotExecutionVertexAssignment> executionVertexAssignments = context.allocateSlotsFor(1, 2); + Collection<ExecutionVertexID> assignIds = getAssignIds(executionVertexAssignments); + + // execution 0 from the first allocateSlotsFor call and execution 1 from the second allocateSlotsFor call + // share a slot, therefore only one physical slot allocation should happen + assertThat(assignIds, containsInAnyOrder(context.getReqIds(1, 2).toArray())); + assertThat(context.getSlotProvider().getRequests().keySet(), hasSize(1)); + } + + @Test + public void testDuplicateAllocationDoesNotRecreateLogicalSlotFuture() throws ExecutionException, InterruptedException { + AllocationContext context = AllocationContext.newBuilder().setGroups(1).build(); + + SlotExecutionVertexAssignment assignment1 = context.allocateSlotsFor(0, 1).get(0); + SlotExecutionVertexAssignment assignment2 = context.allocateSlotsFor(0, 1).get(0); + + assertThat(assignment1.getLogicalSlotFuture().get() == assignment2.getLogicalSlotFuture().get(), is(true)); + } + + @Test + public void testFailedPhysicalSlotRequestFailsLogicalSlotFuturesAndRemovesSharedSlot() { + AllocationContext context = AllocationContext + .newBuilder() + .setGroups(1) + .completePhysicalSlotFutureManually() + .build(); + CompletableFuture<LogicalSlot> logicalSlotFuture = context.allocateSlotsFor(0, 1).get(0).getLogicalSlotFuture(); + SlotRequestId slotRequestId = context.getSlotProvider().getRequests().values().stream().findFirst().get().getSlotRequestId(); + + assertThat(logicalSlotFuture.isDone(), is(false)); + context.getSlotProvider().failPhysicalSlotFutureFor(slotRequestId, new Throwable()); + assertThat(logicalSlotFuture.isCompletedExceptionally(), is(true)); + + // next allocation allocates new shared slot + context.allocateSlotsFor(0, 1); + assertThat(context.getSlotProvider().getRequests().keySet(), hasSize(2)); + } + + @Test + public void testSlotWillBeOccupiedIndefinitelyFalse() throws ExecutionException, InterruptedException { + testSlotWillBeOccupiedIndefinitely(false); + } + + @Test + public void testSlotWillBeOccupiedIndefinitelyTrue() throws ExecutionException, InterruptedException { + testSlotWillBeOccupiedIndefinitely(true); + } + + private static void testSlotWillBeOccupiedIndefinitely(boolean slotWillBeOccupiedIndefinitely) throws ExecutionException, InterruptedException { + AllocationContext context = AllocationContext + .newBuilder() + .setGroups(1) + .setSlotWillBeOccupiedIndefinitely(slotWillBeOccupiedIndefinitely) + .build(); + context.allocateSlotsFor(0, 1); + + PhysicalSlotRequest slotRequest = context.getSlotProvider().getRequests().values().stream().findFirst().get(); + assertThat(slotRequest.willSlotBeOccupiedIndefinitely(), is(slotWillBeOccupiedIndefinitely)); + + TestingPhysicalSlot physicalSlot = context.getSlotProvider().getResponses().get(slotRequest.getSlotRequestId()).get(); + assertThat(physicalSlot.getPayload(), notNullValue()); + assertThat(physicalSlot.getPayload().willOccupySlotIndefinitely(), is(slotWillBeOccupiedIndefinitely)); + } + + @Test + public void testReturningLogicalSlotsRemovesSharedSlot() { + testLogicalSlotRequestCancellation( + false, + (context, assignment) -> { + try { + assignment.getLogicalSlotFuture().get().releaseSlot(null); + } catch (InterruptedException | ExecutionException e) { + throw new FlinkRuntimeException("Unexpected", e); + } + }); + } + + @Test + public void testLogicalSlotCancelsPhysicalSlotRequestAndRemovesSharedSlot() { + testLogicalSlotRequestCancellation( + true, + (context, assignment) -> { + context.getAllocator().cancel(assignment.getExecutionVertexId()); + try { + assignment.getLogicalSlotFuture().get(); + fail("THe logical future must finish with the cancellation exception"); + } catch (InterruptedException | ExecutionException e) { + assertThat(e.getCause(), instanceOf(CancellationException.class)); + } + }); + } + + private static void testLogicalSlotRequestCancellation( + boolean completePhysicalSlotFutureManually, + BiConsumer<AllocationContext, SlotExecutionVertexAssignment> cancelAction) { + //if (completePhysicalSlotRequest) { + AllocationContext context = AllocationContext + .newBuilder() + .setGroups(2) + .completePhysicalSlotFutureManually(completePhysicalSlotFutureManually) + .build(); + + List<SlotExecutionVertexAssignment> assignments = context.allocateSlotsFor(0, 2); + assertThat(context.getSlotProvider().getRequests().keySet(), hasSize(1)); + + // cancel only one sharing logical slots + cancelAction.accept(context, assignments.get(0)); + assignments = context.allocateSlotsFor(0, 2); + // there should be no more physical slot allocations, as the first logical slot reuses the previous shared slot + assertThat(context.getSlotProvider().getRequests().keySet(), hasSize(1)); + + // cancel all sharing logical slots + for (SlotExecutionVertexAssignment assignment : assignments) { + cancelAction.accept(context, assignment); + } + SlotRequestId slotRequestId = context.getSlotProvider().getRequests().keySet().stream().findFirst().get(); + assertThat(context.getSlotProvider().getCancelations().containsKey(slotRequestId), is(true)); + + context.allocateSlotsFor(0, 2); + // there should be one more physical slot allocation, as the first allocation should be removed after releasing all logical slots + assertThat(context.getSlotProvider().getRequests().keySet(), hasSize(2)); + } + + @Test + public void testPhysicalSlotReleaseLogicalSlots() throws ExecutionException, InterruptedException { + AllocationContext context = AllocationContext.newBuilder().setGroups(2).build(); + List<SlotExecutionVertexAssignment> assignments = context.allocateSlotsFor(0, 2); + List<TestingPayload> payloads = assignments + .stream() + .map(assignment -> { + TestingPayload payload = new TestingPayload(); + assignment.getLogicalSlotFuture().thenAccept(logicalSlot -> logicalSlot.tryAssignPayload(payload)); + return payload; + }) + .collect(Collectors.toList()); + TestingPhysicalSlot physicalSlot = context.getSlotProvider().getResponses().values().stream().findFirst().get().get(); + + assertThat(payloads.stream().allMatch(payload -> payload.getTerminalStateFuture().isDone()), is(false)); + assertThat(physicalSlot.getPayload(), notNullValue()); + physicalSlot.getPayload().release(new Throwable()); + assertThat(payloads.stream().allMatch(payload -> payload.getTerminalStateFuture().isDone()), is(true)); + } + + private static List<ExecutionVertexID> getAssignIds(Collection<SlotExecutionVertexAssignment> assignments) { + return assignments.stream().map(SlotExecutionVertexAssignment::getExecutionVertexId).collect(Collectors.toList()); + } + + private static class AllocationContext { + private final List<ExecutionVertexSchedulingRequirements> requirements; + private final TestingPhysicalSlotProvider slotProvider; + private final TestingSlotSharingStrategy slotSharingStrategy; + private final SlotSharingExecutionSlotAllocator allocator; + private final TestingSharedSlotProfileRetrieverFactory slotProfileRetrieverFactory; + + AllocationContext( + List<ExecutionVertexSchedulingRequirements> requirements, + TestingPhysicalSlotProvider slotProvider, + TestingSlotSharingStrategy slotSharingStrategy, + SlotSharingExecutionSlotAllocator allocator, + TestingSharedSlotProfileRetrieverFactory slotProfileRetrieverFactory) { + this.requirements = requirements; + this.slotProvider = slotProvider; + this.slotSharingStrategy = slotSharingStrategy; + this.allocator = allocator; + this.slotProfileRetrieverFactory = slotProfileRetrieverFactory; + } + + public SlotSharingExecutionSlotAllocator getAllocator() { + return allocator; + } + + private List<SlotExecutionVertexAssignment> allocateSlotsFor(int start, int end) { + return allocator.allocateSlotsFor(requirements.subList(start, end)); + } + + private TestingSlotSharingStrategy getSlotSharingStrategy() { + return slotSharingStrategy; + } + + private List<ExecutionVertexID> getReqIds(int start, int end) { + return getReqIds(requirements.subList(start, end)); + } + + private TestingPhysicalSlotProvider getSlotProvider() { + return slotProvider; + } + + private TestingSharedSlotProfileRetrieverFactory getSlotProfileRetrieverFactory() { + return slotProfileRetrieverFactory; + } + + private static List<ExecutionVertexID> getReqIds(Collection<ExecutionVertexSchedulingRequirements> requirements) { + return requirements.stream().map(ExecutionVertexSchedulingRequirements::getExecutionVertexId).collect(Collectors.toList()); + } + + static Builder newBuilder() { + return new Builder(); + } + + private static class Builder { + private int[] groups = { 2, 1 }; // 2 executions in the first group, 1 in the second etc + private List<ResourceProfile> resourceProfiles; Review comment: should we explicitly set default values for these fields? NPE will be thrown if `resourceProfiles` is null when building the context. For the boolean fields, I feel that explicitly setting them to `false` by default can make it more readable. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org