[ 
https://issues.apache.org/jira/browse/FLINK-9455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16631619#comment-16631619
 ] 

ASF GitHub Bot commented on FLINK-9455:
---------------------------------------

asfgit closed pull request #6734: [FLINK-9455][RM] Add support for multi task 
slot TaskExecutors
URL: https://github.com/apache/flink/pull/6734
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
index f18d0d8cfc7..d8267735209 100644
--- 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
+++ 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
@@ -24,7 +24,6 @@
 import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
 import org.apache.flink.mesos.scheduler.ConnectionMonitor;
 import org.apache.flink.mesos.scheduler.LaunchCoordinator;
-import org.apache.flink.mesos.scheduler.LaunchableTask;
 import org.apache.flink.mesos.scheduler.ReconciliationCoordinator;
 import org.apache.flink.mesos.scheduler.TaskMonitor;
 import org.apache.flink.mesos.scheduler.TaskSchedulerBuilder;
@@ -79,6 +78,7 @@
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -124,6 +124,8 @@
        @Nullable
        private final String webUiUrl;
 
+       private final Collection<ResourceProfile> slotsPerWorker;
+
        /** Mesos scheduler driver. */
        private SchedulerDriver schedulerDriver;
 
@@ -191,6 +193,9 @@ public MesosResourceManager(
                this.workersInNew = new HashMap<>(8);
                this.workersInLaunch = new HashMap<>(8);
                this.workersBeingReturned = new HashMap<>(8);
+
+               final ContaineredTaskManagerParameters 
containeredTaskManagerParameters = 
taskManagerParameters.containeredParameters();
+               this.slotsPerWorker = 
createSlotsPerWorker(containeredTaskManagerParameters.numSlots());
        }
 
        protected ActorRef createSelfActor() {
@@ -352,7 +357,7 @@ private void recoverWorkers(final 
List<MesosWorkerStore.Worker> tasksFromPreviou
                                switch(worker.state()) {
                                        case Launched:
                                                
workersInLaunch.put(extractResourceID(worker.taskID()), worker);
-                                               final LaunchableMesosWorker 
launchable = createLaunchableMesosWorker(worker.taskID(), worker.profile());
+                                               final LaunchableMesosWorker 
launchable = createLaunchableMesosWorker(worker.taskID());
                                                toAssign.add(new 
Tuple2<>(launchable.taskRequest(), worker.hostname().get()));
                                                break;
                                        case Released:
@@ -426,7 +431,7 @@ protected void internalDeregisterApplication(
        }
 
        @Override
-       public void startNewWorker(ResourceProfile resourceProfile) {
+       public Collection<ResourceProfile> startNewWorker(ResourceProfile 
resourceProfile) {
                LOG.info("Starting a new worker.");
                try {
                        // generate new workers into persistent state and 
launch associated actors
@@ -434,7 +439,7 @@ public void startNewWorker(ResourceProfile resourceProfile) 
{
                        workerStore.putWorker(worker);
                        workersInNew.put(extractResourceID(worker.taskID()), 
worker);
 
-                       LaunchableMesosWorker launchable = 
createLaunchableMesosWorker(worker.taskID(), resourceProfile);
+                       LaunchableMesosWorker launchable = 
createLaunchableMesosWorker(worker.taskID());
 
                        LOG.info("Scheduling Mesos task {} with ({} MB, {} 
cpus).",
                                launchable.taskID().getValue(), 
launchable.taskRequest().getMemory(), launchable.taskRequest().getCPUs());
@@ -443,9 +448,12 @@ public void startNewWorker(ResourceProfile 
resourceProfile) {
                        taskMonitor.tell(new 
TaskMonitor.TaskGoalStateUpdated(extractGoalState(worker)), selfActor);
 
                        // tell the launch coordinator to launch the new tasks
-                       launchCoordinator.tell(new 
LaunchCoordinator.Launch(Collections.singletonList((LaunchableTask) 
launchable)), selfActor);
+                       launchCoordinator.tell(new 
LaunchCoordinator.Launch(Collections.singletonList(launchable)), selfActor);
+
+                       return slotsPerWorker;
                } catch (Exception ex) {
                        onFatalError(new ResourceManagerException("Unable to 
request new workers.", ex));
+                       return Collections.emptyList();
                }
        }
 
@@ -691,36 +699,13 @@ public void taskTerminated(TaskMonitor.TaskTerminated 
message) {
        /**
         * Creates a launchable task for Fenzo to process.
         */
-       private LaunchableMesosWorker createLaunchableMesosWorker(Protos.TaskID 
taskID, ResourceProfile resourceProfile) {
-
-               // create the specific TM parameters from the resource profile 
and some defaults
-               MesosTaskManagerParameters params = new 
MesosTaskManagerParameters(
-                       resourceProfile.getCpuCores() < 1.0 ? 
taskManagerParameters.cpus() : resourceProfile.getCpuCores(),
-                       taskManagerParameters.gpus(),
-                       taskManagerParameters.containerType(),
-                       taskManagerParameters.containerImageName(),
-                       new ContaineredTaskManagerParameters(
-                               ResourceProfile.UNKNOWN.equals(resourceProfile) 
? taskManagerParameters.containeredParameters().taskManagerTotalMemoryMB() : 
resourceProfile.getMemoryInMB(),
-                               ResourceProfile.UNKNOWN.equals(resourceProfile) 
? taskManagerParameters.containeredParameters().taskManagerHeapSizeMB() : 
resourceProfile.getHeapMemoryInMB(),
-                               ResourceProfile.UNKNOWN.equals(resourceProfile) 
? 
taskManagerParameters.containeredParameters().taskManagerDirectMemoryLimitMB() 
: resourceProfile.getDirectMemoryInMB(),
-                               1,
-                               new 
HashMap<>(taskManagerParameters.containeredParameters().taskManagerEnv())),
-                       taskManagerParameters.containerVolumes(),
-                       taskManagerParameters.dockerParameters(),
-                       taskManagerParameters.dockerForcePullImage(),
-                       taskManagerParameters.constraints(),
-                       taskManagerParameters.command(),
-                       taskManagerParameters.bootstrapCommand(),
-                       taskManagerParameters.getTaskManagerHostname(),
-                       taskManagerParameters.uris()
-               );
-
-               LOG.debug("LaunchableMesosWorker parameters: {}", params);
+       private LaunchableMesosWorker createLaunchableMesosWorker(Protos.TaskID 
taskID) {
+               LOG.debug("LaunchableMesosWorker parameters: {}", 
taskManagerParameters);
 
                LaunchableMesosWorker launchable =
                        new LaunchableMesosWorker(
                                artifactServer,
-                               params,
+                               taskManagerParameters,
                                taskManagerContainerSpec,
                                taskID,
                                mesosConfig);
diff --git 
a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
 
b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
index e21f0fc3374..5163724ed6d 100644
--- 
a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
+++ 
b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
@@ -242,7 +242,7 @@ protected void closeTaskManagerConnection(ResourceID 
resourceID, Exception cause
                TestingMesosResourceManager resourceManager;
 
                // domain objects for test purposes
-               final ResourceProfile resourceProfile1 = new 
ResourceProfile(1.0, 1);
+               final ResourceProfile resourceProfile1 = 
ResourceProfile.UNKNOWN;
 
                Protos.FrameworkID framework1 = 
Protos.FrameworkID.newBuilder().setValue("framework1").build();
                public Protos.SlaveID slave1 = 
Protos.SlaveID.newBuilder().setValue("slave1").build();
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
index a89b9f92b17..5b133e7b624 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
@@ -50,6 +50,9 @@
 
        public static final ResourceProfile UNKNOWN = new ResourceProfile(-1.0, 
-1);
 
+       /** ResourceProfile which matches any other ResourceProfile. */
+       public static final ResourceProfile ANY = new 
ResourceProfile(Double.MAX_VALUE, Integer.MAX_VALUE, Integer.MAX_VALUE, 
Integer.MAX_VALUE, Integer.MAX_VALUE, Collections.emptyMap());
+
        // 
------------------------------------------------------------------------
 
        /** How many cpu cores are needed, use double so we can specify cpu 
like 0.1. */
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index ac1181b1d1d..7929e3170b7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -71,6 +71,7 @@
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
@@ -1019,9 +1020,10 @@ protected abstract void internalDeregisterApplication(
         * Allocates a resource using the resource profile.
         *
         * @param resourceProfile The resource description
+        * @return Collection of {@link ResourceProfile} describing the 
launched slots
         */
        @VisibleForTesting
-       public abstract void startNewWorker(ResourceProfile resourceProfile);
+       public abstract Collection<ResourceProfile> 
startNewWorker(ResourceProfile resourceProfile);
 
        /**
         * Callback when a worker was started.
@@ -1051,9 +1053,9 @@ public void releaseResource(InstanceID instanceId, 
Exception cause) {
                }
 
                @Override
-               public void allocateResource(ResourceProfile resourceProfile) 
throws ResourceManagerException {
+               public Collection<ResourceProfile> 
allocateResource(ResourceProfile resourceProfile) {
                        validateRunsInMainThread();
-                       startNewWorker(resourceProfile);
+                       return startNewWorker(resourceProfile);
                }
 
                @Override
@@ -1176,8 +1178,16 @@ public void reportPayload(ResourceID resourceID, Void 
payload) {
        //  Resource Management
        // 
------------------------------------------------------------------------
 
-       protected int getNumberPendingSlotRequests() {
-               return slotManager.getNumberPendingSlotRequests();
+       protected int getNumberRequiredTaskManagerSlots() {
+               return slotManager.getNumberPendingTaskManagerSlots();
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Helper methods
+       // 
------------------------------------------------------------------------
+
+       protected static Collection<ResourceProfile> createSlotsPerWorker(int 
numSlots) {
+               return Collections.nCopies(numSlots, ResourceProfile.ANY);
        }
 }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
index 420b89f39f2..064c2d361d2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
@@ -32,6 +32,9 @@
 
 import javax.annotation.Nullable;
 
+import java.util.Collection;
+import java.util.Collections;
+
 /**
  * A standalone implementation of the resource manager. Used when the system 
is started in
  * standalone mode (via scripts), rather than via a resource framework like 
YARN or Mesos.
@@ -74,7 +77,8 @@ protected void 
internalDeregisterApplication(ApplicationStatus finalStatus, @Nul
        }
 
        @Override
-       public void startNewWorker(ResourceProfile resourceProfile) {
+       public Collection<ResourceProfile> startNewWorker(ResourceProfile 
resourceProfile) {
+               return Collections.emptyList();
        }
 
        @Override
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/PendingSlotRequest.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/PendingSlotRequest.java
index 17cf8c7907e..a8f212fe6d4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/PendingSlotRequest.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/PendingSlotRequest.java
@@ -25,10 +25,14 @@
 import org.apache.flink.runtime.resourcemanager.SlotRequest;
 import org.apache.flink.util.Preconditions;
 
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
 import java.util.concurrent.CompletableFuture;
 
+/**
+ * Class representing a pending slot request in the {@link SlotManager}.
+ */
 public class PendingSlotRequest {
 
        private final SlotRequest slotRequest;
@@ -36,11 +40,16 @@
        @Nullable
        private CompletableFuture<Acknowledge> requestFuture;
 
+       @Nullable
+       private PendingTaskManagerSlot pendingTaskManagerSlot;
+
        /** Timestamp when this pending slot request has been created. */
        private final long creationTimestamp;
 
        public PendingSlotRequest(SlotRequest slotRequest) {
                this.slotRequest = Preconditions.checkNotNull(slotRequest);
+               this.requestFuture = null;
+               this.pendingTaskManagerSlot = null;
                creationTimestamp = System.currentTimeMillis();
        }
 
@@ -78,4 +87,18 @@ public void setRequestFuture(@Nullable 
CompletableFuture<Acknowledge> requestFut
        public CompletableFuture<Acknowledge> getRequestFuture() {
                return requestFuture;
        }
+
+       @Nullable
+       public PendingTaskManagerSlot getAssignedPendingTaskManagerSlot() {
+               return pendingTaskManagerSlot;
+       }
+
+       public void assignPendingTaskManagerSlot(@Nonnull 
PendingTaskManagerSlot pendingTaskManagerSlotToAssign) {
+               Preconditions.checkState(pendingTaskManagerSlot == null);
+               this.pendingTaskManagerSlot = pendingTaskManagerSlotToAssign;
+       }
+
+       public void unassignPendingTaskManagerSlot() {
+               this.pendingTaskManagerSlot = null;
+       }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/PendingTaskManagerSlot.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/PendingTaskManagerSlot.java
new file mode 100644
index 00000000000..ed207e963e3
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/PendingTaskManagerSlot.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+/**
+ * Represents a pending task manager slot in the {@link SlotManager}.
+ */
+public class PendingTaskManagerSlot {
+
+       private final TaskManagerSlotId taskManagerSlotId = 
TaskManagerSlotId.generate();
+
+       private final ResourceProfile resourceProfile;
+
+       @Nullable
+       private PendingSlotRequest pendingSlotRequest;
+
+       public PendingTaskManagerSlot(ResourceProfile resourceProfile) {
+               this.resourceProfile = resourceProfile;
+       }
+
+       public TaskManagerSlotId getTaskManagerSlotId() {
+               return taskManagerSlotId;
+       }
+
+       public ResourceProfile getResourceProfile() {
+               return resourceProfile;
+       }
+
+       public void assignPendingSlotRequest(@Nonnull PendingSlotRequest 
pendingSlotRequestToAssign) {
+               Preconditions.checkState(pendingSlotRequest == null);
+               pendingSlotRequest = pendingSlotRequestToAssign;
+       }
+
+       public void unassignPendingSlotRequest() {
+               pendingSlotRequest = null;
+       }
+
+       @Nullable
+       public PendingSlotRequest getAssignedPendingSlotRequest() {
+               return pendingSlotRequest;
+       }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceActions.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceActions.java
index 84e7c4e785d..adf8f13db57 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceActions.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceActions.java
@@ -24,6 +24,8 @@
 import org.apache.flink.runtime.instance.InstanceID;
 import 
org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
 
+import java.util.Collection;
+
 /**
  * Resource related actions which the {@link SlotManager} can perform.
  */
@@ -41,9 +43,10 @@
         * Requests to allocate a resource with the given {@link 
ResourceProfile}.
         *
         * @param resourceProfile for the to be allocated resource
+        * @return Collection of {@link ResourceProfile} describing the 
allocated slots
         * @throws ResourceManagerException if the resource cannot be allocated
         */
-       void allocateResource(ResourceProfile resourceProfile) throws 
ResourceManagerException;
+       Collection<ResourceProfile> allocateResource(ResourceProfile 
resourceProfile) throws ResourceManagerException;
 
        /**
         * Notifies that an allocation failure has occurred.
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
index bab56609a1b..2ef2b2fefcc 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
@@ -43,14 +43,17 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
@@ -99,6 +102,8 @@
        /** Map of pending/unfulfilled slot allocation requests. */
        private final HashMap<AllocationID, PendingSlotRequest> 
pendingSlotRequests;
 
+       private final HashMap<TaskManagerSlotId, PendingTaskManagerSlot> 
pendingSlots;
+
        /** ResourceManager's id. */
        private ResourceManagerId resourceManagerId;
 
@@ -130,6 +135,7 @@ public SlotManager(
                taskManagerRegistrations = new HashMap<>(4);
                fulfilledSlotRequests = new HashMap<>(16);
                pendingSlotRequests = new HashMap<>(16);
+               pendingSlots = new HashMap<>(16);
 
                resourceManagerId = null;
                resourceActions = null;
@@ -168,8 +174,13 @@ public int getNumberFreeSlotsOf(InstanceID instanceId) {
                }
        }
 
-       public int getNumberPendingSlotRequests() {
-               return pendingSlotRequests.size();
+       public int getNumberPendingTaskManagerSlots() {
+               return pendingSlots.size();
+       }
+
+       @VisibleForTesting
+       int getNumberAssignedPendingTaskManagerSlots() {
+               return (int) pendingSlots.values().stream().filter(slot -> 
slot.getAssignedPendingSlotRequest() != null).count();
        }
 
        // 
---------------------------------------------------------------------------------------------
@@ -530,14 +541,50 @@ private void registerSlot(
                        removeSlot(slotId);
                }
 
-               TaskManagerSlot slot = new TaskManagerSlot(
+               final TaskManagerSlot slot = 
createAndRegisterTaskManagerSlot(slotId, resourceProfile, 
taskManagerConnection);
+
+               final PendingTaskManagerSlot pendingTaskManagerSlot;
+
+               if (allocationId == null) {
+                       pendingTaskManagerSlot = 
findExactlyMatchingPendingTaskManagerSlot(resourceProfile);
+               } else {
+                       pendingTaskManagerSlot = null;
+               }
+
+               if (pendingTaskManagerSlot == null) {
+                       updateSlot(slotId, allocationId, jobId);
+               } else {
+                       
pendingSlots.remove(pendingTaskManagerSlot.getTaskManagerSlotId());
+                       final PendingSlotRequest assignedPendingSlotRequest = 
pendingTaskManagerSlot.getAssignedPendingSlotRequest();
+
+                       if (assignedPendingSlotRequest == null) {
+                               handleFreeSlot(slot);
+                       } else {
+                               
assignedPendingSlotRequest.unassignPendingTaskManagerSlot();
+                               allocateSlot(slot, assignedPendingSlotRequest);
+                       }
+               }
+       }
+
+       @Nonnull
+       private TaskManagerSlot createAndRegisterTaskManagerSlot(SlotID slotId, 
ResourceProfile resourceProfile, TaskExecutorConnection taskManagerConnection) {
+               final TaskManagerSlot slot = new TaskManagerSlot(
                        slotId,
                        resourceProfile,
                        taskManagerConnection);
-
                slots.put(slotId, slot);
+               return slot;
+       }
 
-               updateSlot(slotId, allocationId, jobId);
+       @Nullable
+       private PendingTaskManagerSlot 
findExactlyMatchingPendingTaskManagerSlot(ResourceProfile resourceProfile) {
+               for (PendingTaskManagerSlot pendingTaskManagerSlot : 
pendingSlots.values()) {
+                       if 
(pendingTaskManagerSlot.getResourceProfile().equals(resourceProfile)) {
+                               return pendingTaskManagerSlot;
+                       }
+               }
+
+               return null;
        }
 
        /**
@@ -595,7 +642,11 @@ private void updateSlotState(
                                                
slot.updateAllocation(allocationId, jobId);
 
                                                // remove the pending request 
if any as it has been assigned
-                                               
pendingSlotRequests.remove(allocationId);
+                                               final PendingSlotRequest 
actualPendingSlotRequest = pendingSlotRequests.remove(allocationId);
+
+                                               if (actualPendingSlotRequest != 
null) {
+                                                       
cancelPendingSlotRequest(actualPendingSlotRequest);
+                                               }
 
                                                // this will try to find a new 
slot for the request
                                                rejectPendingSlotRequest(
@@ -650,13 +701,54 @@ private void updateSlotState(
         * @throws ResourceManagerException if the resource manager cannot 
allocate more resource
         */
        private void internalRequestSlot(PendingSlotRequest pendingSlotRequest) 
throws ResourceManagerException {
-               TaskManagerSlot taskManagerSlot = 
findMatchingSlot(pendingSlotRequest.getResourceProfile());
+               final ResourceProfile resourceProfile = 
pendingSlotRequest.getResourceProfile();
+               TaskManagerSlot taskManagerSlot = 
findMatchingSlot(resourceProfile);
 
                if (taskManagerSlot != null) {
                        allocateSlot(taskManagerSlot, pendingSlotRequest);
                } else {
-                       
resourceActions.allocateResource(pendingSlotRequest.getResourceProfile());
+                       Optional<PendingTaskManagerSlot> 
pendingTaskManagerSlotOptional = 
findFreeMatchingPendingTaskManagerSlot(resourceProfile);
+
+                       if (!pendingTaskManagerSlotOptional.isPresent()) {
+                               pendingTaskManagerSlotOptional = 
allocateResource(resourceProfile);
+                       }
+
+                       
pendingTaskManagerSlotOptional.ifPresent(pendingTaskManagerSlot -> 
assignPendingTaskManagerSlot(pendingSlotRequest, pendingTaskManagerSlot));
+               }
+       }
+
+       private Optional<PendingTaskManagerSlot> 
findFreeMatchingPendingTaskManagerSlot(ResourceProfile requiredResourceProfile) 
{
+               for (PendingTaskManagerSlot pendingTaskManagerSlot : 
pendingSlots.values()) {
+                       if 
(pendingTaskManagerSlot.getAssignedPendingSlotRequest() == null && 
pendingTaskManagerSlot.getResourceProfile().isMatching(requiredResourceProfile))
 {
+                               return Optional.of(pendingTaskManagerSlot);
+                       }
                }
+
+               return Optional.empty();
+       }
+
+       private Optional<PendingTaskManagerSlot> 
allocateResource(ResourceProfile resourceProfile) throws 
ResourceManagerException {
+               final Collection<ResourceProfile> requestedSlots = 
resourceActions.allocateResource(resourceProfile);
+
+               if (requestedSlots.isEmpty()) {
+                       return Optional.empty();
+               } else {
+                       final Iterator<ResourceProfile> slotIterator = 
requestedSlots.iterator();
+                       final PendingTaskManagerSlot pendingTaskManagerSlot = 
new PendingTaskManagerSlot(slotIterator.next());
+                       
pendingSlots.put(pendingTaskManagerSlot.getTaskManagerSlotId(), 
pendingTaskManagerSlot);
+
+                       while (slotIterator.hasNext()) {
+                               final PendingTaskManagerSlot 
additionalPendingTaskManagerSlot = new 
PendingTaskManagerSlot(slotIterator.next());
+                               
pendingSlots.put(additionalPendingTaskManagerSlot.getTaskManagerSlotId(), 
additionalPendingTaskManagerSlot);
+                       }
+
+                       return Optional.of(pendingTaskManagerSlot);
+               }
+       }
+
+       private void assignPendingTaskManagerSlot(PendingSlotRequest 
pendingSlotRequest, PendingTaskManagerSlot pendingTaskManagerSlot) {
+               
pendingTaskManagerSlot.assignPendingSlotRequest(pendingSlotRequest);
+               
pendingSlotRequest.assignPendingTaskManagerSlot(pendingTaskManagerSlot);
        }
 
        /**
@@ -680,6 +772,8 @@ private void allocateSlot(TaskManagerSlot taskManagerSlot, 
PendingSlotRequest pe
                taskManagerSlot.assignPendingSlotRequest(pendingSlotRequest);
                pendingSlotRequest.setRequestFuture(completableFuture);
 
+               returnPendingTaskManagerSlotIfAssigned(pendingSlotRequest);
+
                TaskManagerRegistration taskManagerRegistration = 
taskManagerRegistrations.get(instanceID);
 
                if (taskManagerRegistration == null) {
@@ -733,6 +827,14 @@ private void allocateSlot(TaskManagerSlot taskManagerSlot, 
PendingSlotRequest pe
                        mainThreadExecutor);
        }
 
+       private void returnPendingTaskManagerSlotIfAssigned(PendingSlotRequest 
pendingSlotRequest) {
+               final PendingTaskManagerSlot pendingTaskManagerSlot = 
pendingSlotRequest.getAssignedPendingTaskManagerSlot();
+               if (pendingTaskManagerSlot != null) {
+                       pendingTaskManagerSlot.unassignPendingSlotRequest();
+                       pendingSlotRequest.unassignPendingTaskManagerSlot();
+               }
+       }
+
        /**
         * Handles a free slot. It first tries to find a pending slot request 
which can be fulfilled.
         * If there is no such request, then it will add the slot to the set of 
free slots.
@@ -886,6 +988,8 @@ private void rejectPendingSlotRequest(PendingSlotRequest 
pendingSlotRequest, Exc
        private void cancelPendingSlotRequest(PendingSlotRequest 
pendingSlotRequest) {
                CompletableFuture<Acknowledge> request = 
pendingSlotRequest.getRequestFuture();
 
+               returnPendingTaskManagerSlotIfAssigned(pendingSlotRequest);
+
                if (null != request) {
                        request.cancel(false);
                }
@@ -911,9 +1015,10 @@ private void checkTaskManagerTimeouts() {
                        }
 
                        // second we trigger the release resource callback 
which can decide upon the resource release
+                       final FlinkException cause = new 
FlinkException("TaskExecutor exceeded the idle timeout.");
                        for (InstanceID timedOutTaskManagerId : 
timedOutTaskManagerIds) {
                                LOG.debug("Release TaskExecutor {} because it 
exceeded the idle timeout.", timedOutTaskManagerId);
-                               
resourceActions.releaseResource(timedOutTaskManagerId, new 
FlinkException("TaskExecutor exceeded the idle timeout."));
+                               
resourceActions.releaseResource(timedOutTaskManagerId, cause);
                        }
                }
        }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerSlotId.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerSlotId.java
new file mode 100644
index 00000000000..3084b3e8ed9
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerSlotId.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.runtime.clusterframework.types.TaskManagerSlot;
+import org.apache.flink.util.AbstractID;
+
+/**
+ * Id of {@link TaskManagerSlot} and {@link PendingTaskManagerSlot}.
+ */
+public class TaskManagerSlotId extends AbstractID {
+
+       private static final long serialVersionUID = -4024240625523472071L;
+
+       private TaskManagerSlotId() {}
+
+       public static TaskManagerSlotId generate() {
+               return new TaskManagerSlotId();
+       }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
index 0e98e44ab0d..c46d800bc95 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
@@ -246,7 +246,7 @@ public static TaskManagerServices fromConfiguration(
                final List<ResourceProfile> resourceProfiles = new 
ArrayList<>(taskManagerServicesConfiguration.getNumberOfSlots());
 
                for (int i = 0; i < 
taskManagerServicesConfiguration.getNumberOfSlots(); i++) {
-                       resourceProfiles.add(new ResourceProfile(1.0, 42));
+                       resourceProfiles.add(ResourceProfile.ANY);
                }
 
                final TimerService<AllocationID> timerService = new 
TimerService<>(
@@ -259,7 +259,6 @@ public static TaskManagerServices fromConfiguration(
 
                final JobLeaderService jobLeaderService = new 
JobLeaderService(taskManagerLocation);
 
-
                final String[] stateRootDirectoryStrings = 
taskManagerServicesConfiguration.getLocalRecoveryStateRootDirectories();
 
                final File[] stateRootDirectoryFiles = new 
File[stateRootDirectoryStrings.length];
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java
index 0b56231d4a4..e8207019a32 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java
@@ -32,6 +32,9 @@
 
 import javax.annotation.Nullable;
 
+import java.util.Collection;
+import java.util.Collections;
+
 /**
  * Simple {@link ResourceManager} implementation for testing purposes.
  */
@@ -71,8 +74,8 @@ protected void 
internalDeregisterApplication(ApplicationStatus finalStatus, @Nul
        }
 
        @Override
-       public void startNewWorker(ResourceProfile resourceProfile) {
-               // noop
+       public Collection<ResourceProfile> startNewWorker(ResourceProfile 
resourceProfile) {
+               return Collections.emptyList();
        }
 
        @Override
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
index 8a7f733a3fa..33a696a1d24 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
@@ -47,6 +47,7 @@
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.function.FunctionWithException;
 
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
@@ -54,7 +55,9 @@
 import javax.annotation.Nonnull;
 
 import java.util.ArrayDeque;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -67,6 +70,7 @@
 import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 
@@ -84,9 +88,7 @@
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.timeout;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -103,9 +105,9 @@
        @Test
        public void testTaskManagerRegistration() throws Exception {
                final ResourceManagerId resourceManagerId = 
ResourceManagerId.generate();
-               final ResourceActions resourceManagerActions = 
mock(ResourceActions.class);
+               final ResourceActions resourceManagerActions = new 
TestingResourceActionsBuilder().build();
 
-               final TaskExecutorGateway taskExecutorGateway = 
mock(TaskExecutorGateway.class);
+               final TaskExecutorGateway taskExecutorGateway = new 
TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway();
                final ResourceID resourceId = ResourceID.generate();
                final TaskExecutorConnection taskManagerConnection = new 
TaskExecutorConnection(resourceId, taskExecutorGateway);
 
@@ -135,14 +137,12 @@ public void testTaskManagerUnregistration() throws 
Exception {
                final ResourceActions resourceManagerActions = 
mock(ResourceActions.class);
                final JobID jobId = new JobID();
 
-               final TaskExecutorGateway taskExecutorGateway = 
mock(TaskExecutorGateway.class);
-               when(taskExecutorGateway.requestSlot(
-                       any(SlotID.class),
-                       any(JobID.class),
-                       any(AllocationID.class),
-                       anyString(),
-                       eq(resourceManagerId),
-                       any(Time.class))).thenReturn(new CompletableFuture<>());
+               final TaskExecutorGateway taskExecutorGateway = new 
TestingTaskExecutorGatewayBuilder()
+                       .setRequestSlotFunction(tuple5 -> {
+                               assertThat(tuple5.f4, 
is(equalTo(resourceManagerId)));
+                               return new CompletableFuture<>();
+                       })
+                       .createTestingTaskExecutorGateway();
 
                final ResourceID resourceId = ResourceID.generate();
                final TaskExecutorConnection taskManagerConnection = new 
TaskExecutorConnection(resourceId, taskExecutorGateway);
@@ -202,13 +202,16 @@ public void testSlotRequestWithoutFreeSlots() throws 
Exception {
                        resourceProfile,
                        "localhost");
 
-               ResourceActions resourceManagerActions = 
mock(ResourceActions.class);
+               CompletableFuture<ResourceProfile> allocateResourceFuture = new 
CompletableFuture<>();
+               ResourceActions resourceManagerActions = new 
TestingResourceActionsBuilder()
+                       
.setAllocateResourceConsumer(allocateResourceFuture::complete)
+                       .build();
 
                try (SlotManager slotManager = 
createSlotManager(resourceManagerId, resourceManagerActions)) {
 
                        slotManager.registerSlotRequest(slotRequest);
 
-                       
verify(resourceManagerActions).allocateResource(eq(resourceProfile));
+                       assertThat(allocateResourceFuture.get(), 
is(equalTo(resourceProfile)));
                }
        }
 
@@ -225,8 +228,11 @@ public void testSlotRequestWithResourceAllocationFailure() 
throws Exception {
                        resourceProfile,
                        "localhost");
 
-               ResourceActions resourceManagerActions = 
mock(ResourceActions.class);
-               doThrow(new ResourceManagerException("Test 
exception")).when(resourceManagerActions).allocateResource(any(ResourceProfile.class));
+               ResourceActions resourceManagerActions = new 
TestingResourceActionsBuilder()
+                       .setAllocateResourceFunction(value -> {
+                               throw new ResourceManagerException("Test 
exception");
+                       })
+                       .build();
 
                try (SlotManager slotManager = 
createSlotManager(resourceManagerId, resourceManagerActions)) {
 
@@ -257,19 +263,17 @@ public void testSlotRequestWithFreeSlot() throws 
Exception {
                        resourceProfile,
                        targetAddress);
 
-               ResourceActions resourceManagerActions = 
mock(ResourceActions.class);
+               ResourceActions resourceManagerActions = new 
TestingResourceActionsBuilder().build();
 
                try (SlotManager slotManager = 
createSlotManager(resourceManagerId, resourceManagerActions)) {
-
+                       final CompletableFuture<Tuple5<SlotID, JobID, 
AllocationID, String, ResourceManagerId>> requestFuture = new 
CompletableFuture<>();
                        // accept an incoming slot request
-                       final TaskExecutorGateway taskExecutorGateway = 
mock(TaskExecutorGateway.class);
-                       when(taskExecutorGateway.requestSlot(
-                               eq(slotId),
-                               eq(jobId),
-                               eq(allocationId),
-                               anyString(),
-                               eq(resourceManagerId),
-                               
any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
+                       final TaskExecutorGateway taskExecutorGateway = new 
TestingTaskExecutorGatewayBuilder()
+                               .setRequestSlotFunction(tuple5 -> {
+                                       
requestFuture.complete(Tuple5.of(tuple5.f0, tuple5.f1, tuple5.f2, tuple5.f3, 
tuple5.f4));
+                                       return 
CompletableFuture.completedFuture(Acknowledge.get());
+                               })
+                               .createTestingTaskExecutorGateway();
 
                        final TaskExecutorConnection taskExecutorConnection = 
new TaskExecutorConnection(resourceID, taskExecutorGateway);
 
@@ -282,7 +286,7 @@ public void testSlotRequestWithFreeSlot() throws Exception {
 
                        assertTrue("The slot request should be accepted", 
slotManager.registerSlotRequest(slotRequest));
 
-                       verify(taskExecutorGateway).requestSlot(eq(slotId), 
eq(jobId), eq(allocationId), eq(targetAddress), eq(resourceManagerId), 
any(Time.class));
+                       assertThat(requestFuture.get(), 
is(equalTo(Tuple5.of(slotId, jobId, allocationId, targetAddress, 
resourceManagerId))));
 
                        TaskManagerSlot slot = slotManager.getSlot(slotId);
 
@@ -302,14 +306,9 @@ public void testUnregisterPendingSlotRequest() throws 
Exception {
                final SlotID slotId = new SlotID(resourceID, 0);
                final AllocationID allocationId = new AllocationID();
 
-               final TaskExecutorGateway taskExecutorGateway = 
mock(TaskExecutorGateway.class);
-               when(taskExecutorGateway.requestSlot(
-                       any(SlotID.class),
-                       any(JobID.class),
-                       any(AllocationID.class),
-                       anyString(),
-                       eq(resourceManagerId),
-                       any(Time.class))).thenReturn(new CompletableFuture<>());
+               final TaskExecutorGateway taskExecutorGateway = new 
TestingTaskExecutorGatewayBuilder()
+                       
.setRequestSlotFunction(slotIDJobIDAllocationIDStringResourceManagerIdTuple5 -> 
new CompletableFuture<>())
+                       .createTestingTaskExecutorGateway();
 
                final ResourceProfile resourceProfile = new 
ResourceProfile(1.0, 1);
                final SlotStatus slotStatus = new SlotStatus(slotId, 
resourceProfile);
@@ -357,17 +356,19 @@ public void testFulfillingPendingSlotRequest() throws 
Exception {
                        resourceProfile,
                        targetAddress);
 
-               ResourceActions resourceManagerActions = 
mock(ResourceActions.class);
+               final AtomicInteger numberAllocateResourceCalls = new 
AtomicInteger(0);
+               ResourceActions resourceManagerActions = new 
TestingResourceActionsBuilder()
+                       .setAllocateResourceConsumer(ignored -> 
numberAllocateResourceCalls.incrementAndGet())
+                       .build();
 
+               final CompletableFuture<Tuple5<SlotID, JobID, AllocationID, 
String, ResourceManagerId>> requestFuture = new CompletableFuture<>();
                // accept an incoming slot request
-               final TaskExecutorGateway taskExecutorGateway = 
mock(TaskExecutorGateway.class);
-               when(taskExecutorGateway.requestSlot(
-                       eq(slotId),
-                       eq(jobId),
-                       eq(allocationId),
-                       anyString(),
-                       eq(resourceManagerId),
-                       
any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
+               final TaskExecutorGateway taskExecutorGateway = new 
TestingTaskExecutorGatewayBuilder()
+                       .setRequestSlotFunction(tuple5 -> {
+                               requestFuture.complete(Tuple5.of(tuple5.f0, 
tuple5.f1, tuple5.f2, tuple5.f3, tuple5.f4));
+                               return 
CompletableFuture.completedFuture(Acknowledge.get());
+                       })
+                       .createTestingTaskExecutorGateway();
 
                final TaskExecutorConnection taskExecutorConnection = new 
TaskExecutorConnection(resourceID, taskExecutorGateway);
 
@@ -378,13 +379,13 @@ public void testFulfillingPendingSlotRequest() throws 
Exception {
 
                        assertTrue("The slot request should be accepted", 
slotManager.registerSlotRequest(slotRequest));
 
-                       verify(resourceManagerActions, 
times(1)).allocateResource(eq(resourceProfile));
+                       assertThat(numberAllocateResourceCalls.get(), is(1));
 
                        slotManager.registerTaskManager(
                                taskExecutorConnection,
                                slotReport);
 
-                       verify(taskExecutorGateway).requestSlot(eq(slotId), 
eq(jobId), eq(allocationId), eq(targetAddress), eq(resourceManagerId), 
any(Time.class));
+                       assertThat(requestFuture.get(), 
is(equalTo(Tuple5.of(slotId, jobId, allocationId, targetAddress, 
resourceManagerId))));
 
                        TaskManagerSlot slot = slotManager.getSlot(slotId);
 
@@ -444,7 +445,10 @@ public void testFreeSlot() throws Exception {
        @Test
        public void testDuplicatePendingSlotRequest() throws Exception {
                final ResourceManagerId resourceManagerId = 
ResourceManagerId.generate();
-               final ResourceActions resourceManagerActions = 
mock(ResourceActions.class);
+               final AtomicInteger numberAllocateResourceFunctionCalls = new 
AtomicInteger(0);
+               final ResourceActions resourceManagerActions = new 
TestingResourceActionsBuilder()
+                       .setAllocateResourceConsumer(resourceProfile -> 
numberAllocateResourceFunctionCalls.incrementAndGet())
+                       .build();
                final AllocationID allocationId = new AllocationID();
                final ResourceProfile resourceProfile1 = new 
ResourceProfile(1.0, 2);
                final ResourceProfile resourceProfile2 = new 
ResourceProfile(2.0, 1);
@@ -458,7 +462,7 @@ public void testDuplicatePendingSlotRequest() throws 
Exception {
 
                // check that we have only called the resource allocation only 
for the first slot request,
                // since the second request is a duplicate
-               verify(resourceManagerActions, 
times(1)).allocateResource(any(ResourceProfile.class));
+               assertThat(numberAllocateResourceFunctionCalls.get(), is(1));
        }
 
        /**
@@ -497,21 +501,17 @@ public void 
testDuplicatePendingSlotRequestAfterSlotReport() throws Exception {
        @Test
        public void testDuplicatePendingSlotRequestAfterSuccessfulAllocation() 
throws Exception {
                final ResourceManagerId resourceManagerId = 
ResourceManagerId.generate();
-               final ResourceActions resourceManagerActions = 
mock(ResourceActions.class);
+               final AtomicInteger allocateResourceCalls = new 
AtomicInteger(0);
+               final ResourceActions resourceManagerActions = new 
TestingResourceActionsBuilder()
+                       .setAllocateResourceConsumer(resourceProfile -> 
allocateResourceCalls.incrementAndGet())
+                       .build();
                final AllocationID allocationId = new AllocationID();
                final ResourceProfile resourceProfile1 = new 
ResourceProfile(1.0, 2);
                final ResourceProfile resourceProfile2 = new 
ResourceProfile(2.0, 1);
                final SlotRequest slotRequest1 = new SlotRequest(new JobID(), 
allocationId, resourceProfile1, "foobar");
                final SlotRequest slotRequest2 = new SlotRequest(new JobID(), 
allocationId, resourceProfile2, "barfoo");
 
-               final TaskExecutorGateway taskExecutorGateway = 
mock(TaskExecutorGateway.class);
-               when(taskExecutorGateway.requestSlot(
-                       any(SlotID.class),
-                       any(JobID.class),
-                       any(AllocationID.class),
-                       anyString(),
-                       eq(resourceManagerId),
-                       
any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
+               final TaskExecutorGateway taskExecutorGateway = new 
TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway();
 
                final ResourceID resourceID = ResourceID.generate();
 
@@ -534,7 +534,7 @@ public void 
testDuplicatePendingSlotRequestAfterSuccessfulAllocation() throws Ex
 
                // check that we have only called the resource allocation only 
for the first slot request,
                // since the second request is a duplicate
-               verify(resourceManagerActions, 
never()).allocateResource(any(ResourceProfile.class));
+               assertThat(allocateResourceCalls.get(), is(0));
        }
 
        /**
@@ -544,21 +544,17 @@ public void 
testDuplicatePendingSlotRequestAfterSuccessfulAllocation() throws Ex
        @Test
        public void testAcceptingDuplicateSlotRequestAfterAllocationRelease() 
throws Exception {
                final ResourceManagerId resourceManagerId = 
ResourceManagerId.generate();
-               final ResourceActions resourceManagerActions = 
mock(ResourceActions.class);
+               final AtomicInteger allocateResourceCalls = new 
AtomicInteger(0);
+               final ResourceActions resourceManagerActions = new 
TestingResourceActionsBuilder()
+                       .setAllocateResourceConsumer(resourceProfile -> 
allocateResourceCalls.incrementAndGet())
+                       .build();
                final AllocationID allocationId = new AllocationID();
                final ResourceProfile resourceProfile1 = new 
ResourceProfile(1.0, 2);
                final ResourceProfile resourceProfile2 = new 
ResourceProfile(2.0, 1);
                final SlotRequest slotRequest1 = new SlotRequest(new JobID(), 
allocationId, resourceProfile1, "foobar");
                final SlotRequest slotRequest2 = new SlotRequest(new JobID(), 
allocationId, resourceProfile2, "barfoo");
 
-               final TaskExecutorGateway taskExecutorGateway = 
mock(TaskExecutorGateway.class);
-               when(taskExecutorGateway.requestSlot(
-                       any(SlotID.class),
-                       any(JobID.class),
-                       any(AllocationID.class),
-                       anyString(),
-                       eq(resourceManagerId),
-                       
any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
+               final TaskExecutorGateway taskExecutorGateway = new 
TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway();
 
                final ResourceID resourceID = ResourceID.generate();
                final TaskExecutorConnection taskManagerConnection = new 
TaskExecutorConnection(resourceID, taskExecutorGateway);
@@ -588,7 +584,7 @@ public void 
testAcceptingDuplicateSlotRequestAfterAllocationRelease() throws Exc
 
                // check that we have only called the resource allocation only 
for the first slot request,
                // since the second request is a duplicate
-               verify(resourceManagerActions, 
never()).allocateResource(any(ResourceProfile.class));
+               assertThat(allocateResourceCalls.get(), is(0));
        }
 
        /**
@@ -624,7 +620,7 @@ public void testReceivingUnknownSlotReport() throws 
Exception {
        @Test
        public void testUpdateSlotReport() throws Exception {
                final ResourceManagerId resourceManagerId = 
ResourceManagerId.generate();
-               final ResourceActions resourceManagerActions = 
mock(ResourceActions.class);
+               final ResourceActions resourceManagerActions = new 
TestingResourceActionsBuilder().build();
 
                final JobID jobId = new JobID();
                final AllocationID allocationId = new AllocationID();
@@ -678,13 +674,16 @@ public void testUpdateSlotReport() throws Exception {
         */
        @Test
        public void testTaskManagerTimeout() throws Exception {
-               final long tmTimeout = 500L;
+               final long tmTimeout = 10L;
 
-               final ResourceActions resourceManagerActions = 
mock(ResourceActions.class);
+               final CompletableFuture<InstanceID> releaseFuture = new 
CompletableFuture<>();
+               final ResourceActions resourceManagerActions = new 
TestingResourceActionsBuilder()
+                       .setReleaseResourceConsumer((instanceID, e) -> 
releaseFuture.complete(instanceID))
+                       .build();
                final ResourceManagerId resourceManagerId = 
ResourceManagerId.generate();
                final ResourceID resourceID = ResourceID.generate();
 
-               final TaskExecutorGateway taskExecutorGateway = 
mock(TaskExecutorGateway.class);
+               final TaskExecutorGateway taskExecutorGateway = new 
TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway();
                final TaskExecutorConnection taskManagerConnection = new 
TaskExecutorConnection(resourceID, taskExecutorGateway);
 
                final SlotID slotId = new SlotID(resourceID, 0);
@@ -702,15 +701,9 @@ public void testTaskManagerTimeout() throws Exception {
 
                        slotManager.start(resourceManagerId, 
mainThreadExecutor, resourceManagerActions);
 
-                       mainThreadExecutor.execute(new Runnable() {
-                               @Override
-                               public void run() {
-                                       
slotManager.registerTaskManager(taskManagerConnection, slotReport);
-                               }
-                       });
+                       mainThreadExecutor.execute(() -> 
slotManager.registerTaskManager(taskManagerConnection, slotReport));
 
-                       verify(resourceManagerActions, timeout(100L * 
tmTimeout).times(1))
-                               
.releaseResource(eq(taskManagerConnection.getInstanceID()), 
any(Exception.class));
+                       assertThat(releaseFuture.get(), 
is(equalTo(taskManagerConnection.getInstanceID())));
                }
        }
 
@@ -723,7 +716,10 @@ public void run() {
        public void testSlotRequestTimeout() throws Exception {
                final long allocationTimeout = 50L;
 
-               final ResourceActions resourceManagerActions = 
mock(ResourceActions.class);
+               final CompletableFuture<Tuple2<JobID, AllocationID>> 
failedAllocationFuture = new CompletableFuture<>();
+               final ResourceActions resourceManagerActions = new 
TestingResourceActionsBuilder()
+                       .setNotifyAllocationFailureConsumer(tuple3 -> 
failedAllocationFuture.complete(Tuple2.of(tuple3.f0, tuple3.f1)))
+                       .build();
                final ResourceManagerId resourceManagerId = 
ResourceManagerId.generate();
                final JobID jobId = new JobID();
                final AllocationID allocationId = new AllocationID();
@@ -743,21 +739,15 @@ public void testSlotRequestTimeout() throws Exception {
 
                        final AtomicReference<Exception> atomicException = new 
AtomicReference<>(null);
 
-                       mainThreadExecutor.execute(new Runnable() {
-                               @Override
-                               public void run() {
-                                       try {
-                                               
assertTrue(slotManager.registerSlotRequest(slotRequest));
-                                       } catch (Exception e) {
-                                               
atomicException.compareAndSet(null, e);
-                                       }
+                       mainThreadExecutor.execute(() -> {
+                               try {
+                                       
assertTrue(slotManager.registerSlotRequest(slotRequest));
+                               } catch (Exception e) {
+                                       atomicException.compareAndSet(null, e);
                                }
                        });
 
-                       verify(resourceManagerActions, timeout(100L * 
allocationTimeout).times(1)).notifyAllocationFailure(
-                               eq(jobId),
-                               eq(allocationId),
-                               any(TimeoutException.class));
+                       assertThat(failedAllocationFuture.get(), 
is(equalTo(Tuple2.of(jobId, allocationId))));
 
                        if (atomicException.get() != null) {
                                throw atomicException.get();
@@ -851,7 +841,7 @@ public void testTaskManagerSlotRequestTimeoutHandling() 
throws Exception {
        public void testSlotReportWhileActiveSlotRequest() throws Exception {
                final long verifyTimeout = 10000L;
                final ResourceManagerId resourceManagerId = 
ResourceManagerId.generate();
-               final ResourceActions resourceManagerActions = 
mock(ResourceActions.class);
+               final ResourceActions resourceManagerActions = new 
TestingResourceActionsBuilder().build();
 
                final JobID jobId = new JobID();
                final AllocationID allocationId = new AllocationID();
@@ -966,10 +956,12 @@ public void testSlotReportWhileActiveSlotRequest() throws 
Exception {
        @Test
        public void testTimeoutForUnusedTaskManager() throws Exception {
                final long taskManagerTimeout = 50L;
-               final long verifyTimeout = taskManagerTimeout * 10L;
 
+               final CompletableFuture<InstanceID> releasedResourceFuture = 
new CompletableFuture<>();
+               final ResourceActions resourceManagerActions = new 
TestingResourceActionsBuilder()
+                       .setReleaseResourceConsumer((instanceID, e) -> 
releasedResourceFuture.complete(instanceID))
+                       .build();
                final ResourceManagerId resourceManagerId = 
ResourceManagerId.generate();
-               final ResourceActions resourceManagerActions = 
mock(ResourceActions.class);
                final ScheduledExecutor scheduledExecutor = 
TestingUtils.defaultScheduledExecutor();
 
                final ResourceID resourceId = ResourceID.generate();
@@ -979,14 +971,13 @@ public void testTimeoutForUnusedTaskManager() throws 
Exception {
                final ResourceProfile resourceProfile = new 
ResourceProfile(1.0, 1);
                final SlotRequest slotRequest = new SlotRequest(jobId, 
allocationId, resourceProfile, "foobar");
 
-               final TaskExecutorGateway taskExecutorGateway = 
mock(TaskExecutorGateway.class);
-               when(taskExecutorGateway.requestSlot(
-                       any(SlotID.class),
-                       eq(jobId),
-                       eq(allocationId),
-                       anyString(),
-                       eq(resourceManagerId),
-                       
any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
+               final CompletableFuture<SlotID> requestedSlotFuture = new 
CompletableFuture<>();
+               final TaskExecutorGateway taskExecutorGateway = new 
TestingTaskExecutorGatewayBuilder()
+                       .setRequestSlotFunction(tuple5 -> {
+                               requestedSlotFuture.complete(tuple5.f0);
+                               return 
CompletableFuture.completedFuture(Acknowledge.get());
+                       })
+                       .createTestingTaskExecutorGateway();
 
                final TaskExecutorConnection taskManagerConnection = new 
TaskExecutorConnection(resourceId, taskExecutorGateway);
 
@@ -1015,17 +1006,9 @@ public void testTimeoutForUnusedTaskManager() throws 
Exception {
                                        }
                                },
                                mainThreadExecutor)
-                       .thenAccept((Object value) -> 
slotManager.registerTaskManager(taskManagerConnection, initialSlotReport));
+                       .thenRun(() -> 
slotManager.registerTaskManager(taskManagerConnection, initialSlotReport));
 
-                       ArgumentCaptor<SlotID> slotIdArgumentCaptor = 
ArgumentCaptor.forClass(SlotID.class);
-
-                       verify(taskExecutorGateway, 
timeout(verifyTimeout)).requestSlot(
-                               slotIdArgumentCaptor.capture(),
-                               eq(jobId),
-                               eq(allocationId),
-                               anyString(),
-                               eq(resourceManagerId),
-                               any(Time.class));
+                       final SlotID slotId = requestedSlotFuture.get();
 
                        CompletableFuture<Boolean> idleFuture = 
CompletableFuture.supplyAsync(
                                () -> 
slotManager.isTaskManagerIdle(taskManagerConnection.getInstanceID()),
@@ -1034,8 +1017,6 @@ public void testTimeoutForUnusedTaskManager() throws 
Exception {
                        // check that the TaskManager is not idle
                        assertFalse(idleFuture.get());
 
-                       final SlotID slotId = slotIdArgumentCaptor.getValue();
-
                        CompletableFuture<TaskManagerSlot> slotFuture = 
CompletableFuture.supplyAsync(
                                () -> slotManager.getSlot(slotId),
                                mainThreadExecutor);
@@ -1052,7 +1033,7 @@ public void testTimeoutForUnusedTaskManager() throws 
Exception {
 
                        assertTrue(idleFuture2.get());
 
-                       verify(resourceManagerActions, 
timeout(verifyTimeout).times(1)).releaseResource(eq(taskManagerConnection.getInstanceID()),
 any(Exception.class));
+                       assertThat(releasedResourceFuture.get(), 
is(equalTo(taskManagerConnection.getInstanceID())));
                }
        }
 
@@ -1109,7 +1090,7 @@ public void testTaskManagerTimeoutDoesNotRemoveSlots() 
throws Exception {
        @Test
        public void testReportAllocatedSlot() throws Exception {
                final ResourceID taskManagerId = ResourceID.generate();
-               final ResourceActions resourceActions = 
mock(ResourceActions.class);
+               final ResourceActions resourceActions = new 
TestingResourceActionsBuilder().build();
                final TestingTaskExecutorGateway taskExecutorGateway = new 
TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway();
                final TaskExecutorConnection taskExecutorConnection = new 
TaskExecutorConnection(taskManagerId, taskExecutorGateway);
 
@@ -1167,7 +1148,7 @@ public void testReportAllocatedSlot() throws Exception {
        @Test
        public void testSlotRequestFailure() throws Exception {
                try (final SlotManager slotManager = 
createSlotManager(ResourceManagerId.generate(),
-                       new 
TestingResourceActionsBuilder().createTestingResourceActions())) {
+                       new TestingResourceActionsBuilder().build())) {
 
                        final SlotRequest slotRequest = new SlotRequest(new 
JobID(), new AllocationID(), ResourceProfile.UNKNOWN, "foobar");
                        slotManager.registerSlotRequest(slotRequest);
@@ -1222,7 +1203,7 @@ public void testSlotRequestFailure() throws Exception {
        @Test
        public void testSlotRequestRemovedIfTMReportAllocation() throws 
Exception {
                try (final SlotManager slotManager = 
createSlotManager(ResourceManagerId.generate(),
-                               new 
TestingResourceActionsBuilder().createTestingResourceActions())) {
+                               new TestingResourceActionsBuilder().build())) {
 
                        final JobID jobID = new JobID();
                        final SlotRequest slotRequest1 = new SlotRequest(jobID, 
new AllocationID(), ResourceProfile.UNKNOWN, "foobar");
@@ -1293,7 +1274,7 @@ public void 
testNotifyFailedAllocationWhenTaskManagerTerminated() throws Excepti
                        .setNotifyAllocationFailureConsumer(
                                (Tuple3<JobID, AllocationID, Exception> 
failureMessage) ->
                                        
allocationFailures.offer(Tuple2.of(failureMessage.f0, failureMessage.f1)))
-                       .createTestingResourceActions();
+                       .build();
 
                try (final SlotManager slotManager = createSlotManager(
                        ResourceManagerId.generate(),
@@ -1374,17 +1355,27 @@ public void 
testNotifyFailedAllocationWhenTaskManagerTerminated() throws Excepti
 
        @Nonnull
        private SlotReport createSlotReport(ResourceID taskExecutorResourceId, 
int numberSlots) {
+               return createSlotReport(taskExecutorResourceId, numberSlots, 
ResourceProfile.UNKNOWN);
+       }
+
+       @Nonnull
+       private SlotReport createSlotReport(ResourceID taskExecutorResourceId, 
int numberSlots, ResourceProfile resourceProfile) {
                final Set<SlotStatus> slotStatusSet = new 
HashSet<>(numberSlots);
                for (int i = 0; i < numberSlots; i++) {
-                       slotStatusSet.add(new SlotStatus(new 
SlotID(taskExecutorResourceId, i), ResourceProfile.UNKNOWN));
+                       slotStatusSet.add(new SlotStatus(new 
SlotID(taskExecutorResourceId, i), resourceProfile));
                }
 
                return new SlotReport(slotStatusSet);
        }
 
        @Nonnull
-       private SlotRequest createSlotRequest(JobID jobId1) {
-               return new SlotRequest(jobId1, new AllocationID(), 
ResourceProfile.UNKNOWN, "foobar1");
+       private SlotRequest createSlotRequest(JobID jobId) {
+               return createSlotRequest(jobId, ResourceProfile.UNKNOWN);
+       }
+
+       @Nonnull
+       private SlotRequest createSlotRequest(JobID jobId, ResourceProfile 
resourceProfile) {
+               return new SlotRequest(jobId, new AllocationID(), 
resourceProfile, "foobar1");
        }
 
        private SlotManager createSlotManager(ResourceManagerId 
resourceManagerId, ResourceActions resourceManagerActions) {
@@ -1398,4 +1389,171 @@ private SlotManager createSlotManager(ResourceManagerId 
resourceManagerId, Resou
 
                return slotManager;
        }
+
+       /**
+        * Tests that we only request new resources/containers once we have 
assigned
+        * all pending task manager slots.
+        */
+       @Test
+       public void testRequestNewResources() throws Exception {
+               final int numberSlots = 2;
+               final AtomicInteger resourceRequests = new AtomicInteger(0);
+               final TestingResourceActions testingResourceActions = new 
TestingResourceActionsBuilder()
+                       .setAllocateResourceFunction(
+                               convert(ignored -> {
+                                       resourceRequests.incrementAndGet();
+                                       return numberSlots;
+                               }))
+                       .build();
+
+               try (final SlotManager slotManager = createSlotManager(
+                       ResourceManagerId.generate(),
+                       testingResourceActions)) {
+
+                       final JobID jobId = new JobID();
+                       
assertThat(slotManager.registerSlotRequest(createSlotRequest(jobId)), is(true));
+                       assertThat(resourceRequests.get(), is(1));
+
+                       // the second slot request should not try to allocate a 
new resource because the
+                       // previous resource was started with 2 slots.
+                       
assertThat(slotManager.registerSlotRequest(createSlotRequest(jobId)), is(true));
+                       assertThat(resourceRequests.get(), is(1));
+
+                       
assertThat(slotManager.getNumberAssignedPendingTaskManagerSlots(), is(2));
+
+                       
assertThat(slotManager.registerSlotRequest(createSlotRequest(jobId)), is(true));
+                       assertThat(resourceRequests.get(), is(2));
+               }
+       }
+
+       /**
+        * Tests that a failing allocation/slot request will return the pending 
task manager slot.
+        */
+       @Test
+       public void testFailingAllocationReturnsPendingTaskManagerSlot() throws 
Exception {
+               final int numberSlots = 2;
+               final TestingResourceActions resourceActions = new 
TestingResourceActionsBuilder()
+                       .setAllocateResourceFunction(convert(value -> 
numberSlots))
+                       .build();
+               try (final SlotManager slotManager = 
createSlotManager(ResourceManagerId.generate(), resourceActions)) {
+                       final JobID jobId = new JobID();
+
+                       final SlotRequest slotRequest = 
createSlotRequest(jobId);
+                       
assertThat(slotManager.registerSlotRequest(slotRequest), is(true));
+
+                       
assertThat(slotManager.getNumberPendingTaskManagerSlots(), is(numberSlots));
+                       
assertThat(slotManager.getNumberAssignedPendingTaskManagerSlots(), is(1));
+
+                       
slotManager.unregisterSlotRequest(slotRequest.getAllocationId());
+
+                       
assertThat(slotManager.getNumberPendingTaskManagerSlots(), is(numberSlots));
+                       
assertThat(slotManager.getNumberAssignedPendingTaskManagerSlots(), is(0));
+               }
+       }
+
+       /**
+        * Tests the completion of pending task manager slots by registering a 
TaskExecutor.
+        */
+       @Test
+       public void testPendingTaskManagerSlotCompletion() throws Exception {
+               final int numberSlots = 3;
+               final TestingResourceActions resourceActions = new 
TestingResourceActionsBuilder()
+                       .setAllocateResourceFunction(convert(value -> 
numberSlots))
+                       .build();
+
+               try (final SlotManager slotManager = 
createSlotManager(ResourceManagerId.generate(), resourceActions)) {
+                       final JobID jobId = new JobID();
+                       
assertThat(slotManager.registerSlotRequest(createSlotRequest(jobId)), is(true));
+
+                       
assertThat(slotManager.getNumberPendingTaskManagerSlots(), is(numberSlots));
+                       
assertThat(slotManager.getNumberAssignedPendingTaskManagerSlots(), is(1));
+                       assertThat(slotManager.getNumberRegisteredSlots(), 
is(0));
+
+                       final TaskExecutorConnection taskExecutorConnection = 
createTaskExecutorConnection();
+                       final SlotReport slotReport = 
createSlotReport(taskExecutorConnection.getResourceID(), numberSlots - 1);
+
+                       slotManager.registerTaskManager(taskExecutorConnection, 
slotReport);
+
+                       assertThat(slotManager.getNumberRegisteredSlots(), 
is(numberSlots - 1));
+                       
assertThat(slotManager.getNumberPendingTaskManagerSlots(), is(1));
+               }
+       }
+
+       private TaskExecutorConnection createTaskExecutorConnection() {
+               final TestingTaskExecutorGateway taskExecutorGateway = new 
TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway();
+               return new TaskExecutorConnection(ResourceID.generate(), 
taskExecutorGateway);
+       }
+
+       /**
+        * Tests that a different slot can fulfill a pending slot request. If 
the
+        * pending slot request has a pending task manager slot assigned, it 
should
+        * be freed.
+        */
+       @Test
+       public void testRegistrationOfDifferentSlot() throws Exception {
+               final int numberSlots = 1;
+               final TestingResourceActions resourceActions = new 
TestingResourceActionsBuilder()
+                       .setAllocateResourceFunction(convert(value -> 
numberSlots))
+                       .build();
+
+               try (final SlotManager slotManager = 
createSlotManager(ResourceManagerId.generate(), resourceActions)) {
+                       final JobID jobId = new JobID();
+                       final ResourceProfile requestedSlotProfile = new 
ResourceProfile(1.0, 1);
+
+                       
assertThat(slotManager.registerSlotRequest(createSlotRequest(jobId, 
requestedSlotProfile)), is(true));
+
+                       
assertThat(slotManager.getNumberPendingTaskManagerSlots(), is(numberSlots));
+
+                       final int numberOfferedSlots = 1;
+                       final TaskExecutorConnection taskExecutorConnection = 
createTaskExecutorConnection();
+                       final ResourceProfile offeredSlotProfile = new 
ResourceProfile(2.0, 2);
+                       final SlotReport slotReport = 
createSlotReport(taskExecutorConnection.getResourceID(), numberOfferedSlots, 
offeredSlotProfile);
+
+                       slotManager.registerTaskManager(taskExecutorConnection, 
slotReport);
+
+                       assertThat(slotManager.getNumberRegisteredSlots(), 
is(numberOfferedSlots));
+                       
assertThat(slotManager.getNumberPendingTaskManagerSlots(), is(numberSlots));
+                       
assertThat(slotManager.getNumberAssignedPendingTaskManagerSlots(), is(0));
+               }
+       }
+
+       /**
+        * Tests that only free slots can fulfill/complete a pending task 
manager slot.
+        */
+       @Test
+       public void testOnlyFreeSlotsCanFulfillPendingTaskManagerSlot() throws 
Exception {
+               final int numberSlots = 1;
+               final TestingResourceActions resourceActions = new 
TestingResourceActionsBuilder()
+                       .setAllocateResourceFunction(convert(value -> 
numberSlots))
+                       .build();
+
+               try (final SlotManager slotManager = 
createSlotManager(ResourceManagerId.generate(), resourceActions)) {
+                       final JobID jobId = new JobID();
+                       
assertThat(slotManager.registerSlotRequest(createSlotRequest(jobId)), is(true));
+
+                       final TaskExecutorConnection taskExecutorConnection = 
createTaskExecutorConnection();
+                       final SlotID slotId = new 
SlotID(taskExecutorConnection.getResourceID(), 0);
+                       final SlotStatus slotStatus = new SlotStatus(slotId, 
ResourceProfile.UNKNOWN, jobId, new AllocationID());
+                       final SlotReport slotReport = new 
SlotReport(slotStatus);
+
+                       slotManager.registerTaskManager(taskExecutorConnection, 
slotReport);
+
+                       assertThat(slotManager.getNumberRegisteredSlots(), 
is(1));
+                       
assertThat(slotManager.getNumberPendingTaskManagerSlots(), is(numberSlots));
+                       
assertThat(slotManager.getNumberAssignedPendingTaskManagerSlots(), is(1));
+               }
+       }
+
+       private static FunctionWithException<ResourceProfile, 
Collection<ResourceProfile>, ResourceManagerException> 
convert(FunctionWithException<ResourceProfile, Integer, 
ResourceManagerException> function) {
+               return (ResourceProfile resourceProfile) -> {
+                       final int slots = function.apply(resourceProfile);
+
+                       final ArrayList<ResourceProfile> result = new 
ArrayList<>(slots);
+                       for (int i = 0; i < slots; i++) {
+                               result.add(resourceProfile);
+                       }
+
+                       return result;
+               };
+       }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
index 8f6317c427f..66966cc3eff 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
@@ -19,7 +19,7 @@
 package org.apache.flink.runtime.resourcemanager.slotmanager;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
@@ -33,13 +33,13 @@
 import org.apache.flink.runtime.taskexecutor.SlotReport;
 import org.apache.flink.runtime.taskexecutor.SlotStatus;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.util.ExecutorUtils;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.AfterClass;
 import org.junit.Test;
-import org.mockito.Mockito;
 
 import java.util.Collections;
 import java.util.concurrent.CompletableFuture;
@@ -47,12 +47,13 @@
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.timeout;
-import static org.mockito.Mockito.verify;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
 
+/**
+ * Tests for the slot allocation protocol.
+ */
 public class SlotProtocolTest extends TestLogger {
 
        private static final long timeout = 10000L;
@@ -87,7 +88,10 @@ public void testSlotsUnavailableRequest() throws Exception {
                        TestingUtils.infiniteTime(),
                        TestingUtils.infiniteTime())) {
 
-                       ResourceActions resourceManagerActions = 
mock(ResourceActions.class);
+                       final CompletableFuture<ResourceProfile> 
resourceProfileFuture = new CompletableFuture<>();
+                       ResourceActions resourceManagerActions = new 
TestingResourceActionsBuilder()
+                               
.setAllocateResourceConsumer(resourceProfileFuture::complete)
+                               .build();
 
                        slotManager.start(rmLeaderID, 
Executors.directExecutor(), resourceManagerActions);
 
@@ -99,14 +103,16 @@ public void testSlotsUnavailableRequest() throws Exception 
{
 
                        slotManager.registerSlotRequest(slotRequest);
 
-                       
verify(resourceManagerActions).allocateResource(eq(slotRequest.getResourceProfile()));
+                       assertThat(resourceProfileFuture.get(), 
is(equalTo(slotRequest.getResourceProfile())));
 
                        // slot becomes available
-                       TaskExecutorGateway taskExecutorGateway = 
mock(TaskExecutorGateway.class);
-                       Mockito.when(
-                               taskExecutorGateway
-                                       .requestSlot(any(SlotID.class), 
any(JobID.class), any(AllocationID.class), any(String.class), 
any(ResourceManagerId.class), any(Time.class)))
-                               .thenReturn(mock(CompletableFuture.class));
+                       final CompletableFuture<Tuple3<SlotID, JobID, 
AllocationID>> requestFuture = new CompletableFuture<>();
+                       TaskExecutorGateway taskExecutorGateway = new 
TestingTaskExecutorGatewayBuilder()
+                               .setRequestSlotFunction(tuple5 -> {
+                                       
requestFuture.complete(Tuple3.of(tuple5.f0, tuple5.f1, tuple5.f2));
+                                       return new CompletableFuture<>();
+                               })
+                               .createTestingTaskExecutorGateway();
 
                        final ResourceID resourceID = ResourceID.generate();
                        final SlotID slotID = new SlotID(resourceID, 0);
@@ -119,8 +125,7 @@ public void testSlotsUnavailableRequest() throws Exception {
                        slotManager.registerTaskManager(new 
TaskExecutorConnection(resourceID, taskExecutorGateway), slotReport);
 
                        // 4) Slot becomes available and TaskExecutor gets a 
SlotRequest
-                       verify(taskExecutorGateway, timeout(5000L))
-                               .requestSlot(eq(slotID), eq(jobID), 
eq(allocationID), any(String.class), any(ResourceManagerId.class), 
any(Time.class));
+                       assertThat(requestFuture.get(), 
is(equalTo(Tuple3.of(slotID, jobID, allocationID))));
                }
        }
 
@@ -137,11 +142,13 @@ public void testSlotAvailableRequest() throws Exception {
 
                final ResourceManagerId rmLeaderID = 
ResourceManagerId.generate();
 
-               TaskExecutorGateway taskExecutorGateway = 
mock(TaskExecutorGateway.class);
-               Mockito.when(
-                       taskExecutorGateway
-                               .requestSlot(any(SlotID.class), 
any(JobID.class), any(AllocationID.class), any(String.class), 
any(ResourceManagerId.class), any(Time.class)))
-                       .thenReturn(mock(CompletableFuture.class));
+               final CompletableFuture<Tuple3<SlotID, JobID, AllocationID>> 
requestFuture = new CompletableFuture<>();
+               TaskExecutorGateway taskExecutorGateway = new 
TestingTaskExecutorGatewayBuilder()
+                       .setRequestSlotFunction(tuple5 -> {
+                               requestFuture.complete(Tuple3.of(tuple5.f0, 
tuple5.f1, tuple5.f2));
+                               return new CompletableFuture<>();
+                       })
+                       .createTestingTaskExecutorGateway();
 
                try (SlotManager slotManager = new SlotManager(
                        scheduledExecutor,
@@ -149,7 +156,7 @@ public void testSlotAvailableRequest() throws Exception {
                        TestingUtils.infiniteTime(),
                        TestingUtils.infiniteTime())) {
 
-                       ResourceActions resourceManagerActions = 
mock(ResourceActions.class);
+                       ResourceActions resourceManagerActions = new 
TestingResourceActionsBuilder().build();
 
                        slotManager.start(rmLeaderID, 
Executors.directExecutor(), resourceManagerActions);
 
@@ -172,8 +179,7 @@ public void testSlotAvailableRequest() throws Exception {
                        slotManager.registerSlotRequest(slotRequest);
 
                        // a SlotRequest is routed to the TaskExecutor
-                       verify(taskExecutorGateway, timeout(5000))
-                               .requestSlot(eq(slotID), eq(jobID), 
eq(allocationID), any(String.class), any(ResourceManagerId.class), 
any(Time.class));
+                       assertThat(requestFuture.get(), 
is(equalTo(Tuple3.of(slotID, jobID, allocationID))));
                }
        }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceActions.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceActions.java
index 8b7c8026f92..4c6f14c1c40 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceActions.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceActions.java
@@ -23,9 +23,12 @@
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.instance.InstanceID;
+import 
org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
+import org.apache.flink.util.function.FunctionWithException;
 
 import javax.annotation.Nonnull;
 
+import java.util.Collection;
 import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 
@@ -38,29 +41,28 @@
        private final BiConsumer<InstanceID, Exception> releaseResourceConsumer;
 
        @Nonnull
-       private final Consumer<ResourceProfile> allocateResourceConsumer;
+       private final FunctionWithException<ResourceProfile, 
Collection<ResourceProfile>, ResourceManagerException> allocateResourceFunction;
 
        @Nonnull
        private final Consumer<Tuple3<JobID, AllocationID, Exception>> 
notifyAllocationFailureConsumer;
 
        public TestingResourceActions(
                        @Nonnull BiConsumer<InstanceID, Exception> 
releaseResourceConsumer,
-                       @Nonnull Consumer<ResourceProfile> 
allocateResourceConsumer,
+                       @Nonnull FunctionWithException<ResourceProfile, 
Collection<ResourceProfile>, ResourceManagerException> allocateResourceFunction,
                        @Nonnull Consumer<Tuple3<JobID, AllocationID, 
Exception>> notifyAllocationFailureConsumer) {
                this.releaseResourceConsumer = releaseResourceConsumer;
-               this.allocateResourceConsumer = allocateResourceConsumer;
+               this.allocateResourceFunction = allocateResourceFunction;
                this.notifyAllocationFailureConsumer = 
notifyAllocationFailureConsumer;
        }
 
-
        @Override
        public void releaseResource(InstanceID instanceId, Exception cause) {
                releaseResourceConsumer.accept(instanceId, cause);
        }
 
        @Override
-       public void allocateResource(ResourceProfile resourceProfile) {
-               allocateResourceConsumer.accept(resourceProfile);
+       public Collection<ResourceProfile> allocateResource(ResourceProfile 
resourceProfile) throws ResourceManagerException {
+               return allocateResourceFunction.apply(resourceProfile);
        }
 
        @Override
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceActionsBuilder.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceActionsBuilder.java
index 2c1d47e8c88..ac7afd4283e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceActionsBuilder.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceActionsBuilder.java
@@ -23,7 +23,11 @@
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.instance.InstanceID;
+import 
org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
+import org.apache.flink.util.function.FunctionWithException;
 
+import java.util.Collection;
+import java.util.Collections;
 import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 
@@ -32,7 +36,7 @@
  */
 public class TestingResourceActionsBuilder {
        private BiConsumer<InstanceID, Exception> releaseResourceConsumer = 
(ignoredA, ignoredB) -> {};
-       private Consumer<ResourceProfile> allocateResourceConsumer = (ignored) 
-> {};
+       private FunctionWithException<ResourceProfile, 
Collection<ResourceProfile>, ResourceManagerException> allocateResourceFunction 
= (ignored) -> Collections.singleton(ResourceProfile.UNKNOWN);
        private Consumer<Tuple3<JobID, AllocationID, Exception>> 
notifyAllocationFailureConsumer = (ignored) -> {};
 
        public TestingResourceActionsBuilder 
setReleaseResourceConsumer(BiConsumer<InstanceID, Exception> 
releaseResourceConsumer) {
@@ -40,8 +44,16 @@ public TestingResourceActionsBuilder 
setReleaseResourceConsumer(BiConsumer<Insta
                return this;
        }
 
+       public TestingResourceActionsBuilder 
setAllocateResourceFunction(FunctionWithException<ResourceProfile, 
Collection<ResourceProfile>, ResourceManagerException> 
allocateResourceFunction) {
+               this.allocateResourceFunction = allocateResourceFunction;
+               return this;
+       }
+
        public TestingResourceActionsBuilder 
setAllocateResourceConsumer(Consumer<ResourceProfile> allocateResourceConsumer) 
{
-               this.allocateResourceConsumer = allocateResourceConsumer;
+               this.allocateResourceFunction = (ResourceProfile 
resourceProfile) -> {
+                       allocateResourceConsumer.accept(resourceProfile);
+                       return Collections.singleton(ResourceProfile.UNKNOWN);
+               };
                return this;
        }
 
@@ -50,7 +62,7 @@ public TestingResourceActionsBuilder 
setNotifyAllocationFailureConsumer(Consumer
                return this;
        }
 
-       public TestingResourceActions createTestingResourceActions() {
-               return new TestingResourceActions(releaseResourceConsumer, 
allocateResourceConsumer, notifyAllocationFailureConsumer);
+       public TestingResourceActions build() {
+               return new TestingResourceActions(releaseResourceConsumer, 
allocateResourceFunction, notifyAllocationFailureConsumer);
        }
 }
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
index 956e40fe61b..2a43b8b7f1a 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
@@ -58,6 +58,7 @@
 
 import javax.annotation.Nullable;
 
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -115,6 +116,8 @@
 
        private final Map<ResourceProfile, Integer> resourcePriorities = new 
HashMap<>();
 
+       private final Collection<ResourceProfile> slotsPerWorker;
+
        public YarnResourceManager(
                        RpcService rpcService,
                        String resourceManagerEndpointId,
@@ -163,6 +166,8 @@ public YarnResourceManager(
                this.numberOfTaskSlots = 
flinkConfig.getInteger(TaskManagerOptions.NUM_TASK_SLOTS);
                this.defaultTaskManagerMemoryMB = 
ConfigurationUtils.getTaskManagerHeapMemory(flinkConfig).getMebiBytes();
                this.defaultCpus = 
flinkConfig.getInteger(YarnConfigOptions.VCORES, numberOfTaskSlots);
+
+               this.slotsPerWorker = createSlotsPerWorker(numberOfTaskSlots);
        }
 
        protected AMRMClientAsync<AMRMClient.ContainerRequest> 
createAndStartResourceManagerClient(
@@ -283,7 +288,7 @@ protected void internalDeregisterApplication(
        }
 
        @Override
-       public void startNewWorker(ResourceProfile resourceProfile) {
+       public Collection<ResourceProfile> startNewWorker(ResourceProfile 
resourceProfile) {
                // Priority for worker containers - priorities are 
intra-application
                //TODO: set priority according to the resource allocated
                Priority priority = 
Priority.newInstance(generatePriority(resourceProfile));
@@ -291,6 +296,8 @@ public void startNewWorker(ResourceProfile resourceProfile) 
{
                int vcore = resourceProfile.getCpuCores() < 1 ? defaultCpus : 
(int) resourceProfile.getCpuCores();
                Resource capability = Resource.newInstance(mem, vcore);
                requestYarnContainer(capability, priority);
+
+               return slotsPerWorker;
        }
 
        @Override
@@ -334,7 +341,7 @@ public void onContainersCompleted(final 
List<ContainerStatus> statuses) {
                                        if (yarnWorkerNode != null) {
                                                // Container completed 
unexpectedly ~> start a new one
                                                final Container container = 
yarnWorkerNode.getContainer();
-                                               
requestYarnContainer(container.getResource(), 
yarnWorkerNode.getContainer().getPriority());
+                                               
requestYarnContainerIfRequired(container.getResource(), 
yarnWorkerNode.getContainer().getPriority());
                                        }
                                        // Eagerly close the connection with 
task manager.
                                        closeTaskManagerConnection(resourceId, 
new Exception(containerStatus.getDiagnostics()));
@@ -375,7 +382,7 @@ public void onContainersAllocated(List<Container> 
containers) {
                                                
workerNodeMap.remove(resourceId);
                                                
resourceManagerClient.releaseAssignedContainer(container.getId());
                                                // and ask for a new one
-                                               
requestYarnContainer(container.getResource(), container.getPriority());
+                                               
requestYarnContainerIfRequired(container.getResource(), 
container.getPriority());
                                        }
                                } else {
                                        // return the excessive containers
@@ -446,21 +453,26 @@ private FinalApplicationStatus 
getYarnStatus(ApplicationStatus status) {
        /**
         * Request new container if pending containers cannot satisfies pending 
slot requests.
         */
+       private void requestYarnContainerIfRequired(Resource resource, Priority 
priority) {
+               int requiredTaskManagerSlots = 
getNumberRequiredTaskManagerSlots();
+               int pendingTaskManagerSlots = numPendingContainerRequests * 
numberOfTaskSlots;
+
+               if (requiredTaskManagerSlots > pendingTaskManagerSlots) {
+                       requestYarnContainer(resource, priority);
+               }
+       }
+
        private void requestYarnContainer(Resource resource, Priority priority) 
{
-               int pendingSlotRequests = getNumberPendingSlotRequests();
-               int pendingSlotAllocation = numPendingContainerRequests * 
numberOfTaskSlots;
-               if (pendingSlotRequests > pendingSlotAllocation) {
-                       resourceManagerClient.addContainerRequest(new 
AMRMClient.ContainerRequest(resource, null, null, priority));
+               resourceManagerClient.addContainerRequest(new 
AMRMClient.ContainerRequest(resource, null, null, priority));
 
-                       // make sure we transmit the request fast and receive 
fast news of granted allocations
-                       
resourceManagerClient.setHeartbeatInterval(FAST_YARN_HEARTBEAT_INTERVAL_MS);
+               // make sure we transmit the request fast and receive fast news 
of granted allocations
+               
resourceManagerClient.setHeartbeatInterval(FAST_YARN_HEARTBEAT_INTERVAL_MS);
 
-                       numPendingContainerRequests++;
+               numPendingContainerRequests++;
 
-                       log.info("Requesting new TaskExecutor container with 
resources {}. Number pending requests {}.",
-                               resource,
-                               numPendingContainerRequests);
-               }
+               log.info("Requesting new TaskExecutor container with resources 
{}. Number pending requests {}.",
+                       resource,
+                       numPendingContainerRequests);
        }
 
        private ContainerLaunchContext createTaskExecutorLaunchContext(Resource 
resource, String containerId, String host)


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Make SlotManager aware of multi slot TaskManagers
> -------------------------------------------------
>
>                 Key: FLINK-9455
>                 URL: https://issues.apache.org/jira/browse/FLINK-9455
>             Project: Flink
>          Issue Type: Improvement
>          Components: Distributed Coordination, ResourceManager
>    Affects Versions: 1.5.0
>            Reporter: Till Rohrmann
>            Assignee: Till Rohrmann
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.7.0
>
>
> The {{SlotManager}} responsible for managing all available slots of a Flink 
> cluster can request to start new {{TaskManagers}} if it cannot fulfill a slot 
> request. The started {{TaskManager}} can be started with multiple slots 
> configured but currently, the {{SlotManager}} thinks that it will be started 
> with a single slot. As a consequence, it might issue multiple requests to 
> start new TaskManagers even though a single one would be sufficient to 
> fulfill all pending slot requests.
> In order to avoid requesting unnecessary resources which are freed after the 
> idle timeout, I suggest to make the {{SlotManager}} aware of how many slots a 
> {{TaskManager}} is started with. That way the SlotManager only needs to 
> request a new {{TaskManager}} if all of the previously started slots 
> (potentially not yet registered and, thus, future slots) are being assigned 
> to slot requests.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to