[ https://issues.apache.org/jira/browse/FLINK-9455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16631619#comment-16631619 ]
ASF GitHub Bot commented on FLINK-9455: --------------------------------------- asfgit closed pull request #6734: [FLINK-9455][RM] Add support for multi task slot TaskExecutors URL: https://github.com/apache/flink/pull/6734 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java index f18d0d8cfc7..d8267735209 100644 --- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java @@ -24,7 +24,6 @@ import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore; import org.apache.flink.mesos.scheduler.ConnectionMonitor; import org.apache.flink.mesos.scheduler.LaunchCoordinator; -import org.apache.flink.mesos.scheduler.LaunchableTask; import org.apache.flink.mesos.scheduler.ReconciliationCoordinator; import org.apache.flink.mesos.scheduler.TaskMonitor; import org.apache.flink.mesos.scheduler.TaskSchedulerBuilder; @@ -79,6 +78,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -124,6 +124,8 @@ @Nullable private final String webUiUrl; + private final Collection<ResourceProfile> slotsPerWorker; + /** Mesos scheduler driver. */ private SchedulerDriver schedulerDriver; @@ -191,6 +193,9 @@ public MesosResourceManager( this.workersInNew = new HashMap<>(8); this.workersInLaunch = new HashMap<>(8); this.workersBeingReturned = new HashMap<>(8); + + final ContaineredTaskManagerParameters containeredTaskManagerParameters = taskManagerParameters.containeredParameters(); + this.slotsPerWorker = createSlotsPerWorker(containeredTaskManagerParameters.numSlots()); } protected ActorRef createSelfActor() { @@ -352,7 +357,7 @@ private void recoverWorkers(final List<MesosWorkerStore.Worker> tasksFromPreviou switch(worker.state()) { case Launched: workersInLaunch.put(extractResourceID(worker.taskID()), worker); - final LaunchableMesosWorker launchable = createLaunchableMesosWorker(worker.taskID(), worker.profile()); + final LaunchableMesosWorker launchable = createLaunchableMesosWorker(worker.taskID()); toAssign.add(new Tuple2<>(launchable.taskRequest(), worker.hostname().get())); break; case Released: @@ -426,7 +431,7 @@ protected void internalDeregisterApplication( } @Override - public void startNewWorker(ResourceProfile resourceProfile) { + public Collection<ResourceProfile> startNewWorker(ResourceProfile resourceProfile) { LOG.info("Starting a new worker."); try { // generate new workers into persistent state and launch associated actors @@ -434,7 +439,7 @@ public void startNewWorker(ResourceProfile resourceProfile) { workerStore.putWorker(worker); workersInNew.put(extractResourceID(worker.taskID()), worker); - LaunchableMesosWorker launchable = createLaunchableMesosWorker(worker.taskID(), resourceProfile); + LaunchableMesosWorker launchable = createLaunchableMesosWorker(worker.taskID()); LOG.info("Scheduling Mesos task {} with ({} MB, {} cpus).", launchable.taskID().getValue(), launchable.taskRequest().getMemory(), launchable.taskRequest().getCPUs()); @@ -443,9 +448,12 @@ public void startNewWorker(ResourceProfile resourceProfile) { taskMonitor.tell(new TaskMonitor.TaskGoalStateUpdated(extractGoalState(worker)), selfActor); // tell the launch coordinator to launch the new tasks - launchCoordinator.tell(new LaunchCoordinator.Launch(Collections.singletonList((LaunchableTask) launchable)), selfActor); + launchCoordinator.tell(new LaunchCoordinator.Launch(Collections.singletonList(launchable)), selfActor); + + return slotsPerWorker; } catch (Exception ex) { onFatalError(new ResourceManagerException("Unable to request new workers.", ex)); + return Collections.emptyList(); } } @@ -691,36 +699,13 @@ public void taskTerminated(TaskMonitor.TaskTerminated message) { /** * Creates a launchable task for Fenzo to process. */ - private LaunchableMesosWorker createLaunchableMesosWorker(Protos.TaskID taskID, ResourceProfile resourceProfile) { - - // create the specific TM parameters from the resource profile and some defaults - MesosTaskManagerParameters params = new MesosTaskManagerParameters( - resourceProfile.getCpuCores() < 1.0 ? taskManagerParameters.cpus() : resourceProfile.getCpuCores(), - taskManagerParameters.gpus(), - taskManagerParameters.containerType(), - taskManagerParameters.containerImageName(), - new ContaineredTaskManagerParameters( - ResourceProfile.UNKNOWN.equals(resourceProfile) ? taskManagerParameters.containeredParameters().taskManagerTotalMemoryMB() : resourceProfile.getMemoryInMB(), - ResourceProfile.UNKNOWN.equals(resourceProfile) ? taskManagerParameters.containeredParameters().taskManagerHeapSizeMB() : resourceProfile.getHeapMemoryInMB(), - ResourceProfile.UNKNOWN.equals(resourceProfile) ? taskManagerParameters.containeredParameters().taskManagerDirectMemoryLimitMB() : resourceProfile.getDirectMemoryInMB(), - 1, - new HashMap<>(taskManagerParameters.containeredParameters().taskManagerEnv())), - taskManagerParameters.containerVolumes(), - taskManagerParameters.dockerParameters(), - taskManagerParameters.dockerForcePullImage(), - taskManagerParameters.constraints(), - taskManagerParameters.command(), - taskManagerParameters.bootstrapCommand(), - taskManagerParameters.getTaskManagerHostname(), - taskManagerParameters.uris() - ); - - LOG.debug("LaunchableMesosWorker parameters: {}", params); + private LaunchableMesosWorker createLaunchableMesosWorker(Protos.TaskID taskID) { + LOG.debug("LaunchableMesosWorker parameters: {}", taskManagerParameters); LaunchableMesosWorker launchable = new LaunchableMesosWorker( artifactServer, - params, + taskManagerParameters, taskManagerContainerSpec, taskID, mesosConfig); diff --git a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java index e21f0fc3374..5163724ed6d 100644 --- a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java +++ b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java @@ -242,7 +242,7 @@ protected void closeTaskManagerConnection(ResourceID resourceID, Exception cause TestingMesosResourceManager resourceManager; // domain objects for test purposes - final ResourceProfile resourceProfile1 = new ResourceProfile(1.0, 1); + final ResourceProfile resourceProfile1 = ResourceProfile.UNKNOWN; Protos.FrameworkID framework1 = Protos.FrameworkID.newBuilder().setValue("framework1").build(); public Protos.SlaveID slave1 = Protos.SlaveID.newBuilder().setValue("slave1").build(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java index a89b9f92b17..5b133e7b624 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java @@ -50,6 +50,9 @@ public static final ResourceProfile UNKNOWN = new ResourceProfile(-1.0, -1); + /** ResourceProfile which matches any other ResourceProfile. */ + public static final ResourceProfile ANY = new ResourceProfile(Double.MAX_VALUE, Integer.MAX_VALUE, Integer.MAX_VALUE, Integer.MAX_VALUE, Integer.MAX_VALUE, Collections.emptyMap()); + // ------------------------------------------------------------------------ /** How many cpu cores are needed, use double so we can specify cpu like 0.1. */ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java index ac1181b1d1d..7929e3170b7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java @@ -71,6 +71,7 @@ import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Objects; @@ -1019,9 +1020,10 @@ protected abstract void internalDeregisterApplication( * Allocates a resource using the resource profile. * * @param resourceProfile The resource description + * @return Collection of {@link ResourceProfile} describing the launched slots */ @VisibleForTesting - public abstract void startNewWorker(ResourceProfile resourceProfile); + public abstract Collection<ResourceProfile> startNewWorker(ResourceProfile resourceProfile); /** * Callback when a worker was started. @@ -1051,9 +1053,9 @@ public void releaseResource(InstanceID instanceId, Exception cause) { } @Override - public void allocateResource(ResourceProfile resourceProfile) throws ResourceManagerException { + public Collection<ResourceProfile> allocateResource(ResourceProfile resourceProfile) { validateRunsInMainThread(); - startNewWorker(resourceProfile); + return startNewWorker(resourceProfile); } @Override @@ -1176,8 +1178,16 @@ public void reportPayload(ResourceID resourceID, Void payload) { // Resource Management // ------------------------------------------------------------------------ - protected int getNumberPendingSlotRequests() { - return slotManager.getNumberPendingSlotRequests(); + protected int getNumberRequiredTaskManagerSlots() { + return slotManager.getNumberPendingTaskManagerSlots(); + } + + // ------------------------------------------------------------------------ + // Helper methods + // ------------------------------------------------------------------------ + + protected static Collection<ResourceProfile> createSlotsPerWorker(int numSlots) { + return Collections.nCopies(numSlots, ResourceProfile.ANY); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java index 420b89f39f2..064c2d361d2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java @@ -32,6 +32,9 @@ import javax.annotation.Nullable; +import java.util.Collection; +import java.util.Collections; + /** * A standalone implementation of the resource manager. Used when the system is started in * standalone mode (via scripts), rather than via a resource framework like YARN or Mesos. @@ -74,7 +77,8 @@ protected void internalDeregisterApplication(ApplicationStatus finalStatus, @Nul } @Override - public void startNewWorker(ResourceProfile resourceProfile) { + public Collection<ResourceProfile> startNewWorker(ResourceProfile resourceProfile) { + return Collections.emptyList(); } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/PendingSlotRequest.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/PendingSlotRequest.java index 17cf8c7907e..a8f212fe6d4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/PendingSlotRequest.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/PendingSlotRequest.java @@ -25,10 +25,14 @@ import org.apache.flink.runtime.resourcemanager.SlotRequest; import org.apache.flink.util.Preconditions; +import javax.annotation.Nonnull; import javax.annotation.Nullable; import java.util.concurrent.CompletableFuture; +/** + * Class representing a pending slot request in the {@link SlotManager}. + */ public class PendingSlotRequest { private final SlotRequest slotRequest; @@ -36,11 +40,16 @@ @Nullable private CompletableFuture<Acknowledge> requestFuture; + @Nullable + private PendingTaskManagerSlot pendingTaskManagerSlot; + /** Timestamp when this pending slot request has been created. */ private final long creationTimestamp; public PendingSlotRequest(SlotRequest slotRequest) { this.slotRequest = Preconditions.checkNotNull(slotRequest); + this.requestFuture = null; + this.pendingTaskManagerSlot = null; creationTimestamp = System.currentTimeMillis(); } @@ -78,4 +87,18 @@ public void setRequestFuture(@Nullable CompletableFuture<Acknowledge> requestFut public CompletableFuture<Acknowledge> getRequestFuture() { return requestFuture; } + + @Nullable + public PendingTaskManagerSlot getAssignedPendingTaskManagerSlot() { + return pendingTaskManagerSlot; + } + + public void assignPendingTaskManagerSlot(@Nonnull PendingTaskManagerSlot pendingTaskManagerSlotToAssign) { + Preconditions.checkState(pendingTaskManagerSlot == null); + this.pendingTaskManagerSlot = pendingTaskManagerSlotToAssign; + } + + public void unassignPendingTaskManagerSlot() { + this.pendingTaskManagerSlot = null; + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/PendingTaskManagerSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/PendingTaskManagerSlot.java new file mode 100644 index 00000000000..ed207e963e3 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/PendingTaskManagerSlot.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.resourcemanager.slotmanager; + +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +/** + * Represents a pending task manager slot in the {@link SlotManager}. + */ +public class PendingTaskManagerSlot { + + private final TaskManagerSlotId taskManagerSlotId = TaskManagerSlotId.generate(); + + private final ResourceProfile resourceProfile; + + @Nullable + private PendingSlotRequest pendingSlotRequest; + + public PendingTaskManagerSlot(ResourceProfile resourceProfile) { + this.resourceProfile = resourceProfile; + } + + public TaskManagerSlotId getTaskManagerSlotId() { + return taskManagerSlotId; + } + + public ResourceProfile getResourceProfile() { + return resourceProfile; + } + + public void assignPendingSlotRequest(@Nonnull PendingSlotRequest pendingSlotRequestToAssign) { + Preconditions.checkState(pendingSlotRequest == null); + pendingSlotRequest = pendingSlotRequestToAssign; + } + + public void unassignPendingSlotRequest() { + pendingSlotRequest = null; + } + + @Nullable + public PendingSlotRequest getAssignedPendingSlotRequest() { + return pendingSlotRequest; + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceActions.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceActions.java index 84e7c4e785d..adf8f13db57 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceActions.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceActions.java @@ -24,6 +24,8 @@ import org.apache.flink.runtime.instance.InstanceID; import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException; +import java.util.Collection; + /** * Resource related actions which the {@link SlotManager} can perform. */ @@ -41,9 +43,10 @@ * Requests to allocate a resource with the given {@link ResourceProfile}. * * @param resourceProfile for the to be allocated resource + * @return Collection of {@link ResourceProfile} describing the allocated slots * @throws ResourceManagerException if the resource cannot be allocated */ - void allocateResource(ResourceProfile resourceProfile) throws ResourceManagerException; + Collection<ResourceProfile> allocateResource(ResourceProfile resourceProfile) throws ResourceManagerException; /** * Notifies that an allocation failure has occurred. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java index bab56609a1b..2ef2b2fefcc 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java @@ -43,14 +43,17 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nonnull; import javax.annotation.Nullable; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; @@ -99,6 +102,8 @@ /** Map of pending/unfulfilled slot allocation requests. */ private final HashMap<AllocationID, PendingSlotRequest> pendingSlotRequests; + private final HashMap<TaskManagerSlotId, PendingTaskManagerSlot> pendingSlots; + /** ResourceManager's id. */ private ResourceManagerId resourceManagerId; @@ -130,6 +135,7 @@ public SlotManager( taskManagerRegistrations = new HashMap<>(4); fulfilledSlotRequests = new HashMap<>(16); pendingSlotRequests = new HashMap<>(16); + pendingSlots = new HashMap<>(16); resourceManagerId = null; resourceActions = null; @@ -168,8 +174,13 @@ public int getNumberFreeSlotsOf(InstanceID instanceId) { } } - public int getNumberPendingSlotRequests() { - return pendingSlotRequests.size(); + public int getNumberPendingTaskManagerSlots() { + return pendingSlots.size(); + } + + @VisibleForTesting + int getNumberAssignedPendingTaskManagerSlots() { + return (int) pendingSlots.values().stream().filter(slot -> slot.getAssignedPendingSlotRequest() != null).count(); } // --------------------------------------------------------------------------------------------- @@ -530,14 +541,50 @@ private void registerSlot( removeSlot(slotId); } - TaskManagerSlot slot = new TaskManagerSlot( + final TaskManagerSlot slot = createAndRegisterTaskManagerSlot(slotId, resourceProfile, taskManagerConnection); + + final PendingTaskManagerSlot pendingTaskManagerSlot; + + if (allocationId == null) { + pendingTaskManagerSlot = findExactlyMatchingPendingTaskManagerSlot(resourceProfile); + } else { + pendingTaskManagerSlot = null; + } + + if (pendingTaskManagerSlot == null) { + updateSlot(slotId, allocationId, jobId); + } else { + pendingSlots.remove(pendingTaskManagerSlot.getTaskManagerSlotId()); + final PendingSlotRequest assignedPendingSlotRequest = pendingTaskManagerSlot.getAssignedPendingSlotRequest(); + + if (assignedPendingSlotRequest == null) { + handleFreeSlot(slot); + } else { + assignedPendingSlotRequest.unassignPendingTaskManagerSlot(); + allocateSlot(slot, assignedPendingSlotRequest); + } + } + } + + @Nonnull + private TaskManagerSlot createAndRegisterTaskManagerSlot(SlotID slotId, ResourceProfile resourceProfile, TaskExecutorConnection taskManagerConnection) { + final TaskManagerSlot slot = new TaskManagerSlot( slotId, resourceProfile, taskManagerConnection); - slots.put(slotId, slot); + return slot; + } - updateSlot(slotId, allocationId, jobId); + @Nullable + private PendingTaskManagerSlot findExactlyMatchingPendingTaskManagerSlot(ResourceProfile resourceProfile) { + for (PendingTaskManagerSlot pendingTaskManagerSlot : pendingSlots.values()) { + if (pendingTaskManagerSlot.getResourceProfile().equals(resourceProfile)) { + return pendingTaskManagerSlot; + } + } + + return null; } /** @@ -595,7 +642,11 @@ private void updateSlotState( slot.updateAllocation(allocationId, jobId); // remove the pending request if any as it has been assigned - pendingSlotRequests.remove(allocationId); + final PendingSlotRequest actualPendingSlotRequest = pendingSlotRequests.remove(allocationId); + + if (actualPendingSlotRequest != null) { + cancelPendingSlotRequest(actualPendingSlotRequest); + } // this will try to find a new slot for the request rejectPendingSlotRequest( @@ -650,13 +701,54 @@ private void updateSlotState( * @throws ResourceManagerException if the resource manager cannot allocate more resource */ private void internalRequestSlot(PendingSlotRequest pendingSlotRequest) throws ResourceManagerException { - TaskManagerSlot taskManagerSlot = findMatchingSlot(pendingSlotRequest.getResourceProfile()); + final ResourceProfile resourceProfile = pendingSlotRequest.getResourceProfile(); + TaskManagerSlot taskManagerSlot = findMatchingSlot(resourceProfile); if (taskManagerSlot != null) { allocateSlot(taskManagerSlot, pendingSlotRequest); } else { - resourceActions.allocateResource(pendingSlotRequest.getResourceProfile()); + Optional<PendingTaskManagerSlot> pendingTaskManagerSlotOptional = findFreeMatchingPendingTaskManagerSlot(resourceProfile); + + if (!pendingTaskManagerSlotOptional.isPresent()) { + pendingTaskManagerSlotOptional = allocateResource(resourceProfile); + } + + pendingTaskManagerSlotOptional.ifPresent(pendingTaskManagerSlot -> assignPendingTaskManagerSlot(pendingSlotRequest, pendingTaskManagerSlot)); + } + } + + private Optional<PendingTaskManagerSlot> findFreeMatchingPendingTaskManagerSlot(ResourceProfile requiredResourceProfile) { + for (PendingTaskManagerSlot pendingTaskManagerSlot : pendingSlots.values()) { + if (pendingTaskManagerSlot.getAssignedPendingSlotRequest() == null && pendingTaskManagerSlot.getResourceProfile().isMatching(requiredResourceProfile)) { + return Optional.of(pendingTaskManagerSlot); + } } + + return Optional.empty(); + } + + private Optional<PendingTaskManagerSlot> allocateResource(ResourceProfile resourceProfile) throws ResourceManagerException { + final Collection<ResourceProfile> requestedSlots = resourceActions.allocateResource(resourceProfile); + + if (requestedSlots.isEmpty()) { + return Optional.empty(); + } else { + final Iterator<ResourceProfile> slotIterator = requestedSlots.iterator(); + final PendingTaskManagerSlot pendingTaskManagerSlot = new PendingTaskManagerSlot(slotIterator.next()); + pendingSlots.put(pendingTaskManagerSlot.getTaskManagerSlotId(), pendingTaskManagerSlot); + + while (slotIterator.hasNext()) { + final PendingTaskManagerSlot additionalPendingTaskManagerSlot = new PendingTaskManagerSlot(slotIterator.next()); + pendingSlots.put(additionalPendingTaskManagerSlot.getTaskManagerSlotId(), additionalPendingTaskManagerSlot); + } + + return Optional.of(pendingTaskManagerSlot); + } + } + + private void assignPendingTaskManagerSlot(PendingSlotRequest pendingSlotRequest, PendingTaskManagerSlot pendingTaskManagerSlot) { + pendingTaskManagerSlot.assignPendingSlotRequest(pendingSlotRequest); + pendingSlotRequest.assignPendingTaskManagerSlot(pendingTaskManagerSlot); } /** @@ -680,6 +772,8 @@ private void allocateSlot(TaskManagerSlot taskManagerSlot, PendingSlotRequest pe taskManagerSlot.assignPendingSlotRequest(pendingSlotRequest); pendingSlotRequest.setRequestFuture(completableFuture); + returnPendingTaskManagerSlotIfAssigned(pendingSlotRequest); + TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.get(instanceID); if (taskManagerRegistration == null) { @@ -733,6 +827,14 @@ private void allocateSlot(TaskManagerSlot taskManagerSlot, PendingSlotRequest pe mainThreadExecutor); } + private void returnPendingTaskManagerSlotIfAssigned(PendingSlotRequest pendingSlotRequest) { + final PendingTaskManagerSlot pendingTaskManagerSlot = pendingSlotRequest.getAssignedPendingTaskManagerSlot(); + if (pendingTaskManagerSlot != null) { + pendingTaskManagerSlot.unassignPendingSlotRequest(); + pendingSlotRequest.unassignPendingTaskManagerSlot(); + } + } + /** * Handles a free slot. It first tries to find a pending slot request which can be fulfilled. * If there is no such request, then it will add the slot to the set of free slots. @@ -886,6 +988,8 @@ private void rejectPendingSlotRequest(PendingSlotRequest pendingSlotRequest, Exc private void cancelPendingSlotRequest(PendingSlotRequest pendingSlotRequest) { CompletableFuture<Acknowledge> request = pendingSlotRequest.getRequestFuture(); + returnPendingTaskManagerSlotIfAssigned(pendingSlotRequest); + if (null != request) { request.cancel(false); } @@ -911,9 +1015,10 @@ private void checkTaskManagerTimeouts() { } // second we trigger the release resource callback which can decide upon the resource release + final FlinkException cause = new FlinkException("TaskExecutor exceeded the idle timeout."); for (InstanceID timedOutTaskManagerId : timedOutTaskManagerIds) { LOG.debug("Release TaskExecutor {} because it exceeded the idle timeout.", timedOutTaskManagerId); - resourceActions.releaseResource(timedOutTaskManagerId, new FlinkException("TaskExecutor exceeded the idle timeout.")); + resourceActions.releaseResource(timedOutTaskManagerId, cause); } } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerSlotId.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerSlotId.java new file mode 100644 index 00000000000..3084b3e8ed9 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerSlotId.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.resourcemanager.slotmanager; + +import org.apache.flink.runtime.clusterframework.types.TaskManagerSlot; +import org.apache.flink.util.AbstractID; + +/** + * Id of {@link TaskManagerSlot} and {@link PendingTaskManagerSlot}. + */ +public class TaskManagerSlotId extends AbstractID { + + private static final long serialVersionUID = -4024240625523472071L; + + private TaskManagerSlotId() {} + + public static TaskManagerSlotId generate() { + return new TaskManagerSlotId(); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java index 0e98e44ab0d..c46d800bc95 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java @@ -246,7 +246,7 @@ public static TaskManagerServices fromConfiguration( final List<ResourceProfile> resourceProfiles = new ArrayList<>(taskManagerServicesConfiguration.getNumberOfSlots()); for (int i = 0; i < taskManagerServicesConfiguration.getNumberOfSlots(); i++) { - resourceProfiles.add(new ResourceProfile(1.0, 42)); + resourceProfiles.add(ResourceProfile.ANY); } final TimerService<AllocationID> timerService = new TimerService<>( @@ -259,7 +259,6 @@ public static TaskManagerServices fromConfiguration( final JobLeaderService jobLeaderService = new JobLeaderService(taskManagerLocation); - final String[] stateRootDirectoryStrings = taskManagerServicesConfiguration.getLocalRecoveryStateRootDirectories(); final File[] stateRootDirectoryFiles = new File[stateRootDirectoryStrings.length]; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java index 0b56231d4a4..e8207019a32 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java @@ -32,6 +32,9 @@ import javax.annotation.Nullable; +import java.util.Collection; +import java.util.Collections; + /** * Simple {@link ResourceManager} implementation for testing purposes. */ @@ -71,8 +74,8 @@ protected void internalDeregisterApplication(ApplicationStatus finalStatus, @Nul } @Override - public void startNewWorker(ResourceProfile resourceProfile) { - // noop + public Collection<ResourceProfile> startNewWorker(ResourceProfile resourceProfile) { + return Collections.emptyList(); } @Override diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java index 8a7f733a3fa..33a696a1d24 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java @@ -47,6 +47,7 @@ import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.util.FlinkException; import org.apache.flink.util.TestLogger; +import org.apache.flink.util.function.FunctionWithException; import org.junit.Test; import org.mockito.ArgumentCaptor; @@ -54,7 +55,9 @@ import javax.annotation.Nonnull; import java.util.ArrayDeque; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -67,6 +70,7 @@ import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; @@ -84,9 +88,7 @@ import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -103,9 +105,9 @@ @Test public void testTaskManagerRegistration() throws Exception { final ResourceManagerId resourceManagerId = ResourceManagerId.generate(); - final ResourceActions resourceManagerActions = mock(ResourceActions.class); + final ResourceActions resourceManagerActions = new TestingResourceActionsBuilder().build(); - final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class); + final TaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway(); final ResourceID resourceId = ResourceID.generate(); final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(resourceId, taskExecutorGateway); @@ -135,14 +137,12 @@ public void testTaskManagerUnregistration() throws Exception { final ResourceActions resourceManagerActions = mock(ResourceActions.class); final JobID jobId = new JobID(); - final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class); - when(taskExecutorGateway.requestSlot( - any(SlotID.class), - any(JobID.class), - any(AllocationID.class), - anyString(), - eq(resourceManagerId), - any(Time.class))).thenReturn(new CompletableFuture<>()); + final TaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder() + .setRequestSlotFunction(tuple5 -> { + assertThat(tuple5.f4, is(equalTo(resourceManagerId))); + return new CompletableFuture<>(); + }) + .createTestingTaskExecutorGateway(); final ResourceID resourceId = ResourceID.generate(); final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(resourceId, taskExecutorGateway); @@ -202,13 +202,16 @@ public void testSlotRequestWithoutFreeSlots() throws Exception { resourceProfile, "localhost"); - ResourceActions resourceManagerActions = mock(ResourceActions.class); + CompletableFuture<ResourceProfile> allocateResourceFuture = new CompletableFuture<>(); + ResourceActions resourceManagerActions = new TestingResourceActionsBuilder() + .setAllocateResourceConsumer(allocateResourceFuture::complete) + .build(); try (SlotManager slotManager = createSlotManager(resourceManagerId, resourceManagerActions)) { slotManager.registerSlotRequest(slotRequest); - verify(resourceManagerActions).allocateResource(eq(resourceProfile)); + assertThat(allocateResourceFuture.get(), is(equalTo(resourceProfile))); } } @@ -225,8 +228,11 @@ public void testSlotRequestWithResourceAllocationFailure() throws Exception { resourceProfile, "localhost"); - ResourceActions resourceManagerActions = mock(ResourceActions.class); - doThrow(new ResourceManagerException("Test exception")).when(resourceManagerActions).allocateResource(any(ResourceProfile.class)); + ResourceActions resourceManagerActions = new TestingResourceActionsBuilder() + .setAllocateResourceFunction(value -> { + throw new ResourceManagerException("Test exception"); + }) + .build(); try (SlotManager slotManager = createSlotManager(resourceManagerId, resourceManagerActions)) { @@ -257,19 +263,17 @@ public void testSlotRequestWithFreeSlot() throws Exception { resourceProfile, targetAddress); - ResourceActions resourceManagerActions = mock(ResourceActions.class); + ResourceActions resourceManagerActions = new TestingResourceActionsBuilder().build(); try (SlotManager slotManager = createSlotManager(resourceManagerId, resourceManagerActions)) { - + final CompletableFuture<Tuple5<SlotID, JobID, AllocationID, String, ResourceManagerId>> requestFuture = new CompletableFuture<>(); // accept an incoming slot request - final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class); - when(taskExecutorGateway.requestSlot( - eq(slotId), - eq(jobId), - eq(allocationId), - anyString(), - eq(resourceManagerId), - any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get())); + final TaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder() + .setRequestSlotFunction(tuple5 -> { + requestFuture.complete(Tuple5.of(tuple5.f0, tuple5.f1, tuple5.f2, tuple5.f3, tuple5.f4)); + return CompletableFuture.completedFuture(Acknowledge.get()); + }) + .createTestingTaskExecutorGateway(); final TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(resourceID, taskExecutorGateway); @@ -282,7 +286,7 @@ public void testSlotRequestWithFreeSlot() throws Exception { assertTrue("The slot request should be accepted", slotManager.registerSlotRequest(slotRequest)); - verify(taskExecutorGateway).requestSlot(eq(slotId), eq(jobId), eq(allocationId), eq(targetAddress), eq(resourceManagerId), any(Time.class)); + assertThat(requestFuture.get(), is(equalTo(Tuple5.of(slotId, jobId, allocationId, targetAddress, resourceManagerId)))); TaskManagerSlot slot = slotManager.getSlot(slotId); @@ -302,14 +306,9 @@ public void testUnregisterPendingSlotRequest() throws Exception { final SlotID slotId = new SlotID(resourceID, 0); final AllocationID allocationId = new AllocationID(); - final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class); - when(taskExecutorGateway.requestSlot( - any(SlotID.class), - any(JobID.class), - any(AllocationID.class), - anyString(), - eq(resourceManagerId), - any(Time.class))).thenReturn(new CompletableFuture<>()); + final TaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder() + .setRequestSlotFunction(slotIDJobIDAllocationIDStringResourceManagerIdTuple5 -> new CompletableFuture<>()) + .createTestingTaskExecutorGateway(); final ResourceProfile resourceProfile = new ResourceProfile(1.0, 1); final SlotStatus slotStatus = new SlotStatus(slotId, resourceProfile); @@ -357,17 +356,19 @@ public void testFulfillingPendingSlotRequest() throws Exception { resourceProfile, targetAddress); - ResourceActions resourceManagerActions = mock(ResourceActions.class); + final AtomicInteger numberAllocateResourceCalls = new AtomicInteger(0); + ResourceActions resourceManagerActions = new TestingResourceActionsBuilder() + .setAllocateResourceConsumer(ignored -> numberAllocateResourceCalls.incrementAndGet()) + .build(); + final CompletableFuture<Tuple5<SlotID, JobID, AllocationID, String, ResourceManagerId>> requestFuture = new CompletableFuture<>(); // accept an incoming slot request - final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class); - when(taskExecutorGateway.requestSlot( - eq(slotId), - eq(jobId), - eq(allocationId), - anyString(), - eq(resourceManagerId), - any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get())); + final TaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder() + .setRequestSlotFunction(tuple5 -> { + requestFuture.complete(Tuple5.of(tuple5.f0, tuple5.f1, tuple5.f2, tuple5.f3, tuple5.f4)); + return CompletableFuture.completedFuture(Acknowledge.get()); + }) + .createTestingTaskExecutorGateway(); final TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(resourceID, taskExecutorGateway); @@ -378,13 +379,13 @@ public void testFulfillingPendingSlotRequest() throws Exception { assertTrue("The slot request should be accepted", slotManager.registerSlotRequest(slotRequest)); - verify(resourceManagerActions, times(1)).allocateResource(eq(resourceProfile)); + assertThat(numberAllocateResourceCalls.get(), is(1)); slotManager.registerTaskManager( taskExecutorConnection, slotReport); - verify(taskExecutorGateway).requestSlot(eq(slotId), eq(jobId), eq(allocationId), eq(targetAddress), eq(resourceManagerId), any(Time.class)); + assertThat(requestFuture.get(), is(equalTo(Tuple5.of(slotId, jobId, allocationId, targetAddress, resourceManagerId)))); TaskManagerSlot slot = slotManager.getSlot(slotId); @@ -444,7 +445,10 @@ public void testFreeSlot() throws Exception { @Test public void testDuplicatePendingSlotRequest() throws Exception { final ResourceManagerId resourceManagerId = ResourceManagerId.generate(); - final ResourceActions resourceManagerActions = mock(ResourceActions.class); + final AtomicInteger numberAllocateResourceFunctionCalls = new AtomicInteger(0); + final ResourceActions resourceManagerActions = new TestingResourceActionsBuilder() + .setAllocateResourceConsumer(resourceProfile -> numberAllocateResourceFunctionCalls.incrementAndGet()) + .build(); final AllocationID allocationId = new AllocationID(); final ResourceProfile resourceProfile1 = new ResourceProfile(1.0, 2); final ResourceProfile resourceProfile2 = new ResourceProfile(2.0, 1); @@ -458,7 +462,7 @@ public void testDuplicatePendingSlotRequest() throws Exception { // check that we have only called the resource allocation only for the first slot request, // since the second request is a duplicate - verify(resourceManagerActions, times(1)).allocateResource(any(ResourceProfile.class)); + assertThat(numberAllocateResourceFunctionCalls.get(), is(1)); } /** @@ -497,21 +501,17 @@ public void testDuplicatePendingSlotRequestAfterSlotReport() throws Exception { @Test public void testDuplicatePendingSlotRequestAfterSuccessfulAllocation() throws Exception { final ResourceManagerId resourceManagerId = ResourceManagerId.generate(); - final ResourceActions resourceManagerActions = mock(ResourceActions.class); + final AtomicInteger allocateResourceCalls = new AtomicInteger(0); + final ResourceActions resourceManagerActions = new TestingResourceActionsBuilder() + .setAllocateResourceConsumer(resourceProfile -> allocateResourceCalls.incrementAndGet()) + .build(); final AllocationID allocationId = new AllocationID(); final ResourceProfile resourceProfile1 = new ResourceProfile(1.0, 2); final ResourceProfile resourceProfile2 = new ResourceProfile(2.0, 1); final SlotRequest slotRequest1 = new SlotRequest(new JobID(), allocationId, resourceProfile1, "foobar"); final SlotRequest slotRequest2 = new SlotRequest(new JobID(), allocationId, resourceProfile2, "barfoo"); - final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class); - when(taskExecutorGateway.requestSlot( - any(SlotID.class), - any(JobID.class), - any(AllocationID.class), - anyString(), - eq(resourceManagerId), - any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get())); + final TaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway(); final ResourceID resourceID = ResourceID.generate(); @@ -534,7 +534,7 @@ public void testDuplicatePendingSlotRequestAfterSuccessfulAllocation() throws Ex // check that we have only called the resource allocation only for the first slot request, // since the second request is a duplicate - verify(resourceManagerActions, never()).allocateResource(any(ResourceProfile.class)); + assertThat(allocateResourceCalls.get(), is(0)); } /** @@ -544,21 +544,17 @@ public void testDuplicatePendingSlotRequestAfterSuccessfulAllocation() throws Ex @Test public void testAcceptingDuplicateSlotRequestAfterAllocationRelease() throws Exception { final ResourceManagerId resourceManagerId = ResourceManagerId.generate(); - final ResourceActions resourceManagerActions = mock(ResourceActions.class); + final AtomicInteger allocateResourceCalls = new AtomicInteger(0); + final ResourceActions resourceManagerActions = new TestingResourceActionsBuilder() + .setAllocateResourceConsumer(resourceProfile -> allocateResourceCalls.incrementAndGet()) + .build(); final AllocationID allocationId = new AllocationID(); final ResourceProfile resourceProfile1 = new ResourceProfile(1.0, 2); final ResourceProfile resourceProfile2 = new ResourceProfile(2.0, 1); final SlotRequest slotRequest1 = new SlotRequest(new JobID(), allocationId, resourceProfile1, "foobar"); final SlotRequest slotRequest2 = new SlotRequest(new JobID(), allocationId, resourceProfile2, "barfoo"); - final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class); - when(taskExecutorGateway.requestSlot( - any(SlotID.class), - any(JobID.class), - any(AllocationID.class), - anyString(), - eq(resourceManagerId), - any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get())); + final TaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway(); final ResourceID resourceID = ResourceID.generate(); final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(resourceID, taskExecutorGateway); @@ -588,7 +584,7 @@ public void testAcceptingDuplicateSlotRequestAfterAllocationRelease() throws Exc // check that we have only called the resource allocation only for the first slot request, // since the second request is a duplicate - verify(resourceManagerActions, never()).allocateResource(any(ResourceProfile.class)); + assertThat(allocateResourceCalls.get(), is(0)); } /** @@ -624,7 +620,7 @@ public void testReceivingUnknownSlotReport() throws Exception { @Test public void testUpdateSlotReport() throws Exception { final ResourceManagerId resourceManagerId = ResourceManagerId.generate(); - final ResourceActions resourceManagerActions = mock(ResourceActions.class); + final ResourceActions resourceManagerActions = new TestingResourceActionsBuilder().build(); final JobID jobId = new JobID(); final AllocationID allocationId = new AllocationID(); @@ -678,13 +674,16 @@ public void testUpdateSlotReport() throws Exception { */ @Test public void testTaskManagerTimeout() throws Exception { - final long tmTimeout = 500L; + final long tmTimeout = 10L; - final ResourceActions resourceManagerActions = mock(ResourceActions.class); + final CompletableFuture<InstanceID> releaseFuture = new CompletableFuture<>(); + final ResourceActions resourceManagerActions = new TestingResourceActionsBuilder() + .setReleaseResourceConsumer((instanceID, e) -> releaseFuture.complete(instanceID)) + .build(); final ResourceManagerId resourceManagerId = ResourceManagerId.generate(); final ResourceID resourceID = ResourceID.generate(); - final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class); + final TaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway(); final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(resourceID, taskExecutorGateway); final SlotID slotId = new SlotID(resourceID, 0); @@ -702,15 +701,9 @@ public void testTaskManagerTimeout() throws Exception { slotManager.start(resourceManagerId, mainThreadExecutor, resourceManagerActions); - mainThreadExecutor.execute(new Runnable() { - @Override - public void run() { - slotManager.registerTaskManager(taskManagerConnection, slotReport); - } - }); + mainThreadExecutor.execute(() -> slotManager.registerTaskManager(taskManagerConnection, slotReport)); - verify(resourceManagerActions, timeout(100L * tmTimeout).times(1)) - .releaseResource(eq(taskManagerConnection.getInstanceID()), any(Exception.class)); + assertThat(releaseFuture.get(), is(equalTo(taskManagerConnection.getInstanceID()))); } } @@ -723,7 +716,10 @@ public void run() { public void testSlotRequestTimeout() throws Exception { final long allocationTimeout = 50L; - final ResourceActions resourceManagerActions = mock(ResourceActions.class); + final CompletableFuture<Tuple2<JobID, AllocationID>> failedAllocationFuture = new CompletableFuture<>(); + final ResourceActions resourceManagerActions = new TestingResourceActionsBuilder() + .setNotifyAllocationFailureConsumer(tuple3 -> failedAllocationFuture.complete(Tuple2.of(tuple3.f0, tuple3.f1))) + .build(); final ResourceManagerId resourceManagerId = ResourceManagerId.generate(); final JobID jobId = new JobID(); final AllocationID allocationId = new AllocationID(); @@ -743,21 +739,15 @@ public void testSlotRequestTimeout() throws Exception { final AtomicReference<Exception> atomicException = new AtomicReference<>(null); - mainThreadExecutor.execute(new Runnable() { - @Override - public void run() { - try { - assertTrue(slotManager.registerSlotRequest(slotRequest)); - } catch (Exception e) { - atomicException.compareAndSet(null, e); - } + mainThreadExecutor.execute(() -> { + try { + assertTrue(slotManager.registerSlotRequest(slotRequest)); + } catch (Exception e) { + atomicException.compareAndSet(null, e); } }); - verify(resourceManagerActions, timeout(100L * allocationTimeout).times(1)).notifyAllocationFailure( - eq(jobId), - eq(allocationId), - any(TimeoutException.class)); + assertThat(failedAllocationFuture.get(), is(equalTo(Tuple2.of(jobId, allocationId)))); if (atomicException.get() != null) { throw atomicException.get(); @@ -851,7 +841,7 @@ public void testTaskManagerSlotRequestTimeoutHandling() throws Exception { public void testSlotReportWhileActiveSlotRequest() throws Exception { final long verifyTimeout = 10000L; final ResourceManagerId resourceManagerId = ResourceManagerId.generate(); - final ResourceActions resourceManagerActions = mock(ResourceActions.class); + final ResourceActions resourceManagerActions = new TestingResourceActionsBuilder().build(); final JobID jobId = new JobID(); final AllocationID allocationId = new AllocationID(); @@ -966,10 +956,12 @@ public void testSlotReportWhileActiveSlotRequest() throws Exception { @Test public void testTimeoutForUnusedTaskManager() throws Exception { final long taskManagerTimeout = 50L; - final long verifyTimeout = taskManagerTimeout * 10L; + final CompletableFuture<InstanceID> releasedResourceFuture = new CompletableFuture<>(); + final ResourceActions resourceManagerActions = new TestingResourceActionsBuilder() + .setReleaseResourceConsumer((instanceID, e) -> releasedResourceFuture.complete(instanceID)) + .build(); final ResourceManagerId resourceManagerId = ResourceManagerId.generate(); - final ResourceActions resourceManagerActions = mock(ResourceActions.class); final ScheduledExecutor scheduledExecutor = TestingUtils.defaultScheduledExecutor(); final ResourceID resourceId = ResourceID.generate(); @@ -979,14 +971,13 @@ public void testTimeoutForUnusedTaskManager() throws Exception { final ResourceProfile resourceProfile = new ResourceProfile(1.0, 1); final SlotRequest slotRequest = new SlotRequest(jobId, allocationId, resourceProfile, "foobar"); - final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class); - when(taskExecutorGateway.requestSlot( - any(SlotID.class), - eq(jobId), - eq(allocationId), - anyString(), - eq(resourceManagerId), - any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get())); + final CompletableFuture<SlotID> requestedSlotFuture = new CompletableFuture<>(); + final TaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder() + .setRequestSlotFunction(tuple5 -> { + requestedSlotFuture.complete(tuple5.f0); + return CompletableFuture.completedFuture(Acknowledge.get()); + }) + .createTestingTaskExecutorGateway(); final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(resourceId, taskExecutorGateway); @@ -1015,17 +1006,9 @@ public void testTimeoutForUnusedTaskManager() throws Exception { } }, mainThreadExecutor) - .thenAccept((Object value) -> slotManager.registerTaskManager(taskManagerConnection, initialSlotReport)); + .thenRun(() -> slotManager.registerTaskManager(taskManagerConnection, initialSlotReport)); - ArgumentCaptor<SlotID> slotIdArgumentCaptor = ArgumentCaptor.forClass(SlotID.class); - - verify(taskExecutorGateway, timeout(verifyTimeout)).requestSlot( - slotIdArgumentCaptor.capture(), - eq(jobId), - eq(allocationId), - anyString(), - eq(resourceManagerId), - any(Time.class)); + final SlotID slotId = requestedSlotFuture.get(); CompletableFuture<Boolean> idleFuture = CompletableFuture.supplyAsync( () -> slotManager.isTaskManagerIdle(taskManagerConnection.getInstanceID()), @@ -1034,8 +1017,6 @@ public void testTimeoutForUnusedTaskManager() throws Exception { // check that the TaskManager is not idle assertFalse(idleFuture.get()); - final SlotID slotId = slotIdArgumentCaptor.getValue(); - CompletableFuture<TaskManagerSlot> slotFuture = CompletableFuture.supplyAsync( () -> slotManager.getSlot(slotId), mainThreadExecutor); @@ -1052,7 +1033,7 @@ public void testTimeoutForUnusedTaskManager() throws Exception { assertTrue(idleFuture2.get()); - verify(resourceManagerActions, timeout(verifyTimeout).times(1)).releaseResource(eq(taskManagerConnection.getInstanceID()), any(Exception.class)); + assertThat(releasedResourceFuture.get(), is(equalTo(taskManagerConnection.getInstanceID()))); } } @@ -1109,7 +1090,7 @@ public void testTaskManagerTimeoutDoesNotRemoveSlots() throws Exception { @Test public void testReportAllocatedSlot() throws Exception { final ResourceID taskManagerId = ResourceID.generate(); - final ResourceActions resourceActions = mock(ResourceActions.class); + final ResourceActions resourceActions = new TestingResourceActionsBuilder().build(); final TestingTaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway(); final TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(taskManagerId, taskExecutorGateway); @@ -1167,7 +1148,7 @@ public void testReportAllocatedSlot() throws Exception { @Test public void testSlotRequestFailure() throws Exception { try (final SlotManager slotManager = createSlotManager(ResourceManagerId.generate(), - new TestingResourceActionsBuilder().createTestingResourceActions())) { + new TestingResourceActionsBuilder().build())) { final SlotRequest slotRequest = new SlotRequest(new JobID(), new AllocationID(), ResourceProfile.UNKNOWN, "foobar"); slotManager.registerSlotRequest(slotRequest); @@ -1222,7 +1203,7 @@ public void testSlotRequestFailure() throws Exception { @Test public void testSlotRequestRemovedIfTMReportAllocation() throws Exception { try (final SlotManager slotManager = createSlotManager(ResourceManagerId.generate(), - new TestingResourceActionsBuilder().createTestingResourceActions())) { + new TestingResourceActionsBuilder().build())) { final JobID jobID = new JobID(); final SlotRequest slotRequest1 = new SlotRequest(jobID, new AllocationID(), ResourceProfile.UNKNOWN, "foobar"); @@ -1293,7 +1274,7 @@ public void testNotifyFailedAllocationWhenTaskManagerTerminated() throws Excepti .setNotifyAllocationFailureConsumer( (Tuple3<JobID, AllocationID, Exception> failureMessage) -> allocationFailures.offer(Tuple2.of(failureMessage.f0, failureMessage.f1))) - .createTestingResourceActions(); + .build(); try (final SlotManager slotManager = createSlotManager( ResourceManagerId.generate(), @@ -1374,17 +1355,27 @@ public void testNotifyFailedAllocationWhenTaskManagerTerminated() throws Excepti @Nonnull private SlotReport createSlotReport(ResourceID taskExecutorResourceId, int numberSlots) { + return createSlotReport(taskExecutorResourceId, numberSlots, ResourceProfile.UNKNOWN); + } + + @Nonnull + private SlotReport createSlotReport(ResourceID taskExecutorResourceId, int numberSlots, ResourceProfile resourceProfile) { final Set<SlotStatus> slotStatusSet = new HashSet<>(numberSlots); for (int i = 0; i < numberSlots; i++) { - slotStatusSet.add(new SlotStatus(new SlotID(taskExecutorResourceId, i), ResourceProfile.UNKNOWN)); + slotStatusSet.add(new SlotStatus(new SlotID(taskExecutorResourceId, i), resourceProfile)); } return new SlotReport(slotStatusSet); } @Nonnull - private SlotRequest createSlotRequest(JobID jobId1) { - return new SlotRequest(jobId1, new AllocationID(), ResourceProfile.UNKNOWN, "foobar1"); + private SlotRequest createSlotRequest(JobID jobId) { + return createSlotRequest(jobId, ResourceProfile.UNKNOWN); + } + + @Nonnull + private SlotRequest createSlotRequest(JobID jobId, ResourceProfile resourceProfile) { + return new SlotRequest(jobId, new AllocationID(), resourceProfile, "foobar1"); } private SlotManager createSlotManager(ResourceManagerId resourceManagerId, ResourceActions resourceManagerActions) { @@ -1398,4 +1389,171 @@ private SlotManager createSlotManager(ResourceManagerId resourceManagerId, Resou return slotManager; } + + /** + * Tests that we only request new resources/containers once we have assigned + * all pending task manager slots. + */ + @Test + public void testRequestNewResources() throws Exception { + final int numberSlots = 2; + final AtomicInteger resourceRequests = new AtomicInteger(0); + final TestingResourceActions testingResourceActions = new TestingResourceActionsBuilder() + .setAllocateResourceFunction( + convert(ignored -> { + resourceRequests.incrementAndGet(); + return numberSlots; + })) + .build(); + + try (final SlotManager slotManager = createSlotManager( + ResourceManagerId.generate(), + testingResourceActions)) { + + final JobID jobId = new JobID(); + assertThat(slotManager.registerSlotRequest(createSlotRequest(jobId)), is(true)); + assertThat(resourceRequests.get(), is(1)); + + // the second slot request should not try to allocate a new resource because the + // previous resource was started with 2 slots. + assertThat(slotManager.registerSlotRequest(createSlotRequest(jobId)), is(true)); + assertThat(resourceRequests.get(), is(1)); + + assertThat(slotManager.getNumberAssignedPendingTaskManagerSlots(), is(2)); + + assertThat(slotManager.registerSlotRequest(createSlotRequest(jobId)), is(true)); + assertThat(resourceRequests.get(), is(2)); + } + } + + /** + * Tests that a failing allocation/slot request will return the pending task manager slot. + */ + @Test + public void testFailingAllocationReturnsPendingTaskManagerSlot() throws Exception { + final int numberSlots = 2; + final TestingResourceActions resourceActions = new TestingResourceActionsBuilder() + .setAllocateResourceFunction(convert(value -> numberSlots)) + .build(); + try (final SlotManager slotManager = createSlotManager(ResourceManagerId.generate(), resourceActions)) { + final JobID jobId = new JobID(); + + final SlotRequest slotRequest = createSlotRequest(jobId); + assertThat(slotManager.registerSlotRequest(slotRequest), is(true)); + + assertThat(slotManager.getNumberPendingTaskManagerSlots(), is(numberSlots)); + assertThat(slotManager.getNumberAssignedPendingTaskManagerSlots(), is(1)); + + slotManager.unregisterSlotRequest(slotRequest.getAllocationId()); + + assertThat(slotManager.getNumberPendingTaskManagerSlots(), is(numberSlots)); + assertThat(slotManager.getNumberAssignedPendingTaskManagerSlots(), is(0)); + } + } + + /** + * Tests the completion of pending task manager slots by registering a TaskExecutor. + */ + @Test + public void testPendingTaskManagerSlotCompletion() throws Exception { + final int numberSlots = 3; + final TestingResourceActions resourceActions = new TestingResourceActionsBuilder() + .setAllocateResourceFunction(convert(value -> numberSlots)) + .build(); + + try (final SlotManager slotManager = createSlotManager(ResourceManagerId.generate(), resourceActions)) { + final JobID jobId = new JobID(); + assertThat(slotManager.registerSlotRequest(createSlotRequest(jobId)), is(true)); + + assertThat(slotManager.getNumberPendingTaskManagerSlots(), is(numberSlots)); + assertThat(slotManager.getNumberAssignedPendingTaskManagerSlots(), is(1)); + assertThat(slotManager.getNumberRegisteredSlots(), is(0)); + + final TaskExecutorConnection taskExecutorConnection = createTaskExecutorConnection(); + final SlotReport slotReport = createSlotReport(taskExecutorConnection.getResourceID(), numberSlots - 1); + + slotManager.registerTaskManager(taskExecutorConnection, slotReport); + + assertThat(slotManager.getNumberRegisteredSlots(), is(numberSlots - 1)); + assertThat(slotManager.getNumberPendingTaskManagerSlots(), is(1)); + } + } + + private TaskExecutorConnection createTaskExecutorConnection() { + final TestingTaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway(); + return new TaskExecutorConnection(ResourceID.generate(), taskExecutorGateway); + } + + /** + * Tests that a different slot can fulfill a pending slot request. If the + * pending slot request has a pending task manager slot assigned, it should + * be freed. + */ + @Test + public void testRegistrationOfDifferentSlot() throws Exception { + final int numberSlots = 1; + final TestingResourceActions resourceActions = new TestingResourceActionsBuilder() + .setAllocateResourceFunction(convert(value -> numberSlots)) + .build(); + + try (final SlotManager slotManager = createSlotManager(ResourceManagerId.generate(), resourceActions)) { + final JobID jobId = new JobID(); + final ResourceProfile requestedSlotProfile = new ResourceProfile(1.0, 1); + + assertThat(slotManager.registerSlotRequest(createSlotRequest(jobId, requestedSlotProfile)), is(true)); + + assertThat(slotManager.getNumberPendingTaskManagerSlots(), is(numberSlots)); + + final int numberOfferedSlots = 1; + final TaskExecutorConnection taskExecutorConnection = createTaskExecutorConnection(); + final ResourceProfile offeredSlotProfile = new ResourceProfile(2.0, 2); + final SlotReport slotReport = createSlotReport(taskExecutorConnection.getResourceID(), numberOfferedSlots, offeredSlotProfile); + + slotManager.registerTaskManager(taskExecutorConnection, slotReport); + + assertThat(slotManager.getNumberRegisteredSlots(), is(numberOfferedSlots)); + assertThat(slotManager.getNumberPendingTaskManagerSlots(), is(numberSlots)); + assertThat(slotManager.getNumberAssignedPendingTaskManagerSlots(), is(0)); + } + } + + /** + * Tests that only free slots can fulfill/complete a pending task manager slot. + */ + @Test + public void testOnlyFreeSlotsCanFulfillPendingTaskManagerSlot() throws Exception { + final int numberSlots = 1; + final TestingResourceActions resourceActions = new TestingResourceActionsBuilder() + .setAllocateResourceFunction(convert(value -> numberSlots)) + .build(); + + try (final SlotManager slotManager = createSlotManager(ResourceManagerId.generate(), resourceActions)) { + final JobID jobId = new JobID(); + assertThat(slotManager.registerSlotRequest(createSlotRequest(jobId)), is(true)); + + final TaskExecutorConnection taskExecutorConnection = createTaskExecutorConnection(); + final SlotID slotId = new SlotID(taskExecutorConnection.getResourceID(), 0); + final SlotStatus slotStatus = new SlotStatus(slotId, ResourceProfile.UNKNOWN, jobId, new AllocationID()); + final SlotReport slotReport = new SlotReport(slotStatus); + + slotManager.registerTaskManager(taskExecutorConnection, slotReport); + + assertThat(slotManager.getNumberRegisteredSlots(), is(1)); + assertThat(slotManager.getNumberPendingTaskManagerSlots(), is(numberSlots)); + assertThat(slotManager.getNumberAssignedPendingTaskManagerSlots(), is(1)); + } + } + + private static FunctionWithException<ResourceProfile, Collection<ResourceProfile>, ResourceManagerException> convert(FunctionWithException<ResourceProfile, Integer, ResourceManagerException> function) { + return (ResourceProfile resourceProfile) -> { + final int slots = function.apply(resourceProfile); + + final ArrayList<ResourceProfile> result = new ArrayList<>(slots); + for (int i = 0; i < slots; i++) { + result.add(resourceProfile); + } + + return result; + }; + } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java index 8f6317c427f..66966cc3eff 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java @@ -19,7 +19,7 @@ package org.apache.flink.runtime.resourcemanager.slotmanager; import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.time.Time; +import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; @@ -33,13 +33,13 @@ import org.apache.flink.runtime.taskexecutor.SlotReport; import org.apache.flink.runtime.taskexecutor.SlotStatus; import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; +import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.util.ExecutorUtils; import org.apache.flink.util.TestLogger; import org.junit.AfterClass; import org.junit.Test; -import org.mockito.Mockito; import java.util.Collections; import java.util.concurrent.CompletableFuture; @@ -47,12 +47,13 @@ import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.timeout; -import static org.mockito.Mockito.verify; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; +/** + * Tests for the slot allocation protocol. + */ public class SlotProtocolTest extends TestLogger { private static final long timeout = 10000L; @@ -87,7 +88,10 @@ public void testSlotsUnavailableRequest() throws Exception { TestingUtils.infiniteTime(), TestingUtils.infiniteTime())) { - ResourceActions resourceManagerActions = mock(ResourceActions.class); + final CompletableFuture<ResourceProfile> resourceProfileFuture = new CompletableFuture<>(); + ResourceActions resourceManagerActions = new TestingResourceActionsBuilder() + .setAllocateResourceConsumer(resourceProfileFuture::complete) + .build(); slotManager.start(rmLeaderID, Executors.directExecutor(), resourceManagerActions); @@ -99,14 +103,16 @@ public void testSlotsUnavailableRequest() throws Exception { slotManager.registerSlotRequest(slotRequest); - verify(resourceManagerActions).allocateResource(eq(slotRequest.getResourceProfile())); + assertThat(resourceProfileFuture.get(), is(equalTo(slotRequest.getResourceProfile()))); // slot becomes available - TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class); - Mockito.when( - taskExecutorGateway - .requestSlot(any(SlotID.class), any(JobID.class), any(AllocationID.class), any(String.class), any(ResourceManagerId.class), any(Time.class))) - .thenReturn(mock(CompletableFuture.class)); + final CompletableFuture<Tuple3<SlotID, JobID, AllocationID>> requestFuture = new CompletableFuture<>(); + TaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder() + .setRequestSlotFunction(tuple5 -> { + requestFuture.complete(Tuple3.of(tuple5.f0, tuple5.f1, tuple5.f2)); + return new CompletableFuture<>(); + }) + .createTestingTaskExecutorGateway(); final ResourceID resourceID = ResourceID.generate(); final SlotID slotID = new SlotID(resourceID, 0); @@ -119,8 +125,7 @@ public void testSlotsUnavailableRequest() throws Exception { slotManager.registerTaskManager(new TaskExecutorConnection(resourceID, taskExecutorGateway), slotReport); // 4) Slot becomes available and TaskExecutor gets a SlotRequest - verify(taskExecutorGateway, timeout(5000L)) - .requestSlot(eq(slotID), eq(jobID), eq(allocationID), any(String.class), any(ResourceManagerId.class), any(Time.class)); + assertThat(requestFuture.get(), is(equalTo(Tuple3.of(slotID, jobID, allocationID)))); } } @@ -137,11 +142,13 @@ public void testSlotAvailableRequest() throws Exception { final ResourceManagerId rmLeaderID = ResourceManagerId.generate(); - TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class); - Mockito.when( - taskExecutorGateway - .requestSlot(any(SlotID.class), any(JobID.class), any(AllocationID.class), any(String.class), any(ResourceManagerId.class), any(Time.class))) - .thenReturn(mock(CompletableFuture.class)); + final CompletableFuture<Tuple3<SlotID, JobID, AllocationID>> requestFuture = new CompletableFuture<>(); + TaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder() + .setRequestSlotFunction(tuple5 -> { + requestFuture.complete(Tuple3.of(tuple5.f0, tuple5.f1, tuple5.f2)); + return new CompletableFuture<>(); + }) + .createTestingTaskExecutorGateway(); try (SlotManager slotManager = new SlotManager( scheduledExecutor, @@ -149,7 +156,7 @@ public void testSlotAvailableRequest() throws Exception { TestingUtils.infiniteTime(), TestingUtils.infiniteTime())) { - ResourceActions resourceManagerActions = mock(ResourceActions.class); + ResourceActions resourceManagerActions = new TestingResourceActionsBuilder().build(); slotManager.start(rmLeaderID, Executors.directExecutor(), resourceManagerActions); @@ -172,8 +179,7 @@ public void testSlotAvailableRequest() throws Exception { slotManager.registerSlotRequest(slotRequest); // a SlotRequest is routed to the TaskExecutor - verify(taskExecutorGateway, timeout(5000)) - .requestSlot(eq(slotID), eq(jobID), eq(allocationID), any(String.class), any(ResourceManagerId.class), any(Time.class)); + assertThat(requestFuture.get(), is(equalTo(Tuple3.of(slotID, jobID, allocationID)))); } } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceActions.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceActions.java index 8b7c8026f92..4c6f14c1c40 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceActions.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceActions.java @@ -23,9 +23,12 @@ import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.instance.InstanceID; +import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException; +import org.apache.flink.util.function.FunctionWithException; import javax.annotation.Nonnull; +import java.util.Collection; import java.util.function.BiConsumer; import java.util.function.Consumer; @@ -38,29 +41,28 @@ private final BiConsumer<InstanceID, Exception> releaseResourceConsumer; @Nonnull - private final Consumer<ResourceProfile> allocateResourceConsumer; + private final FunctionWithException<ResourceProfile, Collection<ResourceProfile>, ResourceManagerException> allocateResourceFunction; @Nonnull private final Consumer<Tuple3<JobID, AllocationID, Exception>> notifyAllocationFailureConsumer; public TestingResourceActions( @Nonnull BiConsumer<InstanceID, Exception> releaseResourceConsumer, - @Nonnull Consumer<ResourceProfile> allocateResourceConsumer, + @Nonnull FunctionWithException<ResourceProfile, Collection<ResourceProfile>, ResourceManagerException> allocateResourceFunction, @Nonnull Consumer<Tuple3<JobID, AllocationID, Exception>> notifyAllocationFailureConsumer) { this.releaseResourceConsumer = releaseResourceConsumer; - this.allocateResourceConsumer = allocateResourceConsumer; + this.allocateResourceFunction = allocateResourceFunction; this.notifyAllocationFailureConsumer = notifyAllocationFailureConsumer; } - @Override public void releaseResource(InstanceID instanceId, Exception cause) { releaseResourceConsumer.accept(instanceId, cause); } @Override - public void allocateResource(ResourceProfile resourceProfile) { - allocateResourceConsumer.accept(resourceProfile); + public Collection<ResourceProfile> allocateResource(ResourceProfile resourceProfile) throws ResourceManagerException { + return allocateResourceFunction.apply(resourceProfile); } @Override diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceActionsBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceActionsBuilder.java index 2c1d47e8c88..ac7afd4283e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceActionsBuilder.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceActionsBuilder.java @@ -23,7 +23,11 @@ import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.instance.InstanceID; +import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException; +import org.apache.flink.util.function.FunctionWithException; +import java.util.Collection; +import java.util.Collections; import java.util.function.BiConsumer; import java.util.function.Consumer; @@ -32,7 +36,7 @@ */ public class TestingResourceActionsBuilder { private BiConsumer<InstanceID, Exception> releaseResourceConsumer = (ignoredA, ignoredB) -> {}; - private Consumer<ResourceProfile> allocateResourceConsumer = (ignored) -> {}; + private FunctionWithException<ResourceProfile, Collection<ResourceProfile>, ResourceManagerException> allocateResourceFunction = (ignored) -> Collections.singleton(ResourceProfile.UNKNOWN); private Consumer<Tuple3<JobID, AllocationID, Exception>> notifyAllocationFailureConsumer = (ignored) -> {}; public TestingResourceActionsBuilder setReleaseResourceConsumer(BiConsumer<InstanceID, Exception> releaseResourceConsumer) { @@ -40,8 +44,16 @@ public TestingResourceActionsBuilder setReleaseResourceConsumer(BiConsumer<Insta return this; } + public TestingResourceActionsBuilder setAllocateResourceFunction(FunctionWithException<ResourceProfile, Collection<ResourceProfile>, ResourceManagerException> allocateResourceFunction) { + this.allocateResourceFunction = allocateResourceFunction; + return this; + } + public TestingResourceActionsBuilder setAllocateResourceConsumer(Consumer<ResourceProfile> allocateResourceConsumer) { - this.allocateResourceConsumer = allocateResourceConsumer; + this.allocateResourceFunction = (ResourceProfile resourceProfile) -> { + allocateResourceConsumer.accept(resourceProfile); + return Collections.singleton(ResourceProfile.UNKNOWN); + }; return this; } @@ -50,7 +62,7 @@ public TestingResourceActionsBuilder setNotifyAllocationFailureConsumer(Consumer return this; } - public TestingResourceActions createTestingResourceActions() { - return new TestingResourceActions(releaseResourceConsumer, allocateResourceConsumer, notifyAllocationFailureConsumer); + public TestingResourceActions build() { + return new TestingResourceActions(releaseResourceConsumer, allocateResourceFunction, notifyAllocationFailureConsumer); } } diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java index 956e40fe61b..2a43b8b7f1a 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java @@ -58,6 +58,7 @@ import javax.annotation.Nullable; +import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -115,6 +116,8 @@ private final Map<ResourceProfile, Integer> resourcePriorities = new HashMap<>(); + private final Collection<ResourceProfile> slotsPerWorker; + public YarnResourceManager( RpcService rpcService, String resourceManagerEndpointId, @@ -163,6 +166,8 @@ public YarnResourceManager( this.numberOfTaskSlots = flinkConfig.getInteger(TaskManagerOptions.NUM_TASK_SLOTS); this.defaultTaskManagerMemoryMB = ConfigurationUtils.getTaskManagerHeapMemory(flinkConfig).getMebiBytes(); this.defaultCpus = flinkConfig.getInteger(YarnConfigOptions.VCORES, numberOfTaskSlots); + + this.slotsPerWorker = createSlotsPerWorker(numberOfTaskSlots); } protected AMRMClientAsync<AMRMClient.ContainerRequest> createAndStartResourceManagerClient( @@ -283,7 +288,7 @@ protected void internalDeregisterApplication( } @Override - public void startNewWorker(ResourceProfile resourceProfile) { + public Collection<ResourceProfile> startNewWorker(ResourceProfile resourceProfile) { // Priority for worker containers - priorities are intra-application //TODO: set priority according to the resource allocated Priority priority = Priority.newInstance(generatePriority(resourceProfile)); @@ -291,6 +296,8 @@ public void startNewWorker(ResourceProfile resourceProfile) { int vcore = resourceProfile.getCpuCores() < 1 ? defaultCpus : (int) resourceProfile.getCpuCores(); Resource capability = Resource.newInstance(mem, vcore); requestYarnContainer(capability, priority); + + return slotsPerWorker; } @Override @@ -334,7 +341,7 @@ public void onContainersCompleted(final List<ContainerStatus> statuses) { if (yarnWorkerNode != null) { // Container completed unexpectedly ~> start a new one final Container container = yarnWorkerNode.getContainer(); - requestYarnContainer(container.getResource(), yarnWorkerNode.getContainer().getPriority()); + requestYarnContainerIfRequired(container.getResource(), yarnWorkerNode.getContainer().getPriority()); } // Eagerly close the connection with task manager. closeTaskManagerConnection(resourceId, new Exception(containerStatus.getDiagnostics())); @@ -375,7 +382,7 @@ public void onContainersAllocated(List<Container> containers) { workerNodeMap.remove(resourceId); resourceManagerClient.releaseAssignedContainer(container.getId()); // and ask for a new one - requestYarnContainer(container.getResource(), container.getPriority()); + requestYarnContainerIfRequired(container.getResource(), container.getPriority()); } } else { // return the excessive containers @@ -446,21 +453,26 @@ private FinalApplicationStatus getYarnStatus(ApplicationStatus status) { /** * Request new container if pending containers cannot satisfies pending slot requests. */ + private void requestYarnContainerIfRequired(Resource resource, Priority priority) { + int requiredTaskManagerSlots = getNumberRequiredTaskManagerSlots(); + int pendingTaskManagerSlots = numPendingContainerRequests * numberOfTaskSlots; + + if (requiredTaskManagerSlots > pendingTaskManagerSlots) { + requestYarnContainer(resource, priority); + } + } + private void requestYarnContainer(Resource resource, Priority priority) { - int pendingSlotRequests = getNumberPendingSlotRequests(); - int pendingSlotAllocation = numPendingContainerRequests * numberOfTaskSlots; - if (pendingSlotRequests > pendingSlotAllocation) { - resourceManagerClient.addContainerRequest(new AMRMClient.ContainerRequest(resource, null, null, priority)); + resourceManagerClient.addContainerRequest(new AMRMClient.ContainerRequest(resource, null, null, priority)); - // make sure we transmit the request fast and receive fast news of granted allocations - resourceManagerClient.setHeartbeatInterval(FAST_YARN_HEARTBEAT_INTERVAL_MS); + // make sure we transmit the request fast and receive fast news of granted allocations + resourceManagerClient.setHeartbeatInterval(FAST_YARN_HEARTBEAT_INTERVAL_MS); - numPendingContainerRequests++; + numPendingContainerRequests++; - log.info("Requesting new TaskExecutor container with resources {}. Number pending requests {}.", - resource, - numPendingContainerRequests); - } + log.info("Requesting new TaskExecutor container with resources {}. Number pending requests {}.", + resource, + numPendingContainerRequests); } private ContainerLaunchContext createTaskExecutorLaunchContext(Resource resource, String containerId, String host) ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Make SlotManager aware of multi slot TaskManagers > ------------------------------------------------- > > Key: FLINK-9455 > URL: https://issues.apache.org/jira/browse/FLINK-9455 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination, ResourceManager > Affects Versions: 1.5.0 > Reporter: Till Rohrmann > Assignee: Till Rohrmann > Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > The {{SlotManager}} responsible for managing all available slots of a Flink > cluster can request to start new {{TaskManagers}} if it cannot fulfill a slot > request. The started {{TaskManager}} can be started with multiple slots > configured but currently, the {{SlotManager}} thinks that it will be started > with a single slot. As a consequence, it might issue multiple requests to > start new TaskManagers even though a single one would be sufficient to > fulfill all pending slot requests. > In order to avoid requesting unnecessary resources which are freed after the > idle timeout, I suggest to make the {{SlotManager}} aware of how many slots a > {{TaskManager}} is started with. That way the SlotManager only needs to > request a new {{TaskManager}} if all of the previously started slots > (potentially not yet registered and, thus, future slots) are being assigned > to slot requests. -- This message was sent by Atlassian JIRA (v7.6.3#76005)