[ 
https://issues.apache.org/jira/browse/FLINK-9912?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16617553#comment-16617553
 ] 

ASF GitHub Bot commented on FLINK-9912:
---------------------------------------

asfgit closed pull request #6394: [FLINK-9912][JM] Release TaskExecutors if 
they have no slots registered at SlotPool
URL: https://github.com/apache/flink/pull/6394
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-core/src/main/java/org/apache/flink/types/SerializableOptional.java 
b/flink-core/src/main/java/org/apache/flink/types/SerializableOptional.java
new file mode 100644
index 00000000000..4ec75c51a84
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/types/SerializableOptional.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.types;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.NoSuchElementException;
+import java.util.Optional;
+import java.util.function.Consumer;
+
+/**
+ * Serializable {@link Optional}.
+ */
+public final class SerializableOptional<T extends Serializable> implements 
Serializable {
+       private static final long serialVersionUID = -3312769593551775940L;
+
+       private static final SerializableOptional<?> EMPTY = new 
SerializableOptional<>(null);
+
+       @Nullable
+       private final T value;
+
+       private SerializableOptional(@Nullable T value) {
+               this.value = value;
+       }
+
+       public T get() {
+               if (value == null) {
+                       throw new NoSuchElementException("No value present");
+               }
+               return value;
+       }
+
+       public boolean isPresent() {
+               return value != null;
+       }
+
+       public void ifPresent(Consumer<? super T> consumer) {
+               if (value != null) {
+                       consumer.accept(value);
+               }
+       }
+
+       public static <T extends Serializable> SerializableOptional<T> 
of(@Nonnull T value) {
+               return new SerializableOptional<>(value);
+       }
+
+       @SuppressWarnings("unchecked")
+       public static <T extends Serializable> SerializableOptional<T> empty() {
+               return (SerializableOptional<T>) EMPTY;
+       }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 736984e88e7..21e06af30d6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -103,6 +103,7 @@
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
+import org.apache.flink.types.SerializableOptional;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.InstantiationUtil;
@@ -833,13 +834,25 @@ public void failSlot(
                        final Exception cause) {
 
                if (registeredTaskManagers.containsKey(taskManagerId)) {
-                       slotPoolGateway.failAllocation(allocationId, cause);
+                       internalFailAllocation(allocationId, cause);
                } else {
                        log.warn("Cannot fail slot " + allocationId + " because 
the TaskManager " +
                        taskManagerId + " is unknown.");
                }
        }
 
+       private void internalFailAllocation(AllocationID allocationId, 
Exception cause) {
+               final CompletableFuture<SerializableOptional<ResourceID>> 
emptyTaskExecutorFuture = slotPoolGateway.failAllocation(allocationId, cause);
+
+               emptyTaskExecutorFuture.thenAcceptAsync(
+                       resourceIdOptional -> 
resourceIdOptional.ifPresent(this::releaseEmptyTaskManager),
+                       getMainThreadExecutor());
+       }
+
+       private CompletableFuture<Acknowledge> 
releaseEmptyTaskManager(ResourceID resourceId) {
+               return disconnectTaskManager(resourceId, new 
FlinkException(String.format("No more slots registered at JobMaster %s.", 
resourceId)));
+       }
+
        @Override
        public CompletableFuture<RegistrationResponse> registerTaskManager(
                        final String taskManagerRpcAddress,
@@ -982,7 +995,7 @@ private void startCheckpointScheduler(final 
CheckpointCoordinator checkpointCoor
 
        @Override
        public void notifyAllocationFailure(AllocationID allocationID, 
Exception cause) {
-               slotPoolGateway.failAllocation(allocationID, cause);
+               internalFailAllocation(allocationID, cause);
        }
 
        
//----------------------------------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
index 13f0462455c..b53ee93e643 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
@@ -50,6 +50,7 @@
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.util.clock.Clock;
 import org.apache.flink.runtime.util.clock.SystemClock;
+import org.apache.flink.types.SerializableOptional;
 import org.apache.flink.util.AbstractID;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
@@ -1001,32 +1002,50 @@ private PendingRequest pollMatchingPendingRequest(final 
AllocatedSlot slot) {
         * and decided to take it back.
         *
         * @param allocationID Represents the allocation which should be failed
-        * @param cause        The cause of the failure
+        * @param cause The cause of the failure
+        * @return Optional task executor if it has no more slots registered
         */
        @Override
-       public void failAllocation(final AllocationID allocationID, final 
Exception cause) {
+       public CompletableFuture<SerializableOptional<ResourceID>> 
failAllocation(final AllocationID allocationID, final Exception cause) {
                final PendingRequest pendingRequest = 
pendingRequests.removeKeyB(allocationID);
                if (pendingRequest != null) {
                        // request was still pending
                        failPendingRequest(pendingRequest, cause);
-               }
-               else if (availableSlots.tryRemove(allocationID)) {
-                       log.debug("Failed available slot [{}].", allocationID, 
cause);
+                       return 
CompletableFuture.completedFuture(SerializableOptional.empty());
                }
                else {
-                       AllocatedSlot allocatedSlot = 
allocatedSlots.remove(allocationID);
-                       if (allocatedSlot != null) {
-                               // release the slot.
-                               // since it is not in 'allocatedSlots' any 
more, it will be dropped o return'
-                               allocatedSlot.releasePayload(cause);
-                       }
-                       else {
-                               log.trace("Outdated request to fail slot 
[{}].", allocationID, cause);
-                       }
+                       return tryFailingAllocatedSlot(allocationID, cause);
                }
+
                // TODO: add some unit tests when the previous two are ready, 
the allocation may failed at any phase
        }
 
+       private CompletableFuture<SerializableOptional<ResourceID>> 
tryFailingAllocatedSlot(AllocationID allocationID, Exception cause) {
+               AllocatedSlot allocatedSlot = 
availableSlots.tryRemove(allocationID);
+
+               if (allocatedSlot == null) {
+                       allocatedSlot = allocatedSlots.remove(allocationID);
+               }
+
+               if (allocatedSlot != null) {
+                       log.debug("Failed allocated slot [{}]: {}", 
allocationID, cause.getMessage());
+
+                       // notify TaskExecutor about the failure
+                       
allocatedSlot.getTaskManagerGateway().freeSlot(allocationID, cause, rpcTimeout);
+                       // release the slot.
+                       // since it is not in 'allocatedSlots' any more, it 
will be dropped o return'
+                       allocatedSlot.releasePayload(cause);
+
+                       final ResourceID taskManagerId = 
allocatedSlot.getTaskManagerId();
+
+                       if (!availableSlots.containsTaskManager(taskManagerId) 
&& !allocatedSlots.containResource(taskManagerId)) {
+                               return 
CompletableFuture.completedFuture(SerializableOptional.of(taskManagerId));
+                       }
+               }
+
+               return 
CompletableFuture.completedFuture(SerializableOptional.empty());
+       }
+
        // 
------------------------------------------------------------------------
        //  Resource
        // 
------------------------------------------------------------------------
@@ -1107,7 +1126,7 @@ private void checkIdleSlot() {
 
                for (AllocatedSlot expiredSlot : expiredSlots) {
                        final AllocationID allocationID = 
expiredSlot.getAllocationId();
-                       if (availableSlots.tryRemove(allocationID)) {
+                       if (availableSlots.tryRemove(allocationID) != null) {
 
                                log.info("Releasing idle slot [{}].", 
allocationID);
                                final CompletableFuture<Acknowledge> 
freeSlotFuture = expiredSlot.getTaskManagerGateway().freeSlot(
@@ -1502,7 +1521,7 @@ SlotAndLocality poll(SchedulingStrategy 
schedulingStrategy, SlotProfile slotProf
                        }
                }
 
-               boolean tryRemove(AllocationID slotId) {
+               AllocatedSlot tryRemove(AllocationID slotId) {
                        final SlotAndTimestamp sat = 
availableSlots.remove(slotId);
                        if (sat != null) {
                                final AllocatedSlot slot = sat.slot();
@@ -1522,15 +1541,15 @@ boolean tryRemove(AllocationID slotId) {
                                        availableSlotsByHost.remove(host);
                                }
 
-                               return true;
+                               return slot;
                        }
                        else {
-                               return false;
+                               return null;
                        }
                }
 
                private void remove(AllocationID slotId) throws 
IllegalStateException {
-                       if (!tryRemove(slotId)) {
+                       if (tryRemove(slotId) == null) {
                                throw new IllegalStateException("slot not 
contained");
                        }
                }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolGateway.java
index 34d9c7ff601..3e546ff3674 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolGateway.java
@@ -34,6 +34,7 @@
 import org.apache.flink.runtime.rpc.RpcTimeout;
 import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.types.SerializableOptional;
 
 import java.util.Collection;
 import java.util.concurrent.CompletableFuture;
@@ -126,8 +127,9 @@
         *
         * @param allocationID identifying the slot which is being failed
         * @param cause of the failure
+        * @return An optional task executor id if this task executor has no 
more slots registered
         */
-       void failAllocation(AllocationID allocationID, Exception cause);
+       CompletableFuture<SerializableOptional<ResourceID>> 
failAllocation(AllocationID allocationID, Exception cause);
 
        // 
------------------------------------------------------------------------
        //  allocating and disposing slots
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
index afcd24f1064..ef288a26469 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
@@ -36,7 +36,6 @@
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
-import javax.annotation.concurrent.GuardedBy;
 
 import java.util.AbstractCollection;
 import java.util.Collection;
@@ -82,9 +81,6 @@
 
        private static final Logger LOG = 
LoggerFactory.getLogger(SlotSharingManager.class);
 
-       /** Lock for the internal data structures. */
-       private final Object lock = new Object();
-
        private final SlotSharingGroupId slotSharingGroupId;
 
        /** Actions to release allocated slots after a complete multi task slot 
hierarchy has been released. */
@@ -96,11 +92,9 @@
        private final Map<SlotRequestId, TaskSlot> allTaskSlots;
 
        /** Root nodes which have not been completed because the allocated slot 
is still pending. */
-       @GuardedBy("lock")
        private final Map<SlotRequestId, MultiTaskSlot> unresolvedRootSlots;
 
        /** Root nodes which have been completed (the underlying allocated slot 
has been assigned). */
-       @GuardedBy("lock")
        private final Map<TaskManagerLocation, Set<MultiTaskSlot>> 
resolvedRootSlots;
 
        SlotSharingManager(
@@ -152,27 +146,23 @@ MultiTaskSlot createRootSlot(
 
                allTaskSlots.put(slotRequestId, rootMultiTaskSlot);
 
-               synchronized (lock) {
-                       unresolvedRootSlots.put(slotRequestId, 
rootMultiTaskSlot);
-               }
+               unresolvedRootSlots.put(slotRequestId, rootMultiTaskSlot);
 
                // add the root node to the set of resolved root nodes once the 
SlotContext future has
                // been completed and we know the slot's TaskManagerLocation
                slotContextFuture.whenComplete(
                        (SlotContext slotContext, Throwable throwable) -> {
                                if (slotContext != null) {
-                                       synchronized (lock) {
-                                               final MultiTaskSlot 
resolvedRootNode = unresolvedRootSlots.remove(slotRequestId);
+                                       final MultiTaskSlot resolvedRootNode = 
unresolvedRootSlots.remove(slotRequestId);
 
-                                               if (resolvedRootNode != null) {
-                                                       LOG.trace("Fulfill 
multi task slot [{}] with slot [{}].", slotRequestId, 
slotContext.getAllocationId());
+                                       if (resolvedRootNode != null) {
+                                               LOG.trace("Fulfill multi task 
slot [{}] with slot [{}].", slotRequestId, slotContext.getAllocationId());
 
-                                                       final 
Set<MultiTaskSlot> innerCollection = resolvedRootSlots.computeIfAbsent(
-                                                               
slotContext.getTaskManagerLocation(),
-                                                               
taskManagerLocation -> new HashSet<>(4));
+                                               final Set<MultiTaskSlot> 
innerCollection = resolvedRootSlots.computeIfAbsent(
+                                                       
slotContext.getTaskManagerLocation(),
+                                                       taskManagerLocation -> 
new HashSet<>(4));
 
-                                                       
innerCollection.add(resolvedRootNode);
-                                               }
+                                               
innerCollection.add(resolvedRootNode);
                                        }
                                } else {
                                        rootMultiTaskSlot.release(throwable);
@@ -193,15 +183,13 @@ MultiTaskSlot createRootSlot(
         */
        @Nullable
        MultiTaskSlotLocality getResolvedRootSlot(AbstractID groupId, 
SchedulingStrategy matcher, SlotProfile slotProfile) {
-               synchronized (lock) {
-                       Collection<Set<MultiTaskSlot>> resolvedRootSlotsValues 
= this.resolvedRootSlots.values();
-                       return matcher.findMatchWithLocality(
-                               slotProfile,
-                               
resolvedRootSlotsValues.stream().flatMap(Collection::stream),
-                               (MultiTaskSlot multiTaskSlot) -> 
multiTaskSlot.getSlotContextFuture().join(),
-                               (MultiTaskSlot multiTaskSlot) -> 
!multiTaskSlot.contains(groupId),
-                               MultiTaskSlotLocality::of);
-               }
+               Collection<Set<MultiTaskSlot>> resolvedRootSlotsValues = 
this.resolvedRootSlots.values();
+               return matcher.findMatchWithLocality(
+                       slotProfile,
+                       
resolvedRootSlotsValues.stream().flatMap(Collection::stream),
+                       (MultiTaskSlot multiTaskSlot) -> 
multiTaskSlot.getSlotContextFuture().join(),
+                       (MultiTaskSlot multiTaskSlot) -> 
!multiTaskSlot.contains(groupId),
+                       MultiTaskSlotLocality::of);
        }
 
        /**
@@ -213,11 +201,9 @@ MultiTaskSlotLocality getResolvedRootSlot(AbstractID 
groupId, SchedulingStrategy
         */
        @Nullable
        MultiTaskSlot getUnresolvedRootSlot(AbstractID groupId) {
-               synchronized (lock) {
-                       for (MultiTaskSlot multiTaskSlot : 
unresolvedRootSlots.values()) {
-                               if (!multiTaskSlot.contains(groupId)) {
-                                       return multiTaskSlot;
-                               }
+               for (MultiTaskSlot multiTaskSlot : 
unresolvedRootSlots.values()) {
+                       if (!multiTaskSlot.contains(groupId)) {
+                               return multiTaskSlot;
                        }
                }
 
@@ -228,11 +214,9 @@ MultiTaskSlot getUnresolvedRootSlot(AbstractID groupId) {
        public String toString() {
                final StringBuilder builder = new 
StringBuilder("{\n\tgroupId=").append(slotSharingGroupId).append('\n');
 
-               synchronized (lock) {
-                       
builder.append("\tunresolved=").append(unresolvedRootSlots).append('\n');
-                       
builder.append("\tresolved=").append(resolvedRootSlots).append('\n');
-                       
builder.append("\tall=").append(allTaskSlots).append('\n');
-               }
+               
builder.append("\tunresolved=").append(unresolvedRootSlots).append('\n');
+               
builder.append("\tresolved=").append(resolvedRootSlots).append('\n');
+               builder.append("\tall=").append(allTaskSlots).append('\n');
 
                return builder.append('}').toString();
        }
@@ -479,26 +463,20 @@ public void release(Throwable cause) {
                                parent.releaseChild(getGroupId());
                        } else if (allTaskSlots.remove(getSlotRequestId()) != 
null) {
                                // we are the root node --> remove the root 
node from the list of task slots
+                               final MultiTaskSlot unresolvedRootSlot = 
unresolvedRootSlots.remove(getSlotRequestId());
 
-                               if (!slotContextFuture.isDone() || 
slotContextFuture.isCompletedExceptionally()) {
-                                       synchronized (lock) {
-                                               // the root node should still 
be unresolved
-                                               
unresolvedRootSlots.remove(getSlotRequestId());
-                                       }
-                               } else {
+                               if (unresolvedRootSlot == null) {
                                        // the root node should be resolved --> 
we can access the slot context
                                        final SlotContext slotContext = 
slotContextFuture.getNow(null);
 
                                        if (slotContext != null) {
-                                               synchronized (lock) {
-                                                       final 
Set<MultiTaskSlot> multiTaskSlots = 
resolvedRootSlots.get(slotContext.getTaskManagerLocation());
+                                               final Set<MultiTaskSlot> 
multiTaskSlots = resolvedRootSlots.get(slotContext.getTaskManagerLocation());
 
-                                                       if (multiTaskSlots != 
null) {
-                                                               
multiTaskSlots.remove(this);
+                                               if (multiTaskSlots != null) {
+                                                       
multiTaskSlots.remove(this);
 
-                                                               if 
(multiTaskSlots.isEmpty()) {
-                                                                       
resolvedRootSlots.remove(slotContext.getTaskManagerLocation());
-                                                               }
+                                                       if 
(multiTaskSlots.isEmpty()) {
+                                                               
resolvedRootSlots.remove(slotContext.getTaskManagerLocation());
                                                        }
                                                }
                                        }
@@ -637,9 +615,7 @@ public String toString() {
 
        @VisibleForTesting
        Collection<MultiTaskSlot> getUnresolvedRootSlots() {
-               synchronized (lock) {
-                       return unresolvedRootSlots.values();
-               }
+               return unresolvedRootSlots.values();
        }
 
        /**
@@ -649,19 +625,15 @@ public String toString() {
 
                @Override
                public Iterator<MultiTaskSlot> iterator() {
-                       synchronized (lock) {
-                               return new 
ResolvedRootSlotIterator(resolvedRootSlots.values().iterator());
-                       }
+                       return new 
ResolvedRootSlotIterator(resolvedRootSlots.values().iterator());
                }
 
                @Override
                public int size() {
                        int numberResolvedMultiTaskSlots = 0;
 
-                       synchronized (lock) {
-                               for (Set<MultiTaskSlot> multiTaskSlots : 
resolvedRootSlots.values()) {
-                                       numberResolvedMultiTaskSlots += 
multiTaskSlots.size();
-                               }
+                       for (Set<MultiTaskSlot> multiTaskSlots : 
resolvedRootSlots.values()) {
+                               numberResolvedMultiTaskSlots += 
multiTaskSlots.size();
                        }
 
                        return numberResolvedMultiTaskSlots;
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java
index 5c62a737096..e53b48021ee 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java
@@ -34,9 +34,12 @@
 import org.apache.flink.runtime.messages.StackTrace;
 import org.apache.flink.runtime.messages.StackTraceSampleResponse;
 
+import javax.annotation.Nonnull;
+
 import java.util.Optional;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.function.BiConsumer;
 import java.util.function.BiFunction;
 import java.util.function.Consumer;
 
@@ -54,6 +57,9 @@
 
        private volatile BiFunction<AllocationID, Throwable, 
CompletableFuture<Acknowledge>> freeSlotFunction;
 
+       @Nonnull
+       private volatile BiConsumer<InstanceID, Exception> 
disconnectFromJobManagerConsumer = (ignoredA, ignoredB) -> {};
+
        public SimpleAckingTaskManagerGateway() {
                optSubmitConsumer = Optional.empty();
                optCancelConsumer = Optional.empty();
@@ -71,13 +77,19 @@ public void setFreeSlotFunction(BiFunction<AllocationID, 
Throwable, CompletableF
                this.freeSlotFunction = freeSlotFunction;
        }
 
+       public void setDisconnectFromJobManagerConsumer(@Nonnull 
BiConsumer<InstanceID, Exception> disconnectFromJobManagerConsumer) {
+               this.disconnectFromJobManagerConsumer = 
disconnectFromJobManagerConsumer;
+       }
+
        @Override
        public String getAddress() {
                return address;
        }
 
        @Override
-       public void disconnectFromJobManager(InstanceID instanceId, Exception 
cause) {}
+       public void disconnectFromJobManager(InstanceID instanceId, Exception 
cause) {
+               disconnectFromJobManagerConsumer.accept(instanceId, cause);
+       }
 
        @Override
        public void stopCluster(ApplicationStatus applicationStatus, String 
message) {}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index 9a2bc97b62b..462b1d16da9 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -550,20 +550,6 @@ public void testSlotRequestTimeoutWhenNoSlotOffering() 
throws Exception {
                }
        }
 
-       private JobGraph createSingleVertexJobWithRestartStrategy() throws 
IOException {
-               final JobVertex jobVertex = new JobVertex("Test vertex");
-               jobVertex.setInvokableClass(NoOpInvokable.class);
-
-               final ExecutionConfig executionConfig = new ExecutionConfig();
-               
executionConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE,
 0L));
-
-               final JobGraph jobGraph = new JobGraph(jobVertex);
-               jobGraph.setAllowQueuedScheduling(true);
-               jobGraph.setExecutionConfig(executionConfig);
-
-               return jobGraph;
-       }
-
        /**
         * Tests that we can close an unestablished ResourceManager connection.
         */
@@ -975,6 +961,72 @@ public void testTriggerSavepointTimeout() throws Exception 
{
                }
        }
 
+       /**
+        * Tests that the TaskExecutor is released if all of its slots have 
been freed.
+        */
+       @Test
+       public void testReleasingTaskExecutorIfNoMoreSlotsRegistered() throws 
Exception {
+               final JobManagerSharedServices jobManagerSharedServices = new 
TestingJobManagerSharedServicesBuilder().build();
+
+               final JobGraph jobGraph = 
createSingleVertexJobWithRestartStrategy();
+
+               final JobMaster jobMaster = createJobMaster(
+                       configuration,
+                       jobGraph,
+                       haServices,
+                       jobManagerSharedServices,
+                       heartbeatServices);
+
+               final TestingResourceManagerGateway 
testingResourceManagerGateway = new TestingResourceManagerGateway();
+               
rpcService.registerGateway(testingResourceManagerGateway.getAddress(), 
testingResourceManagerGateway);
+               
rmLeaderRetrievalService.notifyListener(testingResourceManagerGateway.getAddress(),
 testingResourceManagerGateway.getFencingToken().toUUID());
+
+               final CompletableFuture<AllocationID> allocationIdFuture = new 
CompletableFuture<>();
+
+               testingResourceManagerGateway.setRequestSlotConsumer(
+                       slotRequest -> 
allocationIdFuture.complete(slotRequest.getAllocationId()));
+
+               final CompletableFuture<JobID> disconnectTaskExecutorFuture = 
new CompletableFuture<>();
+               final CompletableFuture<AllocationID> freedSlotFuture = new 
CompletableFuture<>();
+               final TestingTaskExecutorGateway testingTaskExecutorGateway = 
new TestingTaskExecutorGatewayBuilder()
+                       .setFreeSlotFunction(
+                               (allocationID, throwable) -> {
+                                       freedSlotFuture.complete(allocationID);
+                                       return 
CompletableFuture.completedFuture(Acknowledge.get());
+                               })
+                       .setDisconnectJobManagerConsumer((jobID, throwable) -> 
disconnectTaskExecutorFuture.complete(jobID))
+                       .createTestingTaskExecutorGateway();
+               final TaskManagerLocation taskManagerLocation = new 
LocalTaskManagerLocation();
+               
rpcService.registerGateway(testingTaskExecutorGateway.getAddress(), 
testingTaskExecutorGateway);
+
+               try {
+                       jobMaster.start(jobMasterId, testingTimeout).get();
+
+                       final JobMasterGateway jobMasterGateway = 
jobMaster.getSelfGateway(JobMasterGateway.class);
+
+                       final AllocationID allocationId = 
allocationIdFuture.get();
+
+                       
jobMasterGateway.registerTaskManager(testingTaskExecutorGateway.getAddress(), 
taskManagerLocation, testingTimeout).get();
+
+                       final SlotOffer slotOffer = new SlotOffer(allocationId, 
0, ResourceProfile.UNKNOWN);
+                       final CompletableFuture<Collection<SlotOffer>> 
acceptedSlotOffers = 
jobMasterGateway.offerSlots(taskManagerLocation.getResourceID(), 
Collections.singleton(slotOffer), testingTimeout);
+
+                       final Collection<SlotOffer> slotOffers = 
acceptedSlotOffers.get();
+
+                       // check that we accepted the offered slot
+                       assertThat(slotOffers, hasSize(1));
+
+                       // now fail the allocation and check that we close the 
connection to the TaskExecutor
+                       jobMasterGateway.notifyAllocationFailure(allocationId, 
new FlinkException("Fail alloction test exception"));
+
+                       // we should free the slot and then disconnect from the 
TaskExecutor because we use no longer slots from it
+                       assertThat(freedSlotFuture.get(), 
equalTo(allocationId));
+                       assertThat(disconnectTaskExecutorFuture.get(), 
equalTo(jobGraph.getJobID()));
+               } finally {
+                       RpcUtils.terminateRpcEndpoint(jobMaster, 
testingTimeout);
+               }
+       }
+
        private JobGraph producerConsumerJobGraph() {
                final JobVertex producer = new JobVertex("Producer");
                producer.setInvokableClass(NoOpInvokable.class);
@@ -1064,6 +1116,20 @@ private JobMaster createJobMaster(
                        JobMasterTest.class.getClassLoader());
        }
 
+       private JobGraph createSingleVertexJobWithRestartStrategy() throws 
IOException {
+               final JobVertex jobVertex = new JobVertex("Test vertex");
+               jobVertex.setInvokableClass(NoOpInvokable.class);
+
+               final ExecutionConfig executionConfig = new ExecutionConfig();
+               
executionConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE,
 0L));
+
+               final JobGraph jobGraph = new JobGraph(jobVertex);
+               jobGraph.setAllowQueuedScheduling(true);
+               jobGraph.setExecutionConfig(executionConfig);
+
+               return jobGraph;
+       }
+
        /**
         * No op implementation of {@link OnCompletionActions}.
         */
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java
index 9815cb289da..3a9925c4f20 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java
@@ -21,6 +21,7 @@
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.clusterframework.types.SlotProfile;
 import org.apache.flink.runtime.concurrent.FutureUtils;
@@ -44,6 +45,7 @@
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.util.clock.ManualClock;
+import org.apache.flink.types.SerializableOptional;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.TestLogger;
@@ -59,7 +61,9 @@
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
@@ -69,6 +73,8 @@
 
 import static 
org.apache.flink.runtime.jobmaster.slotpool.AvailableSlotsTest.DEFAULT_TESTING_PROFILE;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotEquals;
@@ -692,12 +698,7 @@ public void testReleasingIdleSlotFailed() throws Exception 
{
 
                        slotPool.triggerCheckIdleSlot();
 
-                       CompletableFuture<LogicalSlot> allocatedSlotFuture = 
slotPoolGateway.allocateSlot(
-                               new SlotRequestId(),
-                               new DummyScheduledUnit(),
-                               SlotProfile.noRequirements(),
-                               true,
-                               timeout);
+                       CompletableFuture<LogicalSlot> allocatedSlotFuture = 
allocateSlot(slotPoolGateway, new SlotRequestId());
 
                        // wait until the slot has been fulfilled with the 
previously idling slot
                        final LogicalSlot logicalSlot = 
allocatedSlotFuture.get();
@@ -712,12 +713,7 @@ public void testReleasingIdleSlotFailed() throws Exception 
{
                        slotPool.triggerCheckIdleSlot();
 
                        // request a new slot after the idling slot has been 
released
-                       allocatedSlotFuture = slotPoolGateway.allocateSlot(
-                               new SlotRequestId(),
-                               new DummyScheduledUnit(),
-                               SlotProfile.noRequirements(),
-                               true,
-                               timeout);
+                       allocatedSlotFuture = allocateSlot(slotPoolGateway, new 
SlotRequestId());
 
                        // release the TaskExecutor before we get a response 
from the slot releasing
                        
slotPoolGateway.releaseTaskManager(taskManagerLocation.getResourceID(), 
null).get();
@@ -739,6 +735,114 @@ public void testReleasingIdleSlotFailed() throws 
Exception {
                }
        }
 
+       /**
+        * Tests that failed slots are freed on the {@link TaskExecutor}.
+        */
+       @Test
+       public void testFreeFailedSlots() throws Exception {
+               final SlotPool slotPool = new SlotPool(rpcService, jobId, 
LocationPreferenceSchedulingStrategy.getInstance());
+
+               try {
+                       final int parallelism = 5;
+                       final ArrayBlockingQueue<AllocationID> allocationIds = 
new ArrayBlockingQueue<>(parallelism);
+                       resourceManagerGateway.setRequestSlotConsumer(
+                               slotRequest -> 
allocationIds.offer(slotRequest.getAllocationId()));
+
+                       final SlotPoolGateway slotPoolGateway = 
setupSlotPool(slotPool, resourceManagerGateway);
+
+                       final Map<SlotRequestId, 
CompletableFuture<LogicalSlot>> slotRequestFutures = new HashMap<>(parallelism);
+
+                       for (int i = 0; i < parallelism; i++) {
+                               final SlotRequestId slotRequestId = new 
SlotRequestId();
+                               slotRequestFutures.put(slotRequestId, 
allocateSlot(slotPoolGateway, slotRequestId));
+                       }
+
+                       final List<SlotOffer> slotOffers = new 
ArrayList<>(parallelism);
+
+                       for (int i = 0; i < parallelism; i++) {
+                               slotOffers.add(new 
SlotOffer(allocationIds.take(), i, ResourceProfile.UNKNOWN));
+                       }
+
+                       
slotPoolGateway.registerTaskManager(taskManagerLocation.getResourceID());
+                       slotPoolGateway.offerSlots(taskManagerLocation, 
taskManagerGateway, slotOffers);
+
+                       // wait for the completion of both slot futures
+                       
FutureUtils.waitForAll(slotRequestFutures.values()).get();
+
+                       final ArrayBlockingQueue<AllocationID> freedSlots = new 
ArrayBlockingQueue<>(1);
+                       taskManagerGateway.setFreeSlotFunction(
+                               (allocationID, throwable) -> {
+                                       freedSlots.offer(allocationID);
+                                       return 
CompletableFuture.completedFuture(Acknowledge.get());
+                               });
+
+                       final FlinkException failException = new 
FlinkException("Test fail exception");
+                       // fail allocations one by one
+                       for (int i = 0; i < parallelism - 1; i++) {
+                               final SlotOffer slotOffer = slotOffers.get(i);
+                               final 
CompletableFuture<SerializableOptional<ResourceID>> emptyTaskExecutorFuture = 
slotPoolGateway.failAllocation(
+                                       slotOffer.getAllocationId(),
+                                       failException);
+
+                               
assertThat(emptyTaskExecutorFuture.get().isPresent(), is(false));
+                               assertThat(freedSlots.take(), 
is(equalTo(slotOffer.getAllocationId())));
+                       }
+
+                       final SlotOffer slotOffer = slotOffers.get(parallelism 
- 1);
+                       final 
CompletableFuture<SerializableOptional<ResourceID>> emptyTaskExecutorFuture = 
slotPoolGateway.failAllocation(
+                               slotOffer.getAllocationId(),
+                               failException);
+                       assertThat(emptyTaskExecutorFuture.get().get(), 
is(equalTo(taskManagerLocation.getResourceID())));
+                       assertThat(freedSlots.take(), 
is(equalTo(slotOffer.getAllocationId())));
+
+               } finally {
+                       RpcUtils.terminateRpcEndpoint(slotPool, timeout);
+               }
+       }
+
+       /**
+        * Tests that failing an allocation fails the pending slot request
+        */
+       @Test
+       public void testFailingAllocationFailsPendingSlotRequests() throws 
Exception {
+               final SlotPool slotPool = new SlotPool(rpcService, jobId, 
LocationPreferenceSchedulingStrategy.getInstance());
+
+               try {
+                       final CompletableFuture<AllocationID> 
allocationIdFuture = new CompletableFuture<>();
+                       
resourceManagerGateway.setRequestSlotConsumer(slotRequest -> 
allocationIdFuture.complete(slotRequest.getAllocationId()));
+                       final SlotPoolGateway slotPoolGateway = 
setupSlotPool(slotPool, resourceManagerGateway);
+
+                       final CompletableFuture<LogicalSlot> slotFuture = 
allocateSlot(slotPoolGateway, new SlotRequestId());
+
+                       final AllocationID allocationId = 
allocationIdFuture.get();
+
+                       assertThat(slotFuture.isDone(), is(false));
+
+                       final FlinkException cause = new FlinkException("Fail 
pending slot request failure.");
+                       final 
CompletableFuture<SerializableOptional<ResourceID>> responseFuture = 
slotPoolGateway.failAllocation(allocationId, cause);
+
+                       assertThat(responseFuture.get().isPresent(), is(false));
+
+                       try {
+                               slotFuture.get();
+                               fail("Expected a slot allocation failure.");
+                       } catch (ExecutionException ee) {
+                               
assertThat(ExceptionUtils.stripExecutionException(ee), equalTo(cause));
+                       }
+               } finally {
+                       RpcUtils.terminateRpcEndpoint(slotPool, timeout);
+               }
+       }
+
+       private CompletableFuture<LogicalSlot> allocateSlot(SlotPoolGateway 
slotPoolGateway, SlotRequestId slotRequestId) {
+               return slotPoolGateway.allocateSlot(
+                       slotRequestId,
+                       new DummyScheduledUnit(),
+                       SlotProfile.noRequirements(),
+                       true,
+                       timeout);
+       }
+
        private static SlotPoolGateway setupSlotPool(
                        SlotPool slotPool,
                        ResourceManagerGateway resourceManagerGateway) throws 
Exception {
@@ -750,13 +854,4 @@ private static SlotPoolGateway setupSlotPool(
 
                return slotPool.getSelfGateway(SlotPoolGateway.class);
        }
-
-       private AllocatedSlot createSlot(final AllocationID allocationId) {
-               return new AllocatedSlot(
-                       allocationId,
-                       taskManagerLocation,
-                       0,
-                       ResourceProfile.UNKNOWN,
-                       taskManagerGateway);
-       }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
index a9e99495e34..912de36c881 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
@@ -59,13 +59,16 @@
 
        private final Function<Tuple5<SlotID, JobID, AllocationID, String, 
ResourceManagerId>, CompletableFuture<Acknowledge>> requestSlotFunction;
 
-       TestingTaskExecutorGateway(String address, String hostname, 
Consumer<ResourceID> heartbeatJobManagerConsumer, BiConsumer<JobID, Throwable> 
disconnectJobManagerConsumer, BiFunction<TaskDeploymentDescriptor, JobMasterId, 
CompletableFuture<Acknowledge>> submitTaskConsumer, Function<Tuple5<SlotID, 
JobID, AllocationID, String, ResourceManagerId>, 
CompletableFuture<Acknowledge>> requestSlotFunction) {
+       private final BiFunction<AllocationID, Throwable, 
CompletableFuture<Acknowledge>> freeSlotFunction;
+
+       TestingTaskExecutorGateway(String address, String hostname, 
Consumer<ResourceID> heartbeatJobManagerConsumer, BiConsumer<JobID, Throwable> 
disconnectJobManagerConsumer, BiFunction<TaskDeploymentDescriptor, JobMasterId, 
CompletableFuture<Acknowledge>> submitTaskConsumer, Function<Tuple5<SlotID, 
JobID, AllocationID, String, ResourceManagerId>, 
CompletableFuture<Acknowledge>> requestSlotFunction, BiFunction<AllocationID, 
Throwable, CompletableFuture<Acknowledge>> freeSlotFunction) {
                this.address = Preconditions.checkNotNull(address);
                this.hostname = Preconditions.checkNotNull(hostname);
                this.heartbeatJobManagerConsumer = 
Preconditions.checkNotNull(heartbeatJobManagerConsumer);
                this.disconnectJobManagerConsumer = 
Preconditions.checkNotNull(disconnectJobManagerConsumer);
                this.submitTaskConsumer = 
Preconditions.checkNotNull(submitTaskConsumer);
                this.requestSlotFunction = 
Preconditions.checkNotNull(requestSlotFunction);
+               this.freeSlotFunction = 
Preconditions.checkNotNull(freeSlotFunction);
        }
 
        @Override
@@ -141,7 +144,7 @@ public void disconnectResourceManager(Exception cause) {
 
        @Override
        public CompletableFuture<Acknowledge> freeSlot(AllocationID 
allocationId, Throwable cause, Time timeout) {
-               return CompletableFuture.completedFuture(Acknowledge.get());
+               return freeSlotFunction.apply(allocationId, cause);
        }
 
        @Override
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGatewayBuilder.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGatewayBuilder.java
index 1c2f1328a5a..e59eefd0c8d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGatewayBuilder.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGatewayBuilder.java
@@ -43,6 +43,7 @@
        private static final BiConsumer<JobID, Throwable> 
NOOP_DISCONNECT_JOBMANAGER_CONSUMER = (ignoredA, ignoredB) -> {};
        private static final BiFunction<TaskDeploymentDescriptor, JobMasterId, 
CompletableFuture<Acknowledge>> NOOP_SUBMIT_TASK_CONSUMER = (ignoredA, 
ignoredB) -> CompletableFuture.completedFuture(Acknowledge.get());
        private static final Function<Tuple5<SlotID, JobID, AllocationID, 
String, ResourceManagerId>, CompletableFuture<Acknowledge>> 
NOOP_REQUEST_SLOT_FUNCTION = ignored -> 
CompletableFuture.completedFuture(Acknowledge.get());
+       private static final BiFunction<AllocationID, Throwable, 
CompletableFuture<Acknowledge>> NOOP_FREE_SLOT_FUNCTION = (ignoredA, ignoredB) 
-> CompletableFuture.completedFuture(Acknowledge.get());
 
        private String address = "foobar:1234";
        private String hostname = "foobar";
@@ -50,6 +51,7 @@
        private BiConsumer<JobID, Throwable> disconnectJobManagerConsumer = 
NOOP_DISCONNECT_JOBMANAGER_CONSUMER;
        private BiFunction<TaskDeploymentDescriptor, JobMasterId, 
CompletableFuture<Acknowledge>> submitTaskConsumer = NOOP_SUBMIT_TASK_CONSUMER;
        private Function<Tuple5<SlotID, JobID, AllocationID, String, 
ResourceManagerId>, CompletableFuture<Acknowledge>> requestSlotFunction = 
NOOP_REQUEST_SLOT_FUNCTION;
+       private BiFunction<AllocationID, Throwable, 
CompletableFuture<Acknowledge>> freeSlotFunction = NOOP_FREE_SLOT_FUNCTION;
 
        public TestingTaskExecutorGatewayBuilder setAddress(String address) {
                this.address = address;
@@ -81,7 +83,12 @@ public TestingTaskExecutorGatewayBuilder 
setRequestSlotFunction(Function<Tuple5<
                return this;
        }
 
+       public TestingTaskExecutorGatewayBuilder 
setFreeSlotFunction(BiFunction<AllocationID, Throwable, 
CompletableFuture<Acknowledge>> freeSlotFunction) {
+               this.freeSlotFunction = freeSlotFunction;
+               return this;
+       }
+
        public TestingTaskExecutorGateway createTestingTaskExecutorGateway() {
-               return new TestingTaskExecutorGateway(address, hostname, 
heartbeatJobManagerConsumer, disconnectJobManagerConsumer, submitTaskConsumer, 
requestSlotFunction);
+               return new TestingTaskExecutorGateway(address, hostname, 
heartbeatJobManagerConsumer, disconnectJobManagerConsumer, submitTaskConsumer, 
requestSlotFunction, freeSlotFunction);
        }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Release TaskExecutors from SlotPool if all slots have been removed
> ------------------------------------------------------------------
>
>                 Key: FLINK-9912
>                 URL: https://issues.apache.org/jira/browse/FLINK-9912
>             Project: Flink
>          Issue Type: Improvement
>          Components: Distributed Coordination
>    Affects Versions: 1.5.1, 1.6.0
>            Reporter: Till Rohrmann
>            Assignee: Till Rohrmann
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.7.0
>
>
> Currently, it is possible to fail slot allocations in the {{SlotPool}}. 
> Failing an allocation means that the slot is removed from the {{SlotPool}}. 
> If we have removed all slots from a {{TaskExecutor}}, then we should also 
> release/close the connection to this {{TaskExecutor}}. At the moment, this 
> only happens via the heartbeats if the {{TaskExecutor}} has become 
> unreachable.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to