tillrohrmann commented on a change in pull request #13964:
URL: https://github.com/apache/flink/pull/13964#discussion_r539184317



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
##########
@@ -157,6 +161,22 @@ void start(
                @Nonnull SlotRequestId slotRequestId,
                @Nonnull AllocationID allocationID);
 
+       /**
+        * Allocates the available slot with the given allocation id under the 
given request id. This method returns
+        * {@code null} if no slot with the given allocation id is available.

Review comment:
       I think it returns `Optional#empty` if the given allocation id is not 
available.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
##########
@@ -157,6 +161,22 @@ void start(
                @Nonnull SlotRequestId slotRequestId,
                @Nonnull AllocationID allocationID);
 
+       /**
+        * Allocates the available slot with the given allocation id under the 
given request id. This method returns
+        * {@code null} if no slot with the given allocation id is available.
+        *
+        * @param slotRequestId identifying the requested slot
+        * @param allocationID the allocation id of the requested available slot
+        * @param requiredSlotProfile requiredSlotProfile for which to allocate 
the slot

Review comment:
       It would be good to state what it means if `requiredSlotProfile` is 
`null`. Also add for what this parameter is used.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
##########
@@ -157,6 +161,22 @@ void start(
                @Nonnull SlotRequestId slotRequestId,
                @Nonnull AllocationID allocationID);
 
+       /**
+        * Allocates the available slot with the given allocation id under the 
given request id. This method returns
+        * {@code null} if no slot with the given allocation id is available.
+        *
+        * @param slotRequestId identifying the requested slot
+        * @param allocationID the allocation id of the requested available slot
+        * @param requiredSlotProfile requiredSlotProfile for which to allocate 
the slot
+        * @return the previously available slot with the given allocation id 
or {@code null} if no such slot existed.

Review comment:
       Returns `Optional#empty` instead of `null`.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeTest.java
##########
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import 
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
+import org.apache.flink.runtime.jobmaster.JobMasterId;
+import org.apache.flink.runtime.jobmaster.RpcTaskManagerGateway;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.slots.ResourceRequirement;
+import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
+import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.clock.SystemClock;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.annotation.Nonnull;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for the {@link DeclarativeSlotPoolBridge}.
+ */
+public class DeclarativeSlotPoolBridgeTest extends TestLogger {
+
+       private static final Time rpcTimeout = Time.seconds(20);
+       private static final JobID jobId = new JobID();
+       private static final JobMasterId jobMasterId = JobMasterId.generate();
+       private final ComponentMainThreadExecutor mainThreadExecutor = 
ComponentMainThreadExecutorServiceAdapter.forMainThread();
+
+       @Test
+       public void testSlotOffer() throws Exception {
+               final SlotRequestId slotRequestId = new SlotRequestId();
+               final AllocationID expectedAllocationId = new AllocationID();
+               final PhysicalSlot allocatedSlot = 
createAllocatedSlot(expectedAllocationId);
+
+               final TestingDeclarativeSlotPoolFactory 
declarativeSlotPoolFactory = new 
TestingDeclarativeSlotPoolFactory(TestingDeclarativeSlotPool.builder());
+               try (DeclarativeSlotPoolBridge declarativeSlotPoolBridge = 
createDeclarativeSlotPoolBridge(declarativeSlotPoolFactory)) {
+
+                       declarativeSlotPoolBridge.start(jobMasterId, 
"localhost", mainThreadExecutor);
+
+                       CompletableFuture<PhysicalSlot> slotAllocationFuture = 
declarativeSlotPoolBridge.requestNewAllocatedSlot(slotRequestId, 
ResourceProfile.UNKNOWN, null);
+
+                       
declarativeSlotPoolBridge.newSlotsAreAvailable(Collections.singleton(allocatedSlot));
+
+                       slotAllocationFuture.join();
+               }
+       }
+
+       @Test
+       public void testNotEnoughResourcesAvailableFailsPendingRequests() 
throws Exception {
+               final SlotRequestId slotRequestId = new SlotRequestId();
+
+               final TestingDeclarativeSlotPoolFactory 
declarativeSlotPoolFactory = new 
TestingDeclarativeSlotPoolFactory(TestingDeclarativeSlotPool.builder());
+               try (DeclarativeSlotPoolBridge declarativeSlotPoolBridge = 
createDeclarativeSlotPoolBridge(declarativeSlotPoolFactory)) {
+
+                       declarativeSlotPoolBridge.start(jobMasterId, 
"localhost", mainThreadExecutor);
+
+                       CompletableFuture<PhysicalSlot> slotAllocationFuture = 
CompletableFuture
+                                       .supplyAsync(() -> 
declarativeSlotPoolBridge.requestNewAllocatedSlot(slotRequestId, 
ResourceProfile.UNKNOWN, Time.minutes(5)), mainThreadExecutor)
+                                       .get();
+
+                       mainThreadExecutor.execute(() -> 
declarativeSlotPoolBridge.notifyNotEnoughResourcesAvailable(Collections.emptyList()));
+
+                       try {
+                               slotAllocationFuture.join();
+                               Assert.fail();
+                       } catch (Exception e) {
+                               Optional<NoResourceAvailableException> 
expectedException = ExceptionUtils.findThrowable(e, 
NoResourceAvailableException.class);
+                               assertThat(expectedException.isPresent(), 
is(true));

Review comment:
       I'd suggest to use `FlinkMatchers.containsCause`

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeTest.java
##########
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import 
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
+import org.apache.flink.runtime.jobmaster.JobMasterId;
+import org.apache.flink.runtime.jobmaster.RpcTaskManagerGateway;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.slots.ResourceRequirement;
+import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
+import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.clock.SystemClock;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.annotation.Nonnull;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for the {@link DeclarativeSlotPoolBridge}.
+ */
+public class DeclarativeSlotPoolBridgeTest extends TestLogger {

Review comment:
       I think it would be great to add some tests which ensure that the set of 
declared resources is correct wrt to different actions on the `SlotPool` (e.g. 
`failAllocation`, `releaseSlot`, `allocateAvailableSlot` etc.)

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java
##########
@@ -0,0 +1,680 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.jobmaster.AllocatedSlotInfo;
+import org.apache.flink.runtime.jobmaster.AllocatedSlotReport;
+import org.apache.flink.runtime.jobmaster.JobMasterId;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.slots.ResourceRequirement;
+import org.apache.flink.runtime.slots.ResourceRequirements;
+import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.clock.Clock;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * {@link SlotPool} implementation which uses the {@link DeclarativeSlotPool} 
to allocate slots.
+ */
+public class DeclarativeSlotPoolBridge implements SlotPool {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(DeclarativeSlotPoolBridge.class);
+
+       private final JobID jobId;
+
+       private final Map<SlotRequestId, PendingRequest> pendingRequests;
+       private final Map<SlotRequestId, AllocationID> fulfilledRequests;
+       private final Set<AllocationID> newSlotsSet;
+       private final DeclarativeSlotPool declarativeSlotPool;
+       private final Set<ResourceID> registeredTaskManagers;
+
+       @Nullable
+       private ComponentMainThreadExecutor componentMainThreadExecutor;
+
+       @Nullable
+       private String jobManagerAddress;
+
+       @Nullable
+       private JobMasterId jobMasterId;
+
+       private DeclareResourceRequirementServiceConnectionManager 
declareResourceRequirementServiceConnectionManager;
+
+       private final Clock clock;
+       private final Time rpcTimeout;
+       private final Time idleSlotTimeout;
+       private final Time batchSlotTimeout;
+       private boolean isBatchSlotRequestTimeoutCheckDisabled;
+       private final Duration resourceAcquisitionTimeout;
+
+       public DeclarativeSlotPoolBridge(
+                       JobID jobId,
+                       DeclarativeSlotPoolFactory declarativeSlotPoolFactory,
+                       Clock clock,
+                       Time rpcTimeout,
+                       Time idleSlotTimeout,
+                       Time batchSlotTimeout, Duration 
resourceAcquisitionTimeout) {
+               this.jobId = Preconditions.checkNotNull(jobId);
+               this.clock = Preconditions.checkNotNull(clock);
+               this.rpcTimeout = Preconditions.checkNotNull(rpcTimeout);
+               this.idleSlotTimeout = 
Preconditions.checkNotNull(idleSlotTimeout);
+               this.batchSlotTimeout = 
Preconditions.checkNotNull(batchSlotTimeout);
+               this.isBatchSlotRequestTimeoutCheckDisabled = false;
+               this.resourceAcquisitionTimeout = 
Preconditions.checkNotNull(resourceAcquisitionTimeout);
+
+               this.pendingRequests = new LinkedHashMap<>();
+               this.fulfilledRequests = new HashMap<>();
+               this.newSlotsSet = new HashSet<>();
+               this.registeredTaskManagers = new HashSet<>();
+               this.declareResourceRequirementServiceConnectionManager = 
NoOpDeclareResourceRequirementServiceConnectionManager.INSTANCE;
+               this.declarativeSlotPool = declarativeSlotPoolFactory.create(
+                               this::declareResourceRequirements,
+                               this::newSlotsAreAvailable,
+                               idleSlotTimeout,
+                               rpcTimeout);
+       }
+
+       @Override
+       public void start(JobMasterId jobMasterId, String newJobManagerAddress, 
ComponentMainThreadExecutor jmMainThreadScheduledExecutor) throws Exception {
+               this.componentMainThreadExecutor = 
Preconditions.checkNotNull(jmMainThreadScheduledExecutor);
+               this.jobManagerAddress = 
Preconditions.checkNotNull(newJobManagerAddress);
+               this.jobMasterId = Preconditions.checkNotNull(jobMasterId);
+               this.declareResourceRequirementServiceConnectionManager = 
DefaultDeclareResourceRequirementServiceConnectionManager.create(componentMainThreadExecutor);
+
+               
componentMainThreadExecutor.schedule(this::checkIdleSlotTimeout, 
idleSlotTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+               
componentMainThreadExecutor.schedule(this::checkBatchSlotTimeout, 
batchSlotTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+       }
+
+       @Override
+       public void suspend() {
+               assertRunningInMainThread();
+               LOG.info("Suspending slot pool.");
+
+               cancelPendingRequests(new FlinkException("Suspending slot 
pool."), request -> true);
+               clearState();
+       }
+
+       @Override
+       public void close() {
+               LOG.info("Closing slot pool.");
+               final FlinkException cause = new FlinkException("Closing slot 
pool");
+               cancelPendingRequests(cause, request -> true);
+               releaseAllTaskManagers(new FlinkException("Closing slot 
pool."));
+               clearState();
+       }
+
+       private void cancelPendingRequests(FlinkException cancelCause, 
Predicate<PendingRequest> requestPredicate) {
+               ResourceCounter decreasedResourceRequirements = 
ResourceCounter.empty();
+
+               // need a copy since failing a request could trigger another 
request to be issued
+               final Iterable<PendingRequest> pendingRequestsToFail = new 
ArrayList<>(pendingRequests.values());
+               pendingRequests.clear();
+
+               for (PendingRequest pendingRequest : pendingRequestsToFail) {
+                       if (requestPredicate.test(pendingRequest)) {
+                               pendingRequest.failRequest(cancelCause);
+                               decreasedResourceRequirements = 
decreasedResourceRequirements.add(pendingRequest.getResourceProfile(), 1);
+                       } else {
+                               
pendingRequests.put(pendingRequest.slotRequestId, pendingRequest);
+                       }
+               }
+
+               if (!decreasedResourceRequirements.isEmpty()) {
+                       
declarativeSlotPool.decreaseResourceRequirementsBy(decreasedResourceRequirements);
+               }
+       }
+
+       private void clearState() {
+               declareResourceRequirementServiceConnectionManager.close();
+               declareResourceRequirementServiceConnectionManager = 
NoOpDeclareResourceRequirementServiceConnectionManager.INSTANCE;
+               registeredTaskManagers.clear();
+               jobManagerAddress = null;
+               jobMasterId = null;
+       }
+
+       @Override
+       public void connectToResourceManager(ResourceManagerGateway 
resourceManagerGateway) {
+               assertRunningInMainThread();
+               Preconditions.checkNotNull(resourceManagerGateway);
+
+               
declareResourceRequirementServiceConnectionManager.connect(resourceRequirements 
-> resourceManagerGateway.declareRequiredResources(jobMasterId, 
resourceRequirements, rpcTimeout));
+               
declareResourceRequirements(declarativeSlotPool.getResourceRequirements());
+       }
+
+       @Override
+       public void disconnectResourceManager() {
+               assertRunningInMainThread();
+               
this.declareResourceRequirementServiceConnectionManager.disconnect();
+       }
+
+       @Override
+       public boolean registerTaskManager(ResourceID resourceID) {
+               assertRunningInMainThread();
+
+               LOG.debug("Register new TaskExecutor {}.", resourceID);
+               return registeredTaskManagers.add(resourceID);
+       }
+
+       @Override
+       public boolean releaseTaskManager(ResourceID resourceId, Exception 
cause) {
+               assertRunningInMainThread();
+
+               if (registeredTaskManagers.remove(resourceId)) {
+                       internalReleaseTaskManager(resourceId, cause);
+                       return true;
+               } else {
+                       return false;
+               }
+       }
+
+       private void releaseAllTaskManagers(FlinkException cause) {
+               for (ResourceID registeredTaskManager : registeredTaskManagers) 
{
+                       internalReleaseTaskManager(registeredTaskManager, 
cause);
+               }
+
+               registeredTaskManagers.clear();
+       }
+
+       private void internalReleaseTaskManager(ResourceID resourceId, 
Exception cause) {
+               ResourceCounter previouslyFulfilledRequirement = 
declarativeSlotPool.releaseSlots(resourceId, cause);
+               
declarativeSlotPool.decreaseResourceRequirementsBy(previouslyFulfilledRequirement);
+       }
+
+       @Override
+       public Collection<SlotOffer> offerSlots(TaskManagerLocation 
taskManagerLocation, TaskManagerGateway taskManagerGateway, 
Collection<SlotOffer> offers) {
+               assertRunningInMainThread();
+               Preconditions.checkNotNull(taskManagerGateway);
+               Preconditions.checkNotNull(offers);
+
+               if 
(!registeredTaskManagers.contains(taskManagerLocation.getResourceID())) {
+                       return Collections.emptyList();
+               }
+
+               return declarativeSlotPool.offerSlots(offers, 
taskManagerLocation, taskManagerGateway, clock.relativeTimeMillis());
+       }
+
+       @VisibleForTesting
+       void newSlotsAreAvailable(Collection<? extends PhysicalSlot> newSlots) {
+               // Avoid notifying new slots multiple times due to 
SchedulerImpl allocating and releasing slots
+               // in order to find the best shared slot
+               final Collection<PhysicalSlot> slotsToProcess = new 
ArrayList<>();
+               for (PhysicalSlot newSlot : newSlots) {
+                       if (newSlotsSet.add(newSlot.getAllocationId())) {
+                               slotsToProcess.add(newSlot);
+                       }
+               }
+
+               final Collection<PendingRequestSlotMatching> matchingsToFulfill 
= new ArrayList<>();
+
+               for (PhysicalSlot newSlot : slotsToProcess) {
+                       final Optional<PendingRequest> matchingPendingRequest = 
findMatchingPendingRequest(newSlot);
+
+                       matchingPendingRequest.ifPresent(pendingRequest -> {
+                               
Preconditions.checkNotNull(pendingRequests.remove(pendingRequest.getSlotRequestId()),
 "Cannot fulfill a non existing pending slot request.");
+                               
reserveFreeSlot(pendingRequest.getSlotRequestId(), newSlot.getAllocationId(), 
pendingRequest.resourceProfile);
+
+                               
matchingsToFulfill.add(PendingRequestSlotMatching.createFor(pendingRequest, 
newSlot));
+                       });
+
+                       newSlotsSet.remove(newSlot.getAllocationId());
+               }
+
+               // we have to first reserve all matching slots before 
fulfilling the requests
+               // otherwise it can happen that the SchedulerImpl reserves one 
of the new slots
+               // for a request which has been triggered by fulfilling a 
pending request
+               for (PendingRequestSlotMatching pendingRequestSlotMatching : 
matchingsToFulfill) {
+                       pendingRequestSlotMatching.fulfillPendingRequest();
+               }
+       }
+
+       private void reserveFreeSlot(SlotRequestId slotRequestId, AllocationID 
allocationId, ResourceProfile resourceProfile) {
+               LOG.debug("Reserve slot {} for slot request id {}", 
allocationId, slotRequestId);
+               declarativeSlotPool.reserveFreeSlot(allocationId, 
resourceProfile);
+               fulfilledRequests.put(slotRequestId, allocationId);
+       }
+
+       private Optional<PendingRequest> 
findMatchingPendingRequest(PhysicalSlot slot) {
+               final ResourceProfile resourceProfile = 
slot.getResourceProfile();
+
+               for (PendingRequest pendingRequest : pendingRequests.values()) {
+                       if 
(resourceProfile.isMatching(pendingRequest.getResourceProfile())) {
+                               LOG.debug("Matched slot {} to pending request 
{}.", slot, pendingRequest);
+                               return Optional.of(pendingRequest);
+                       }
+               }
+               LOG.debug("Could not match slot {} to any pending request.", 
slot);
+
+               return Optional.empty();
+       }
+
+       @Override
+       public Optional<PhysicalSlot> allocateAvailableSlot(@Nonnull 
SlotRequestId slotRequestId, @Nonnull AllocationID allocationID) {
+               throw new UnsupportedOperationException("This method should not 
be used when using declarative resource management.");
+       }
+
+       @Override
+       public Optional<PhysicalSlot> allocateAvailableSlot(@Nonnull 
SlotRequestId slotRequestId, @Nonnull AllocationID allocationID, @Nullable 
ResourceProfile requiredSlotProfile) {
+               assertRunningInMainThread();
+               Preconditions.checkNotNull(requiredSlotProfile, "The 
requiredSlotProfile must not be null.");
+
+               LOG.debug("Reserving free slot {} for slot request id {} and 
profile {}.", allocationID, slotRequestId, requiredSlotProfile);
+
+               return Optional.of(reserveFreeSlotForResource(slotRequestId, 
allocationID, requiredSlotProfile));
+       }
+
+       private PhysicalSlot reserveFreeSlotForResource(SlotRequestId 
slotRequestId, AllocationID allocationId, ResourceProfile requiredSlotProfile) {
+               
declarativeSlotPool.increaseResourceRequirementsBy(ResourceCounter.withResource(requiredSlotProfile,
 1));
+               final PhysicalSlot physicalSlot = 
declarativeSlotPool.reserveFreeSlot(allocationId, requiredSlotProfile);
+               fulfilledRequests.put(slotRequestId, allocationId);
+
+               return physicalSlot;
+       }
+
+       @Override
+       @Nonnull
+       public CompletableFuture<PhysicalSlot> requestNewAllocatedSlot(@Nonnull 
SlotRequestId slotRequestId, @Nonnull ResourceProfile resourceProfile, 
@Nullable Time timeout) {
+               assertRunningInMainThread();
+
+               LOG.debug("Request new allocated slot with slot request id {} 
and resource profile {}", slotRequestId, resourceProfile);
+
+               final PendingRequest pendingRequest = 
PendingRequest.createNormalRequest(slotRequestId, resourceProfile);
+
+               if (timeout != null) {
+                       FutureUtils
+                               .orTimeout(
+                                       pendingRequest.getSlotFuture(),
+                                       timeout.toMilliseconds(),
+                                       TimeUnit.MILLISECONDS,
+                                       componentMainThreadExecutor)
+                               .whenComplete((physicalSlot, throwable) -> {
+                                       if (throwable instanceof 
TimeoutException) {
+                                               
timeoutPendingSlotRequest(slotRequestId);
+                                       }
+                               });
+               }
+
+               return internalRequestNewAllocatedSlot(pendingRequest);
+       }
+
+       @Override
+       @Nonnull
+       public CompletableFuture<PhysicalSlot> 
requestNewAllocatedBatchSlot(@Nonnull SlotRequestId slotRequestId, @Nonnull 
ResourceProfile resourceProfile) {
+               assertRunningInMainThread();
+
+               LOG.debug("Request new allocated batch slot with slot request 
id {} and resource profile {}", slotRequestId, resourceProfile);
+
+               final PendingRequest pendingRequest = 
PendingRequest.createBatchRequest(slotRequestId, resourceProfile);
+
+               return internalRequestNewAllocatedSlot(pendingRequest);
+       }
+
+       private void timeoutPendingSlotRequest(SlotRequestId slotRequestId) {
+               releaseSlot(slotRequestId, new TimeoutException("Pending slot 
request timed out in slot pool."));
+       }
+
+       private CompletableFuture<PhysicalSlot> 
internalRequestNewAllocatedSlot(PendingRequest pendingRequest) {
+               pendingRequests.put(pendingRequest.getSlotRequestId(), 
pendingRequest);
+
+               
declarativeSlotPool.increaseResourceRequirementsBy(ResourceCounter.withResource(pendingRequest.getResourceProfile(),
 1));
+
+               return pendingRequest.getSlotFuture();
+       }
+
+       private void 
declareResourceRequirements(Collection<ResourceRequirement> 
resourceRequirements) {
+               assertRunningInMainThread();
+
+               LOG.debug("Declare new resource requirements for job {}: {}.", 
jobId, resourceRequirements);
+
+               
declareResourceRequirementServiceConnectionManager.declareResourceRequirements(ResourceRequirements.create(jobId,
 jobManagerAddress, resourceRequirements));
+       }
+
+       @Override
+       public Optional<ResourceID> failAllocation(AllocationID allocationID, 
Exception cause) {
+               throw new UnsupportedOperationException("Please call 
failAllocation(ResourceID, AllocationID, Exception)");
+       }
+
+       @Override
+       public Optional<ResourceID> failAllocation(@Nullable ResourceID 
resourceId, AllocationID allocationID, Exception cause) {
+               assertRunningInMainThread();
+
+               Preconditions.checkNotNull(allocationID);
+               Preconditions.checkNotNull(resourceId, "This slot pool only 
supports failAllocation calls coming from the TaskExecutor.");
+
+               ResourceCounter previouslyFulfilledRequirements = 
declarativeSlotPool.releaseSlot(allocationID, cause);
+               if (!previouslyFulfilledRequirements.isEmpty()) {
+                       
declarativeSlotPool.decreaseResourceRequirementsBy(previouslyFulfilledRequirements);
+               }
+
+               if (declarativeSlotPool.containsSlots(resourceId)) {
+                       return Optional.empty();
+               } else {
+                       return Optional.of(resourceId);
+               }
+       }
+
+       @Override
+       public void releaseSlot(@Nonnull SlotRequestId slotRequestId, @Nullable 
Throwable cause) {
+               LOG.debug("Release slot with slot request id {}", 
slotRequestId);
+               assertRunningInMainThread();
+
+               final PendingRequest pendingRequest = 
pendingRequests.remove(slotRequestId);
+
+               if (pendingRequest != null) {
+                       
declarativeSlotPool.decreaseResourceRequirementsBy(ResourceCounter.withResource(pendingRequest.getResourceProfile(),
 1));
+                       pendingRequest.failRequest(new FlinkException(
+                                       String.format("Pending slot request 
with %s has been released.", pendingRequest.getSlotRequestId()),
+                                       cause));
+               } else {
+                       final AllocationID allocationId = 
fulfilledRequests.remove(slotRequestId);
+
+                       if (allocationId != null) {
+                               ResourceCounter previouslyFulfilledRequirement 
= declarativeSlotPool.freeReservedSlot(allocationId, cause, 
clock.relativeTimeMillis());
+                               if (!previouslyFulfilledRequirement.isEmpty()) {
+                                       
declarativeSlotPool.decreaseResourceRequirementsBy(previouslyFulfilledRequirement);
+                               }
+                       } else {
+                               LOG.debug("Could not find slot which has 
fulfilled slot request {}. Ignoring the release operation.", slotRequestId);
+                       }
+               }
+       }
+
+       @Override
+       public void 
notifyNotEnoughResourcesAvailable(Collection<ResourceRequirement> 
acquiredResources) {
+               assertRunningInMainThread();
+
+               componentMainThreadExecutor.schedule(
+                               failPendingRequests(),
+                               resourceAcquisitionTimeout.toMillis(),
+                               TimeUnit.MILLISECONDS);
+       }
+
+       private Runnable failPendingRequests() {
+               return () -> {
+                       if (!pendingRequests.isEmpty()) {
+                               final NoResourceAvailableException cause = new 
NoResourceAvailableException("Could not acquire the minimum required 
resources.");
+
+                               cancelPendingRequests(
+                                               cause,
+                                               request -> 
!isBatchSlotRequestTimeoutCheckDisabled || !request.isBatchRequest);

Review comment:
       ```suggestion
                                                request -> 
!isBatchSlotRequestTimeoutCheckDisabled || !request.isBatchRequest());
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java
##########
@@ -0,0 +1,680 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.jobmaster.AllocatedSlotInfo;
+import org.apache.flink.runtime.jobmaster.AllocatedSlotReport;
+import org.apache.flink.runtime.jobmaster.JobMasterId;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.slots.ResourceRequirement;
+import org.apache.flink.runtime.slots.ResourceRequirements;
+import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.clock.Clock;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * {@link SlotPool} implementation which uses the {@link DeclarativeSlotPool} 
to allocate slots.
+ */
+public class DeclarativeSlotPoolBridge implements SlotPool {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(DeclarativeSlotPoolBridge.class);
+
+       private final JobID jobId;
+
+       private final Map<SlotRequestId, PendingRequest> pendingRequests;
+       private final Map<SlotRequestId, AllocationID> fulfilledRequests;
+       private final Set<AllocationID> newSlotsSet;
+       private final DeclarativeSlotPool declarativeSlotPool;
+       private final Set<ResourceID> registeredTaskManagers;
+
+       @Nullable
+       private ComponentMainThreadExecutor componentMainThreadExecutor;
+
+       @Nullable
+       private String jobManagerAddress;
+
+       @Nullable
+       private JobMasterId jobMasterId;
+
+       private DeclareResourceRequirementServiceConnectionManager 
declareResourceRequirementServiceConnectionManager;
+
+       private final Clock clock;
+       private final Time rpcTimeout;
+       private final Time idleSlotTimeout;
+       private final Time batchSlotTimeout;
+       private boolean isBatchSlotRequestTimeoutCheckDisabled;
+       private final Duration resourceAcquisitionTimeout;
+
+       public DeclarativeSlotPoolBridge(
+                       JobID jobId,
+                       DeclarativeSlotPoolFactory declarativeSlotPoolFactory,
+                       Clock clock,
+                       Time rpcTimeout,
+                       Time idleSlotTimeout,
+                       Time batchSlotTimeout, Duration 
resourceAcquisitionTimeout) {
+               this.jobId = Preconditions.checkNotNull(jobId);
+               this.clock = Preconditions.checkNotNull(clock);
+               this.rpcTimeout = Preconditions.checkNotNull(rpcTimeout);
+               this.idleSlotTimeout = 
Preconditions.checkNotNull(idleSlotTimeout);
+               this.batchSlotTimeout = 
Preconditions.checkNotNull(batchSlotTimeout);
+               this.isBatchSlotRequestTimeoutCheckDisabled = false;
+               this.resourceAcquisitionTimeout = 
Preconditions.checkNotNull(resourceAcquisitionTimeout);
+
+               this.pendingRequests = new LinkedHashMap<>();
+               this.fulfilledRequests = new HashMap<>();
+               this.newSlotsSet = new HashSet<>();
+               this.registeredTaskManagers = new HashSet<>();
+               this.declareResourceRequirementServiceConnectionManager = 
NoOpDeclareResourceRequirementServiceConnectionManager.INSTANCE;
+               this.declarativeSlotPool = declarativeSlotPoolFactory.create(
+                               this::declareResourceRequirements,
+                               this::newSlotsAreAvailable,
+                               idleSlotTimeout,
+                               rpcTimeout);
+       }
+
+       @Override
+       public void start(JobMasterId jobMasterId, String newJobManagerAddress, 
ComponentMainThreadExecutor jmMainThreadScheduledExecutor) throws Exception {
+               this.componentMainThreadExecutor = 
Preconditions.checkNotNull(jmMainThreadScheduledExecutor);
+               this.jobManagerAddress = 
Preconditions.checkNotNull(newJobManagerAddress);
+               this.jobMasterId = Preconditions.checkNotNull(jobMasterId);
+               this.declareResourceRequirementServiceConnectionManager = 
DefaultDeclareResourceRequirementServiceConnectionManager.create(componentMainThreadExecutor);
+
+               
componentMainThreadExecutor.schedule(this::checkIdleSlotTimeout, 
idleSlotTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+               
componentMainThreadExecutor.schedule(this::checkBatchSlotTimeout, 
batchSlotTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+       }
+
+       @Override
+       public void suspend() {
+               assertRunningInMainThread();
+               LOG.info("Suspending slot pool.");
+
+               cancelPendingRequests(new FlinkException("Suspending slot 
pool."), request -> true);
+               clearState();
+       }
+
+       @Override
+       public void close() {
+               LOG.info("Closing slot pool.");
+               final FlinkException cause = new FlinkException("Closing slot 
pool");
+               cancelPendingRequests(cause, request -> true);
+               releaseAllTaskManagers(new FlinkException("Closing slot 
pool."));
+               clearState();
+       }
+
+       private void cancelPendingRequests(FlinkException cancelCause, 
Predicate<PendingRequest> requestPredicate) {
+               ResourceCounter decreasedResourceRequirements = 
ResourceCounter.empty();
+
+               // need a copy since failing a request could trigger another 
request to be issued
+               final Iterable<PendingRequest> pendingRequestsToFail = new 
ArrayList<>(pendingRequests.values());
+               pendingRequests.clear();
+
+               for (PendingRequest pendingRequest : pendingRequestsToFail) {
+                       if (requestPredicate.test(pendingRequest)) {
+                               pendingRequest.failRequest(cancelCause);
+                               decreasedResourceRequirements = 
decreasedResourceRequirements.add(pendingRequest.getResourceProfile(), 1);
+                       } else {
+                               
pendingRequests.put(pendingRequest.slotRequestId, pendingRequest);
+                       }
+               }
+
+               if (!decreasedResourceRequirements.isEmpty()) {
+                       
declarativeSlotPool.decreaseResourceRequirementsBy(decreasedResourceRequirements);
+               }
+       }
+
+       private void clearState() {
+               declareResourceRequirementServiceConnectionManager.close();
+               declareResourceRequirementServiceConnectionManager = 
NoOpDeclareResourceRequirementServiceConnectionManager.INSTANCE;
+               registeredTaskManagers.clear();
+               jobManagerAddress = null;
+               jobMasterId = null;
+       }
+
+       @Override
+       public void connectToResourceManager(ResourceManagerGateway 
resourceManagerGateway) {
+               assertRunningInMainThread();
+               Preconditions.checkNotNull(resourceManagerGateway);
+
+               
declareResourceRequirementServiceConnectionManager.connect(resourceRequirements 
-> resourceManagerGateway.declareRequiredResources(jobMasterId, 
resourceRequirements, rpcTimeout));
+               
declareResourceRequirements(declarativeSlotPool.getResourceRequirements());
+       }
+
+       @Override
+       public void disconnectResourceManager() {
+               assertRunningInMainThread();
+               
this.declareResourceRequirementServiceConnectionManager.disconnect();
+       }
+
+       @Override
+       public boolean registerTaskManager(ResourceID resourceID) {
+               assertRunningInMainThread();
+
+               LOG.debug("Register new TaskExecutor {}.", resourceID);
+               return registeredTaskManagers.add(resourceID);
+       }
+
+       @Override
+       public boolean releaseTaskManager(ResourceID resourceId, Exception 
cause) {
+               assertRunningInMainThread();
+
+               if (registeredTaskManagers.remove(resourceId)) {
+                       internalReleaseTaskManager(resourceId, cause);
+                       return true;
+               } else {
+                       return false;
+               }
+       }
+
+       private void releaseAllTaskManagers(FlinkException cause) {
+               for (ResourceID registeredTaskManager : registeredTaskManagers) 
{
+                       internalReleaseTaskManager(registeredTaskManager, 
cause);
+               }
+
+               registeredTaskManagers.clear();
+       }
+
+       private void internalReleaseTaskManager(ResourceID resourceId, 
Exception cause) {
+               ResourceCounter previouslyFulfilledRequirement = 
declarativeSlotPool.releaseSlots(resourceId, cause);
+               
declarativeSlotPool.decreaseResourceRequirementsBy(previouslyFulfilledRequirement);
+       }
+
+       @Override
+       public Collection<SlotOffer> offerSlots(TaskManagerLocation 
taskManagerLocation, TaskManagerGateway taskManagerGateway, 
Collection<SlotOffer> offers) {
+               assertRunningInMainThread();
+               Preconditions.checkNotNull(taskManagerGateway);
+               Preconditions.checkNotNull(offers);
+
+               if 
(!registeredTaskManagers.contains(taskManagerLocation.getResourceID())) {
+                       return Collections.emptyList();
+               }
+
+               return declarativeSlotPool.offerSlots(offers, 
taskManagerLocation, taskManagerGateway, clock.relativeTimeMillis());
+       }
+
+       @VisibleForTesting
+       void newSlotsAreAvailable(Collection<? extends PhysicalSlot> newSlots) {
+               // Avoid notifying new slots multiple times due to 
SchedulerImpl allocating and releasing slots
+               // in order to find the best shared slot
+               final Collection<PhysicalSlot> slotsToProcess = new 
ArrayList<>();
+               for (PhysicalSlot newSlot : newSlots) {
+                       if (newSlotsSet.add(newSlot.getAllocationId())) {
+                               slotsToProcess.add(newSlot);
+                       }
+               }
+
+               final Collection<PendingRequestSlotMatching> matchingsToFulfill 
= new ArrayList<>();
+
+               for (PhysicalSlot newSlot : slotsToProcess) {
+                       final Optional<PendingRequest> matchingPendingRequest = 
findMatchingPendingRequest(newSlot);
+
+                       matchingPendingRequest.ifPresent(pendingRequest -> {
+                               
Preconditions.checkNotNull(pendingRequests.remove(pendingRequest.getSlotRequestId()),
 "Cannot fulfill a non existing pending slot request.");
+                               
reserveFreeSlot(pendingRequest.getSlotRequestId(), newSlot.getAllocationId(), 
pendingRequest.resourceProfile);
+
+                               
matchingsToFulfill.add(PendingRequestSlotMatching.createFor(pendingRequest, 
newSlot));
+                       });
+
+                       newSlotsSet.remove(newSlot.getAllocationId());
+               }
+
+               // we have to first reserve all matching slots before 
fulfilling the requests
+               // otherwise it can happen that the SchedulerImpl reserves one 
of the new slots
+               // for a request which has been triggered by fulfilling a 
pending request
+               for (PendingRequestSlotMatching pendingRequestSlotMatching : 
matchingsToFulfill) {
+                       pendingRequestSlotMatching.fulfillPendingRequest();
+               }
+       }
+
+       private void reserveFreeSlot(SlotRequestId slotRequestId, AllocationID 
allocationId, ResourceProfile resourceProfile) {
+               LOG.debug("Reserve slot {} for slot request id {}", 
allocationId, slotRequestId);
+               declarativeSlotPool.reserveFreeSlot(allocationId, 
resourceProfile);
+               fulfilledRequests.put(slotRequestId, allocationId);
+       }
+
+       private Optional<PendingRequest> 
findMatchingPendingRequest(PhysicalSlot slot) {
+               final ResourceProfile resourceProfile = 
slot.getResourceProfile();
+
+               for (PendingRequest pendingRequest : pendingRequests.values()) {
+                       if 
(resourceProfile.isMatching(pendingRequest.getResourceProfile())) {
+                               LOG.debug("Matched slot {} to pending request 
{}.", slot, pendingRequest);
+                               return Optional.of(pendingRequest);
+                       }
+               }
+               LOG.debug("Could not match slot {} to any pending request.", 
slot);
+
+               return Optional.empty();
+       }
+
+       @Override
+       public Optional<PhysicalSlot> allocateAvailableSlot(@Nonnull 
SlotRequestId slotRequestId, @Nonnull AllocationID allocationID) {
+               throw new UnsupportedOperationException("This method should not 
be used when using declarative resource management.");
+       }
+
+       @Override
+       public Optional<PhysicalSlot> allocateAvailableSlot(@Nonnull 
SlotRequestId slotRequestId, @Nonnull AllocationID allocationID, @Nullable 
ResourceProfile requiredSlotProfile) {
+               assertRunningInMainThread();
+               Preconditions.checkNotNull(requiredSlotProfile, "The 
requiredSlotProfile must not be null.");
+
+               LOG.debug("Reserving free slot {} for slot request id {} and 
profile {}.", allocationID, slotRequestId, requiredSlotProfile);
+
+               return Optional.of(reserveFreeSlotForResource(slotRequestId, 
allocationID, requiredSlotProfile));
+       }
+
+       private PhysicalSlot reserveFreeSlotForResource(SlotRequestId 
slotRequestId, AllocationID allocationId, ResourceProfile requiredSlotProfile) {
+               
declarativeSlotPool.increaseResourceRequirementsBy(ResourceCounter.withResource(requiredSlotProfile,
 1));
+               final PhysicalSlot physicalSlot = 
declarativeSlotPool.reserveFreeSlot(allocationId, requiredSlotProfile);
+               fulfilledRequests.put(slotRequestId, allocationId);
+
+               return physicalSlot;
+       }
+
+       @Override
+       @Nonnull
+       public CompletableFuture<PhysicalSlot> requestNewAllocatedSlot(@Nonnull 
SlotRequestId slotRequestId, @Nonnull ResourceProfile resourceProfile, 
@Nullable Time timeout) {
+               assertRunningInMainThread();
+
+               LOG.debug("Request new allocated slot with slot request id {} 
and resource profile {}", slotRequestId, resourceProfile);
+
+               final PendingRequest pendingRequest = 
PendingRequest.createNormalRequest(slotRequestId, resourceProfile);
+
+               if (timeout != null) {
+                       FutureUtils
+                               .orTimeout(
+                                       pendingRequest.getSlotFuture(),
+                                       timeout.toMilliseconds(),
+                                       TimeUnit.MILLISECONDS,
+                                       componentMainThreadExecutor)
+                               .whenComplete((physicalSlot, throwable) -> {
+                                       if (throwable instanceof 
TimeoutException) {
+                                               
timeoutPendingSlotRequest(slotRequestId);
+                                       }
+                               });
+               }
+
+               return internalRequestNewAllocatedSlot(pendingRequest);
+       }
+
+       @Override
+       @Nonnull
+       public CompletableFuture<PhysicalSlot> 
requestNewAllocatedBatchSlot(@Nonnull SlotRequestId slotRequestId, @Nonnull 
ResourceProfile resourceProfile) {
+               assertRunningInMainThread();
+
+               LOG.debug("Request new allocated batch slot with slot request 
id {} and resource profile {}", slotRequestId, resourceProfile);
+
+               final PendingRequest pendingRequest = 
PendingRequest.createBatchRequest(slotRequestId, resourceProfile);
+
+               return internalRequestNewAllocatedSlot(pendingRequest);
+       }
+
+       private void timeoutPendingSlotRequest(SlotRequestId slotRequestId) {
+               releaseSlot(slotRequestId, new TimeoutException("Pending slot 
request timed out in slot pool."));
+       }
+
+       private CompletableFuture<PhysicalSlot> 
internalRequestNewAllocatedSlot(PendingRequest pendingRequest) {
+               pendingRequests.put(pendingRequest.getSlotRequestId(), 
pendingRequest);
+
+               
declarativeSlotPool.increaseResourceRequirementsBy(ResourceCounter.withResource(pendingRequest.getResourceProfile(),
 1));
+
+               return pendingRequest.getSlotFuture();
+       }
+
+       private void 
declareResourceRequirements(Collection<ResourceRequirement> 
resourceRequirements) {
+               assertRunningInMainThread();
+
+               LOG.debug("Declare new resource requirements for job {}: {}.", 
jobId, resourceRequirements);
+
+               
declareResourceRequirementServiceConnectionManager.declareResourceRequirements(ResourceRequirements.create(jobId,
 jobManagerAddress, resourceRequirements));
+       }
+
+       @Override
+       public Optional<ResourceID> failAllocation(AllocationID allocationID, 
Exception cause) {
+               throw new UnsupportedOperationException("Please call 
failAllocation(ResourceID, AllocationID, Exception)");
+       }
+
+       @Override
+       public Optional<ResourceID> failAllocation(@Nullable ResourceID 
resourceId, AllocationID allocationID, Exception cause) {
+               assertRunningInMainThread();
+
+               Preconditions.checkNotNull(allocationID);
+               Preconditions.checkNotNull(resourceId, "This slot pool only 
supports failAllocation calls coming from the TaskExecutor.");
+
+               ResourceCounter previouslyFulfilledRequirements = 
declarativeSlotPool.releaseSlot(allocationID, cause);
+               if (!previouslyFulfilledRequirements.isEmpty()) {
+                       
declarativeSlotPool.decreaseResourceRequirementsBy(previouslyFulfilledRequirements);
+               }
+
+               if (declarativeSlotPool.containsSlots(resourceId)) {
+                       return Optional.empty();
+               } else {
+                       return Optional.of(resourceId);
+               }
+       }
+
+       @Override
+       public void releaseSlot(@Nonnull SlotRequestId slotRequestId, @Nullable 
Throwable cause) {
+               LOG.debug("Release slot with slot request id {}", 
slotRequestId);
+               assertRunningInMainThread();
+
+               final PendingRequest pendingRequest = 
pendingRequests.remove(slotRequestId);
+
+               if (pendingRequest != null) {
+                       
declarativeSlotPool.decreaseResourceRequirementsBy(ResourceCounter.withResource(pendingRequest.getResourceProfile(),
 1));
+                       pendingRequest.failRequest(new FlinkException(
+                                       String.format("Pending slot request 
with %s has been released.", pendingRequest.getSlotRequestId()),
+                                       cause));
+               } else {
+                       final AllocationID allocationId = 
fulfilledRequests.remove(slotRequestId);
+
+                       if (allocationId != null) {
+                               ResourceCounter previouslyFulfilledRequirement 
= declarativeSlotPool.freeReservedSlot(allocationId, cause, 
clock.relativeTimeMillis());
+                               if (!previouslyFulfilledRequirement.isEmpty()) {
+                                       
declarativeSlotPool.decreaseResourceRequirementsBy(previouslyFulfilledRequirement);
+                               }
+                       } else {
+                               LOG.debug("Could not find slot which has 
fulfilled slot request {}. Ignoring the release operation.", slotRequestId);
+                       }
+               }
+       }
+
+       @Override
+       public void 
notifyNotEnoughResourcesAvailable(Collection<ResourceRequirement> 
acquiredResources) {
+               assertRunningInMainThread();
+
+               componentMainThreadExecutor.schedule(
+                               failPendingRequests(),
+                               resourceAcquisitionTimeout.toMillis(),
+                               TimeUnit.MILLISECONDS);

Review comment:
       Why are we scheduling the `failPendingRequests` here and don't execute 
it immediately?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java
##########
@@ -0,0 +1,680 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.jobmaster.AllocatedSlotInfo;
+import org.apache.flink.runtime.jobmaster.AllocatedSlotReport;
+import org.apache.flink.runtime.jobmaster.JobMasterId;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.slots.ResourceRequirement;
+import org.apache.flink.runtime.slots.ResourceRequirements;
+import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.clock.Clock;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * {@link SlotPool} implementation which uses the {@link DeclarativeSlotPool} 
to allocate slots.
+ */
+public class DeclarativeSlotPoolBridge implements SlotPool {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(DeclarativeSlotPoolBridge.class);
+
+       private final JobID jobId;
+
+       private final Map<SlotRequestId, PendingRequest> pendingRequests;
+       private final Map<SlotRequestId, AllocationID> fulfilledRequests;
+       private final Set<AllocationID> newSlotsSet;
+       private final DeclarativeSlotPool declarativeSlotPool;
+       private final Set<ResourceID> registeredTaskManagers;
+
+       @Nullable
+       private ComponentMainThreadExecutor componentMainThreadExecutor;
+
+       @Nullable
+       private String jobManagerAddress;
+
+       @Nullable
+       private JobMasterId jobMasterId;
+
+       private DeclareResourceRequirementServiceConnectionManager 
declareResourceRequirementServiceConnectionManager;
+
+       private final Clock clock;
+       private final Time rpcTimeout;
+       private final Time idleSlotTimeout;
+       private final Time batchSlotTimeout;
+       private boolean isBatchSlotRequestTimeoutCheckDisabled;
+       private final Duration resourceAcquisitionTimeout;
+
+       public DeclarativeSlotPoolBridge(
+                       JobID jobId,
+                       DeclarativeSlotPoolFactory declarativeSlotPoolFactory,
+                       Clock clock,
+                       Time rpcTimeout,
+                       Time idleSlotTimeout,
+                       Time batchSlotTimeout, Duration 
resourceAcquisitionTimeout) {
+               this.jobId = Preconditions.checkNotNull(jobId);
+               this.clock = Preconditions.checkNotNull(clock);
+               this.rpcTimeout = Preconditions.checkNotNull(rpcTimeout);
+               this.idleSlotTimeout = 
Preconditions.checkNotNull(idleSlotTimeout);
+               this.batchSlotTimeout = 
Preconditions.checkNotNull(batchSlotTimeout);
+               this.isBatchSlotRequestTimeoutCheckDisabled = false;
+               this.resourceAcquisitionTimeout = 
Preconditions.checkNotNull(resourceAcquisitionTimeout);
+
+               this.pendingRequests = new LinkedHashMap<>();
+               this.fulfilledRequests = new HashMap<>();
+               this.newSlotsSet = new HashSet<>();
+               this.registeredTaskManagers = new HashSet<>();
+               this.declareResourceRequirementServiceConnectionManager = 
NoOpDeclareResourceRequirementServiceConnectionManager.INSTANCE;
+               this.declarativeSlotPool = declarativeSlotPoolFactory.create(
+                               this::declareResourceRequirements,
+                               this::newSlotsAreAvailable,
+                               idleSlotTimeout,
+                               rpcTimeout);
+       }
+
+       @Override
+       public void start(JobMasterId jobMasterId, String newJobManagerAddress, 
ComponentMainThreadExecutor jmMainThreadScheduledExecutor) throws Exception {
+               this.componentMainThreadExecutor = 
Preconditions.checkNotNull(jmMainThreadScheduledExecutor);
+               this.jobManagerAddress = 
Preconditions.checkNotNull(newJobManagerAddress);
+               this.jobMasterId = Preconditions.checkNotNull(jobMasterId);
+               this.declareResourceRequirementServiceConnectionManager = 
DefaultDeclareResourceRequirementServiceConnectionManager.create(componentMainThreadExecutor);
+
+               
componentMainThreadExecutor.schedule(this::checkIdleSlotTimeout, 
idleSlotTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+               
componentMainThreadExecutor.schedule(this::checkBatchSlotTimeout, 
batchSlotTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+       }
+
+       @Override
+       public void suspend() {
+               assertRunningInMainThread();
+               LOG.info("Suspending slot pool.");
+
+               cancelPendingRequests(new FlinkException("Suspending slot 
pool."), request -> true);
+               clearState();
+       }
+
+       @Override
+       public void close() {
+               LOG.info("Closing slot pool.");
+               final FlinkException cause = new FlinkException("Closing slot 
pool");
+               cancelPendingRequests(cause, request -> true);
+               releaseAllTaskManagers(new FlinkException("Closing slot 
pool."));
+               clearState();
+       }
+
+       private void cancelPendingRequests(FlinkException cancelCause, 
Predicate<PendingRequest> requestPredicate) {
+               ResourceCounter decreasedResourceRequirements = 
ResourceCounter.empty();
+
+               // need a copy since failing a request could trigger another 
request to be issued
+               final Iterable<PendingRequest> pendingRequestsToFail = new 
ArrayList<>(pendingRequests.values());
+               pendingRequests.clear();
+
+               for (PendingRequest pendingRequest : pendingRequestsToFail) {
+                       if (requestPredicate.test(pendingRequest)) {
+                               pendingRequest.failRequest(cancelCause);
+                               decreasedResourceRequirements = 
decreasedResourceRequirements.add(pendingRequest.getResourceProfile(), 1);
+                       } else {
+                               
pendingRequests.put(pendingRequest.slotRequestId, pendingRequest);
+                       }
+               }
+
+               if (!decreasedResourceRequirements.isEmpty()) {
+                       
declarativeSlotPool.decreaseResourceRequirementsBy(decreasedResourceRequirements);
+               }
+       }
+
+       private void clearState() {
+               declareResourceRequirementServiceConnectionManager.close();
+               declareResourceRequirementServiceConnectionManager = 
NoOpDeclareResourceRequirementServiceConnectionManager.INSTANCE;
+               registeredTaskManagers.clear();
+               jobManagerAddress = null;
+               jobMasterId = null;
+       }

Review comment:
       Just as a side note: I really would like to do 
[FLINK-11719](https://issues.apache.org/jira/browse/FLINK-11719) at some point 
in time. This should allow us to get rid of this pattern here.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java
##########
@@ -0,0 +1,680 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.jobmaster.AllocatedSlotInfo;
+import org.apache.flink.runtime.jobmaster.AllocatedSlotReport;
+import org.apache.flink.runtime.jobmaster.JobMasterId;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.slots.ResourceRequirement;
+import org.apache.flink.runtime.slots.ResourceRequirements;
+import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.clock.Clock;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * {@link SlotPool} implementation which uses the {@link DeclarativeSlotPool} 
to allocate slots.
+ */
+public class DeclarativeSlotPoolBridge implements SlotPool {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(DeclarativeSlotPoolBridge.class);
+
+       private final JobID jobId;
+
+       private final Map<SlotRequestId, PendingRequest> pendingRequests;
+       private final Map<SlotRequestId, AllocationID> fulfilledRequests;
+       private final Set<AllocationID> newSlotsSet;
+       private final DeclarativeSlotPool declarativeSlotPool;
+       private final Set<ResourceID> registeredTaskManagers;
+
+       @Nullable
+       private ComponentMainThreadExecutor componentMainThreadExecutor;
+
+       @Nullable
+       private String jobManagerAddress;
+
+       @Nullable
+       private JobMasterId jobMasterId;
+
+       private DeclareResourceRequirementServiceConnectionManager 
declareResourceRequirementServiceConnectionManager;
+
+       private final Clock clock;
+       private final Time rpcTimeout;
+       private final Time idleSlotTimeout;
+       private final Time batchSlotTimeout;
+       private boolean isBatchSlotRequestTimeoutCheckDisabled;
+       private final Duration resourceAcquisitionTimeout;
+
+       public DeclarativeSlotPoolBridge(
+                       JobID jobId,
+                       DeclarativeSlotPoolFactory declarativeSlotPoolFactory,
+                       Clock clock,
+                       Time rpcTimeout,
+                       Time idleSlotTimeout,
+                       Time batchSlotTimeout, Duration 
resourceAcquisitionTimeout) {
+               this.jobId = Preconditions.checkNotNull(jobId);
+               this.clock = Preconditions.checkNotNull(clock);
+               this.rpcTimeout = Preconditions.checkNotNull(rpcTimeout);
+               this.idleSlotTimeout = 
Preconditions.checkNotNull(idleSlotTimeout);
+               this.batchSlotTimeout = 
Preconditions.checkNotNull(batchSlotTimeout);
+               this.isBatchSlotRequestTimeoutCheckDisabled = false;
+               this.resourceAcquisitionTimeout = 
Preconditions.checkNotNull(resourceAcquisitionTimeout);
+
+               this.pendingRequests = new LinkedHashMap<>();
+               this.fulfilledRequests = new HashMap<>();
+               this.newSlotsSet = new HashSet<>();
+               this.registeredTaskManagers = new HashSet<>();
+               this.declareResourceRequirementServiceConnectionManager = 
NoOpDeclareResourceRequirementServiceConnectionManager.INSTANCE;
+               this.declarativeSlotPool = declarativeSlotPoolFactory.create(
+                               this::declareResourceRequirements,
+                               this::newSlotsAreAvailable,
+                               idleSlotTimeout,
+                               rpcTimeout);
+       }
+
+       @Override
+       public void start(JobMasterId jobMasterId, String newJobManagerAddress, 
ComponentMainThreadExecutor jmMainThreadScheduledExecutor) throws Exception {
+               this.componentMainThreadExecutor = 
Preconditions.checkNotNull(jmMainThreadScheduledExecutor);
+               this.jobManagerAddress = 
Preconditions.checkNotNull(newJobManagerAddress);
+               this.jobMasterId = Preconditions.checkNotNull(jobMasterId);
+               this.declareResourceRequirementServiceConnectionManager = 
DefaultDeclareResourceRequirementServiceConnectionManager.create(componentMainThreadExecutor);
+
+               
componentMainThreadExecutor.schedule(this::checkIdleSlotTimeout, 
idleSlotTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+               
componentMainThreadExecutor.schedule(this::checkBatchSlotTimeout, 
batchSlotTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+       }
+
+       @Override
+       public void suspend() {
+               assertRunningInMainThread();
+               LOG.info("Suspending slot pool.");
+
+               cancelPendingRequests(new FlinkException("Suspending slot 
pool."), request -> true);
+               clearState();
+       }
+
+       @Override
+       public void close() {
+               LOG.info("Closing slot pool.");
+               final FlinkException cause = new FlinkException("Closing slot 
pool");
+               cancelPendingRequests(cause, request -> true);
+               releaseAllTaskManagers(new FlinkException("Closing slot 
pool."));
+               clearState();
+       }
+
+       private void cancelPendingRequests(FlinkException cancelCause, 
Predicate<PendingRequest> requestPredicate) {
+               ResourceCounter decreasedResourceRequirements = 
ResourceCounter.empty();
+
+               // need a copy since failing a request could trigger another 
request to be issued
+               final Iterable<PendingRequest> pendingRequestsToFail = new 
ArrayList<>(pendingRequests.values());
+               pendingRequests.clear();
+
+               for (PendingRequest pendingRequest : pendingRequestsToFail) {
+                       if (requestPredicate.test(pendingRequest)) {
+                               pendingRequest.failRequest(cancelCause);
+                               decreasedResourceRequirements = 
decreasedResourceRequirements.add(pendingRequest.getResourceProfile(), 1);
+                       } else {
+                               
pendingRequests.put(pendingRequest.slotRequestId, pendingRequest);
+                       }
+               }
+
+               if (!decreasedResourceRequirements.isEmpty()) {
+                       
declarativeSlotPool.decreaseResourceRequirementsBy(decreasedResourceRequirements);
+               }
+       }
+
+       private void clearState() {
+               declareResourceRequirementServiceConnectionManager.close();
+               declareResourceRequirementServiceConnectionManager = 
NoOpDeclareResourceRequirementServiceConnectionManager.INSTANCE;
+               registeredTaskManagers.clear();
+               jobManagerAddress = null;
+               jobMasterId = null;
+       }
+
+       @Override
+       public void connectToResourceManager(ResourceManagerGateway 
resourceManagerGateway) {
+               assertRunningInMainThread();
+               Preconditions.checkNotNull(resourceManagerGateway);
+
+               
declareResourceRequirementServiceConnectionManager.connect(resourceRequirements 
-> resourceManagerGateway.declareRequiredResources(jobMasterId, 
resourceRequirements, rpcTimeout));
+               
declareResourceRequirements(declarativeSlotPool.getResourceRequirements());
+       }
+
+       @Override
+       public void disconnectResourceManager() {
+               assertRunningInMainThread();
+               
this.declareResourceRequirementServiceConnectionManager.disconnect();
+       }
+
+       @Override
+       public boolean registerTaskManager(ResourceID resourceID) {
+               assertRunningInMainThread();
+
+               LOG.debug("Register new TaskExecutor {}.", resourceID);
+               return registeredTaskManagers.add(resourceID);
+       }
+
+       @Override
+       public boolean releaseTaskManager(ResourceID resourceId, Exception 
cause) {
+               assertRunningInMainThread();
+
+               if (registeredTaskManagers.remove(resourceId)) {
+                       internalReleaseTaskManager(resourceId, cause);
+                       return true;
+               } else {
+                       return false;
+               }
+       }
+
+       private void releaseAllTaskManagers(FlinkException cause) {
+               for (ResourceID registeredTaskManager : registeredTaskManagers) 
{
+                       internalReleaseTaskManager(registeredTaskManager, 
cause);
+               }
+
+               registeredTaskManagers.clear();
+       }
+
+       private void internalReleaseTaskManager(ResourceID resourceId, 
Exception cause) {
+               ResourceCounter previouslyFulfilledRequirement = 
declarativeSlotPool.releaseSlots(resourceId, cause);
+               
declarativeSlotPool.decreaseResourceRequirementsBy(previouslyFulfilledRequirement);
+       }
+
+       @Override
+       public Collection<SlotOffer> offerSlots(TaskManagerLocation 
taskManagerLocation, TaskManagerGateway taskManagerGateway, 
Collection<SlotOffer> offers) {
+               assertRunningInMainThread();
+               Preconditions.checkNotNull(taskManagerGateway);
+               Preconditions.checkNotNull(offers);
+
+               if 
(!registeredTaskManagers.contains(taskManagerLocation.getResourceID())) {
+                       return Collections.emptyList();
+               }
+
+               return declarativeSlotPool.offerSlots(offers, 
taskManagerLocation, taskManagerGateway, clock.relativeTimeMillis());
+       }
+
+       @VisibleForTesting
+       void newSlotsAreAvailable(Collection<? extends PhysicalSlot> newSlots) {
+               // Avoid notifying new slots multiple times due to 
SchedulerImpl allocating and releasing slots
+               // in order to find the best shared slot
+               final Collection<PhysicalSlot> slotsToProcess = new 
ArrayList<>();
+               for (PhysicalSlot newSlot : newSlots) {
+                       if (newSlotsSet.add(newSlot.getAllocationId())) {
+                               slotsToProcess.add(newSlot);
+                       }
+               }
+
+               final Collection<PendingRequestSlotMatching> matchingsToFulfill 
= new ArrayList<>();
+
+               for (PhysicalSlot newSlot : slotsToProcess) {
+                       final Optional<PendingRequest> matchingPendingRequest = 
findMatchingPendingRequest(newSlot);
+
+                       matchingPendingRequest.ifPresent(pendingRequest -> {
+                               
Preconditions.checkNotNull(pendingRequests.remove(pendingRequest.getSlotRequestId()),
 "Cannot fulfill a non existing pending slot request.");
+                               
reserveFreeSlot(pendingRequest.getSlotRequestId(), newSlot.getAllocationId(), 
pendingRequest.resourceProfile);
+
+                               
matchingsToFulfill.add(PendingRequestSlotMatching.createFor(pendingRequest, 
newSlot));
+                       });
+
+                       newSlotsSet.remove(newSlot.getAllocationId());

Review comment:
       I don't fully understand `newSlotsSet`. If this set is supposed to 
filter out repeated `newSlotsAreAvailable` calls, how is this possible if the 
only thing we are doing between adding a slot to `newSlotsAreAvailable` and 
removing it from there is to add the slot to `matchingsToFulfill`? Can it be 
that this is no longer possible because the completion of pending requests has 
been moved into a separate loop?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java
##########
@@ -0,0 +1,680 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.jobmaster.AllocatedSlotInfo;
+import org.apache.flink.runtime.jobmaster.AllocatedSlotReport;
+import org.apache.flink.runtime.jobmaster.JobMasterId;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.slots.ResourceRequirement;
+import org.apache.flink.runtime.slots.ResourceRequirements;
+import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.clock.Clock;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * {@link SlotPool} implementation which uses the {@link DeclarativeSlotPool} 
to allocate slots.
+ */
+public class DeclarativeSlotPoolBridge implements SlotPool {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(DeclarativeSlotPoolBridge.class);
+
+       private final JobID jobId;
+
+       private final Map<SlotRequestId, PendingRequest> pendingRequests;
+       private final Map<SlotRequestId, AllocationID> fulfilledRequests;
+       private final Set<AllocationID> newSlotsSet;
+       private final DeclarativeSlotPool declarativeSlotPool;
+       private final Set<ResourceID> registeredTaskManagers;
+
+       @Nullable
+       private ComponentMainThreadExecutor componentMainThreadExecutor;
+
+       @Nullable
+       private String jobManagerAddress;
+
+       @Nullable
+       private JobMasterId jobMasterId;
+
+       private DeclareResourceRequirementServiceConnectionManager 
declareResourceRequirementServiceConnectionManager;
+
+       private final Clock clock;
+       private final Time rpcTimeout;
+       private final Time idleSlotTimeout;
+       private final Time batchSlotTimeout;
+       private boolean isBatchSlotRequestTimeoutCheckDisabled;
+       private final Duration resourceAcquisitionTimeout;
+
+       public DeclarativeSlotPoolBridge(
+                       JobID jobId,
+                       DeclarativeSlotPoolFactory declarativeSlotPoolFactory,
+                       Clock clock,
+                       Time rpcTimeout,
+                       Time idleSlotTimeout,
+                       Time batchSlotTimeout, Duration 
resourceAcquisitionTimeout) {
+               this.jobId = Preconditions.checkNotNull(jobId);
+               this.clock = Preconditions.checkNotNull(clock);
+               this.rpcTimeout = Preconditions.checkNotNull(rpcTimeout);
+               this.idleSlotTimeout = 
Preconditions.checkNotNull(idleSlotTimeout);
+               this.batchSlotTimeout = 
Preconditions.checkNotNull(batchSlotTimeout);
+               this.isBatchSlotRequestTimeoutCheckDisabled = false;
+               this.resourceAcquisitionTimeout = 
Preconditions.checkNotNull(resourceAcquisitionTimeout);
+
+               this.pendingRequests = new LinkedHashMap<>();
+               this.fulfilledRequests = new HashMap<>();
+               this.newSlotsSet = new HashSet<>();
+               this.registeredTaskManagers = new HashSet<>();
+               this.declareResourceRequirementServiceConnectionManager = 
NoOpDeclareResourceRequirementServiceConnectionManager.INSTANCE;
+               this.declarativeSlotPool = declarativeSlotPoolFactory.create(
+                               this::declareResourceRequirements,
+                               this::newSlotsAreAvailable,
+                               idleSlotTimeout,
+                               rpcTimeout);
+       }
+
+       @Override
+       public void start(JobMasterId jobMasterId, String newJobManagerAddress, 
ComponentMainThreadExecutor jmMainThreadScheduledExecutor) throws Exception {
+               this.componentMainThreadExecutor = 
Preconditions.checkNotNull(jmMainThreadScheduledExecutor);
+               this.jobManagerAddress = 
Preconditions.checkNotNull(newJobManagerAddress);
+               this.jobMasterId = Preconditions.checkNotNull(jobMasterId);
+               this.declareResourceRequirementServiceConnectionManager = 
DefaultDeclareResourceRequirementServiceConnectionManager.create(componentMainThreadExecutor);
+
+               
componentMainThreadExecutor.schedule(this::checkIdleSlotTimeout, 
idleSlotTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+               
componentMainThreadExecutor.schedule(this::checkBatchSlotTimeout, 
batchSlotTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+       }
+
+       @Override
+       public void suspend() {
+               assertRunningInMainThread();
+               LOG.info("Suspending slot pool.");
+
+               cancelPendingRequests(new FlinkException("Suspending slot 
pool."), request -> true);
+               clearState();
+       }
+
+       @Override
+       public void close() {
+               LOG.info("Closing slot pool.");
+               final FlinkException cause = new FlinkException("Closing slot 
pool");
+               cancelPendingRequests(cause, request -> true);
+               releaseAllTaskManagers(new FlinkException("Closing slot 
pool."));
+               clearState();
+       }
+
+       private void cancelPendingRequests(FlinkException cancelCause, 
Predicate<PendingRequest> requestPredicate) {

Review comment:
       nit: I would change the order of the parameters. First saying which 
requests get cancelled and then with what.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeTest.java
##########
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import 
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
+import org.apache.flink.runtime.jobmaster.JobMasterId;
+import org.apache.flink.runtime.jobmaster.RpcTaskManagerGateway;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.slots.ResourceRequirement;
+import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
+import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.clock.SystemClock;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.annotation.Nonnull;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for the {@link DeclarativeSlotPoolBridge}.
+ */
+public class DeclarativeSlotPoolBridgeTest extends TestLogger {
+
+       private static final Time rpcTimeout = Time.seconds(20);
+       private static final JobID jobId = new JobID();
+       private static final JobMasterId jobMasterId = JobMasterId.generate();
+       private final ComponentMainThreadExecutor mainThreadExecutor = 
ComponentMainThreadExecutorServiceAdapter.forMainThread();
+
+       @Test
+       public void testSlotOffer() throws Exception {
+               final SlotRequestId slotRequestId = new SlotRequestId();
+               final AllocationID expectedAllocationId = new AllocationID();
+               final PhysicalSlot allocatedSlot = 
createAllocatedSlot(expectedAllocationId);
+
+               final TestingDeclarativeSlotPoolFactory 
declarativeSlotPoolFactory = new 
TestingDeclarativeSlotPoolFactory(TestingDeclarativeSlotPool.builder());
+               try (DeclarativeSlotPoolBridge declarativeSlotPoolBridge = 
createDeclarativeSlotPoolBridge(declarativeSlotPoolFactory)) {
+
+                       declarativeSlotPoolBridge.start(jobMasterId, 
"localhost", mainThreadExecutor);
+
+                       CompletableFuture<PhysicalSlot> slotAllocationFuture = 
declarativeSlotPoolBridge.requestNewAllocatedSlot(slotRequestId, 
ResourceProfile.UNKNOWN, null);
+
+                       
declarativeSlotPoolBridge.newSlotsAreAvailable(Collections.singleton(allocatedSlot));
+
+                       slotAllocationFuture.join();
+               }
+       }
+
+       @Test
+       public void testNotEnoughResourcesAvailableFailsPendingRequests() 
throws Exception {
+               final SlotRequestId slotRequestId = new SlotRequestId();
+
+               final TestingDeclarativeSlotPoolFactory 
declarativeSlotPoolFactory = new 
TestingDeclarativeSlotPoolFactory(TestingDeclarativeSlotPool.builder());
+               try (DeclarativeSlotPoolBridge declarativeSlotPoolBridge = 
createDeclarativeSlotPoolBridge(declarativeSlotPoolFactory)) {
+
+                       declarativeSlotPoolBridge.start(jobMasterId, 
"localhost", mainThreadExecutor);
+
+                       CompletableFuture<PhysicalSlot> slotAllocationFuture = 
CompletableFuture
+                                       .supplyAsync(() -> 
declarativeSlotPoolBridge.requestNewAllocatedSlot(slotRequestId, 
ResourceProfile.UNKNOWN, Time.minutes(5)), mainThreadExecutor)
+                                       .get();
+
+                       mainThreadExecutor.execute(() -> 
declarativeSlotPoolBridge.notifyNotEnoughResourcesAvailable(Collections.emptyList()));
+
+                       try {
+                               slotAllocationFuture.join();
+                               Assert.fail();
+                       } catch (Exception e) {
+                               Optional<NoResourceAvailableException> 
expectedException = ExceptionUtils.findThrowable(e, 
NoResourceAvailableException.class);
+                               assertThat(expectedException.isPresent(), 
is(true));
+                       }
+               }
+       }
+
+       @Test
+       public void testReleasingAllocatedSlot() throws Exception {
+               final CompletableFuture<AllocationID> releaseSlotFuture = new 
CompletableFuture<>();
+               final AllocationID expectedAllocationId = new AllocationID();
+               final PhysicalSlot allocatedSlot = 
createAllocatedSlot(expectedAllocationId);
+
+               final TestingDeclarativeSlotPoolBuilder builder = 
TestingDeclarativeSlotPool
+                               .builder()
+                               .setReserveFreeSlotFunction((allocationId, 
resourceProfile) -> {
+                                       assertThat(allocationId, 
is(expectedAllocationId));
+                                       return allocatedSlot;
+                               })
+                               .setFreeReservedSlotFunction((allocationID, 
throwable, aLong) -> {
+                                       
releaseSlotFuture.complete(allocationID);
+                                       return ResourceCounter.empty();
+                               });
+
+               final TestingDeclarativeSlotPoolFactory 
declarativeSlotPoolFactory = new TestingDeclarativeSlotPoolFactory(builder);
+               try (DeclarativeSlotPoolBridge declarativeSlotPoolBridge = 
createDeclarativeSlotPoolBridge(declarativeSlotPoolFactory)) {
+                       declarativeSlotPoolBridge.start(jobMasterId, 
"localhost", mainThreadExecutor);
+
+                       final SlotRequestId slotRequestId = new SlotRequestId();
+
+                       
declarativeSlotPoolBridge.allocateAvailableSlot(slotRequestId, 
expectedAllocationId, allocatedSlot.getResourceProfile());
+                       declarativeSlotPoolBridge.releaseSlot(slotRequestId, 
null);
+
+                       assertThat(releaseSlotFuture.join(), 
is(expectedAllocationId));
+               }
+       }
+
+       @Test
+       public void 
testNoConcurrentModificationWhenSuspendingAndReleasingSlot() throws Exception {
+               try (DeclarativeSlotPoolBridge declarativeSlotPoolBridge = 
createDeclarativeSlotPoolBridge(new DefaultDeclarativeSlotPoolFactory())) {
+
+                       declarativeSlotPoolBridge.start(jobMasterId, 
"localhost", mainThreadExecutor);
+
+                       final List<SlotRequestId> slotRequestIds = 
Arrays.asList(new SlotRequestId(), new SlotRequestId());
+
+                       final List<CompletableFuture<PhysicalSlot>> slotFutures 
= slotRequestIds.stream()
+                                       .map(slotRequestId -> {
+                                               final 
CompletableFuture<PhysicalSlot> slotFuture = 
declarativeSlotPoolBridge.requestNewAllocatedSlot(slotRequestId, 
ResourceProfile.UNKNOWN, rpcTimeout);
+                                               
slotFuture.whenComplete((physicalSlot, throwable) -> {
+                                                       if (throwable != null) {
+                                                               
declarativeSlotPoolBridge.releaseSlot(slotRequestId, throwable);
+                                                       }
+                                               });
+                                               return slotFuture;
+                                       })
+                                       .collect(Collectors.toList());
+
+                       declarativeSlotPoolBridge.suspend();
+
+                       try {
+                               FutureUtils.waitForAll(slotFutures).get();

Review comment:
       I think `FutureUtils.completeAll` is what you want to use here.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java
##########
@@ -0,0 +1,680 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.jobmaster.AllocatedSlotInfo;
+import org.apache.flink.runtime.jobmaster.AllocatedSlotReport;
+import org.apache.flink.runtime.jobmaster.JobMasterId;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.slots.ResourceRequirement;
+import org.apache.flink.runtime.slots.ResourceRequirements;
+import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.clock.Clock;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * {@link SlotPool} implementation which uses the {@link DeclarativeSlotPool} 
to allocate slots.
+ */
+public class DeclarativeSlotPoolBridge implements SlotPool {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(DeclarativeSlotPoolBridge.class);
+
+       private final JobID jobId;
+
+       private final Map<SlotRequestId, PendingRequest> pendingRequests;
+       private final Map<SlotRequestId, AllocationID> fulfilledRequests;
+       private final Set<AllocationID> newSlotsSet;
+       private final DeclarativeSlotPool declarativeSlotPool;
+       private final Set<ResourceID> registeredTaskManagers;
+
+       @Nullable
+       private ComponentMainThreadExecutor componentMainThreadExecutor;
+
+       @Nullable
+       private String jobManagerAddress;
+
+       @Nullable
+       private JobMasterId jobMasterId;
+
+       private DeclareResourceRequirementServiceConnectionManager 
declareResourceRequirementServiceConnectionManager;
+
+       private final Clock clock;
+       private final Time rpcTimeout;
+       private final Time idleSlotTimeout;
+       private final Time batchSlotTimeout;
+       private boolean isBatchSlotRequestTimeoutCheckDisabled;
+       private final Duration resourceAcquisitionTimeout;
+
+       public DeclarativeSlotPoolBridge(
+                       JobID jobId,
+                       DeclarativeSlotPoolFactory declarativeSlotPoolFactory,
+                       Clock clock,
+                       Time rpcTimeout,
+                       Time idleSlotTimeout,
+                       Time batchSlotTimeout, Duration 
resourceAcquisitionTimeout) {
+               this.jobId = Preconditions.checkNotNull(jobId);
+               this.clock = Preconditions.checkNotNull(clock);
+               this.rpcTimeout = Preconditions.checkNotNull(rpcTimeout);
+               this.idleSlotTimeout = 
Preconditions.checkNotNull(idleSlotTimeout);
+               this.batchSlotTimeout = 
Preconditions.checkNotNull(batchSlotTimeout);
+               this.isBatchSlotRequestTimeoutCheckDisabled = false;
+               this.resourceAcquisitionTimeout = 
Preconditions.checkNotNull(resourceAcquisitionTimeout);
+
+               this.pendingRequests = new LinkedHashMap<>();
+               this.fulfilledRequests = new HashMap<>();
+               this.newSlotsSet = new HashSet<>();
+               this.registeredTaskManagers = new HashSet<>();
+               this.declareResourceRequirementServiceConnectionManager = 
NoOpDeclareResourceRequirementServiceConnectionManager.INSTANCE;
+               this.declarativeSlotPool = declarativeSlotPoolFactory.create(
+                               this::declareResourceRequirements,
+                               this::newSlotsAreAvailable,
+                               idleSlotTimeout,
+                               rpcTimeout);
+       }
+
+       @Override
+       public void start(JobMasterId jobMasterId, String newJobManagerAddress, 
ComponentMainThreadExecutor jmMainThreadScheduledExecutor) throws Exception {
+               this.componentMainThreadExecutor = 
Preconditions.checkNotNull(jmMainThreadScheduledExecutor);
+               this.jobManagerAddress = 
Preconditions.checkNotNull(newJobManagerAddress);
+               this.jobMasterId = Preconditions.checkNotNull(jobMasterId);
+               this.declareResourceRequirementServiceConnectionManager = 
DefaultDeclareResourceRequirementServiceConnectionManager.create(componentMainThreadExecutor);
+
+               
componentMainThreadExecutor.schedule(this::checkIdleSlotTimeout, 
idleSlotTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+               
componentMainThreadExecutor.schedule(this::checkBatchSlotTimeout, 
batchSlotTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+       }
+
+       @Override
+       public void suspend() {
+               assertRunningInMainThread();
+               LOG.info("Suspending slot pool.");
+
+               cancelPendingRequests(new FlinkException("Suspending slot 
pool."), request -> true);
+               clearState();
+       }
+
+       @Override
+       public void close() {
+               LOG.info("Closing slot pool.");
+               final FlinkException cause = new FlinkException("Closing slot 
pool");
+               cancelPendingRequests(cause, request -> true);
+               releaseAllTaskManagers(new FlinkException("Closing slot 
pool."));
+               clearState();
+       }
+
+       private void cancelPendingRequests(FlinkException cancelCause, 
Predicate<PendingRequest> requestPredicate) {
+               ResourceCounter decreasedResourceRequirements = 
ResourceCounter.empty();
+
+               // need a copy since failing a request could trigger another 
request to be issued
+               final Iterable<PendingRequest> pendingRequestsToFail = new 
ArrayList<>(pendingRequests.values());
+               pendingRequests.clear();
+
+               for (PendingRequest pendingRequest : pendingRequestsToFail) {
+                       if (requestPredicate.test(pendingRequest)) {
+                               pendingRequest.failRequest(cancelCause);
+                               decreasedResourceRequirements = 
decreasedResourceRequirements.add(pendingRequest.getResourceProfile(), 1);
+                       } else {
+                               
pendingRequests.put(pendingRequest.slotRequestId, pendingRequest);

Review comment:
       ```suggestion
                                
pendingRequests.put(pendingRequest.getSlotRequestId(), pendingRequest);
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java
##########
@@ -0,0 +1,680 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.jobmaster.AllocatedSlotInfo;
+import org.apache.flink.runtime.jobmaster.AllocatedSlotReport;
+import org.apache.flink.runtime.jobmaster.JobMasterId;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.slots.ResourceRequirement;
+import org.apache.flink.runtime.slots.ResourceRequirements;
+import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.clock.Clock;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * {@link SlotPool} implementation which uses the {@link DeclarativeSlotPool} 
to allocate slots.
+ */
+public class DeclarativeSlotPoolBridge implements SlotPool {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(DeclarativeSlotPoolBridge.class);
+
+       private final JobID jobId;
+
+       private final Map<SlotRequestId, PendingRequest> pendingRequests;
+       private final Map<SlotRequestId, AllocationID> fulfilledRequests;
+       private final Set<AllocationID> newSlotsSet;
+       private final DeclarativeSlotPool declarativeSlotPool;
+       private final Set<ResourceID> registeredTaskManagers;
+
+       @Nullable
+       private ComponentMainThreadExecutor componentMainThreadExecutor;
+
+       @Nullable
+       private String jobManagerAddress;
+
+       @Nullable
+       private JobMasterId jobMasterId;
+
+       private DeclareResourceRequirementServiceConnectionManager 
declareResourceRequirementServiceConnectionManager;
+
+       private final Clock clock;
+       private final Time rpcTimeout;
+       private final Time idleSlotTimeout;
+       private final Time batchSlotTimeout;
+       private boolean isBatchSlotRequestTimeoutCheckDisabled;
+       private final Duration resourceAcquisitionTimeout;
+
+       public DeclarativeSlotPoolBridge(
+                       JobID jobId,
+                       DeclarativeSlotPoolFactory declarativeSlotPoolFactory,
+                       Clock clock,
+                       Time rpcTimeout,
+                       Time idleSlotTimeout,
+                       Time batchSlotTimeout, Duration 
resourceAcquisitionTimeout) {
+               this.jobId = Preconditions.checkNotNull(jobId);
+               this.clock = Preconditions.checkNotNull(clock);
+               this.rpcTimeout = Preconditions.checkNotNull(rpcTimeout);
+               this.idleSlotTimeout = 
Preconditions.checkNotNull(idleSlotTimeout);
+               this.batchSlotTimeout = 
Preconditions.checkNotNull(batchSlotTimeout);
+               this.isBatchSlotRequestTimeoutCheckDisabled = false;
+               this.resourceAcquisitionTimeout = 
Preconditions.checkNotNull(resourceAcquisitionTimeout);
+
+               this.pendingRequests = new LinkedHashMap<>();
+               this.fulfilledRequests = new HashMap<>();
+               this.newSlotsSet = new HashSet<>();
+               this.registeredTaskManagers = new HashSet<>();
+               this.declareResourceRequirementServiceConnectionManager = 
NoOpDeclareResourceRequirementServiceConnectionManager.INSTANCE;
+               this.declarativeSlotPool = declarativeSlotPoolFactory.create(
+                               this::declareResourceRequirements,
+                               this::newSlotsAreAvailable,
+                               idleSlotTimeout,
+                               rpcTimeout);
+       }
+
+       @Override
+       public void start(JobMasterId jobMasterId, String newJobManagerAddress, 
ComponentMainThreadExecutor jmMainThreadScheduledExecutor) throws Exception {
+               this.componentMainThreadExecutor = 
Preconditions.checkNotNull(jmMainThreadScheduledExecutor);
+               this.jobManagerAddress = 
Preconditions.checkNotNull(newJobManagerAddress);
+               this.jobMasterId = Preconditions.checkNotNull(jobMasterId);
+               this.declareResourceRequirementServiceConnectionManager = 
DefaultDeclareResourceRequirementServiceConnectionManager.create(componentMainThreadExecutor);
+
+               
componentMainThreadExecutor.schedule(this::checkIdleSlotTimeout, 
idleSlotTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+               
componentMainThreadExecutor.schedule(this::checkBatchSlotTimeout, 
batchSlotTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+       }
+
+       @Override
+       public void suspend() {
+               assertRunningInMainThread();
+               LOG.info("Suspending slot pool.");
+
+               cancelPendingRequests(new FlinkException("Suspending slot 
pool."), request -> true);
+               clearState();
+       }
+
+       @Override
+       public void close() {
+               LOG.info("Closing slot pool.");
+               final FlinkException cause = new FlinkException("Closing slot 
pool");
+               cancelPendingRequests(cause, request -> true);
+               releaseAllTaskManagers(new FlinkException("Closing slot 
pool."));
+               clearState();
+       }
+
+       private void cancelPendingRequests(FlinkException cancelCause, 
Predicate<PendingRequest> requestPredicate) {
+               ResourceCounter decreasedResourceRequirements = 
ResourceCounter.empty();
+
+               // need a copy since failing a request could trigger another 
request to be issued
+               final Iterable<PendingRequest> pendingRequestsToFail = new 
ArrayList<>(pendingRequests.values());
+               pendingRequests.clear();
+
+               for (PendingRequest pendingRequest : pendingRequestsToFail) {
+                       if (requestPredicate.test(pendingRequest)) {
+                               pendingRequest.failRequest(cancelCause);
+                               decreasedResourceRequirements = 
decreasedResourceRequirements.add(pendingRequest.getResourceProfile(), 1);
+                       } else {
+                               
pendingRequests.put(pendingRequest.slotRequestId, pendingRequest);
+                       }
+               }
+
+               if (!decreasedResourceRequirements.isEmpty()) {
+                       
declarativeSlotPool.decreaseResourceRequirementsBy(decreasedResourceRequirements);
+               }
+       }
+
+       private void clearState() {
+               declareResourceRequirementServiceConnectionManager.close();
+               declareResourceRequirementServiceConnectionManager = 
NoOpDeclareResourceRequirementServiceConnectionManager.INSTANCE;
+               registeredTaskManagers.clear();
+               jobManagerAddress = null;
+               jobMasterId = null;
+       }
+
+       @Override
+       public void connectToResourceManager(ResourceManagerGateway 
resourceManagerGateway) {
+               assertRunningInMainThread();
+               Preconditions.checkNotNull(resourceManagerGateway);
+
+               
declareResourceRequirementServiceConnectionManager.connect(resourceRequirements 
-> resourceManagerGateway.declareRequiredResources(jobMasterId, 
resourceRequirements, rpcTimeout));
+               
declareResourceRequirements(declarativeSlotPool.getResourceRequirements());
+       }
+
+       @Override
+       public void disconnectResourceManager() {
+               assertRunningInMainThread();
+               
this.declareResourceRequirementServiceConnectionManager.disconnect();
+       }
+
+       @Override
+       public boolean registerTaskManager(ResourceID resourceID) {
+               assertRunningInMainThread();
+
+               LOG.debug("Register new TaskExecutor {}.", resourceID);
+               return registeredTaskManagers.add(resourceID);
+       }
+
+       @Override
+       public boolean releaseTaskManager(ResourceID resourceId, Exception 
cause) {
+               assertRunningInMainThread();
+
+               if (registeredTaskManagers.remove(resourceId)) {
+                       internalReleaseTaskManager(resourceId, cause);
+                       return true;
+               } else {
+                       return false;
+               }
+       }
+
+       private void releaseAllTaskManagers(FlinkException cause) {
+               for (ResourceID registeredTaskManager : registeredTaskManagers) 
{
+                       internalReleaseTaskManager(registeredTaskManager, 
cause);
+               }
+
+               registeredTaskManagers.clear();
+       }
+
+       private void internalReleaseTaskManager(ResourceID resourceId, 
Exception cause) {
+               ResourceCounter previouslyFulfilledRequirement = 
declarativeSlotPool.releaseSlots(resourceId, cause);
+               
declarativeSlotPool.decreaseResourceRequirementsBy(previouslyFulfilledRequirement);
+       }
+
+       @Override
+       public Collection<SlotOffer> offerSlots(TaskManagerLocation 
taskManagerLocation, TaskManagerGateway taskManagerGateway, 
Collection<SlotOffer> offers) {
+               assertRunningInMainThread();
+               Preconditions.checkNotNull(taskManagerGateway);
+               Preconditions.checkNotNull(offers);
+
+               if 
(!registeredTaskManagers.contains(taskManagerLocation.getResourceID())) {
+                       return Collections.emptyList();
+               }
+
+               return declarativeSlotPool.offerSlots(offers, 
taskManagerLocation, taskManagerGateway, clock.relativeTimeMillis());
+       }
+
+       @VisibleForTesting
+       void newSlotsAreAvailable(Collection<? extends PhysicalSlot> newSlots) {
+               // Avoid notifying new slots multiple times due to 
SchedulerImpl allocating and releasing slots
+               // in order to find the best shared slot
+               final Collection<PhysicalSlot> slotsToProcess = new 
ArrayList<>();
+               for (PhysicalSlot newSlot : newSlots) {
+                       if (newSlotsSet.add(newSlot.getAllocationId())) {
+                               slotsToProcess.add(newSlot);
+                       }
+               }
+
+               final Collection<PendingRequestSlotMatching> matchingsToFulfill 
= new ArrayList<>();
+
+               for (PhysicalSlot newSlot : slotsToProcess) {
+                       final Optional<PendingRequest> matchingPendingRequest = 
findMatchingPendingRequest(newSlot);
+
+                       matchingPendingRequest.ifPresent(pendingRequest -> {
+                               
Preconditions.checkNotNull(pendingRequests.remove(pendingRequest.getSlotRequestId()),
 "Cannot fulfill a non existing pending slot request.");
+                               
reserveFreeSlot(pendingRequest.getSlotRequestId(), newSlot.getAllocationId(), 
pendingRequest.resourceProfile);
+
+                               
matchingsToFulfill.add(PendingRequestSlotMatching.createFor(pendingRequest, 
newSlot));
+                       });
+
+                       newSlotsSet.remove(newSlot.getAllocationId());
+               }
+
+               // we have to first reserve all matching slots before 
fulfilling the requests
+               // otherwise it can happen that the SchedulerImpl reserves one 
of the new slots
+               // for a request which has been triggered by fulfilling a 
pending request
+               for (PendingRequestSlotMatching pendingRequestSlotMatching : 
matchingsToFulfill) {
+                       pendingRequestSlotMatching.fulfillPendingRequest();
+               }
+       }
+
+       private void reserveFreeSlot(SlotRequestId slotRequestId, AllocationID 
allocationId, ResourceProfile resourceProfile) {
+               LOG.debug("Reserve slot {} for slot request id {}", 
allocationId, slotRequestId);

Review comment:
       I am wondering whether this logging statement shouldn't be part of the 
`AllocatedSlotPool`.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeTest.java
##########
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import 
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
+import org.apache.flink.runtime.jobmaster.JobMasterId;
+import org.apache.flink.runtime.jobmaster.RpcTaskManagerGateway;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.slots.ResourceRequirement;
+import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
+import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.clock.SystemClock;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.annotation.Nonnull;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for the {@link DeclarativeSlotPoolBridge}.
+ */
+public class DeclarativeSlotPoolBridgeTest extends TestLogger {
+
+       private static final Time rpcTimeout = Time.seconds(20);
+       private static final JobID jobId = new JobID();
+       private static final JobMasterId jobMasterId = JobMasterId.generate();
+       private final ComponentMainThreadExecutor mainThreadExecutor = 
ComponentMainThreadExecutorServiceAdapter.forMainThread();
+
+       @Test
+       public void testSlotOffer() throws Exception {
+               final SlotRequestId slotRequestId = new SlotRequestId();
+               final AllocationID expectedAllocationId = new AllocationID();
+               final PhysicalSlot allocatedSlot = 
createAllocatedSlot(expectedAllocationId);
+
+               final TestingDeclarativeSlotPoolFactory 
declarativeSlotPoolFactory = new 
TestingDeclarativeSlotPoolFactory(TestingDeclarativeSlotPool.builder());
+               try (DeclarativeSlotPoolBridge declarativeSlotPoolBridge = 
createDeclarativeSlotPoolBridge(declarativeSlotPoolFactory)) {
+
+                       declarativeSlotPoolBridge.start(jobMasterId, 
"localhost", mainThreadExecutor);
+
+                       CompletableFuture<PhysicalSlot> slotAllocationFuture = 
declarativeSlotPoolBridge.requestNewAllocatedSlot(slotRequestId, 
ResourceProfile.UNKNOWN, null);
+
+                       
declarativeSlotPoolBridge.newSlotsAreAvailable(Collections.singleton(allocatedSlot));
+
+                       slotAllocationFuture.join();
+               }
+       }
+
+       @Test
+       public void testNotEnoughResourcesAvailableFailsPendingRequests() 
throws Exception {
+               final SlotRequestId slotRequestId = new SlotRequestId();
+
+               final TestingDeclarativeSlotPoolFactory 
declarativeSlotPoolFactory = new 
TestingDeclarativeSlotPoolFactory(TestingDeclarativeSlotPool.builder());
+               try (DeclarativeSlotPoolBridge declarativeSlotPoolBridge = 
createDeclarativeSlotPoolBridge(declarativeSlotPoolFactory)) {
+
+                       declarativeSlotPoolBridge.start(jobMasterId, 
"localhost", mainThreadExecutor);
+
+                       CompletableFuture<PhysicalSlot> slotAllocationFuture = 
CompletableFuture
+                                       .supplyAsync(() -> 
declarativeSlotPoolBridge.requestNewAllocatedSlot(slotRequestId, 
ResourceProfile.UNKNOWN, Time.minutes(5)), mainThreadExecutor)
+                                       .get();
+
+                       mainThreadExecutor.execute(() -> 
declarativeSlotPoolBridge.notifyNotEnoughResourcesAvailable(Collections.emptyList()));
+
+                       try {
+                               slotAllocationFuture.join();
+                               Assert.fail();
+                       } catch (Exception e) {
+                               Optional<NoResourceAvailableException> 
expectedException = ExceptionUtils.findThrowable(e, 
NoResourceAvailableException.class);
+                               assertThat(expectedException.isPresent(), 
is(true));

Review comment:
       Or `FlinkMatchers.futureWillCompleteExceptionally` on 
`slotAllocationFuture`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to